CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/DQMServices/Core/src/DQMService.cc

Go to the documentation of this file.
00001 #if !WITHOUT_CMS_FRAMEWORK
00002 # include "DQMServices/Core/src/DQMService.h"
00003 # include "DQMServices/Core/interface/DQMNet.h"
00004 # include "DQMServices/Core/interface/DQMStore.h"
00005 # include "DQMServices/Core/interface/DQMScope.h"
00006 # include "DQMServices/Core/interface/MonitorElement.h"
00007 # include "FWCore/ServiceRegistry/interface/Service.h"
00008 # include "classlib/utils/Regexp.h"
00009 # include "classlib/utils/Error.h"
00010 # include <pthread.h>
00011 # include <iostream>
00012 # include <string>
00013 # include <memory>
00014 #include "TBufferFile.h"
00015 
00016 // -------------------------------------------------------------------
00017 static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
00018 
00021 DQMScope::DQMScope(void)
00022 { pthread_mutex_lock(&s_mutex); }
00023 
00025 DQMScope::~DQMScope(void)
00026 { pthread_mutex_unlock(&s_mutex); }
00027 
00029 static void
00030 restrictDQMAccess(void)
00031 { pthread_mutex_lock(&s_mutex); }
00032 
00033 static void
00034 restrictDQMAccessM(const edm::ModuleDescription &)
00035 { restrictDQMAccess(); }
00036 
00038 static void
00039 releaseDQMAccess(void)
00040 { pthread_mutex_unlock(&s_mutex); }
00041 
00042 static void
00043 releaseDQMAccessM(const edm::ModuleDescription &)
00044 { releaseDQMAccess(); }
00045 
00046 // -------------------------------------------------------------------
00047 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
00048   : store_(&*edm::Service<DQMStore>()),
00049     net_(0),
00050     filter_(0),
00051     lastFlush_(0),
00052     publishFrequency_(5.0)
00053 {
00054   ar.watchPreSourceConstruction(&restrictDQMAccessM);
00055   ar.watchPostSourceConstruction(&releaseDQMAccessM);
00056   ar.watchPreSource(&restrictDQMAccess);
00057   ar.watchPostSource(&releaseDQMAccess);
00058   ar.watchPreModule(&restrictDQMAccessM);
00059   ar.watchPostModule(&releaseDQMAccessM);
00060   ar.watchPostProcessEvent(this, &DQMService::flush);
00061   ar.watchPostEndJob(this, &DQMService::shutdown);
00062 
00063   std::string host = pset.getUntrackedParameter<std::string>("collectorHost", ""); 
00064   int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
00065   bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
00066   publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
00067   std::string filter = pset.getUntrackedParameter<std::string>("filter", "");
00068 
00069   if (host != "" && port > 0)
00070   {
00071     net_ = new DQMBasicNet;
00072     net_->debug(verbose);
00073     net_->updateToCollector(host, port);
00074     net_->start();
00075   }
00076 
00077   if (! filter.empty())
00078   {
00079     try
00080     {
00081       filter_ = new lat::Regexp(filter);
00082       if (! filter_->valid())
00083         throw cms::Exception("DQMService")
00084           << "Invalid 'filter' parameter value '" << filter << "':"
00085           << " bad regular expression syntax at character "
00086           << filter_->errorOffset() << ": " << filter_->errorMessage();
00087       filter_->study();
00088     }
00089     catch (lat::Error &e)
00090     {
00091       throw cms::Exception("DQMService")
00092         << "Invalid regular expression 'filter' parameter value '"
00093         << filter << "': " << e.explain();
00094     }
00095   }
00096 }
00097 
00098 DQMService::~DQMService(void)
00099 {
00100   shutdown();
00101 }
00102 
00103 // Flush updates to the network layer at the end of each event.  This
00104 // is the only point at which the main application and the network
00105 // layer interact outside initialisation and exit.
00106 void
00107 DQMService::flush(const edm::Event &, const edm::EventSetup &)
00108 {
00109   // Avoid sending updates excessively often.
00110   uint64_t version = lat::Time::current().ns();
00111   double vtime = version * 1e-9;
00112   if (vtime - lastFlush_ < publishFrequency_)
00113     return;
00114 
00115   // OK, send an update.
00116   if (net_)
00117   {
00118     DQMNet::Object o;
00119     std::set<std::string> seen;
00120     std::string fullpath;
00121 
00122     // Lock the network layer so we can modify the data.
00123     net_->lock();
00124     bool updated = false;
00125 
00126     // Find updated contents and update the network cache.
00127     DQMStore::MEMap::iterator i, e;
00128     net_->reserveLocalSpace(store_->data_.size());
00129     for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
00130     {
00131       const MonitorElement &me = *i;
00132       fullpath.clear();
00133       fullpath += *me.data_.dirname;
00134       if (! me.data_.dirname->empty())
00135         fullpath += '/';
00136       fullpath += me.data_.objname;
00137 
00138       if (filter_ && filter_->search(fullpath) < 0)
00139         continue;
00140 
00141       seen.insert(fullpath);
00142       if (! me.wasUpdated())
00143         continue;
00144 
00145       o.lastreq = 0;
00146       o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
00147       o.flags = me.data_.flags;
00148       o.tag = me.data_.tag;
00149       o.version = version;
00150       o.dirname = me.data_.dirname;
00151       o.objname = me.data_.objname;
00152       assert(o.rawdata.empty());
00153       assert(o.scalar.empty());
00154       assert(o.qdata.empty());
00155 
00156       // Pack object and reference, scalar and quality data.
00157       switch (me.kind())
00158       {
00159       case MonitorElement::DQM_KIND_INT:
00160       case MonitorElement::DQM_KIND_REAL:
00161       case MonitorElement::DQM_KIND_STRING:
00162         me.packScalarData(o.scalar, "");
00163         break;
00164 
00165       default:
00166         {
00167           TBufferFile buffer(TBufferFile::kWrite);
00168           buffer.WriteObject(me.object_);
00169           if (me.reference_)
00170             buffer.WriteObject(me.reference_);
00171           else
00172             buffer.WriteObjectAny(0, 0);
00173           o.rawdata.resize(buffer.Length());
00174           memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
00175           DQMNet::packQualityData(o.qdata, me.data_.qreports);
00176           break;
00177         }
00178       }
00179 
00180       // Update.
00181       net_->updateLocalObject(o);
00182       DQMNet::DataBlob().swap(o.rawdata);
00183       std::string().swap(o.scalar);
00184       std::string().swap(o.qdata);
00185       updated = true;
00186     }
00187 
00188     // Find removed contents and clear the network cache.
00189     if (net_->removeLocalExcept(seen))
00190       updated = true;
00191 
00192     // Unlock the network layer.
00193     net_->unlock();
00194 
00195     // Tell network to flush if we updated something.
00196     if (updated)
00197       net_->sendLocalChanges();
00198   }
00199 
00200   store_->reset();
00201   lastFlush_ = lat::Time::current().ns() * 1e-9;
00202 }
00203 
00204 // Disengage the network service.
00205 void
00206 DQMService::shutdown(void)
00207 {
00208   // If we have a network, let it go.
00209   if (net_)
00210     net_->shutdown();
00211 }
00212 
00213 #endif // !WITHOUT_CMS_FRAMEWORK