CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes
DQMNet Class Referenceabstract

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

Classes

struct  AutoPeer
 
struct  Bucket
 
struct  CoreObject
 
struct  HashEqual
 
struct  HashOp
 
struct  Object
 
struct  Peer
 
struct  WaitObject
 

Public Types

using DataBlob = std::vector< unsigned char >
 
using QReports = std::vector< QValue >
 
using QValue = MonitorElementData::QReport::QValue
 
using TagList = std::vector< uint32_t >
 
using WaitList = std::list< WaitObject >
 

Public Member Functions

void debug (bool doit)
 
void delay (int delay)
 
 DQMNet (const DQMNet &)=delete
 
 DQMNet (const std::string &appname="")
 
void listenToCollector (const std::string &host, int port)
 
void lock ()
 Acquire a lock on the DQM net layer. More...
 
DQMNetoperator= (const DQMNet &)=delete
 
void run ()
 
void sendLocalChanges ()
 
void shutdown ()
 Stop the network layer and wait it to finish. More...
 
void staleObjectWaitLimit (lat::TimeSpan time)
 
void start ()
 
void startLocalServer (const char *path)
 
void startLocalServer (int port)
 
void unlock ()
 Release the lock on the DQM net layer. More...
 
void updateToCollector (const std::string &host, int port)
 
virtual ~DQMNet ()
 

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
 
static void packQualityData (std::string &into, const QReports &qr)
 
static bool setOrder (const CoreObject &a, const CoreObject &b)
 
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)
 

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
 
static const uint32_t DQM_MSG_HELLO = 0
 
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
 
static const uint32_t DQM_MSG_UPDATE_ME = 1
 
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
 
static const uint32_t DQM_PROP_DEAD = 0x00080000
 
static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000
 
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
 
static const uint32_t DQM_PROP_LUMI = 0x00040000
 
static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000
 
static const uint32_t DQM_PROP_NEW = 0x00010000
 
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
 
static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
 
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
 
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
 
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
 
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
 
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
 
static const uint32_t DQM_PROP_RESET = 0x00008000
 
static const uint32_t DQM_PROP_STALE = 0x00100000
 
static const uint32_t DQM_PROP_TAGGED = 0x00002000
 
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
 
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
 
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
 
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
 
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
 
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
 
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
 
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
 
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
 
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
 
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
 
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
 
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
 
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
 
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
 
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
 
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
 
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
 
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
 
static const uint32_t DQM_REPLY_LIST_END = 102
 
static const uint32_t DQM_REPLY_NONE = 103
 
static const uint32_t DQM_REPLY_OBJECT = 104
 
static const uint32_t MAX_PEER_WAITREQS = 128
 

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
 
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=nullptr)=0
 
virtual PeergetPeer (lat::Socket *s)=0
 
std::ostream & logme ()
 
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
 
virtual void markObjectsDead (Peer *p)=0
 
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
 
virtual void purgeDeadObjects (Peer *p)=0
 
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
 
virtual void removePeer (Peer *p, lat::Socket *s)=0
 
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
 
virtual void sendObjectListToPeers (bool all)=0
 
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
 
virtual bool shouldStop ()
 
void updateMask (Peer *p)
 
virtual void updatePeerMasks ()=0
 
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
 
static void discard (Bucket *&b)
 

Protected Attributes

bool debug_
 
pthread_mutex_t lock_
 

Private Member Functions

void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
 
bool onLocalNotify (lat::IOSelectEvent *ev)
 
bool onPeerConnect (lat::IOSelectEvent *ev)
 
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client. More...
 
void releaseFromWait (WaitList::iterator i, Object *o)
 
void releaseWaiters (const std::string &name, Object *o)
 
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server. More...
 

Private Attributes

std::string appname_
 
pthread_t communicate_
 
int delay_
 
AutoPeer downstream_
 
bool flush_
 
int pid_
 
lat::IOSelector sel_
 
lat::Socket * server_
 
sig_atomic_t shutdown_
 
AutoPeer upstream_
 
lat::Time version_
 
WaitList waiting_
 
lat::TimeSpan waitMax_
 
lat::TimeSpan waitStale_
 
lat::Pipe wakeup_
 

Detailed Description

Definition at line 26 of file DQMNet.h.

Member Typedef Documentation

◆ DataBlob

using DQMNet::DataBlob = std::vector<unsigned char>

Definition at line 83 of file DQMNet.h.

◆ QReports

using DQMNet::QReports = std::vector<QValue>

Definition at line 84 of file DQMNet.h.

◆ QValue

Definition at line 82 of file DQMNet.h.

◆ TagList

using DQMNet::TagList = std::vector<uint32_t>

Definition at line 85 of file DQMNet.h.

◆ WaitList

Definition at line 86 of file DQMNet.h.

Constructor & Destructor Documentation

◆ DQMNet() [1/2]

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 906 of file DQMNet.cc.

907  : debug_(false),
908  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
909  pid_(getpid()),
910  server_(nullptr),
911  version_(Time::current()),
912  communicate_((pthread_t)-1),
913  shutdown_(0),
914  delay_(1000),
915  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
916  waitMax_(0, 0, 0, 5 /* seconds */, 0),
917  flush_(false) {
918  // Create a pipe for the local DQM to tell the communicator
919  // thread that local DQM data has changed and that the peers
920  // should be notified.
921  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
922  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
923 
924  // Initialise the upstream and downstream to empty.
925  upstream_.peer = downstream_.peer = nullptr;
929 }

References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

◆ ~DQMNet()

DQMNet::~DQMNet ( )
virtual

Definition at line 931 of file DQMNet.cc.

931  {
932  // FIXME
933 }

◆ DQMNet() [2/2]

DQMNet::DQMNet ( const DQMNet )
delete

Member Function Documentation

◆ copydata()

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
)
staticprotected

Definition at line 57 of file DQMNet.cc.

57  {
58  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
59 }

References b, and data.

Referenced by run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ createPeer()

virtual Peer* DQMNet::createPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ debug()

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 937 of file DQMNet.cc.

937 { debug_ = doit; }

References debug_.

Referenced by DQMService::DQMService().

◆ delay()

void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 941 of file DQMNet.cc.

941 { delay_ = delay; }

References delay_.

◆ discard()

void DQMNet::discard ( Bucket *&  b)
staticprotected

Definition at line 62 of file DQMNet.cc.

62  {
63  while (b) {
64  Bucket *next = b->next;
65  delete b;
66  b = next;
67  }
68 }

References b, and GetRecoTauVFromDQM_MC_cff::next.

Referenced by OrderedSet.OrderedSet::pop().

◆ dqmhash()

static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
)
inlinestatic

Definition at line 194 of file DQMNet.h.

194  {
195  // Reduced version of Bob Jenkins' hash function at:
196  // http://www.burtleburtle.net/bob/c/lookup3.c
197 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
198 #define dqmhashmix(a, b, c) \
199  { \
200  a -= c; \
201  a ^= dqmhashrot(c, 4); \
202  c += b; \
203  b -= a; \
204  b ^= dqmhashrot(a, 6); \
205  a += c; \
206  c -= b; \
207  c ^= dqmhashrot(b, 8); \
208  b += a; \
209  a -= c; \
210  a ^= dqmhashrot(c, 16); \
211  c += b; \
212  b -= a; \
213  b ^= dqmhashrot(a, 19); \
214  a += c; \
215  c -= b; \
216  c ^= dqmhashrot(b, 4); \
217  b += a; \
218  }
219 #define dqmhashfinal(a, b, c) \
220  { \
221  c ^= b; \
222  c -= dqmhashrot(b, 14); \
223  a ^= c; \
224  a -= dqmhashrot(c, 11); \
225  b ^= a; \
226  b -= dqmhashrot(a, 25); \
227  c ^= b; \
228  c -= dqmhashrot(b, 16); \
229  a ^= c; \
230  a -= dqmhashrot(c, 4); \
231  b ^= a; \
232  b -= dqmhashrot(a, 14); \
233  c ^= b; \
234  c -= dqmhashrot(b, 24); \
235  }
236 
237  uint32_t a, b, c;
238  a = b = c = 0xdeadbeef + (uint32_t)keylen;
239  const auto *k = (const unsigned char *)key;
240 
241  // all but the last block: affect some bits of (a, b, c)
242  while (keylen > 12) {
243  a += k[0];
244  a += ((uint32_t)k[1]) << 8;
245  a += ((uint32_t)k[2]) << 16;
246  a += ((uint32_t)k[3]) << 24;
247  b += k[4];
248  b += ((uint32_t)k[5]) << 8;
249  b += ((uint32_t)k[6]) << 16;
250  b += ((uint32_t)k[7]) << 24;
251  c += k[8];
252  c += ((uint32_t)k[9]) << 8;
253  c += ((uint32_t)k[10]) << 16;
254  c += ((uint32_t)k[11]) << 24;
255  dqmhashmix(a, b, c);
256  keylen -= 12;
257  k += 12;
258  }
259 
260  // last block: affect all 32 bits of (c); all case statements fall through
261  switch (keylen) {
262  case 12:
263  c += ((uint32_t)k[11]) << 24;
264  [[fallthrough]];
265  case 11:
266  c += ((uint32_t)k[10]) << 16;
267  [[fallthrough]];
268  case 10:
269  c += ((uint32_t)k[9]) << 8;
270  [[fallthrough]];
271  case 9:
272  c += k[8];
273  [[fallthrough]];
274  case 8:
275  b += ((uint32_t)k[7]) << 24;
276  [[fallthrough]];
277  case 7:
278  b += ((uint32_t)k[6]) << 16;
279  [[fallthrough]];
280  case 6:
281  b += ((uint32_t)k[5]) << 8;
282  [[fallthrough]];
283  case 5:
284  b += k[4];
285  [[fallthrough]];
286  case 4:
287  a += ((uint32_t)k[3]) << 24;
288  [[fallthrough]];
289  case 3:
290  a += ((uint32_t)k[2]) << 16;
291  [[fallthrough]];
292  case 2:
293  a += ((uint32_t)k[1]) << 8;
294  [[fallthrough]];
295  case 1:
296  a += k[0];
297  break;
298  case 0:
299  return c;
300  }
301 
302  dqmhashfinal(a, b, c);
303  return c;
304 #undef dqmhashrot
305 #undef dqmhashmix
306 #undef dqmhashfinal
307  }

References a, b, HltBtagPostValidation_cff::c, dqmhashfinal, dqmhashmix, dqmdumpme::k, and crabWrapper::key.

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().

◆ findObject()

virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = nullptr 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ getPeer()

virtual Peer* DQMNet::getPeer ( lat::Socket *  s)
protectedpure virtual

◆ listenToCollector()

void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1025 of file DQMNet.cc.

1025  {
1026  if (!upstream_.host.empty()) {
1027  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1028  << std::endl;
1029  return;
1030  }
1031 
1032  upstream_.update = false;
1033  upstream_.host = host;
1034  upstream_.port = port;
1035 }

References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

◆ lock()

void DQMNet::lock ( )

Acquire a lock on the DQM net layer.

Definition at line 1058 of file DQMNet.cc.

1058  {
1059  if (communicate_ != (pthread_t)-1)
1060  pthread_mutex_lock(&lock_);
1061 }

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

◆ logme()

std::ostream & DQMNet::logme ( )
protected

Definition at line 50 of file DQMNet.cc.

50  {
51  Time now = Time::current();
52  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
53  << "]: ";
54 }

References gather_cfg::cout, fileCollector::now, and RecoSummaryTask_cfi::Time.

Referenced by listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

◆ losePeer()

void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = nullptr 
)
private

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 74 of file DQMNet.cc.

74  {
75  if (reason)
76  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
77 
78  Socket *s = peer->socket;
79 
80  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
81  if (i->peer == peer)
82  waiting_.erase(i++);
83  else
84  ++i;
85 
86  if (ev)
87  ev->source = nullptr;
88 
89  discard(peer->sendq);
90  if (peer->automatic)
91  peer->automatic->peer = nullptr;
92 
93  sel_.detach(s);
94  s->close();
95  removePeer(peer, s);
96  delete s;
97 }

References DQMNet::Peer::automatic, MillePedeFileConverter_cfg::e, runTheMatrix::err, ev, mps_fire::i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, PixelMapPlotter::reason, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.

◆ makeObject()

virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
)
protectedpure virtual

◆ markObjectsDead()

virtual void DQMNet::markObjectsDead ( Peer p)
protectedpure virtual

◆ onLocalNotify()

bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev)
private

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 856 of file DQMNet.cc.

856  {
857  // Discard the data in the pipe, we care only about the wakeup.
858  try {
859  IOSize sz;
860  unsigned char buf[1024];
861  while ((sz = ev->source->read(buf, sizeof(buf))))
862  ;
863  } catch (Error &e) {
864  auto *next = dynamic_cast<SystemError *>(e.next());
865  if (next && next->portable() == SysErr::ErrTryAgain)
866  ; // Ignore it
867  else
868  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
869  }
870 
871  // Tell the main event pump to send an update in a little while.
872  flush_ = true;
873 
874  // We are never done, always keep going.
875  return false;
876 }

References visDQMUpload::buf, MillePedeFileConverter_cfg::e, ev, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.

Referenced by DQMNet().

◆ onMessage()

bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
)
protectedvirtual

Definition at line 433 of file DQMNet.cc.

433  {
434  // Decode and process this message.
435  uint32_t type;
436  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
437  switch (type) {
438  case DQM_MSG_UPDATE_ME: {
439  if (len != 2 * sizeof(uint32_t)) {
440  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
441  return false;
442  }
443 
444  if (debug_)
445  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
446 
447  p->update = true;
448  }
449  return true;
450 
451  case DQM_MSG_LIST_OBJECTS: {
452  if (debug_)
453  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
454 
455  // Send over current status: list of known objects.
456  sendObjectListToPeer(msg, true, false);
457  }
458  return true;
459 
460  case DQM_MSG_GET_OBJECT: {
461  if (debug_)
462  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
463 
464  if (len < 3 * sizeof(uint32_t)) {
465  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
466  return false;
467  }
468 
469  uint32_t namelen;
470  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
471  if (len != 3 * sizeof(uint32_t) + namelen) {
472  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
473  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
474  return false;
475  }
476 
477  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
478  Peer *owner = nullptr;
479  Object *o = findObject(nullptr, name, &owner);
480  if (o) {
481  o->lastreq = Time::current().ns();
482  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
484  waitForData(p, name, "", owner);
485  else
486  sendObjectToPeer(msg, *o, true);
487  } else {
488  uint32_t words[3];
489  words[0] = sizeof(words) + name.size();
490  words[1] = DQM_REPLY_NONE;
491  words[2] = name.size();
492 
493  msg->data.reserve(msg->data.size() + words[0]);
494  copydata(msg, &words[0], sizeof(words));
495  copydata(msg, &name[0], name.size());
496  }
497  }
498  return true;
499 
500  case DQM_REPLY_LIST_BEGIN: {
501  if (len != 4 * sizeof(uint32_t)) {
502  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
503  return false;
504  }
505 
506  // Get the update status: whether this is a full update.
507  uint32_t flags;
508  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
509 
510  if (debug_)
511  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
512  << p->peeraddr << ", size " << len << std::endl;
513 
514  // If we are about to receive a full list of objects, flag all
515  // objects as possibly dead. Subsequent object notifications
516  // will undo this for the live objects. We cannot delete
517  // objects quite yet, as we may get inquiry from another client
518  // while we are processing the incoming list, so we keep the
519  // objects tentatively alive as long as we've not seen the end.
520  if (flags)
522  }
523  return true;
524 
525  case DQM_REPLY_LIST_END: {
526  if (len != 4 * sizeof(uint32_t)) {
527  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
528  return false;
529  }
530 
531  // Get the update status: whether this is a full update.
532  uint32_t flags;
533  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
534 
535  // If we received a full list of objects, now purge all dead
536  // objects. We need to do this in two stages in case we receive
537  // updates in many parts, and end up sending updates to others in
538  // between; this avoids us lying live objects are dead.
539  if (flags)
541 
542  if (debug_)
543  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
544  << ", size " << len << std::endl;
545 
546  // Indicate we have received another update from this peer.
547  // Also indicate we should flush to our clients.
548  flush_ = true;
549  p->updates++;
550  }
551  return true;
552 
553  case DQM_REPLY_OBJECT: {
554  uint32_t words[9];
555  if (len < sizeof(words)) {
556  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
557  return false;
558  }
559 
560  memcpy(&words[0], data, sizeof(words));
561  uint32_t &namelen = words[6];
562  uint32_t &datalen = words[7];
563  uint32_t &qlen = words[8];
564 
565  if (len != sizeof(words) + namelen + datalen + qlen) {
566  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
567  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
568  << std::endl;
569  return false;
570  }
571 
572  unsigned char *namedata = data + sizeof(words);
573  unsigned char *objdata = namedata + namelen;
574  unsigned char *qdata = objdata + datalen;
575  unsigned char *enddata = qdata + qlen;
576  std::string name((char *)namedata, namelen);
577  assert(enddata == data + len);
578 
579  if (debug_)
580  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
581  << std::endl;
582 
583  // Mark the peer as a known object source.
584  p->source = true;
585 
586  // Initialise or update an object entry.
587  Object *o = findObject(p, name);
588  if (!o)
589  o = makeObject(p, name);
590 
591  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
592  o->tag = words[5];
593  o->version = ((uint64_t)words[4] << 32 | words[3]);
594  o->scalar.clear();
595  o->qdata.clear();
596  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
597  o->rawdata.clear();
598  o->scalar.insert(o->scalar.end(), objdata, qdata);
599  } else if (datalen) {
600  o->rawdata.clear();
601  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
602  } else if (!o->rawdata.empty())
603  o->flags |= DQM_PROP_STALE;
604  o->qdata.insert(o->qdata.end(), qdata, enddata);
605 
606  // If we had an object for this one already and this is a list
607  // update without data, issue an immediate data get request.
608  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
609  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
610 
611  // If we have the object data, release from wait.
612  if (datalen)
614  }
615  return true;
616 
617  case DQM_REPLY_NONE: {
618  uint32_t words[3];
619  if (len < sizeof(words)) {
620  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
621  return false;
622  }
623 
624  memcpy(&words[0], data, sizeof(words));
625  uint32_t &namelen = words[2];
626 
627  if (len != sizeof(words) + namelen) {
628  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
629  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
630  return false;
631  }
632 
633  unsigned char *namedata = data + sizeof(words);
634  std::string name((char *)namedata, namelen);
635 
636  if (debug_)
637  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
638  << std::endl;
639 
640  // Mark the peer as a known object source.
641  p->source = true;
642 
643  // If this was a known object, kill it.
644  if (Object *o = findObject(p, name)) {
645  o->flags |= DQM_PROP_DEAD;
647  }
648 
649  // If someone was waiting for this, let them go.
650  releaseWaiters(name, nullptr);
651  }
652  return true;
653 
654  default:
655  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
656  << std::endl;
657  return false;
658  }
659 }

References cms::cuda::assert(), data, HLT_2018_cff::flags, fileCollector::logme(), mps_check::msg, Skims_PA_cff::name, EcalTangentSkim_cfg::o, AlCaHLTBitMon_ParallelJobs::p, and AlCaHLTBitMon_QueryRunRegistry::string.

◆ onPeerConnect()

bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev)
private

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 811 of file DQMNet.cc.

811  {
812  // Recover the server socket.
813  assert(ev->source == server_);
814 
815  // Accept the connection.
816  Socket *s = server_->accept();
817  assert(s);
818  assert(!s->isBlocking());
819 
820  // Record it to our list of peers.
821  lock();
822  Peer *p = createPeer(s);
823  std::string localaddr;
824  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
825  InetAddress peeraddr = inet->peername();
826  InetAddress myaddr = inet->sockname();
827  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
828  localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
829  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
830  p->peeraddr = local->peername().path();
831  localaddr = local->sockname().path();
832  } else
833  assert(false);
834 
835  p->mask = IORead | IOUrgent;
836  p->socket = s;
837 
838  // Report the new connection.
839  if (debug_)
840  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
841 
842  // Attach it to the listener.
843  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
844  unlock();
845 
846  // We are never done.
847  return false;
848 }

References cms::cuda::assert(), ev, IORead, IOUrgent, DTRecHitClients_cfi::local, CommonMethods::lock(), fileCollector::logme(), onPeerData(), AlCaHLTBitMon_ParallelJobs::p, alignCSCRings::s, AlCaHLTBitMon_QueryRunRegistry::string, and relativeConstraints::value.

Referenced by startLocalServer().

◆ onPeerData()

bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
)
private

Handle communication to a particular client.

Definition at line 663 of file DQMNet.cc.

663  {
664  lock();
665  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
666 
667  // If there is a problem with the peer socket, discard the peer
668  // and tell the selector to stop prcessing events for it. If
669  // this is a server connection, we will eventually recreate
670  // everything if/when the data server comes back.
671  if (ev->events & IOUrgent) {
672  if (p->automatic) {
673  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
674  << " lost (will attempt to reconnect in 15 seconds)\n";
675  losePeer(nullptr, p, ev);
676  } else
677  losePeer("WARNING: lost peer connection ", p, ev);
678 
679  unlock();
680  return true;
681  }
682 
683  // If we can write to the peer socket, pump whatever we can into it.
684  if (ev->events & IOWrite) {
685  while (Bucket *b = p->sendq) {
686  IOSize len = b->data.size() - p->sendpos;
687  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
688  IOSize done;
689 
690  try {
691  done = (len ? ev->source->write(data, len) : 0);
692  if (debug_ && len)
693  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
694  } catch (Error &e) {
695  losePeer("WARNING: unable to write to peer ", p, ev, &e);
696  unlock();
697  return true;
698  }
699 
700  p->sendpos += done;
701  if (p->sendpos == b->data.size()) {
702  Bucket *old = p->sendq;
703  p->sendq = old->next;
704  p->sendpos = 0;
705  old->next = nullptr;
706  discard(old);
707  }
708 
709  if (!done && len)
710  // Cannot write any more.
711  break;
712  }
713  }
714 
715  // If there is data to be read from the peer, first receive what we
716  // can get out the socket, the process all complete requests.
717  if (ev->events & IORead) {
718  // First build up the incoming buffer of data in the socket.
719  // Remember the last size returned by the socket; we need
720  // it to determine if the remote end closed the connection.
721  IOSize sz;
722  try {
723  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
724  do
725  if ((sz = ev->source->read(&buf[0], buf.size()))) {
726  if (debug_)
727  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
728  DataBlob &data = p->incoming;
729  if (data.capacity() < data.size() + sz)
730  data.reserve(data.size() + SOCKET_READ_GROWTH);
731  data.insert(data.end(), &buf[0], &buf[0] + sz);
732  }
733  while (sz == sizeof(buf));
734  } catch (Error &e) {
735  auto *next = dynamic_cast<SystemError *>(e.next());
736  if (next && next->portable() == SysErr::ErrTryAgain)
737  sz = 1; // Ignore it, and fake no end of data.
738  else {
739  // Houston we have a problem.
740  losePeer("WARNING: failed to read from peer ", p, ev, &e);
741  unlock();
742  return true;
743  }
744  }
745 
746  // Process fully received messages as long as we can.
747  size_t consumed = 0;
748  DataBlob &data = p->incoming;
749  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
750  uint32_t msglen;
751  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
752 
753  if (msglen >= MESSAGE_SIZE_LIMIT) {
754  losePeer("WARNING: excessively large message from ", p, ev);
755  unlock();
756  return true;
757  }
758 
759  if (data.size() - consumed >= msglen) {
760  bool valid = true;
761  if (msglen < 2 * sizeof(uint32_t)) {
762  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
763  valid = false;
764  } else {
765  // Decode and process this message.
766  Bucket msg;
767  msg.next = nullptr;
768  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
769 
770  // If we created a response, chain it to the write queue.
771  if (!msg.data.empty()) {
772  Bucket **prev = &p->sendq;
773  while (*prev)
774  prev = &(*prev)->next;
775 
776  *prev = new Bucket;
777  (*prev)->next = nullptr;
778  (*prev)->data.swap(msg.data);
779  }
780  }
781 
782  if (!valid) {
783  losePeer("WARNING: data stream error with ", p, ev);
784  unlock();
785  return true;
786  }
787 
788  consumed += msglen;
789  } else
790  break;
791  }
792 
793  data.erase(data.begin(), data.begin() + consumed);
794 
795  // If the client has closed the connection, shut down our end. If
796  // we have something to send back still, leave the write direction
797  // open. Otherwise close the shop for this client.
798  if (sz == 0)
799  sel_.setMask(p->socket, p->mask &= ~IORead);
800  }
801 
802  // Yes, please keep processing events for this socket.
803  unlock();
804  return false;
805 }

References cms::cuda::assert(), b, visDQMUpload::buf, data, DQMNet::Bucket::data, fileCollector::done, MillePedeFileConverter_cfg::e, ev, IORead, IOUrgent, IOWrite, CommonMethods::lock(), fileCollector::logme(), MESSAGE_SIZE_LIMIT, mps_check::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, AlCaHLTBitMon_ParallelJobs::p, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, and validateGeometry_cfg::valid.

Referenced by onPeerConnect(), and run().

◆ operator=()

DQMNet& DQMNet::operator= ( const DQMNet )
delete

◆ packQualityData()

void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
)
static

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 158 of file DQMNet.cc.

158  {
159  char buf[64];
160  std::ostringstream qrs;
161  QReports::const_iterator qi, qe;
162  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
163  int pos = 0;
164  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
165  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
166  << '\0';
167  }
168  into = qrs.str();
169 }

References visDQMUpload::buf.

Referenced by DQMService::flushStandalone(), and dqm::impl::MonitorElement::packQualityData().

◆ purgeDeadObjects()

virtual void DQMNet::purgeDeadObjects ( Peer p)
protectedpure virtual

◆ releaseFromWait() [1/2]

void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
)
protectedvirtual

Definition at line 372 of file DQMNet.cc.

372  {
373  if (o)
374  sendObjectToPeer(msg, *o, true);
375  else {
376  uint32_t words[3];
377  words[0] = sizeof(words) + w.name.size();
378  words[1] = DQM_REPLY_NONE;
379  words[2] = w.name.size();
380 
381  msg->data.reserve(msg->data.size() + words[0]);
382  copydata(msg, &words[0], sizeof(words));
383  copydata(msg, &w.name[0], w.name.size());
384  }
385 }

References mps_check::msg, EcalTangentSkim_cfg::o, and w.

Referenced by run().

◆ releaseFromWait() [2/2]

void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
)
private

Definition at line 132 of file DQMNet.cc.

132  {
133  Bucket **msg = &i->peer->sendq;
134  while (*msg)
135  msg = &(*msg)->next;
136  *msg = new Bucket;
137  (*msg)->next = nullptr;
138 
139  releaseFromWait(*msg, *i, o);
140 
141  assert(i->peer->waiting > 0);
142  i->peer->waiting--;
143  waiting_.erase(i);
144 }

References cms::cuda::assert(), mps_fire::i, mps_check::msg, DQMNet::Bucket::next, and EcalTangentSkim_cfg::o.

◆ releaseWaiters()

void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
)
private

Definition at line 147 of file DQMNet.cc.

147  {
148  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
149  if (i->name == name)
150  releaseFromWait(i++, o);
151  else
152  ++i;
153 }

References MillePedeFileConverter_cfg::e, mps_fire::i, Skims_PA_cff::name, and EcalTangentSkim_cfg::o.

◆ removePeer()

virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
)
protectedpure virtual

◆ requestObjectData()

void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
)
private

Queue an object request to the data server.

Definition at line 100 of file DQMNet.cc.

100  {
101  // Issue request to peer.
102  Bucket **msg = &p->sendq;
103  while (*msg)
104  msg = &(*msg)->next;
105  *msg = new Bucket;
106  (*msg)->next = nullptr;
107 
108  uint32_t words[3];
109  words[0] = sizeof(words) + len;
110  words[1] = DQM_MSG_GET_OBJECT;
111  words[2] = len;
112  copydata(*msg, words, sizeof(words));
113  copydata(*msg, name, len);
114 }

References mps_check::msg, Skims_PA_cff::name, DQMNet::Bucket::next, and AlCaHLTBitMon_ParallelJobs::p.

◆ run()

void DQMNet::run ( )

Run the actual I/O processing loop.

Definition at line 1083 of file DQMNet.cc.

1083  {
1084  Time now;
1085  Time nextFlush = 0;
1086  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1087 
1088  // Perform I/O. Every once in a while flush updates to peers.
1089  while (!shouldStop()) {
1090  for (auto ap : automatic) {
1091  // If we need a server connection and don't have one yet,
1092  // initiate asynchronous connection creation. Swallow errors
1093  // in case the server won't talk to us.
1094  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1095  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1096  InetSocket *s = nullptr;
1097  try {
1098  InetAddress addr(ap->host.c_str(), ap->port);
1099  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1100  s->setBlocking(false);
1101  s->connect(addr);
1102  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1103  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1104  } catch (Error &e) {
1105  auto *sys = dynamic_cast<SystemError *>(e.next());
1106  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1107  // "In progress" just means the connection is in progress.
1108  // The connection is ready when the socket is writeable.
1109  // Anything else is a real problem.
1110  if (s)
1111  s->abort();
1112  delete s;
1113  s = nullptr;
1114  }
1115  }
1116 
1117  // Set up with the selector if we were successful. If this is
1118  // the upstream collector, queue a request for updates.
1119  if (s) {
1120  Peer *p = createPeer(s);
1121  ap->peer = p;
1122 
1123  InetAddress peeraddr = ((InetSocket *)s)->peername();
1124  InetAddress myaddr = ((InetSocket *)s)->sockname();
1125  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1126  p->mask = IORead | IOWrite | IOUrgent;
1127  p->update = ap->update;
1128  p->automatic = ap;
1129  p->socket = s;
1130  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1131  if (ap == &upstream_) {
1132  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1133  p->sendq = new Bucket;
1134  p->sendq->next = nullptr;
1135  copydata(p->sendq, words, sizeof(words));
1136  }
1137 
1138  // Report the new connection.
1139  if (debug_)
1140  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1141  << std::endl;
1142  }
1143  }
1144  }
1145 
1146  // Pump events for a while.
1147  sel_.dispatch(delay_);
1148  now = Time::current();
1149  lock();
1150 
1151  // Check if flush is required. Flush only if one is needed.
1152  // Always sends the full object list, but only rarely.
1153  if (flush_ && now > nextFlush) {
1154  flush_ = false;
1155  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1156  sendObjectListToPeers(true);
1157  }
1158 
1159  // Update the data server and peer selection masks. If we
1160  // have no more data to send and listening for writes, remove
1161  // the write mask. If we have something to write and aren't
1162  // listening for writes, start listening so we can send off
1163  // the data.
1164  updatePeerMasks();
1165 
1166  // Release peers that have been waiting for data for too long.
1167  Time waitold = now - waitMax_;
1168  Time waitstale = now - waitStale_;
1169  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1170  Object *o = findObject(nullptr, i->name);
1171 
1172  // If we have (stale) object data, wait only up to stale limit.
1173  // Otherwise if we have no data at all, wait up to the max limit.
1174  if (i->time < waitold) {
1175  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1176  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1177  releaseFromWait(i++, o);
1178  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1179  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1180  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1181  releaseFromWait(i++, o);
1182  }
1183 
1184  // Keep it for now.
1185  else
1186  ++i;
1187  }
1188 
1189  unlock();
1190  }
1191 }

References generateTowerEtThresholdLUT::addr, copydata(), createPeer(), debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, MillePedeFileConverter_cfg::e, findObject(), flush_, mps_fire::i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::AutoPeer::next, fileCollector::now, EcalTangentSkim_cfg::o, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), shouldStop(), SOCKET_BUF_SIZE, RecoSummaryTask_cfi::Time, unlock(), updatePeerMasks(), upstream_, relativeConstraints::value, waiting_, waitMax_, and waitStale_.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

◆ sendLocalChanges()

void DQMNet::sendLocalChanges ( )

Definition at line 1195 of file DQMNet.cc.

1195  {
1196  char byte = 0;
1197  wakeup_.sink()->write(&byte, 1);
1198 }

References wakeup_.

Referenced by DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::removePeer().

◆ sendObjectListToPeer()

virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
)
protectedpure virtual

◆ sendObjectListToPeers()

virtual void DQMNet::sendObjectListToPeers ( bool  all)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ sendObjectToPeer()

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
)
protectedvirtual

Definition at line 390 of file DQMNet.cc.

390  {
391  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
392  DataBlob objdata;
393 
395  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
396  else if (data)
397  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
398 
399  uint32_t words[9];
400  uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
401  uint32_t datalen = objdata.size();
402  uint32_t qlen = o.qdata.size();
403 
404  if (o.dirname.empty())
405  --namelen;
406 
407  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
408  words[1] = DQM_REPLY_OBJECT;
409  words[2] = flags;
410  words[3] = (o.version >> 0) & 0xffffffff;
411  words[4] = (o.version >> 32) & 0xffffffff;
412  words[5] = o.tag;
413  words[6] = namelen;
414  words[7] = datalen;
415  words[8] = qlen;
416 
417  msg->data.reserve(msg->data.size() + words[0]);
418  copydata(msg, &words[0], 9 * sizeof(uint32_t));
419  if (namelen) {
420  copydata(msg, &(o.dirname)[0], o.dirname.size());
421  if (!o.dirname.empty())
422  copydata(msg, "/", 1);
423  copydata(msg, &o.objname[0], o.objname.size());
424  }
425  if (datalen)
426  copydata(msg, &objdata[0], datalen);
427  if (qlen)
428  copydata(msg, &o.qdata[0], qlen);
429 }

References data, HLT_2018_cff::flags, mps_check::msg, and EcalTangentSkim_cfg::o.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ setOrder()

static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
)
inlinestatic

Definition at line 165 of file DQMNet.h.

165  {
166  if (a.run == b.run) {
167  if (a.lumi == b.lumi) {
168  if (a.streamId == b.streamId) {
169  if (a.moduleId == b.moduleId) {
170  if (a.dirname == b.dirname) {
171  return a.objname < b.objname;
172  }
173  return a.dirname < b.dirname;
174  }
175  return a.moduleId < b.moduleId;
176  }
177  return a.streamId < b.streamId;
178  }
179  return a.lumi < b.lumi;
180  }
181  return a.run < b.run;
182  }

References a, and b.

Referenced by dqm::impl::MonitorElement::operator<().

◆ shouldStop()

bool DQMNet::shouldStop ( )
protectedvirtual

Definition at line 368 of file DQMNet.cc.

368 { return shutdown_; }

Referenced by run().

◆ shutdown()

void DQMNet::shutdown ( )

Stop the network layer and wait it to finish.

Definition at line 1038 of file DQMNet.cc.

1038  {
1039  shutdown_ = 1;
1040  if (communicate_ != (pthread_t)-1)
1041  pthread_join(communicate_, nullptr);
1042 }

References communicate_, and shutdown_.

Referenced by DQMService::shutdown().

◆ staleObjectWaitLimit()

void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 947 of file DQMNet.cc.

947 { waitStale_ = time; }

References ntuplemaker::time, and waitStale_.

◆ start()

void DQMNet::start ( )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1072 of file DQMNet.cc.

1072  {
1073  if (communicate_ != (pthread_t)-1) {
1074  logme() << "ERROR: DQM networking thread has already been started\n";
1075  return;
1076  }
1077 
1078  pthread_mutex_init(&lock_, nullptr);
1079  pthread_create(&communicate_, nullptr, &communicate, this);
1080 }

References communicate(), communicate_, lock_, and logme().

Referenced by progressbar.ProgressBar::__next__(), Types.LuminosityBlockRange::cppID(), Types.EventRange::cppID(), and DQMService::DQMService().

◆ startLocalServer() [1/2]

void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 983 of file DQMNet.cc.

983  {
984  if (server_) {
985  logme() << "ERROR: DQM server was already started.\n";
986  return;
987  }
988 
989  try {
990  server_ = new LocalServerSocket(path, 10);
991  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
992  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
993  server_->setBlocking(false);
994  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
995  } catch (Error &e) {
996  // FIXME: Do we need to do this when we throw an exception anyway?
997  // FIXME: Abort instead?
998  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
999 
1000  throw cms::Exception("DQMNet::startLocalServer")
1001  << "Failed to start server at path " << path << ": " << e.explain().c_str();
1002  }
1003 
1004  logme() << "INFO: DQM server started at path " << path << std::endl;
1005 }

References MillePedeFileConverter_cfg::e, Exception, IOAccept, logme(), onPeerConnect(), castor_dqm_sourceclient_file_cfg::path, sel_, server_, and SOCKET_BUF_SIZE.

◆ startLocalServer() [2/2]

void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 952 of file DQMNet.cc.

952  {
953  if (server_) {
954  logme() << "ERROR: DQM server was already started.\n";
955  return;
956  }
957 
958  try {
959  InetAddress addr("0.0.0.0", port);
960  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
961  s->bind(addr);
962  s->listen(10);
963  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
964  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
965  s->setBlocking(false);
966  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
967  } catch (Error &e) {
968  // FIXME: Do we need to do this when we throw an exception anyway?
969  // FIXME: Abort instead?
970  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
971 
972  throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
973 
974  port << ": " << e.explain().c_str();
975  }
976 
977  logme() << "INFO: DQM server started at port " << port << std::endl;
978 }

References generateTowerEtThresholdLUT::addr, MillePedeFileConverter_cfg::e, Exception, IOAccept, logme(), onPeerConnect(), query::port, alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.

◆ unlock()

void DQMNet::unlock ( )

Release the lock on the DQM net layer.

Definition at line 1064 of file DQMNet.cc.

1064  {
1065  if (communicate_ != (pthread_t)-1)
1066  pthread_mutex_unlock(&lock_);
1067 }

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

◆ unpackQualityData()

void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
)
static

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 173 of file DQMNet.cc.

173  {
174  const char *qdata = from;
175 
176  // Count how many qresults there are.
177  size_t nqv = 0;
178  while (*qdata) {
179  ++nqv;
180  while (*qdata)
181  ++qdata;
182  ++qdata;
183  while (*qdata)
184  ++qdata;
185  ++qdata;
186  while (*qdata)
187  ++qdata;
188  ++qdata;
189  while (*qdata)
190  ++qdata;
191  ++qdata;
192  while (*qdata)
193  ++qdata;
194  ++qdata;
195  }
196 
197  // Now extract the qreports.
198  qdata = from;
199  qr.reserve(nqv);
200  while (*qdata) {
201  qr.emplace_back();
202  DQMNet::QValue &qv = qr.back();
203 
204  qv.code = atoi(qdata);
205  while (*qdata)
206  ++qdata;
207  switch (qv.code) {
209  break;
212  break;
213  case dqm::qstatus::ERROR:
215  break;
216  default:
218  break;
219  }
220 
221  qv.qtresult = atof(++qdata);
222  while (*qdata)
223  ++qdata;
224 
225  qv.qtname = ++qdata;
226  while (*qdata)
227  ++qdata;
228 
229  qv.algorithm = ++qdata;
230  while (*qdata)
231  ++qdata;
232 
233  qv.message = ++qdata;
234  while (*qdata)
235  ++qdata;
236  ++qdata;
237  }
238 }

References MonitorElementData::QReport::QValue::algorithm, MonitorElementData::QReport::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, HLT_2018_cff::flags, MonitorElementData::QReport::QValue::message, MonitorElementData::QReport::QValue::qtname, MonitorElementData::QReport::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.

◆ updateMask()

void DQMNet::updateMask ( Peer p)
protected

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 880 of file DQMNet.cc.

880  {
881  if (!p->socket)
882  return;
883 
884  // Listen to writes iff we have data to send.
885  unsigned oldmask = p->mask;
886  if (!p->sendq && (p->mask & IOWrite))
887  sel_.setMask(p->socket, p->mask &= ~IOWrite);
888 
889  if (p->sendq && !(p->mask & IOWrite))
890  sel_.setMask(p->socket, p->mask |= IOWrite);
891 
892  if (debug_ && oldmask != p->mask)
893  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
894 
895  // If we have nothing more to send and are no longer listening
896  // for reads, close up the shop for this peer.
897  if (p->mask == IOUrgent && !p->waiting) {
898  assert(!p->sendq);
899  if (debug_)
900  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
901  losePeer(nullptr, p, nullptr);
902  }
903 }

References cms::cuda::assert(), IOUrgent, IOWrite, fileCollector::logme(), and AlCaHLTBitMon_ParallelJobs::p.

Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().

◆ updatePeerMasks()

virtual void DQMNet::updatePeerMasks ( )
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ updateToCollector()

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1010 of file DQMNet.cc.

1010  {
1011  if (!downstream_.host.empty()) {
1012  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1013  << std::endl;
1014  return;
1015  }
1016 
1017  downstream_.update = true;
1018  downstream_.host = host;
1019  downstream_.port = port;
1020 }

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

Referenced by DQMService::DQMService().

◆ waitForData()

void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
)
protected

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 118 of file DQMNet.cc.

118  {
119  // FIXME: Should we automatically record which exact peer the waiter
120  // is expecting to deliver data so we know to release the waiter if
121  // the other peer vanishes? The current implementation stands a
122  // chance for the waiter to wait indefinitely -- although we do
123  // force terminate the wait after a while.
124  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
125  WaitObject wo = {Time::current(), name, info, p};
126  waiting_.push_back(wo);
127  p->waiting++;
128 }

References info(), Skims_PA_cff::name, and AlCaHLTBitMon_ParallelJobs::p.

Member Data Documentation

◆ appname_

std::string DQMNet::appname_
private

Definition at line 352 of file DQMNet.h.

◆ communicate_

pthread_t DQMNet::communicate_
private

Definition at line 364 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

◆ debug_

bool DQMNet::debug_
protected

Definition at line 339 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

◆ delay_

int DQMNet::delay_
private

Definition at line 367 of file DQMNet.h.

Referenced by delay(), and run().

◆ downstream_

AutoPeer DQMNet::downstream_
private

Definition at line 361 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

◆ DQM_MSG_GET_OBJECT

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3
static

Definition at line 70 of file DQMNet.h.

◆ DQM_MSG_HELLO

const uint32_t DQMNet::DQM_MSG_HELLO = 0
static

Definition at line 67 of file DQMNet.h.

◆ DQM_MSG_LIST_OBJECTS

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2
static

Definition at line 69 of file DQMNet.h.

Referenced by run().

◆ DQM_MSG_UPDATE_ME

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1
static

Definition at line 68 of file DQMNet.h.

Referenced by run().

◆ DQM_PROP_ACCUMULATE

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000
static

Definition at line 56 of file DQMNet.h.

◆ DQM_PROP_DEAD

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000
static

◆ DQM_PROP_EFFICIENCY_PLOT

const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000
static

◆ DQM_PROP_HAS_REFERENCE

const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000
static

Definition at line 54 of file DQMNet.h.

◆ DQM_PROP_LUMI

const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000
static

◆ DQM_PROP_MARKTODELETE

const uint32_t DQMNet::DQM_PROP_MARKTODELETE = 0x01000000
static

Definition at line 65 of file DQMNet.h.

◆ DQM_PROP_NEW

const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000
static

◆ DQM_PROP_RECEIVED

const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000
static

Definition at line 60 of file DQMNet.h.

◆ DQM_PROP_REPORT_ALARM

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
static

Definition at line 52 of file DQMNet.h.

◆ DQM_PROP_REPORT_CLEAR

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000
static

Definition at line 48 of file DQMNet.h.

◆ DQM_PROP_REPORT_ERROR

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100
static

◆ DQM_PROP_REPORT_MASK

const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00
static

Definition at line 47 of file DQMNet.h.

◆ DQM_PROP_REPORT_OTHER

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400
static

◆ DQM_PROP_REPORT_WARN

const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200
static

◆ DQM_PROP_RESET

const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000
static

Definition at line 57 of file DQMNet.h.

◆ DQM_PROP_STALE

const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000
static

Definition at line 63 of file DQMNet.h.

Referenced by run().

◆ DQM_PROP_TAGGED

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000
static

Definition at line 55 of file DQMNet.h.

◆ DQM_PROP_TYPE_DATABLOB

const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050
static

Definition at line 45 of file DQMNet.h.

◆ DQM_PROP_TYPE_INT

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001
static

Definition at line 31 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_INVALID

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000
static

Definition at line 30 of file DQMNet.h.

◆ DQM_PROP_TYPE_MASK

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff
static

◆ DQM_PROP_TYPE_REAL

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002
static

Definition at line 32 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_SCALAR

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f
static

Definition at line 29 of file DQMNet.h.

◆ DQM_PROP_TYPE_STRING

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003
static

Definition at line 33 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1D

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012
static

Definition at line 36 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1F

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010
static

Definition at line 34 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1S

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011
static

Definition at line 35 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2D

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022
static

Definition at line 39 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2F

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020
static

Definition at line 37 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2S

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021
static

Definition at line 38 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH3D

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032
static

Definition at line 42 of file DQMNet.h.

◆ DQM_PROP_TYPE_TH3F

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030
static

Definition at line 40 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH3S

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031
static

Definition at line 41 of file DQMNet.h.

◆ DQM_PROP_TYPE_TPROF

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040
static

Definition at line 43 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TPROF2D

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041
static

Definition at line 44 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_REPLY_LIST_BEGIN

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101
static

Definition at line 72 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ DQM_REPLY_LIST_END

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102
static

Definition at line 73 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ DQM_REPLY_NONE

const uint32_t DQMNet::DQM_REPLY_NONE = 103
static

Definition at line 74 of file DQMNet.h.

◆ DQM_REPLY_OBJECT

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104
static

Definition at line 75 of file DQMNet.h.

◆ flush_

bool DQMNet::flush_
private

Definition at line 370 of file DQMNet.h.

Referenced by run().

◆ lock_

pthread_mutex_t DQMNet::lock_
protected

Definition at line 340 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

◆ MAX_PEER_WAITREQS

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128
static

Definition at line 77 of file DQMNet.h.

◆ pid_

int DQMNet::pid_
private

Definition at line 353 of file DQMNet.h.

◆ sel_

lat::IOSelector DQMNet::sel_
private

Definition at line 355 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

◆ server_

lat::Socket* DQMNet::server_
private

Definition at line 356 of file DQMNet.h.

Referenced by startLocalServer().

◆ shutdown_

sig_atomic_t DQMNet::shutdown_
private

Definition at line 365 of file DQMNet.h.

Referenced by shutdown().

◆ upstream_

AutoPeer DQMNet::upstream_
private

Definition at line 360 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

◆ version_

lat::Time DQMNet::version_
private

Definition at line 358 of file DQMNet.h.

◆ waiting_

WaitList DQMNet::waiting_
private

Definition at line 362 of file DQMNet.h.

Referenced by run().

◆ waitMax_

lat::TimeSpan DQMNet::waitMax_
private

Definition at line 369 of file DQMNet.h.

Referenced by run().

◆ waitStale_

lat::TimeSpan DQMNet::waitStale_
private

Definition at line 368 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

◆ wakeup_

lat::Pipe DQMNet::wakeup_
private

Definition at line 357 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().

DQMNet::updatePeerMasks
virtual void updatePeerMasks()=0
DQMNet::getPeer
virtual Peer * getPeer(lat::Socket *s)=0
DQMNet::releaseFromWait
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:372
DQMNet::sendObjectToPeer
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:390
dqmhashmix
#define dqmhashmix(a, b, c)
DQMNet::DQM_PROP_STALE
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
mps_fire.i
i
Definition: mps_fire.py:355
DQMNet::onMessage
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:433
DQMNet::DQM_REPLY_NONE
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:74
DQMNet::pid_
int pid_
Definition: DQMNet.h:353
IOAccept
Definition: IOTypes.h:29
DQMNet::DQM_REPLY_OBJECT
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:75
DQMNet::DQM_REPLY_LIST_END
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:73
DQMNet::version_
lat::Time version_
Definition: DQMNet.h:358
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
DQMNet::DQM_MSG_UPDATE_ME
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
DQMNet::DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:60
gather_cfg.cout
cout
Definition: gather_cfg.py:144
pos
Definition: PixelAliasList.h:18
DQMNet::releaseWaiters
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:147
DQMNet::waitForData
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:118
DQMNet::wakeup_
lat::Pipe wakeup_
Definition: DQMNet.h:357
dqm::qstatus::WARNING
static const int WARNING
Definition: MonitorElement.h:53
SOCKET_READ_SIZE
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
cms::cuda::assert
assert(be >=bs)
DQMNet::unlock
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1064
DQMNet::AutoPeer::host
std::string host
Definition: DQMNet.h:141
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:152
mps_check.msg
tuple msg
Definition: mps_check.py:285
DQMNet::sel_
lat::IOSelector sel_
Definition: DQMNet.h:355
MonitorElementData::QReport::QValue::qtresult
float qtresult
Definition: MonitorElementCollection.h:59
DQMNet::flush_
bool flush_
Definition: DQMNet.h:370
DQMNet::waitStale_
lat::TimeSpan waitStale_
Definition: DQMNet.h:368
MonitorElementData::QReport::QValue
Definition: MonitorElementCollection.h:57
DQMNet::makeObject
virtual Object * makeObject(Peer *p, const std::string &name)=0
DQMNet::DQM_PROP_DEAD
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:62
DQMNet::shouldStop
virtual bool shouldStop()
Definition: DQMNet.cc:368
DQMNet::onPeerData
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:663
validateGeometry_cfg.valid
valid
Definition: validateGeometry_cfg.py:21
MESSAGE_SIZE_LIMIT
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
EcalTangentSkim_cfg.o
o
Definition: EcalTangentSkim_cfg.py:36
MonitorElementData::QReport::QValue::algorithm
std::string algorithm
Definition: MonitorElementCollection.h:62
query.host
host
Definition: query.py:115
DQMNet::DQM_PROP_TYPE_SCALAR
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:29
fileCollector.now
now
Definition: fileCollector.py:207
DQMNet::AutoPeer::update
bool update
Definition: DQMNet.h:143
alignCSCRings.s
s
Definition: alignCSCRings.py:92
DQMNet::sendObjectListToPeers
virtual void sendObjectListToPeers(bool all)=0
generateTowerEtThresholdLUT.addr
addr
Definition: generateTowerEtThresholdLUT.py:57
DQMNet::onPeerConnect
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:811
DQMNet::shutdown_
sig_atomic_t shutdown_
Definition: DQMNet.h:365
IOWrite
Definition: IOTypes.h:27
fileCollector.done
done
Definition: fileCollector.py:123
w
const double w
Definition: UKUtility.cc:23
DQMNet::findObject
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
DQMNet::downstream_
AutoPeer downstream_
Definition: DQMNet.h:361
DQMNet::DQM_PROP_REPORT_WARN
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:50
MonitorElementData::QReport::QValue::message
std::string message
Definition: MonitorElementCollection.h:60
DQMNet::DQM_PROP_NEW
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:59
communicate
static void * communicate(void *obj)
Definition: DQMNet.cc:1049
DQMNet::DQM_PROP_TYPE_MASK
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:28
DQMNet::upstream_
AutoPeer upstream_
Definition: DQMNet.h:360
DQMNet::appname_
std::string appname_
Definition: DQMNet.h:352
dqmdumpme.k
k
Definition: dqmdumpme.py:60
b
double b
Definition: hdecay.h:118
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
DQMNet::discard
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
MonitorElementData::QReport::QValue::code
int code
Definition: MonitorElementCollection.h:58
DQMNet::DQM_REPLY_LIST_BEGIN
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:72
DQMNet::DataBlob
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:83
a
double a
Definition: hdecay.h:119
dqmhashfinal
#define dqmhashfinal(a, b, c)
IORead
Definition: IOTypes.h:26
DQMNet::DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:49
runTheMatrix.err
err
Definition: runTheMatrix.py:288
DQMNet::sendObjectListToPeer
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
DQMNet::DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:51
DQMNet::markObjectsDead
virtual void markObjectsDead(Peer *p)=0
SOCKET_READ_GROWTH
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
DQMNet::removePeer
virtual void removePeer(Peer *p, lat::Socket *s)=0
leef::Error
edm::ErrorSummaryEntry Error
Definition: LogErrorEventFilter.cc:29
DQMNet::losePeer
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:74
DQMNet::requestObjectData
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:100
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
PixelMapPlotter.reason
reason
Definition: PixelMapPlotter.py:509
DQMNet::delay_
int delay_
Definition: DQMNet.h:367
DQMNet::waitMax_
lat::TimeSpan waitMax_
Definition: DQMNet.h:369
type
type
Definition: HCALResponse.h:21
query.port
port
Definition: query.py:116
DQMNet::lock
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1058
DQMNet::debug_
bool debug_
Definition: DQMNet.h:339
DQMNet::createPeer
virtual Peer * createPeer(lat::Socket *s)=0
DQMNet::lock_
pthread_mutex_t lock_
Definition: DQMNet.h:340
relativeConstraints.value
value
Definition: relativeConstraints.py:53
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
Exception
Definition: hltDiff.cc:246
DQMNet::DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
O_NONBLOCK
#define O_NONBLOCK
Definition: SysFile.h:21
dqm::qstatus::STATUS_OK
static const int STATUS_OK
Definition: MonitorElement.h:52
DQMNet::server_
lat::Socket * server_
Definition: DQMNet.h:356
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
DQMNet::AutoPeer::next
lat::Time next
Definition: DQMNet.h:140
RecoSummaryTask_cfi.Time
Time
Definition: RecoSummaryTask_cfi.py:34
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
DQMNet::DQM_MSG_GET_OBJECT
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:70
funct::arg
A arg
Definition: Factorize.h:36
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
DTRecHitClients_cfi.local
local
Definition: DTRecHitClients_cfi.py:10
DQMNet::AutoPeer::peer
Peer * peer
Definition: DQMNet.h:139
SOCKET_BUF_SIZE
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
DQMNet::onLocalNotify
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:856
DQMNet::communicate_
pthread_t communicate_
Definition: DQMNet.h:364
MonitorElementData::QReport::QValue::qtname
std::string qtname
Definition: MonitorElementCollection.h:61
DQMNet::waiting_
WaitList waiting_
Definition: DQMNet.h:362
DQMNet::delay
void delay(int delay)
Definition: DQMNet.cc:941
ntuplemaker.time
time
Definition: ntuplemaker.py:310
DQMNet::logme
std::ostream & logme()
Definition: DQMNet.cc:50
IOSize
size_t IOSize
Definition: IOTypes.h:14
dqm::qstatus::ERROR
static const int ERROR
Definition: MonitorElement.h:54
crabWrapper.key
key
Definition: crabWrapper.py:19
DQMNet::AutoPeer::port
int port
Definition: DQMNet.h:142
HLT_2018_cff.flags
flags
Definition: HLT_2018_cff.py:11758
IOUrgent
Definition: IOTypes.h:28
DQMNet::copydata
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
DQMNet::MAX_PEER_WAITREQS
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:77
GetRecoTauVFromDQM_MC_cff.next
next
Definition: GetRecoTauVFromDQM_MC_cff.py:31
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
DQMNet::purgeDeadObjects
virtual void purgeDeadObjects(Peer *p)=0