CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/DQMServices/Core/src/DQMService.cc

Go to the documentation of this file.
00001 #if !WITHOUT_CMS_FRAMEWORK
00002 # include "DQMServices/Core/src/DQMService.h"
00003 # include "DQMServices/Core/interface/DQMNet.h"
00004 # include "DQMServices/Core/interface/DQMStore.h"
00005 # include "DQMServices/Core/interface/DQMScope.h"
00006 # include "DQMServices/Core/interface/MonitorElement.h"
00007 # include "FWCore/ServiceRegistry/interface/Service.h"
00008 # include "classlib/utils/Regexp.h"
00009 # include "classlib/utils/Error.h"
00010 # include <mutex>
00011 # include <iostream>
00012 # include <string>
00013 # include <memory>
00014 #include "TBufferFile.h"
00015 
00016 // -------------------------------------------------------------------
00017 static std::recursive_mutex s_mutex;
00018 
00021 DQMScope::DQMScope(void)
00022 { s_mutex.lock(); }
00023 
00025 DQMScope::~DQMScope(void)
00026 { s_mutex.unlock(); }
00027 
00029 static void
00030 restrictDQMAccess(void)
00031 { s_mutex.lock(); }
00032 
00033 static void
00034 restrictDQMAccessM(const edm::ModuleDescription &)
00035 { restrictDQMAccess(); }
00036 
00038 static void
00039 releaseDQMAccess(void)
00040 { s_mutex.unlock(); }
00041 
00042 static void
00043 releaseDQMAccessM(const edm::ModuleDescription &)
00044 { releaseDQMAccess(); }
00045 
00046 // -------------------------------------------------------------------
00047 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
00048   : store_(&*edm::Service<DQMStore>()),
00049     net_(0),
00050     filter_(0),
00051     lastFlush_(0),
00052     publishFrequency_(5.0)
00053 {
00054   ar.watchPreSourceConstruction(&restrictDQMAccessM);
00055   ar.watchPostSourceConstruction(&releaseDQMAccessM);
00056   ar.watchPreSource(&restrictDQMAccess);
00057   ar.watchPostSource(&releaseDQMAccess);
00058   ar.watchPreModule(&restrictDQMAccessM);
00059   ar.watchPostModule(&releaseDQMAccessM);
00060   ar.watchPostProcessEvent(this, &DQMService::flush);
00061   ar.watchPostEndJob(this, &DQMService::shutdown);
00062 
00063   std::string host = pset.getUntrackedParameter<std::string>("collectorHost", ""); 
00064   int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
00065   bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
00066   publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
00067   std::string filter = pset.getUntrackedParameter<std::string>("filter", "");
00068 
00069   if (host != "" && port > 0)
00070   {
00071     net_ = new DQMBasicNet;
00072     net_->debug(verbose);
00073     net_->updateToCollector(host, port);
00074     net_->start();
00075   }
00076 
00077   if (! filter.empty())
00078   {
00079     try
00080     {
00081       filter_ = new lat::Regexp(filter);
00082       if (! filter_->valid())
00083         throw cms::Exception("DQMService")
00084           << "Invalid 'filter' parameter value '" << filter << "':"
00085           << " bad regular expression syntax at character "
00086           << filter_->errorOffset() << ": " << filter_->errorMessage();
00087       filter_->study();
00088     }
00089     catch (lat::Error &e)
00090     {
00091       throw cms::Exception("DQMService")
00092         << "Invalid regular expression 'filter' parameter value '"
00093         << filter << "': " << e.explain();
00094     }
00095   }
00096 }
00097 
00098 DQMService::~DQMService(void)
00099 {
00100   shutdown();
00101 }
00102 
00103 // Flush updates to the network layer at the end of each event.  This
00104 // is the only point at which the main application and the network
00105 // layer interact outside initialisation and exit.
00106 void DQMService::flushStandalone()
00107 {
00108   // Avoid sending updates excessively often.
00109   uint64_t version = lat::Time::current().ns();
00110   double vtime = version * 1e-9;
00111   if (vtime - lastFlush_ < publishFrequency_)
00112     return;
00113 
00114   // OK, send an update.
00115   if (net_)
00116   {
00117     DQMNet::Object o;
00118     std::set<std::string> seen;
00119     std::string fullpath;
00120 
00121     // Lock the network layer so we can modify the data.
00122     net_->lock();
00123     bool updated = false;
00124 
00125     // Find updated contents and update the network cache.
00126     DQMStore::MEMap::iterator i, e;
00127     net_->reserveLocalSpace(store_->data_.size());
00128     for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
00129     {
00130       const MonitorElement &me = *i;
00131       fullpath.clear();
00132       fullpath += *me.data_.dirname;
00133       if (! me.data_.dirname->empty())
00134         fullpath += '/';
00135       fullpath += me.data_.objname;
00136 
00137       if (filter_ && filter_->search(fullpath) < 0)
00138         continue;
00139 
00140       seen.insert(fullpath);
00141       if (! me.wasUpdated())
00142         continue;
00143 
00144       o.lastreq = 0;
00145       o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
00146       o.flags = me.data_.flags;
00147       o.tag = me.data_.tag;
00148       o.version = version;
00149       o.dirname = me.data_.dirname;
00150       o.objname = me.data_.objname;
00151       assert(o.rawdata.empty());
00152       assert(o.scalar.empty());
00153       assert(o.qdata.empty());
00154 
00155       // Pack object and reference, scalar and quality data.
00156       switch (me.kind())
00157       {
00158       case MonitorElement::DQM_KIND_INT:
00159       case MonitorElement::DQM_KIND_REAL:
00160       case MonitorElement::DQM_KIND_STRING:
00161         me.packScalarData(o.scalar, "");
00162         break;
00163 
00164       default:
00165         {
00166           TBufferFile buffer(TBufferFile::kWrite);
00167           buffer.WriteObject(me.object_);
00168           if (me.reference_)
00169             buffer.WriteObject(me.reference_);
00170           else
00171             buffer.WriteObjectAny(0, 0);
00172           o.rawdata.resize(buffer.Length());
00173           memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
00174           DQMNet::packQualityData(o.qdata, me.data_.qreports);
00175           break;
00176         }
00177       }
00178 
00179       // Update.
00180       net_->updateLocalObject(o);
00181       DQMNet::DataBlob().swap(o.rawdata);
00182       std::string().swap(o.scalar);
00183       std::string().swap(o.qdata);
00184       updated = true;
00185     }
00186 
00187     // Find removed contents and clear the network cache.
00188     if (net_->removeLocalExcept(seen))
00189       updated = true;
00190 
00191     // Unlock the network layer.
00192     net_->unlock();
00193 
00194     // Tell network to flush if we updated something.
00195     if (updated)
00196       net_->sendLocalChanges();
00197   }
00198 
00199   store_->reset();
00200   lastFlush_ = lat::Time::current().ns() * 1e-9;
00201 
00202 
00203 }
00204 void
00205 DQMService::flush(const edm::Event &, const edm::EventSetup &)
00206 {
00207   // Call a function independent to the framework
00208   flushStandalone();
00209 }
00210 
00211 // Disengage the network service.
00212 void
00213 DQMService::shutdown(void)
00214 {
00215   // If we have a network, let it go.
00216   if (net_)
00217     net_->shutdown();
00218 }
00219 
00220 #endif // !WITHOUT_CMS_FRAMEWORK