#include <DQMService.h>
Public Member Functions | |
DQMService (const edm::ParameterSet &pset, edm::ActivityRegistry &ar) | |
~DQMService (void) | |
Private Member Functions | |
void | flush (const edm::Event &, const edm::EventSetup &) |
void | shutdown (void) |
Private Attributes | |
lat::Regexp * | filter_ |
double | lastFlush_ |
DQMBasicNet * | net_ |
double | publishFrequency_ |
DQMStore * | store_ |
A bridge to udpate the DQM network layer at the end of every event.
Definition at line 13 of file DQMService.h.
DQMService::DQMService | ( | const edm::ParameterSet & | pset, |
edm::ActivityRegistry & | ar | ||
) |
Definition at line 47 of file DQMService.cc.
References ExpressReco_HICollisions_FallBack::e, Exception, align_tpl::filter, filter_, flush(), edm::ParameterSet::getUntrackedParameter(), query::host, query::port, publishFrequency_, releaseDQMAccess(), releaseDQMAccessM(), restrictDQMAccess(), restrictDQMAccessM(), shutdown(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostModule(), edm::ActivityRegistry::watchPostProcessEvent(), edm::ActivityRegistry::watchPostSource(), edm::ActivityRegistry::watchPostSourceConstruction(), edm::ActivityRegistry::watchPreModule(), edm::ActivityRegistry::watchPreSource(), and edm::ActivityRegistry::watchPreSourceConstruction().
: store_(&*edm::Service<DQMStore>()), net_(0), filter_(0), lastFlush_(0), publishFrequency_(5.0) { ar.watchPreSourceConstruction(&restrictDQMAccessM); ar.watchPostSourceConstruction(&releaseDQMAccessM); ar.watchPreSource(&restrictDQMAccess); ar.watchPostSource(&releaseDQMAccess); ar.watchPreModule(&restrictDQMAccessM); ar.watchPostModule(&releaseDQMAccessM); ar.watchPostProcessEvent(this, &DQMService::flush); ar.watchPostEndJob(this, &DQMService::shutdown); std::string host = pset.getUntrackedParameter<std::string>("collectorHost", ""); int port = pset.getUntrackedParameter<int>("collectorPort", 9090); bool verbose = pset.getUntrackedParameter<bool>("verbose", false); publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_); std::string filter = pset.getUntrackedParameter<std::string>("filter", ""); if (host != "" && port > 0) { net_ = new DQMBasicNet; net_->debug(verbose); net_->updateToCollector(host, port); net_->start(); } if (! filter.empty()) { try { filter_ = new lat::Regexp(filter); if (! filter_->valid()) throw cms::Exception("DQMService") << "Invalid 'filter' parameter value '" << filter << "':" << " bad regular expression syntax at character " << filter_->errorOffset() << ": " << filter_->errorMessage(); filter_->study(); } catch (lat::Error &e) { throw cms::Exception("DQMService") << "Invalid regular expression 'filter' parameter value '" << filter << "': " << e.explain(); } } }
DQMService::~DQMService | ( | void | ) |
void DQMService::flush | ( | const edm::Event & | , |
const edm::EventSetup & | |||
) | [private] |
Definition at line 107 of file DQMService.cc.
References cond::rpcobimon::current, DQMStore::data_, MonitorElement::data_, DQMNet::CoreObject::dirname, MonitorElement::DQM_KIND_INT, MonitorElement::DQM_KIND_REAL, MonitorElement::DQM_KIND_STRING, DQMNet::dqmhash(), ExpressReco_HICollisions_FallBack::e, filter_, DQMNet::CoreObject::flags, DQMNet::Object::hash, i, MonitorElement::kind(), lastFlush_, DQMNet::Object::lastreq, connectstrParser::o, MonitorElement::object_, DQMNet::CoreObject::objname, DQMNet::packQualityData(), MonitorElement::packScalarData(), publishFrequency_, DQMNet::Object::qdata, DQMNet::CoreObject::qreports, DQMNet::Object::rawdata, MonitorElement::reference_, DQMStore::reset(), DQMNet::Object::scalar, store_, DQMNet::CoreObject::tag, DQMNet::CoreObject::version, AlCaRecoCosmics_cfg::version, and MonitorElement::wasUpdated().
Referenced by DQMService().
{ // Avoid sending updates excessively often. uint64_t version = lat::Time::current().ns(); double vtime = version * 1e-9; if (vtime - lastFlush_ < publishFrequency_) return; // OK, send an update. if (net_) { DQMNet::Object o; std::set<std::string> seen; std::string fullpath; // Lock the network layer so we can modify the data. net_->lock(); bool updated = false; // Find updated contents and update the network cache. DQMStore::MEMap::iterator i, e; net_->reserveLocalSpace(store_->data_.size()); for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i) { const MonitorElement &me = *i; fullpath.clear(); fullpath += *me.data_.dirname; if (! me.data_.dirname->empty()) fullpath += '/'; fullpath += me.data_.objname; if (filter_ && filter_->search(fullpath) < 0) continue; seen.insert(fullpath); if (! me.wasUpdated()) continue; o.lastreq = 0; o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size()); o.flags = me.data_.flags; o.tag = me.data_.tag; o.version = version; o.dirname = me.data_.dirname; o.objname = me.data_.objname; assert(o.rawdata.empty()); assert(o.scalar.empty()); assert(o.qdata.empty()); // Pack object and reference, scalar and quality data. switch (me.kind()) { case MonitorElement::DQM_KIND_INT: case MonitorElement::DQM_KIND_REAL: case MonitorElement::DQM_KIND_STRING: me.packScalarData(o.scalar, ""); break; default: { TBufferFile buffer(TBufferFile::kWrite); buffer.WriteObject(me.object_); if (me.reference_) buffer.WriteObject(me.reference_); else buffer.WriteObjectAny(0, 0); o.rawdata.resize(buffer.Length()); memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length()); DQMNet::packQualityData(o.qdata, me.data_.qreports); break; } } // Update. net_->updateLocalObject(o); DQMNet::DataBlob().swap(o.rawdata); std::string().swap(o.scalar); std::string().swap(o.qdata); updated = true; } // Find removed contents and clear the network cache. if (net_->removeLocalExcept(seen)) updated = true; // Unlock the network layer. net_->unlock(); // Tell network to flush if we updated something. if (updated) net_->sendLocalChanges(); } store_->reset(); lastFlush_ = lat::Time::current().ns() * 1e-9; }
void DQMService::shutdown | ( | void | ) | [private] |
Definition at line 206 of file DQMService.cc.
Referenced by DQMService(), and ~DQMService().
lat::Regexp* DQMService::filter_ [private] |
Definition at line 25 of file DQMService.h.
Referenced by DQMService(), and flush().
double DQMService::lastFlush_ [private] |
Definition at line 26 of file DQMService.h.
Referenced by flush().
DQMBasicNet* DQMService::net_ [private] |
Definition at line 24 of file DQMService.h.
double DQMService::publishFrequency_ [private] |
Definition at line 27 of file DQMService.h.
Referenced by DQMService(), and flush().
DQMStore* DQMService::store_ [private] |
Definition at line 23 of file DQMService.h.
Referenced by flush().