#include <DQMServices/Core/src/DQMService.h>
Public Member Functions | |
DQMService (const edm::ParameterSet &pset, edm::ActivityRegistry &ar) | |
~DQMService (void) | |
Private Member Functions | |
void | flush (const edm::Event &, const edm::EventSetup &) |
void | shutdown (void) |
Private Attributes | |
lat::Regexp * | filter_ |
double | lastFlush_ |
DQMNet * | net_ |
double | publishFrequency_ |
DQMStore * | store_ |
Definition at line 13 of file DQMService.h.
DQMService::DQMService | ( | const edm::ParameterSet & | pset, | |
edm::ActivityRegistry & | ar | |||
) |
Definition at line 46 of file DQMService.cc.
References DQMNet::debug(), e, lat::Regexp::errorMessage(), lat::Regexp::errorOffset(), Exception, lat::Error::explain(), filter, filter_, flush(), edm::ParameterSet::getUntrackedParameter(), cmsBenchmark::host, net_, python_dbs::port, publishFrequency_, releaseDQMAccess(), releaseDQMAccessM(), restrictDQMAccess(), restrictDQMAccessM(), shutdown(), DQMNet::start(), lat::Regexp::study(), DQMNet::updateToCollector(), lat::Regexp::valid(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostModule(), edm::ActivityRegistry::watchPostProcessEvent(), edm::ActivityRegistry::watchPostSource(), edm::ActivityRegistry::watchPostSourceConstruction(), edm::ActivityRegistry::watchPreModule(), edm::ActivityRegistry::watchPreSource(), and edm::ActivityRegistry::watchPreSourceConstruction().
00047 : store_(&*edm::Service<DQMStore>()), 00048 net_(0), 00049 filter_(0), 00050 lastFlush_(0), 00051 publishFrequency_(5.0) 00052 { 00053 ar.watchPreSourceConstruction(&restrictDQMAccessM); 00054 ar.watchPostSourceConstruction(&releaseDQMAccessM); 00055 ar.watchPreSource(&restrictDQMAccess); 00056 ar.watchPostSource(&releaseDQMAccess); 00057 ar.watchPreModule(&restrictDQMAccessM); 00058 ar.watchPostModule(&releaseDQMAccessM); 00059 ar.watchPostProcessEvent(this, &DQMService::flush); 00060 ar.watchPostEndJob(this, &DQMService::shutdown); 00061 00062 std::string host = pset.getUntrackedParameter<std::string>("collectorHost", ""); 00063 int port = pset.getUntrackedParameter<int>("collectorPort", 9090); 00064 bool verbose = pset.getUntrackedParameter<bool>("verbose", false); 00065 publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_); 00066 std::string filter = pset.getUntrackedParameter<std::string>("filter", ""); 00067 00068 if (host != "" && port > 0) 00069 { 00070 net_ = new DQMBasicNet; 00071 net_->debug(verbose); 00072 net_->updateToCollector(host, port); 00073 net_->start(); 00074 } 00075 00076 if (! filter.empty()) 00077 { 00078 try 00079 { 00080 filter_ = new lat::Regexp(filter); 00081 if (! filter_->valid()) 00082 throw cms::Exception("DQMService") 00083 << "Invalid 'filter' parameter value '" << filter << "':" 00084 << " bad regular expression syntax at character " 00085 << filter_->errorOffset() << ": " << filter_->errorMessage(); 00086 filter_->study(); 00087 } 00088 catch (lat::Error &e) 00089 { 00090 throw cms::Exception("DQMService") 00091 << "Invalid regular expression 'filter' parameter value '" 00092 << filter << "': " << e.explain(); 00093 } 00094 } 00095 }
DQMService::~DQMService | ( | void | ) |
Definition at line 97 of file DQMService.cc.
References shutdown().
00098 { 00099 shutdown(); 00100 }
void DQMService::flush | ( | const edm::Event & | , | |
const edm::EventSetup & | ||||
) | [private] |
Definition at line 106 of file DQMService.cc.
References lat::Time::current(), DQMStore::data_, MonitorElement::data_, e, filter_, DQMNet::CoreObject::flags, i, lastFlush_, DQMNet::Object::lastreq, DQMNet::lock(), me, DQMNet::CoreObject::name, net_, DQMNet::CoreObject::object, publishFrequency_, DQMNet::CoreObject::qreports, MonitorElement::qualityTagString(), DQMNet::Object::rawdata, DQMNet::CoreObject::reference, DQMStore::removed_, DQMNet::removeLocalObject(), DQMStore::reset(), s, lat::Regexp::search(), DQMNet::sendLocalChanges(), store_, DQMNet::CoreObject::tags, DQMNet::unlock(), DQMNet::updateLocalObject(), version(), DQMNet::CoreObject::version, and MonitorElement::wasUpdated().
Referenced by DQMService().
00107 { 00108 // Avoid sending updates excessively often. 00109 uint64_t version = lat::Time::current().ns(); 00110 double vtime = version * 1e-9; 00111 if (vtime - lastFlush_ < publishFrequency_) 00112 return; 00113 lastFlush_ = vtime; 00114 00115 // OK, send an update. 00116 if (net_) 00117 { 00118 // Lock the network layer so we can modify the data. 00119 net_->lock(); 00120 bool updated = false; 00121 00122 // Find updated contents and update the network cache. 00123 DQMStore::MEMap::iterator i, e; 00124 for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i) 00125 { 00126 MonitorElement &me = i->second; 00127 if (! me.wasUpdated()) 00128 continue; 00129 00130 if (filter_ && filter_->search(me.data_.name) < 0) 00131 continue; 00132 00133 assert(me.data_.object); 00134 00135 DQMNet::Object o; 00136 o.version = version; 00137 o.name = me.data_.name; 00138 o.tags = me.data_.tags; 00139 o.object = 0; 00140 o.reference = 0; 00141 o.flags = me.data_.flags; 00142 o.lastreq = 0; 00143 00144 DQMRootBuffer buffer (DQMRootBuffer::kWrite); 00145 buffer.WriteObject(me.data_.object); 00146 if (me.data_.reference) 00147 buffer.WriteObject(me.data_.reference); 00148 else 00149 buffer.WriteObjectAny(0, 0); 00150 00151 // Save the quality test results. 00152 DQMNet::QReports::iterator qi, qe; 00153 for (qi = me.data_.qreports.begin(), qe = me.data_.qreports.end(); qi != qe; ++qi) 00154 { 00155 TObjString s (me.qualityTagString(*qi).c_str()); 00156 buffer.WriteObject(&s); 00157 } 00158 00159 // Save this ensemble to the buffer. 00160 o.rawdata.resize(buffer.Length()); 00161 memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length()); 00162 net_->updateLocalObject(o); 00163 updated = true; 00164 } 00165 00166 // Find removed contents and clear the network cache. 00167 std::vector<std::string>::iterator ri, re; 00168 for (ri = store_->removed_.begin(), re = store_->removed_.end(); ri != re; ++ri, updated = true) 00169 net_->removeLocalObject(*ri); 00170 00171 // Unlock the network layer. 00172 net_->unlock(); 00173 00174 // Tell network to flush if we updated something. 00175 if (updated) 00176 net_->sendLocalChanges(); 00177 } 00178 00179 store_->reset(); 00180 }
Definition at line 184 of file DQMService.cc.
References net_, and DQMNet::shutdown().
Referenced by DQMService(), and ~DQMService().
lat::Regexp* DQMService::filter_ [private] |
double DQMService::lastFlush_ [private] |
DQMNet* DQMService::net_ [private] |
double DQMService::publishFrequency_ [private] |
DQMStore* DQMService::store_ [private] |