#include <DQMNet.h>
Classes | |
struct | AutoPeer |
struct | Bucket |
struct | CoreObject |
struct | HashEqual |
struct | HashOp |
struct | Object |
struct | Peer |
struct | QValue |
struct | WaitObject |
Public Types | |
typedef std::vector< unsigned char > | DataBlob |
typedef std::vector< QValue > | QReports |
typedef std::vector< uint32_t > | TagList |
typedef std::list< WaitObject > | WaitList |
Public Member Functions | |
void | debug (bool doit) |
void | delay (int delay) |
DQMNet (const std::string &appname="") | |
void | listenToCollector (const std::string &host, int port) |
void | lock (void) |
Acquire a lock on the DQM net layer. More... | |
void | run (void) |
void | sendLocalChanges (void) |
void | shutdown (void) |
Stop the network layer and wait it to finish. More... | |
void | staleObjectWaitLimit (lat::TimeSpan time) |
void | start (void) |
void | startLocalServer (int port) |
void | startLocalServer (const char *path) |
void | unlock (void) |
Release the lock on the DQM net layer. More... | |
void | updateToCollector (const std::string &host, int port) |
virtual | ~DQMNet (void) |
Static Public Member Functions | |
static size_t | dqmhash (const void *key, size_t keylen) |
static void | packQualityData (std::string &into, const QReports &qr) |
static bool | setOrder (const CoreObject &a, const CoreObject &b) |
static void | unpackQualityData (QReports &qr, uint32_t &flags, const char *from) |
Static Public Attributes | |
static const uint32_t | DQM_MSG_GET_OBJECT = 3 |
static const uint32_t | DQM_MSG_HELLO = 0 |
static const uint32_t | DQM_MSG_LIST_OBJECTS = 2 |
static const uint32_t | DQM_MSG_UPDATE_ME = 1 |
static const uint32_t | DQM_PROP_ACCUMULATE = 0x00004000 |
static const uint32_t | DQM_PROP_DEAD = 0x00080000 |
static const uint32_t | DQM_PROP_EFFICIENCY_PLOT = 0x00200000 |
static const uint32_t | DQM_PROP_HAS_REFERENCE = 0x00001000 |
static const uint32_t | DQM_PROP_LUMI = 0x00040000 |
static const uint32_t | DQM_PROP_MARKTODELETE = 0x01000000 |
static const uint32_t | DQM_PROP_NEW = 0x00010000 |
static const uint32_t | DQM_PROP_RECEIVED = 0x00020000 |
static const uint32_t | DQM_PROP_REPORT_ALARM |
static const uint32_t | DQM_PROP_REPORT_CLEAR = 0x00000000 |
static const uint32_t | DQM_PROP_REPORT_ERROR = 0x00000100 |
static const uint32_t | DQM_PROP_REPORT_MASK = 0x00000f00 |
static const uint32_t | DQM_PROP_REPORT_OTHER = 0x00000400 |
static const uint32_t | DQM_PROP_REPORT_WARN = 0x00000200 |
static const uint32_t | DQM_PROP_RESET = 0x00008000 |
static const uint32_t | DQM_PROP_STALE = 0x00100000 |
static const uint32_t | DQM_PROP_TAGGED = 0x00002000 |
static const uint32_t | DQM_PROP_TYPE_DATABLOB = 0x00000050 |
static const uint32_t | DQM_PROP_TYPE_INT = 0x00000001 |
static const uint32_t | DQM_PROP_TYPE_INVALID = 0x00000000 |
static const uint32_t | DQM_PROP_TYPE_MASK = 0x000000ff |
static const uint32_t | DQM_PROP_TYPE_REAL = 0x00000002 |
static const uint32_t | DQM_PROP_TYPE_SCALAR = 0x0000000f |
static const uint32_t | DQM_PROP_TYPE_STRING = 0x00000003 |
static const uint32_t | DQM_PROP_TYPE_TH1D = 0x00000012 |
static const uint32_t | DQM_PROP_TYPE_TH1F = 0x00000010 |
static const uint32_t | DQM_PROP_TYPE_TH1S = 0x00000011 |
static const uint32_t | DQM_PROP_TYPE_TH2D = 0x00000022 |
static const uint32_t | DQM_PROP_TYPE_TH2F = 0x00000020 |
static const uint32_t | DQM_PROP_TYPE_TH2S = 0x00000021 |
static const uint32_t | DQM_PROP_TYPE_TH3D = 0x00000032 |
static const uint32_t | DQM_PROP_TYPE_TH3F = 0x00000030 |
static const uint32_t | DQM_PROP_TYPE_TH3S = 0x00000031 |
static const uint32_t | DQM_PROP_TYPE_TPROF = 0x00000040 |
static const uint32_t | DQM_PROP_TYPE_TPROF2D = 0x00000041 |
static const uint32_t | DQM_REPLY_LIST_BEGIN = 101 |
static const uint32_t | DQM_REPLY_LIST_END = 102 |
static const uint32_t | DQM_REPLY_NONE = 103 |
static const uint32_t | DQM_REPLY_OBJECT = 104 |
static const uint32_t | MAX_PEER_WAITREQS = 128 |
Protected Member Functions | |
virtual Peer * | createPeer (lat::Socket *s)=0 |
virtual Object * | findObject (Peer *p, const std::string &name, Peer **owner=0)=0 |
virtual Peer * | getPeer (lat::Socket *s)=0 |
std::ostream & | logme (void) |
virtual Object * | makeObject (Peer *p, const std::string &name)=0 |
virtual void | markObjectsDead (Peer *p)=0 |
virtual bool | onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len) |
virtual void | purgeDeadObjects (Peer *p)=0 |
virtual void | releaseFromWait (Bucket *msg, WaitObject &w, Object *o) |
virtual void | removePeer (Peer *p, lat::Socket *s)=0 |
virtual void | sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0 |
virtual void | sendObjectListToPeers (bool all)=0 |
virtual void | sendObjectToPeer (Bucket *msg, Object &o, bool data) |
virtual bool | shouldStop (void) |
void | updateMask (Peer *p) |
virtual void | updatePeerMasks (void)=0 |
void | waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner) |
Static Protected Member Functions | |
static void | copydata (Bucket *b, const void *data, size_t len) |
static void | discard (Bucket *&b) |
Protected Attributes | |
bool | debug_ |
pthread_mutex_t | lock_ |
Private Member Functions | |
DQMNet (const DQMNet &) | |
void | losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0) |
bool | onLocalNotify (lat::IOSelectEvent *ev) |
bool | onPeerConnect (lat::IOSelectEvent *ev) |
bool | onPeerData (lat::IOSelectEvent *ev, Peer *p) |
Handle communication to a particular client. More... | |
DQMNet & | operator= (const DQMNet &) |
void | releaseFromWait (WaitList::iterator i, Object *o) |
void | releaseWaiters (const std::string &name, Object *o) |
void | requestObjectData (Peer *p, const char *name, size_t len) |
Queue an object request to the data server. More... | |
Private Attributes | |
std::string | appname_ |
pthread_t | communicate_ |
int | delay_ |
AutoPeer | downstream_ |
bool | flush_ |
int | pid_ |
lat::IOSelector | sel_ |
lat::Socket * | server_ |
sig_atomic_t | shutdown_ |
AutoPeer | upstream_ |
lat::Time | version_ |
WaitList | waiting_ |
lat::TimeSpan | waitMax_ |
lat::TimeSpan | waitStale_ |
lat::Pipe | wakeup_ |
typedef std::vector<unsigned char> DQMNet::DataBlob |
typedef std::vector<QValue> DQMNet::QReports |
typedef std::vector<uint32_t> DQMNet::TagList |
typedef std::list<WaitObject> DQMNet::WaitList |
DQMNet::DQMNet | ( | const std::string & | appname = "" | ) |
Definition at line 1069 of file DQMNet.cc.
References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.
|
virtual |
|
private |
|
staticprotected |
Definition at line 53 of file DQMNet.cc.
References DQMNet::Bucket::data.
Referenced by run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
Referenced by run().
void DQMNet::debug | ( | bool | doit | ) |
void DQMNet::delay | ( | int | delay | ) |
|
staticprotected |
Definition at line 62 of file DQMNet.cc.
References b, GetRecoTauVFromDQM_MC_cff::next, and DQMNet::Bucket::next.
|
inlinestatic |
Definition at line 216 of file DQMNet.h.
References a, b, EnergyCorrector::c, dqmhashfinal, dqmhashmix, and relval_2017::k.
Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
Referenced by run().
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
void DQMNet::listenToCollector | ( | const std::string & | host, |
int | port | ||
) |
Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().
Definition at line 1222 of file DQMNet.cc.
References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.
void DQMNet::lock | ( | void | ) |
Acquire a lock on the DQM net layer.
Definition at line 1262 of file DQMNet.cc.
References communicate_, and lock_.
Referenced by run().
|
protected |
Definition at line 42 of file DQMNet.cc.
References gather_cfg::cout, and fileCollector::now.
Referenced by listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().
|
private |
Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.
Definition at line 77 of file DQMNet.cc.
References DQMNet::Peer::automatic, alignCSCRings::e, i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
|
private |
React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.
Definition at line 1008 of file DQMNet.cc.
References alignCSCRings::e, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.
Referenced by DQMNet().
|
protectedvirtual |
Definition at line 468 of file DQMNet.cc.
References assert(), DQMNet::Bucket::data, DQMNet::CoreObject::flags, flags, reco::if(), DQMNet::Object::lastreq, fileCollector::logme(), mergeVDriftHistosByStation::name, DQMNet::Peer::peeraddr, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::Peer::source, AlCaHLTBitMon_QueryRunRegistry::string, DQMNet::CoreObject::tag, DQMNet::Peer::update, DQMNet::Peer::updates, and DQMNet::CoreObject::version.
|
private |
Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false
always to tell the IOSelector to keep processing events for the server socket.
Definition at line 951 of file DQMNet.cc.
References assert(), IORead, IOUrgent, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by startLocalServer().
|
private |
Handle communication to a particular client.
Definition at line 768 of file DQMNet.cc.
References assert(), DQMNet::Peer::automatic, b, data, DQMNet::Bucket::data, fileCollector::done, alignCSCRings::e, DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, MESSAGE_SIZE_LIMIT, visualization-live-secondInstance_cfg::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, DQMNet::Peer::peeraddr, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, and DQMNet::Peer::waiting.
Referenced by onPeerConnect(), and run().
|
static |
Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.
Definition at line 177 of file DQMNet.cc.
Referenced by DQMService::flushStandalone(), and MonitorElement::packQualityData().
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
|
protectedvirtual |
Definition at line 397 of file DQMNet.cc.
References DQMNet::Bucket::data, and DQMNet::WaitObject::name.
Referenced by run().
|
private |
Definition at line 147 of file DQMNet.cc.
References assert(), visualization-live-secondInstance_cfg::msg, and DQMNet::Bucket::next.
|
private |
Definition at line 164 of file DQMNet.cc.
References alignCSCRings::e, and i.
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
|
private |
Queue an object request to the data server.
Definition at line 111 of file DQMNet.cc.
References visualization-live-secondInstance_cfg::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.
void DQMNet::run | ( | void | ) |
Run the actual I/O processing loop.
Definition at line 1295 of file DQMNet.cc.
References DQMNet::Peer::automatic, copydata(), createPeer(), debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, alignCSCRings::e, findObject(), DQMNet::CoreObject::flags, flush_, DQMNet::AutoPeer::host, i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, fileCollector::now, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, DQMNet::AutoPeer::port, DQMNet::Object::rawdata, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, unlock(), DQMNet::Peer::update, DQMNet::AutoPeer::update, updatePeerMasks(), upstream_, waiting_, waitMax_, and waitStale_.
Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().
void DQMNet::sendLocalChanges | ( | void | ) |
Definition at line 1435 of file DQMNet.cc.
References wakeup_.
Referenced by DQMImplNet< DQMNet::Object >::removePeer().
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
Referenced by run().
Definition at line 418 of file DQMNet.cc.
References DQMNet::Bucket::data, DQMNet::CoreObject::dirname, DQMNet::CoreObject::flags, flags, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::CoreObject::tag, and DQMNet::CoreObject::version.
Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
|
inlinestatic |
Definition at line 179 of file DQMNet.h.
References DQMNet::CoreObject::dirname, DQMNet::CoreObject::lumi, DQMNet::CoreObject::moduleId, DQMNet::CoreObject::objname, DQMNet::CoreObject::run, and DQMNet::CoreObject::streamId.
Referenced by MonitorElement::operator<().
|
protectedvirtual |
void DQMNet::shutdown | ( | void | ) |
Stop the network layer and wait it to finish.
Definition at line 1239 of file DQMNet.cc.
References communicate_, and shutdown_.
void DQMNet::staleObjectWaitLimit | ( | lat::TimeSpan | time | ) |
Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.
Definition at line 1121 of file DQMNet.cc.
References waitStale_.
void DQMNet::start | ( | void | ) |
Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.
Definition at line 1280 of file DQMNet.cc.
References communicate(), communicate_, lock_, and logme().
Referenced by progressbar.ProgressBar::__next__(), Types.LuminosityBlockRange::cppID(), and Types.EventRange::cppID().
void DQMNet::startLocalServer | ( | int | port | ) |
Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.
Definition at line 1130 of file DQMNet.cc.
References alignCSCRings::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.
void DQMNet::startLocalServer | ( | const char * | path | ) |
Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.
Definition at line 1168 of file DQMNet.cc.
References alignCSCRings::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), sel_, server_, and SOCKET_BUF_SIZE.
void DQMNet::unlock | ( | void | ) |
Release the lock on the DQM net layer.
Definition at line 1270 of file DQMNet.cc.
References communicate_, and lock_.
Referenced by run().
|
static |
Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().
Definition at line 199 of file DQMNet.cc.
References DQMNet::QValue::algorithm, DQMNet::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, DQMNet::QValue::message, DQMNet::QValue::qtname, DQMNet::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.
|
protected |
Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.
Definition at line 1039 of file DQMNet.cc.
References assert(), IOUrgent, IOWrite, fileCollector::logme(), DQMNet::Peer::mask, DQMNet::Peer::peeraddr, DQMNet::Peer::sendq, DQMNet::Peer::socket, and DQMNet::Peer::waiting.
Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().
|
protectedpure virtual |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
Referenced by run().
void DQMNet::updateToCollector | ( | const std::string & | host, |
int | port | ||
) |
Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().
Definition at line 1203 of file DQMNet.cc.
References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.
|
protected |
Queue a request for an object and put a peer into the mode of waiting for object data to appear.
Definition at line 131 of file DQMNet.cc.
References info(), mergeVDriftHistosByStation::name, and DQMNet::Peer::waiting.
|
private |
|
protected |
Definition at line 317 of file DQMNet.h.
Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().
|
private |
|
private |
Definition at line 342 of file DQMNet.h.
Referenced by DQMNet(), run(), and updateToCollector().
|
static |
|
static |
|
static |
Definition at line 55 of file DQMNet.h.
Referenced by MonitorElement::isAccumulateEnabled(), and MonitorElement::setAccumulate().
|
static |
Definition at line 61 of file DQMNet.h.
Referenced by DQMImplNet< DQMNet::Object >::markObjectsDead(), and DQMImplNet< DQMNet::Object >::purgeDeadObjects().
|
static |
Definition at line 63 of file DQMNet.h.
Referenced by MonitorElement::isEfficiency(), DQMStore::save(), and MonitorElement::setEfficiencyFlag().
|
static |
Definition at line 53 of file DQMNet.h.
Referenced by DQMStore::book(), DQMStore::extract(), and MonitorElement::initialise().
|
static |
Definition at line 60 of file DQMNet.h.
Referenced by MonitorElement::getLumiFlag(), DQMStore::readFilePB(), and MonitorElement::setLumiFlag().
|
static |
Definition at line 64 of file DQMNet.h.
Referenced by MonitorElement::markedToDelete(), and MonitorElement::markToDelete().
|
static |
Definition at line 58 of file DQMNet.h.
Referenced by MonitorElement::MonitorElement(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), MonitorElement::update(), and MonitorElement::wasUpdated().
|
static |
|
static |
|
static |
|
static |
Definition at line 46 of file DQMNet.h.
Referenced by MonitorElement::hasError(), unpackQualityData(), and MonitorElement::updateQReportStats().
|
static |
|
static |
Definition at line 48 of file DQMNet.h.
Referenced by MonitorElement::hasOtherReport(), unpackQualityData(), and MonitorElement::updateQReportStats().
|
static |
Definition at line 47 of file DQMNet.h.
Referenced by MonitorElement::hasWarning(), unpackQualityData(), and MonitorElement::updateQReportStats().
|
static |
Definition at line 56 of file DQMNet.h.
Referenced by MonitorElement::resetMe(), and MonitorElement::setResetMe().
|
static |
|
static |
Definition at line 54 of file DQMNet.h.
Referenced by DQMStore::get(), DQMStore::getAllTags(), DQMStore::getContents(), MonitorElement::getTags(), DQMStore::save(), and DQMStore::tag().
|
static |
|
static |
|
static |
|
static |
Definition at line 25 of file DQMNet.h.
Referenced by MonitorElement::kind().
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
Definition at line 71 of file DQMNet.h.
Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
|
static |
Definition at line 72 of file DQMNet.h.
Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
|
protected |
|
private |
Definition at line 336 of file DQMNet.h.
Referenced by DQMNet(), run(), and startLocalServer().
|
private |
Definition at line 337 of file DQMNet.h.
Referenced by startLocalServer().
|
private |
Definition at line 346 of file DQMNet.h.
Referenced by shutdown().
|
private |
Definition at line 341 of file DQMNet.h.
Referenced by DQMNet(), listenToCollector(), and run().
|
private |
|
private |
Definition at line 349 of file DQMNet.h.
Referenced by run(), and staleObjectWaitLimit().
|
private |
Definition at line 338 of file DQMNet.h.
Referenced by DQMNet(), and sendLocalChanges().