|
|
#include <DQMNet.h>
|
virtual Peer * | createPeer (lat::Socket *s)=0 |
|
virtual Object * | findObject (Peer *p, const std::string &name, Peer **owner=nullptr)=0 |
|
virtual Peer * | getPeer (lat::Socket *s)=0 |
|
std::ostream & | logme () |
|
virtual Object * | makeObject (Peer *p, const std::string &name)=0 |
|
virtual void | markObjectsDead (Peer *p)=0 |
|
virtual bool | onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len) |
|
virtual void | purgeDeadObjects (Peer *p)=0 |
|
virtual void | releaseFromWait (Bucket *msg, WaitObject &w, Object *o) |
|
virtual void | removePeer (Peer *p, lat::Socket *s)=0 |
|
virtual void | sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0 |
|
virtual void | sendObjectListToPeers (bool all)=0 |
|
virtual void | sendObjectToPeer (Bucket *msg, Object &o, bool data) |
|
virtual bool | shouldStop () |
|
void | updateMask (Peer *p) |
|
virtual void | updatePeerMasks ()=0 |
|
void | waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner) |
|
Definition at line 26 of file DQMNet.h.
◆ DataBlob
◆ QReports
◆ QValue
◆ TagList
◆ WaitList
◆ DQMNet() [1/2]
DQMNet::DQMNet |
( |
const std::string & |
appname = "" | ) |
|
Definition at line 906 of file DQMNet.cc.
908 appname_(appname.empty() ?
"DQMNet" : appname.c_str()),
References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.
◆ ~DQMNet()
◆ DQMNet() [2/2]
DQMNet::DQMNet |
( |
const DQMNet & |
| ) |
|
|
delete |
◆ copydata()
void DQMNet::copydata |
( |
Bucket * |
b, |
|
|
const void * |
data, |
|
|
size_t |
len |
|
) |
| |
|
staticprotected |
◆ createPeer()
virtual Peer* DQMNet::createPeer |
( |
lat::Socket * |
s | ) |
|
|
protectedpure virtual |
◆ debug()
void DQMNet::debug |
( |
bool |
doit | ) |
|
◆ delay()
void DQMNet::delay |
( |
int |
delay | ) |
|
◆ discard()
void DQMNet::discard |
( |
Bucket *& |
b | ) |
|
|
staticprotected |
◆ dqmhash()
static size_t DQMNet::dqmhash |
( |
const void * |
key, |
|
|
size_t |
keylen |
|
) |
| |
|
inlinestatic |
Definition at line 194 of file DQMNet.h.
197 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
198 #define dqmhashmix(a, b, c) \
201 a ^= dqmhashrot(c, 4); \
204 b ^= dqmhashrot(a, 6); \
207 c ^= dqmhashrot(b, 8); \
210 a ^= dqmhashrot(c, 16); \
213 b ^= dqmhashrot(a, 19); \
216 c ^= dqmhashrot(b, 4); \
219 #define dqmhashfinal(a, b, c) \
222 c -= dqmhashrot(b, 14); \
224 a -= dqmhashrot(c, 11); \
226 b -= dqmhashrot(a, 25); \
228 c -= dqmhashrot(b, 16); \
230 a -= dqmhashrot(c, 4); \
232 b -= dqmhashrot(a, 14); \
234 c -= dqmhashrot(b, 24); \
238 a =
b =
c = 0xdeadbeef + (uint32_t)keylen;
239 const auto *
k = (
const unsigned char *)
key;
242 while (keylen > 12) {
244 a += ((uint32_t)
k[1]) << 8;
245 a += ((uint32_t)
k[2]) << 16;
246 a += ((uint32_t)
k[3]) << 24;
248 b += ((uint32_t)
k[5]) << 8;
249 b += ((uint32_t)
k[6]) << 16;
250 b += ((uint32_t)
k[7]) << 24;
252 c += ((uint32_t)
k[9]) << 8;
253 c += ((uint32_t)
k[10]) << 16;
254 c += ((uint32_t)
k[11]) << 24;
263 c += ((uint32_t)
k[11]) << 24;
266 c += ((uint32_t)
k[10]) << 16;
269 c += ((uint32_t)
k[9]) << 8;
275 b += ((uint32_t)
k[7]) << 24;
278 b += ((uint32_t)
k[6]) << 16;
281 b += ((uint32_t)
k[5]) << 8;
287 a += ((uint32_t)
k[3]) << 24;
290 a += ((uint32_t)
k[2]) << 16;
293 a += ((uint32_t)
k[1]) << 8;
References a, b, HltBtagPostValidation_cff::c, dqmhashfinal, dqmhashmix, dqmdumpme::k, and crabWrapper::key.
Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().
◆ findObject()
virtual Object* DQMNet::findObject |
( |
Peer * |
p, |
|
|
const std::string & |
name, |
|
|
Peer ** |
owner = nullptr |
|
) |
| |
|
protectedpure virtual |
◆ getPeer()
virtual Peer* DQMNet::getPeer |
( |
lat::Socket * |
s | ) |
|
|
protectedpure virtual |
◆ listenToCollector()
void DQMNet::listenToCollector |
( |
const std::string & |
host, |
|
|
int |
port |
|
) |
| |
◆ lock()
◆ logme()
std::ostream & DQMNet::logme |
( |
| ) |
|
|
protected |
◆ losePeer()
void DQMNet::losePeer |
( |
const char * |
reason, |
|
|
Peer * |
peer, |
|
|
lat::IOSelectEvent * |
event, |
|
|
lat::Error * |
err = nullptr |
|
) |
| |
|
private |
Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.
Definition at line 74 of file DQMNet.cc.
78 Socket *
s = peer->socket;
91 peer->automatic->peer =
nullptr;
References DQMNet::Peer::automatic, MillePedeFileConverter_cfg::e, submitPVResolutionJobs::err, ev, mps_fire::i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, PixelMapPlotter::reason, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.
◆ makeObject()
virtual Object* DQMNet::makeObject |
( |
Peer * |
p, |
|
|
const std::string & |
name |
|
) |
| |
|
protectedpure virtual |
◆ markObjectsDead()
virtual void DQMNet::markObjectsDead |
( |
Peer * |
p | ) |
|
|
protectedpure virtual |
◆ onLocalNotify()
bool DQMNet::onLocalNotify |
( |
lat::IOSelectEvent * |
ev | ) |
|
|
private |
React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.
Definition at line 856 of file DQMNet.cc.
860 unsigned char buf[1024];
861 while ((sz =
ev->source->read(
buf,
sizeof(
buf))))
864 auto *
next = dynamic_cast<SystemError *>(
e.next());
865 if (
next &&
next->portable() == SysErr::ErrTryAgain)
868 logme() <<
"WARNING: error reading from notification pipe: " <<
e.explain() << std::endl;
References visDQMUpload::buf, MillePedeFileConverter_cfg::e, ev, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.
Referenced by DQMNet().
◆ onMessage()
bool DQMNet::onMessage |
( |
Bucket * |
msg, |
|
|
Peer * |
p, |
|
|
unsigned char * |
data, |
|
|
size_t |
len |
|
) |
| |
|
protectedvirtual |
Definition at line 433 of file DQMNet.cc.
439 if (len != 2 *
sizeof(uint32_t)) {
440 logme() <<
"ERROR: corrupt 'UPDATE_ME' message of length " << len <<
" from peer " <<
p->peeraddr << std::endl;
445 logme() <<
"DEBUG: received message 'UPDATE ME' from peer " <<
p->peeraddr <<
", size " << len << std::endl;
453 logme() <<
"DEBUG: received message 'LIST OBJECTS' from peer " <<
p->peeraddr <<
", size " << len << std::endl;
462 logme() <<
"DEBUG: received message 'GET OBJECT' from peer " <<
p->peeraddr <<
", size " << len << std::endl;
464 if (len < 3 *
sizeof(uint32_t)) {
465 logme() <<
"ERROR: corrupt 'GET IMAGE' message of length " << len <<
" from peer " <<
p->peeraddr << std::endl;
470 memcpy(&namelen,
data + 2 *
sizeof(uint32_t),
sizeof(namelen));
471 if (len != 3 *
sizeof(uint32_t) + namelen) {
472 logme() <<
"ERROR: corrupt 'GET OBJECT' message of length " << len <<
" from peer " <<
p->peeraddr
473 <<
", expected length " << (3 *
sizeof(uint32_t)) <<
" + " << namelen << std::endl;
478 Peer *owner =
nullptr;
481 o->lastreq = Time::current().ns();
489 words[0] =
sizeof(words) +
name.size();
491 words[2] =
name.size();
493 msg->data.reserve(
msg->data.size() + words[0]);
501 if (len != 4 *
sizeof(uint32_t)) {
502 logme() <<
"ERROR: corrupt 'LIST BEGIN' message of length " << len <<
" from peer " <<
p->peeraddr << std::endl;
508 memcpy(&
flags,
data + 3 *
sizeof(uint32_t),
sizeof(uint32_t));
511 logme() <<
"DEBUG: received message 'LIST BEGIN " << (
flags ?
"FULL" :
"INCREMENTAL") <<
"' from "
512 <<
p->peeraddr <<
", size " << len << std::endl;
526 if (len != 4 *
sizeof(uint32_t)) {
527 logme() <<
"ERROR: corrupt 'LIST END' message of length " << len <<
" from peer " <<
p->peeraddr << std::endl;
533 memcpy(&
flags,
data + 3 *
sizeof(uint32_t),
sizeof(uint32_t));
543 logme() <<
"DEBUG: received message 'LIST END " << (
flags ?
"FULL" :
"INCREMENTAL") <<
"' from " <<
p->peeraddr
544 <<
", size " << len << std::endl;
555 if (len <
sizeof(words)) {
556 logme() <<
"ERROR: corrupt 'OBJECT' message of length " << len <<
" from peer " <<
p->peeraddr << std::endl;
560 memcpy(&words[0],
data,
sizeof(words));
561 uint32_t &namelen = words[6];
562 uint32_t &datalen = words[7];
563 uint32_t &qlen = words[8];
565 if (len !=
sizeof(words) + namelen + datalen + qlen) {
566 logme() <<
"ERROR: corrupt 'OBJECT' message of length " << len <<
" from peer " <<
p->peeraddr
567 <<
", expected length " <<
sizeof(words) <<
" + " << namelen <<
" + " << datalen <<
" + " << qlen
572 unsigned char *namedata =
data +
sizeof(words);
573 unsigned char *objdata = namedata + namelen;
574 unsigned char *qdata = objdata + datalen;
575 unsigned char *enddata = qdata + qlen;
580 logme() <<
"DEBUG: received message 'OBJECT " <<
name <<
"' from " <<
p->peeraddr <<
", size " << len
593 o->version = ((
uint64_t)words[4] << 32 | words[3]);
598 o->scalar.insert(
o->scalar.end(), objdata, qdata);
599 }
else if (datalen) {
601 o->rawdata.insert(
o->rawdata.end(), objdata, qdata);
602 }
else if (!
o->rawdata.empty())
604 o->qdata.insert(
o->qdata.end(), qdata, enddata);
619 if (len <
sizeof(words)) {
620 logme() <<
"ERROR: corrupt 'NONE' message of length " << len <<
" from peer " <<
p->peeraddr << std::endl;
624 memcpy(&words[0],
data,
sizeof(words));
625 uint32_t &namelen = words[2];
627 if (len !=
sizeof(words) + namelen) {
628 logme() <<
"ERROR: corrupt 'NONE' message of length " << len <<
" from peer " <<
p->peeraddr
629 <<
", expected length " <<
sizeof(words) <<
" + " << namelen << std::endl;
633 unsigned char *namedata =
data +
sizeof(words);
637 logme() <<
"DEBUG: received message 'NONE " <<
name <<
"' from " <<
p->peeraddr <<
", size " << len
655 logme() <<
"ERROR: unrecognised message of length " << len <<
" and type " <<
type <<
" from peer " <<
p->peeraddr
References cms::cuda::assert(), data, HLT_FULL_cff::flags, fileCollector::logme(), mps_check::msg, Skims_PA_cff::name, EcalTangentSkim_cfg::o, AlCaHLTBitMon_ParallelJobs::p, and AlCaHLTBitMon_QueryRunRegistry::string.
◆ onPeerConnect()
bool DQMNet::onPeerConnect |
( |
lat::IOSelectEvent * |
ev | ) |
|
|
private |
Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false
always to tell the IOSelector to keep processing events for the server socket.
Definition at line 811 of file DQMNet.cc.
824 if (
auto *inet = dynamic_cast<InetSocket *>(
s)) {
825 InetAddress peeraddr = inet->peername();
826 InetAddress myaddr = inet->sockname();
827 p->peeraddr = StringFormat(
"%1:%2").arg(peeraddr.hostname()).
arg(peeraddr.port()).
value();
828 localaddr = StringFormat(
"%1:%2").arg(myaddr.hostname()).
arg(myaddr.port()).
value();
829 }
else if (
auto *
local = dynamic_cast<LocalSocket *>(
s)) {
830 p->peeraddr =
local->peername().path();
831 localaddr =
local->sockname().path();
840 logme() <<
"INFO: new peer " <<
p->peeraddr <<
" is now connected to " << localaddr << std::endl;
References cms::cuda::assert(), ev, IORead, IOUrgent, DTRecHitClients_cfi::local, CommonMethods::lock(), fileCollector::logme(), onPeerData(), AlCaHLTBitMon_ParallelJobs::p, alignCSCRings::s, AlCaHLTBitMon_QueryRunRegistry::string, and relativeConstraints::value.
Referenced by startLocalServer().
◆ onPeerData()
bool DQMNet::onPeerData |
( |
lat::IOSelectEvent * |
ev, |
|
|
Peer * |
p |
|
) |
| |
|
private |
Handle communication to a particular client.
Definition at line 663 of file DQMNet.cc.
673 logme() <<
"WARNING: connection to the DQM server at " <<
p->peeraddr
674 <<
" lost (will attempt to reconnect in 15 seconds)\n";
685 while (Bucket *
b =
p->sendq) {
686 IOSize len =
b->data.size() -
p->sendpos;
687 const void *
data = (len ? (
const void *)&
b->data[
p->sendpos] : (
const void *)&
data);
691 done = (len ?
ev->source->write(
data, len) : 0);
693 logme() <<
"DEBUG: sent " <<
done <<
" bytes to peer " <<
p->peeraddr << std::endl;
695 losePeer(
"WARNING: unable to write to peer ",
p,
ev, &
e);
701 if (
p->sendpos ==
b->data.size()) {
702 Bucket *old =
p->sendq;
703 p->sendq = old->next;
725 if ((sz =
ev->source->read(&
buf[0],
buf.size()))) {
727 logme() <<
"DEBUG: received " << sz <<
" bytes from peer " <<
p->peeraddr << std::endl;
729 if (
data.capacity() <
data.size() + sz)
733 while (sz ==
sizeof(
buf));
735 auto *
next = dynamic_cast<SystemError *>(
e.next());
736 if (
next &&
next->portable() == SysErr::ErrTryAgain)
740 losePeer(
"WARNING: failed to read from peer ",
p,
ev, &
e);
751 memcpy(&msglen, &
data[0] + consumed,
sizeof(msglen));
754 losePeer(
"WARNING: excessively large message from ",
p,
ev);
759 if (
data.size() - consumed >= msglen) {
761 if (msglen < 2 *
sizeof(uint32_t)) {
762 logme() <<
"ERROR: corrupt peer message of length " << msglen <<
" from peer " <<
p->peeraddr << std::endl;
771 if (!
msg.data.empty()) {
772 Bucket **prev = &
p->sendq;
774 prev = &(*prev)->next;
777 (*prev)->next =
nullptr;
778 (*prev)->data.swap(
msg.data);
783 losePeer(
"WARNING: data stream error with ",
p,
ev);
References cms::cuda::assert(), b, visDQMUpload::buf, data, DQMNet::Bucket::data, fileCollector::done, MillePedeFileConverter_cfg::e, ev, IORead, IOUrgent, IOWrite, CommonMethods::lock(), fileCollector::logme(), MESSAGE_SIZE_LIMIT, mps_check::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, AlCaHLTBitMon_ParallelJobs::p, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, and RunInfoPI::valid.
Referenced by onPeerConnect(), and run().
◆ operator=()
◆ packQualityData()
void DQMNet::packQualityData |
( |
std::string & |
into, |
|
|
const QReports & |
qr |
|
) |
| |
|
static |
Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.
Definition at line 158 of file DQMNet.cc.
160 std::ostringstream qrs;
161 QReports::const_iterator qi, qe;
162 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
164 sprintf(
buf,
"%d%c%n%.*g", qi->code, 0, &
pos, DBL_DIG + 2, qi->qtresult);
165 qrs <<
buf <<
'\0' <<
buf +
pos <<
'\0' << qi->qtname <<
'\0' << qi->algorithm <<
'\0' << qi->message <<
'\0'
References visDQMUpload::buf.
Referenced by DQMService::flushStandalone(), and dqm::impl::MonitorElement::packQualityData().
◆ purgeDeadObjects()
virtual void DQMNet::purgeDeadObjects |
( |
Peer * |
p | ) |
|
|
protectedpure virtual |
◆ releaseFromWait() [1/2]
◆ releaseFromWait() [2/2]
void DQMNet::releaseFromWait |
( |
WaitList::iterator |
i, |
|
|
Object * |
o |
|
) |
| |
|
private |
◆ releaseWaiters()
void DQMNet::releaseWaiters |
( |
const std::string & |
name, |
|
|
Object * |
o |
|
) |
| |
|
private |
◆ removePeer()
virtual void DQMNet::removePeer |
( |
Peer * |
p, |
|
|
lat::Socket * |
s |
|
) |
| |
|
protectedpure virtual |
◆ requestObjectData()
void DQMNet::requestObjectData |
( |
Peer * |
p, |
|
|
const char * |
name, |
|
|
size_t |
len |
|
) |
| |
|
private |
◆ run()
Run the actual I/O processing loop.
Definition at line 1083 of file DQMNet.cc.
1090 for (
auto ap : automatic) {
1094 if (!ap->host.empty() && !ap->peer && (
now = Time::current()) > ap->next) {
1095 ap->
next =
now + TimeSpan(0, 0, 0, 15 , 0);
1096 InetSocket *
s =
nullptr;
1098 InetAddress
addr(ap->host.c_str(), ap->port);
1099 s =
new InetSocket(SOCK_STREAM, 0,
addr.family());
1100 s->setBlocking(
false);
1105 auto *sys = dynamic_cast<SystemError *>(
e.next());
1106 if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1123 InetAddress peeraddr = ((InetSocket *)
s)->peername();
1124 InetAddress myaddr = ((InetSocket *)
s)->sockname();
1125 p->peeraddr = StringFormat(
"%1:%2").arg(peeraddr.hostname()).
arg(peeraddr.port()).
value();
1127 p->update = ap->update;
1133 p->sendq =
new Bucket;
1134 p->sendq->next =
nullptr;
1135 copydata(
p->sendq, words,
sizeof(words));
1140 logme() <<
"INFO: now connected to " <<
p->peeraddr <<
" from " << myaddr.hostname() <<
":" << myaddr.port()
1148 now = Time::current();
1155 nextFlush =
now + TimeSpan(0, 0, 0, 15 , 0);
1174 if (
i->time < waitold) {
1175 logme() <<
"WARNING: source not responding in " << (
waitMax_.ns() * 1
e-9) <<
"s to retrieval, releasing '"
1176 <<
i->name <<
"' from wait, have " << (
o ?
o->rawdata.size() : 0) <<
" data available\n";
1179 logme() <<
"WARNING: source not responding in " << (
waitStale_.ns() * 1
e-9) <<
"s to update, releasing '"
1180 <<
i->name <<
"' from wait, have " <<
o->rawdata.size() <<
" data available\n";
References generateTowerEtThresholdLUT::addr, copydata(), createPeer(), debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, MillePedeFileConverter_cfg::e, findObject(), flush_, mps_fire::i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::AutoPeer::next, submitPVValidationJobs::now, EcalTangentSkim_cfg::o, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), shouldStop(), SOCKET_BUF_SIZE, RecoSummaryTask_cfi::Time, unlock(), updatePeerMasks(), upstream_, relativeConstraints::value, waiting_, waitMax_, and waitStale_.
Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().
◆ sendLocalChanges()
void DQMNet::sendLocalChanges |
( |
| ) |
|
◆ sendObjectListToPeer()
virtual void DQMNet::sendObjectListToPeer |
( |
Bucket * |
msg, |
|
|
bool |
all, |
|
|
bool |
clear |
|
) |
| |
|
protectedpure virtual |
◆ sendObjectListToPeers()
virtual void DQMNet::sendObjectListToPeers |
( |
bool |
all | ) |
|
|
protectedpure virtual |
◆ sendObjectToPeer()
void DQMNet::sendObjectToPeer |
( |
Bucket * |
msg, |
|
|
Object & |
o, |
|
|
bool |
data |
|
) |
| |
|
protectedvirtual |
Definition at line 390 of file DQMNet.cc.
395 objdata.insert(objdata.end(), &
o.scalar[0], &
o.scalar[0] +
o.scalar.size());
397 objdata.insert(objdata.end(), &
o.rawdata[0], &
o.rawdata[0] +
o.rawdata.size());
400 uint32_t namelen =
o.dirname.size() +
o.objname.size() + 1;
401 uint32_t datalen = objdata.size();
402 uint32_t qlen =
o.qdata.size();
404 if (
o.dirname.empty())
407 words[0] = 9 *
sizeof(uint32_t) + namelen + datalen + qlen;
410 words[3] = (
o.version >> 0) & 0xffffffff;
411 words[4] = (
o.version >> 32) & 0xffffffff;
417 msg->data.reserve(
msg->data.size() + words[0]);
421 if (!
o.dirname.empty())
References data, HLT_FULL_cff::flags, mps_check::msg, and EcalTangentSkim_cfg::o.
Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
◆ setOrder()
Definition at line 165 of file DQMNet.h.
166 if (
a.run ==
b.run) {
167 if (
a.lumi ==
b.lumi) {
168 if (
a.streamId ==
b.streamId) {
169 if (
a.moduleId ==
b.moduleId) {
170 if (
a.dirname ==
b.dirname) {
171 return a.objname <
b.objname;
173 return a.dirname <
b.dirname;
175 return a.moduleId <
b.moduleId;
177 return a.streamId <
b.streamId;
179 return a.lumi <
b.lumi;
181 return a.run <
b.run;
References a, and b.
Referenced by dqm::impl::MonitorElement::operator<().
◆ shouldStop()
bool DQMNet::shouldStop |
( |
| ) |
|
|
protectedvirtual |
◆ shutdown()
void DQMNet::shutdown |
( |
| ) |
|
◆ staleObjectWaitLimit()
void DQMNet::staleObjectWaitLimit |
( |
lat::TimeSpan |
time | ) |
|
Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.
Definition at line 947 of file DQMNet.cc.
References ntuplemaker::time, and waitStale_.
◆ start()
◆ startLocalServer() [1/2]
void DQMNet::startLocalServer |
( |
const char * |
path | ) |
|
◆ startLocalServer() [2/2]
void DQMNet::startLocalServer |
( |
int |
port | ) |
|
Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.
Definition at line 952 of file DQMNet.cc.
954 logme() <<
"ERROR: DQM server was already started.\n";
960 auto *
s =
new InetSocket(SOCK_STREAM, 0,
addr.family());
965 s->setBlocking(
false);
970 logme() <<
"ERROR: Failed to start server at port " <<
port <<
": " <<
e.explain() << std::endl;
972 throw cms::Exception(
"DQMNet::startLocalServer") <<
"Failed to start server at port " <<
974 port <<
": " <<
e.explain().c_str();
977 logme() <<
"INFO: DQM server started at port " <<
port << std::endl;
References generateTowerEtThresholdLUT::addr, MillePedeFileConverter_cfg::e, Exception, IOAccept, logme(), onPeerConnect(), query::port, alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.
◆ unlock()
◆ unpackQualityData()
void DQMNet::unpackQualityData |
( |
QReports & |
qr, |
|
|
uint32_t & |
flags, |
|
|
const char * |
from |
|
) |
| |
|
static |
Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().
Definition at line 173 of file DQMNet.cc.
174 const char *qdata = from;
204 qv.
code = atoi(qdata);
References MonitorElementData::QReport::QValue::algorithm, MonitorElementData::QReport::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, HLT_FULL_cff::flags, MonitorElementData::QReport::QValue::message, MonitorElementData::QReport::QValue::qtname, MonitorElementData::QReport::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.
◆ updateMask()
void DQMNet::updateMask |
( |
Peer * |
p | ) |
|
|
protected |
◆ updatePeerMasks()
virtual void DQMNet::updatePeerMasks |
( |
| ) |
|
|
protectedpure virtual |
◆ updateToCollector()
void DQMNet::updateToCollector |
( |
const std::string & |
host, |
|
|
int |
port |
|
) |
| |
◆ waitForData()
void DQMNet::waitForData |
( |
Peer * |
p, |
|
|
const std::string & |
name, |
|
|
const std::string & |
info, |
|
|
Peer * |
owner |
|
) |
| |
|
protected |
◆ appname_
std::string DQMNet::appname_ |
|
private |
◆ communicate_
pthread_t DQMNet::communicate_ |
|
private |
◆ debug_
◆ delay_
◆ downstream_
◆ DQM_MSG_GET_OBJECT
const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3 |
|
static |
◆ DQM_MSG_HELLO
const uint32_t DQMNet::DQM_MSG_HELLO = 0 |
|
static |
◆ DQM_MSG_LIST_OBJECTS
const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2 |
|
static |
◆ DQM_MSG_UPDATE_ME
const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1 |
|
static |
◆ DQM_PROP_ACCUMULATE
const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000 |
|
static |
◆ DQM_PROP_DEAD
const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000 |
|
static |
◆ DQM_PROP_EFFICIENCY_PLOT
const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000 |
|
static |
◆ DQM_PROP_HAS_REFERENCE
const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000 |
|
static |
◆ DQM_PROP_LUMI
const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000 |
|
static |
◆ DQM_PROP_MARKTODELETE
const uint32_t DQMNet::DQM_PROP_MARKTODELETE = 0x01000000 |
|
static |
◆ DQM_PROP_NEW
const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000 |
|
static |
◆ DQM_PROP_RECEIVED
const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000 |
|
static |
◆ DQM_PROP_REPORT_ALARM
◆ DQM_PROP_REPORT_CLEAR
const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000 |
|
static |
◆ DQM_PROP_REPORT_ERROR
const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100 |
|
static |
◆ DQM_PROP_REPORT_MASK
const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00 |
|
static |
◆ DQM_PROP_REPORT_OTHER
const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400 |
|
static |
◆ DQM_PROP_REPORT_WARN
const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200 |
|
static |
◆ DQM_PROP_RESET
const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000 |
|
static |
◆ DQM_PROP_STALE
const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000 |
|
static |
◆ DQM_PROP_TAGGED
const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000 |
|
static |
◆ DQM_PROP_TYPE_DATABLOB
const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050 |
|
static |
◆ DQM_PROP_TYPE_INT
const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001 |
|
static |
◆ DQM_PROP_TYPE_INVALID
const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000 |
|
static |
◆ DQM_PROP_TYPE_MASK
const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff |
|
static |
◆ DQM_PROP_TYPE_REAL
const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002 |
|
static |
◆ DQM_PROP_TYPE_SCALAR
const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f |
|
static |
◆ DQM_PROP_TYPE_STRING
const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003 |
|
static |
◆ DQM_PROP_TYPE_TH1D
const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012 |
|
static |
◆ DQM_PROP_TYPE_TH1F
const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010 |
|
static |
◆ DQM_PROP_TYPE_TH1S
const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011 |
|
static |
◆ DQM_PROP_TYPE_TH2D
const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022 |
|
static |
◆ DQM_PROP_TYPE_TH2F
const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020 |
|
static |
◆ DQM_PROP_TYPE_TH2S
const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021 |
|
static |
◆ DQM_PROP_TYPE_TH3D
const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032 |
|
static |
◆ DQM_PROP_TYPE_TH3F
const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030 |
|
static |
◆ DQM_PROP_TYPE_TH3S
const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031 |
|
static |
◆ DQM_PROP_TYPE_TPROF
const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040 |
|
static |
◆ DQM_PROP_TYPE_TPROF2D
const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041 |
|
static |
◆ DQM_REPLY_LIST_BEGIN
const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101 |
|
static |
◆ DQM_REPLY_LIST_END
const uint32_t DQMNet::DQM_REPLY_LIST_END = 102 |
|
static |
◆ DQM_REPLY_NONE
const uint32_t DQMNet::DQM_REPLY_NONE = 103 |
|
static |
◆ DQM_REPLY_OBJECT
const uint32_t DQMNet::DQM_REPLY_OBJECT = 104 |
|
static |
◆ flush_
◆ lock_
pthread_mutex_t DQMNet::lock_ |
|
protected |
◆ MAX_PEER_WAITREQS
const uint32_t DQMNet::MAX_PEER_WAITREQS = 128 |
|
static |
◆ pid_
◆ sel_
lat::IOSelector DQMNet::sel_ |
|
private |
◆ server_
lat::Socket* DQMNet::server_ |
|
private |
◆ shutdown_
sig_atomic_t DQMNet::shutdown_ |
|
private |
◆ upstream_
◆ version_
lat::Time DQMNet::version_ |
|
private |
◆ waiting_
◆ waitMax_
lat::TimeSpan DQMNet::waitMax_ |
|
private |
◆ waitStale_
lat::TimeSpan DQMNet::waitStale_ |
|
private |
◆ wakeup_
lat::Pipe DQMNet::wakeup_ |
|
private |
virtual void updatePeerMasks()=0
virtual Peer * getPeer(lat::Socket *s)=0
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
#define dqmhashmix(a, b, c)
static const uint32_t DQM_PROP_STALE
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
static const uint32_t DQM_REPLY_NONE
static const uint32_t DQM_REPLY_OBJECT
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_MSG_UPDATE_ME
static const uint32_t DQM_PROP_RECEIVED
void releaseWaiters(const std::string &name, Object *o)
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void unlock()
Release the lock on the DQM net layer.
virtual Object * makeObject(Peer *p, const std::string &name)=0
static const uint32_t DQM_PROP_DEAD
virtual bool shouldStop()
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
#define MESSAGE_SIZE_LIMIT
static const uint32_t DQM_PROP_TYPE_SCALAR
virtual void sendObjectListToPeers(bool all)=0
bool onPeerConnect(lat::IOSelectEvent *ev)
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_PROP_NEW
static void * communicate(void *obj)
static const uint32_t DQM_PROP_TYPE_MASK
static void discard(Bucket *&b)
static const uint32_t DQM_REPLY_LIST_BEGIN
std::vector< unsigned char > DataBlob
#define dqmhashfinal(a, b, c)
static const uint32_t DQM_PROP_REPORT_ERROR
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
static const uint32_t DQM_PROP_REPORT_OTHER
virtual void markObjectsDead(Peer *p)=0
#define SOCKET_READ_GROWTH
virtual void removePeer(Peer *p, lat::Socket *s)=0
edm::ErrorSummaryEntry Error
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
void lock()
Acquire a lock on the DQM net layer.
virtual Peer * createPeer(lat::Socket *s)=0
static const uint32_t DQM_MSG_LIST_OBJECTS
static const int STATUS_OK
char data[epos_bytes_allocation]
unsigned long long uint64_t
static const uint32_t DQM_MSG_GET_OBJECT
bool onLocalNotify(lat::IOSelectEvent *ev)
static void copydata(Bucket *b, const void *data, size_t len)
static const uint32_t MAX_PEER_WAITREQS
virtual void purgeDeadObjects(Peer *p)=0