CMS 3D CMS Logo

DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 #define DQMSERVICES_CORE_DQM_NET_H
3 
4 #include "classlib/iobase/Socket.h"
5 #include "classlib/iobase/IOSelector.h"
6 #include "classlib/iobase/Pipe.h"
7 #include "classlib/utils/Signal.h"
8 #include "classlib/utils/Error.h"
9 #include "classlib/utils/Time.h"
10 #include <pthread.h>
11 #include <cstdint>
12 #include <csignal>
13 #include <iostream>
14 #include <vector>
15 #include <string>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <ext/hash_set>
20 
21 // for definition of QValue
23 
24 //class DQMStore;
25 
26 class DQMNet {
27 public:
28  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
29  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
30  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
31  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
32  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
33  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
34  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
35  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
36  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
37  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
38  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
39  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
40  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
41  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
42  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
43  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
44  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
45  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
46 
47  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
48  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
49  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
50  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
51  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
53 
54  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
55  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
56  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
57  static const uint32_t DQM_PROP_RESET = 0x00008000;
58 
59  static const uint32_t DQM_PROP_NEW = 0x00010000;
60  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
61  static const uint32_t DQM_PROP_LUMI = 0x00040000;
62  static const uint32_t DQM_PROP_DEAD = 0x00080000;
63  static const uint32_t DQM_PROP_STALE = 0x00100000;
64  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
65  static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
66 
67  static const uint32_t DQM_MSG_HELLO = 0;
68  static const uint32_t DQM_MSG_UPDATE_ME = 1;
69  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
70  static const uint32_t DQM_MSG_GET_OBJECT = 3;
71 
72  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
73  static const uint32_t DQM_REPLY_LIST_END = 102;
74  static const uint32_t DQM_REPLY_NONE = 103;
75  static const uint32_t DQM_REPLY_OBJECT = 104;
76 
77  static const uint32_t MAX_PEER_WAITREQS = 128;
78 
79  struct Peer;
80  struct WaitObject;
81 
83  using DataBlob = std::vector<unsigned char>;
84  using QReports = std::vector<QValue>;
85  using TagList = std::vector<uint32_t>; // DEPRECATED
86  using WaitList = std::list<WaitObject>;
87 
88  struct CoreObject {
89  uint32_t flags;
90  uint32_t tag;
92  uint32_t run;
93  uint32_t lumi;
94  uint32_t streamId;
95  uint32_t moduleId;
99  };
100 
101  struct Object : CoreObject {
107  };
108 
109  struct Bucket {
112  };
113 
114  struct WaitObject {
119  };
120 
121  struct AutoPeer;
122  struct Peer {
124  lat::Socket *socket;
127  size_t sendpos;
128 
129  unsigned mask;
130  bool source;
131  bool update;
132  bool updated;
133  size_t updates;
134  size_t waiting;
136  };
137 
138  struct AutoPeer {
142  int port;
143  bool update;
144  };
145 
146  DQMNet(const std::string &appname = "");
147  virtual ~DQMNet();
148 
149  void debug(bool doit);
150  void delay(int delay);
151  void startLocalServer(int port);
152  void startLocalServer(const char *path);
153  void staleObjectWaitLimit(lat::TimeSpan time);
154  void updateToCollector(const std::string &host, int port);
155  void listenToCollector(const std::string &host, int port);
156  void shutdown();
157  void lock();
158  void unlock();
159 
160  void start();
161  void run();
162 
163  void sendLocalChanges();
164 
165  static bool setOrder(const CoreObject &a, const CoreObject &b) {
166  if (a.run == b.run) {
167  if (a.lumi == b.lumi) {
168  if (a.streamId == b.streamId) {
169  if (a.moduleId == b.moduleId) {
170  if (a.dirname == b.dirname) {
171  return a.objname < b.objname;
172  }
173  return a.dirname < b.dirname;
174  }
175  return a.moduleId < b.moduleId;
176  }
177  return a.streamId < b.streamId;
178  }
179  return a.lumi < b.lumi;
180  }
181  return a.run < b.run;
182  }
183 
184  struct HashOp {
185  uint32_t operator()(const Object &a) const { return a.hash; }
186  };
187 
188  struct HashEqual {
189  bool operator()(const Object &a, const Object &b) const {
190  return a.hash == b.hash && a.dirname == b.dirname && a.objname == b.objname;
191  }
192  };
193 
194  static size_t dqmhash(const void *key, size_t keylen) {
195  // Reduced version of Bob Jenkins' hash function at:
196  // http://www.burtleburtle.net/bob/c/lookup3.c
197 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
198 #define dqmhashmix(a, b, c) \
199  { \
200  a -= c; \
201  a ^= dqmhashrot(c, 4); \
202  c += b; \
203  b -= a; \
204  b ^= dqmhashrot(a, 6); \
205  a += c; \
206  c -= b; \
207  c ^= dqmhashrot(b, 8); \
208  b += a; \
209  a -= c; \
210  a ^= dqmhashrot(c, 16); \
211  c += b; \
212  b -= a; \
213  b ^= dqmhashrot(a, 19); \
214  a += c; \
215  c -= b; \
216  c ^= dqmhashrot(b, 4); \
217  b += a; \
218  }
219 #define dqmhashfinal(a, b, c) \
220  { \
221  c ^= b; \
222  c -= dqmhashrot(b, 14); \
223  a ^= c; \
224  a -= dqmhashrot(c, 11); \
225  b ^= a; \
226  b -= dqmhashrot(a, 25); \
227  c ^= b; \
228  c -= dqmhashrot(b, 16); \
229  a ^= c; \
230  a -= dqmhashrot(c, 4); \
231  b ^= a; \
232  b -= dqmhashrot(a, 14); \
233  c ^= b; \
234  c -= dqmhashrot(b, 24); \
235  }
236 
237  uint32_t a, b, c;
238  a = b = c = 0xdeadbeef + (uint32_t)keylen;
239  const auto *k = (const unsigned char *)key;
240 
241  // all but the last block: affect some bits of (a, b, c)
242  while (keylen > 12) {
243  a += k[0];
244  a += ((uint32_t)k[1]) << 8;
245  a += ((uint32_t)k[2]) << 16;
246  a += ((uint32_t)k[3]) << 24;
247  b += k[4];
248  b += ((uint32_t)k[5]) << 8;
249  b += ((uint32_t)k[6]) << 16;
250  b += ((uint32_t)k[7]) << 24;
251  c += k[8];
252  c += ((uint32_t)k[9]) << 8;
253  c += ((uint32_t)k[10]) << 16;
254  c += ((uint32_t)k[11]) << 24;
255  dqmhashmix(a, b, c);
256  keylen -= 12;
257  k += 12;
258  }
259 
260  // last block: affect all 32 bits of (c); all case statements fall through
261  switch (keylen) {
262  case 12:
263  c += ((uint32_t)k[11]) << 24;
264  [[fallthrough]];
265  case 11:
266  c += ((uint32_t)k[10]) << 16;
267  [[fallthrough]];
268  case 10:
269  c += ((uint32_t)k[9]) << 8;
270  [[fallthrough]];
271  case 9:
272  c += k[8];
273  [[fallthrough]];
274  case 8:
275  b += ((uint32_t)k[7]) << 24;
276  [[fallthrough]];
277  case 7:
278  b += ((uint32_t)k[6]) << 16;
279  [[fallthrough]];
280  case 6:
281  b += ((uint32_t)k[5]) << 8;
282  [[fallthrough]];
283  case 5:
284  b += k[4];
285  [[fallthrough]];
286  case 4:
287  a += ((uint32_t)k[3]) << 24;
288  [[fallthrough]];
289  case 3:
290  a += ((uint32_t)k[2]) << 16;
291  [[fallthrough]];
292  case 2:
293  a += ((uint32_t)k[1]) << 8;
294  [[fallthrough]];
295  case 1:
296  a += k[0];
297  break;
298  case 0:
299  return c;
300  }
301 
302  dqmhashfinal(a, b, c);
303  return c;
304 #undef dqmhashrot
305 #undef dqmhashmix
306 #undef dqmhashfinal
307  }
308 
309  static void packQualityData(std::string &into, const QReports &qr);
310  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
311 
312 protected:
313  std::ostream &logme();
314  static void copydata(Bucket *b, const void *data, size_t len);
315  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
316 
317  virtual bool shouldStop();
318  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
319  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
320  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
321 
322  // bool reconstructObject(Object &o);
323  // bool reinstateObject(DQMStore *store, Object &o);
324  virtual Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
325  virtual Object *makeObject(Peer *p, const std::string &name) = 0;
326  virtual void markObjectsDead(Peer *p) = 0;
327  virtual void purgeDeadObjects(Peer *p) = 0;
328 
329  virtual Peer *getPeer(lat::Socket *s) = 0;
330  virtual Peer *createPeer(lat::Socket *s) = 0;
331  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
332  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
333  virtual void sendObjectListToPeers(bool all) = 0;
334 
335  void updateMask(Peer *p);
336  virtual void updatePeerMasks() = 0;
337  static void discard(Bucket *&b);
338 
339  bool debug_;
340  pthread_mutex_t lock_;
341 
342 private:
343  void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err = nullptr);
344  void requestObjectData(Peer *p, const char *name, size_t len);
345  void releaseFromWait(WaitList::iterator i, Object *o);
346  void releaseWaiters(const std::string &name, Object *o);
347 
348  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
349  bool onPeerConnect(lat::IOSelectEvent *ev);
350  bool onLocalNotify(lat::IOSelectEvent *ev);
351 
353  int pid_;
354 
355  lat::IOSelector sel_;
356  lat::Socket *server_;
357  lat::Pipe wakeup_;
359 
363 
364  pthread_t communicate_;
365  sig_atomic_t shutdown_;
366 
367  int delay_;
368  lat::TimeSpan waitStale_;
369  lat::TimeSpan waitMax_;
370  bool flush_;
371 
372 public:
373  // copying is not available
374  DQMNet(const DQMNet &) = delete;
375  DQMNet &operator=(const DQMNet &) = delete;
376 };
377 
378 template <class ObjType>
379 class DQMImplNet : public DQMNet {
380 public:
381  struct ImplPeer;
382 
383  using DirMap = std::set<std::string>;
384  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
385  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
386  struct ImplPeer : Peer {
387  ImplPeer() = default;
390  };
391 
392  DQMImplNet(const std::string &appname = "") : DQMNet(appname) {}
393 
394  ~DQMImplNet() override = default;
395 
396 protected:
397  Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override {
398  size_t slash = name.rfind('/');
399  size_t dirpos = (slash == std::string::npos ? 0 : slash);
400  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
401  std::string path(name, 0, dirpos);
402  ObjType proto;
403  proto.hash = dqmhash(name.c_str(), name.size());
404  proto.dirname = path;
405  proto.objname.append(name, namepos, std::string::npos);
406 
407  typename ObjectMap::iterator pos;
408  typename PeerMap::iterator i, e;
409  if (owner)
410  *owner = nullptr;
411  if (p) {
412  auto *ip = static_cast<ImplPeer *>(p);
413  pos = ip->objs.find(proto);
414  if (pos == ip->objs.end())
415  return nullptr;
416  else {
417  if (owner)
418  *owner = ip;
419  return const_cast<ObjType *>(&*pos);
420  }
421  } else {
422  for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
423  pos = i->second.objs.find(proto);
424  if (pos != i->second.objs.end()) {
425  if (owner)
426  *owner = &i->second;
427  return const_cast<ObjType *>(&*pos);
428  }
429  }
430  return nullptr;
431  }
432  }
433 
434  Object *makeObject(Peer *p, const std::string &name) override {
435  auto *ip = static_cast<ImplPeer *>(p);
436  size_t slash = name.rfind('/');
437  size_t dirpos = (slash == std::string::npos ? 0 : slash);
438  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
439  ObjType o;
440  o.flags = 0;
441  o.tag = 0;
442  o.version = 0;
443  o.lastreq = 0;
444  o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).first;
445  o.objname.append(name, namepos, std::string::npos);
446  o.hash = dqmhash(name.c_str(), name.size());
447  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
448  }
449 
450  // Mark all the objects dead. This is intended to be used when
451  // starting to process a complete list of objects, in order to
452  // flag the objects that need to be killed at the end. After
453  // call to this method, revive all live objects by removing the
454  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
455  // to remove the dead ones. This also turns off object request
456  // for objects we've lost interest in.
457  void markObjectsDead(Peer *p) override {
458  uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
459  auto *ip = static_cast<ImplPeer *>(p);
460  typename ObjectMap::iterator i, e;
461  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i) {
462  if (i->lastreq && i->lastreq < minreq)
463  const_cast<ObjType &>(*i).lastreq = 0;
464  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
465  }
466  }
467 
468  // Mark remaining zombie objects as dead. See markObjectsDead().
469  void purgeDeadObjects(Peer *p) override {
470  auto *ip = static_cast<ImplPeer *>(p);
471  typename ObjectMap::iterator i, e;
472  for (i = ip->objs.begin(), e = ip->objs.end(); i != e;) {
473  if (i->flags & DQM_PROP_DEAD)
474  ip->objs.erase(i++);
475  else
476  ++i;
477  }
478  }
479 
480  Peer *getPeer(lat::Socket *s) override {
481  auto pos = peers_.find(s);
482  auto end = peers_.end();
483  return pos == end ? nullptr : &pos->second;
484  }
485 
486  Peer *createPeer(lat::Socket *s) override {
487  ImplPeer *ip = &peers_[s];
488  ip->socket = nullptr;
489  ip->sendq = nullptr;
490  ip->sendpos = 0;
491  ip->mask = 0;
492  ip->source = false;
493  ip->update = false;
494  ip->updated = false;
495  ip->updates = 0;
496  ip->waiting = 0;
497  ip->automatic = nullptr;
498  return ip;
499  }
500 
501  void removePeer(Peer *p, lat::Socket *s) override {
502  auto *ip = static_cast<ImplPeer *>(p);
503  bool needflush = !ip->objs.empty();
504 
505  typename ObjectMap::iterator i, e;
506  for (i = ip->objs.begin(), e = ip->objs.end(); i != e;)
507  ip->objs.erase(i++);
508 
509  peers_.erase(s);
510 
511  // If we removed a peer with objects, our list of objects
512  // has changed and we need to update downstream peers.
513  if (needflush)
515  }
516 
518  void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override {
519  typename PeerMap::iterator pi, pe;
520  typename ObjectMap::iterator oi, oe;
521  size_t size = 0;
522  size_t numobjs = 0;
523  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
524  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
525  if (all || (oi->flags & DQM_PROP_NEW))
526  size += 9 * sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
527  oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
528 
529  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
530 
531  uint32_t nupdates = 0;
532  uint32_t words[4];
533  words[0] = sizeof(words);
534  words[1] = DQM_REPLY_LIST_BEGIN;
535  words[2] = numobjs;
536  words[3] = all;
537  copydata(msg, &words[0], sizeof(words));
538 
539  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
540  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
541  if (all || (oi->flags & DQM_PROP_NEW)) {
542  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
543  if (clear)
544  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
545  ++nupdates;
546  }
547 
548  words[1] = DQM_REPLY_LIST_END;
549  words[2] = nupdates;
550  copydata(msg, &words[0], sizeof(words));
551  }
552 
553  void sendObjectListToPeers(bool all) override {
554  typename PeerMap::iterator i, e;
555  typename ObjectMap::iterator oi, oe;
556  for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
557  ImplPeer &p = i->second;
558  if (!p.update)
559  continue;
560 
561  if (debug_)
562  logme() << "DEBUG: notifying " << p.peeraddr << std::endl;
563 
564  Bucket msg;
565  msg.next = nullptr;
566  sendObjectListToPeer(&msg, !p.updated || all, true);
567 
568  if (!msg.data.empty()) {
569  Bucket **prev = &p.sendq;
570  while (*prev)
571  prev = &(*prev)->next;
572 
573  *prev = new Bucket;
574  (*prev)->next = nullptr;
575  (*prev)->data.swap(msg.data);
576  }
577  p.updated = true;
578  }
579  }
580 
581  void updatePeerMasks() override {
582  typename PeerMap::iterator i, e;
583  for (i = peers_.begin(), e = peers_.end(); i != e;)
584  updateMask(&(i++)->second);
585  }
586 
587 protected:
589 };
590 
591 class DQMBasicNet : public DQMImplNet<DQMNet::Object> {
592 public:
593  DQMBasicNet(const std::string &appname = "");
594 
595  void reserveLocalSpace(uint32_t size);
596  void updateLocalObject(Object &o);
597  bool removeLocalExcept(const std::set<std::string> &known);
598 
599 private:
600  ImplPeer *local_;
601 };
602 
603 #endif // DQMSERVICES_CORE_DQM_NET_H
DQMImplNet::markObjectsDead
void markObjectsDead(Peer *p) override
Definition: DQMNet.h:457
DQMNet::Object::rawdata
DataBlob rawdata
Definition: DQMNet.h:104
DQMNet::updatePeerMasks
virtual void updatePeerMasks()=0
DQMNet::staleObjectWaitLimit
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:947
DQMNet::getPeer
virtual Peer * getPeer(lat::Socket *s)=0
DQMNet::releaseFromWait
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:372
DQMNet::DQM_PROP_RESET
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:57
DQMNet::sendObjectToPeer
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:390
dqmhashmix
#define dqmhashmix(a, b, c)
DQMNet::DQM_PROP_STALE
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
DQMNet::setOrder
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:165
mps_fire.i
i
Definition: mps_fire.py:355
DQMNet::onMessage
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:433
DQMNet::DQM_REPLY_NONE
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:74
DQMBasicNet::updateLocalObject
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1212
DQMNet::DQM_PROP_TYPE_TH2S
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:38
DQMNet::pid_
int pid_
Definition: DQMNet.h:353
DQMNet::operator=
DQMNet & operator=(const DQMNet &)=delete
DQMNet::DQM_REPLY_OBJECT
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:75
DQMImplNet::updatePeerMasks
void updatePeerMasks() override
Definition: DQMNet.h:581
DQMNet::DQM_REPLY_LIST_END
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:73
DQMNet::QReports
std::vector< QValue > QReports
Definition: DQMNet.h:84
DQMImplNet::removePeer
void removePeer(Peer *p, lat::Socket *s) override
Definition: DQMNet.h:501
DQMNet::version_
lat::Time version_
Definition: DQMNet.h:358
DQMNet::shutdown
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1038
DQMNet::HashOp
Definition: DQMNet.h:184
DQMNet::Object
Definition: DQMNet.h:101
DQMImplNet::getPeer
Peer * getPeer(lat::Socket *s) override
Definition: DQMNet.h:480
DQMImplNet::sendObjectListToPeer
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:518
DQMImplNet::createPeer
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:486
DQMNet::dqmhash
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:194
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
DQMNet::DQM_PROP_REPORT_ALARM
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:52
DQMNet::DQM_MSG_UPDATE_ME
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
DQMNet::DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:60
pos
Definition: PixelAliasList.h:18
DQMNet::releaseWaiters
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:147
DQMNet::WaitList
std::list< WaitObject > WaitList
Definition: DQMNet.h:86
DQMNet::waitForData
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:118
DQMNet::wakeup_
lat::Pipe wakeup_
Definition: DQMNet.h:357
DQMNet::DQM_PROP_TYPE_TH2F
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:37
DQMNet::~DQMNet
virtual ~DQMNet()
Definition: DQMNet.cc:931
DQMBasicNet::DQMBasicNet
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1203
DQMNet::unlock
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1064
DQMNet::AutoPeer::host
std::string host
Definition: DQMNet.h:141
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:152
mps_check.msg
tuple msg
Definition: mps_check.py:285
DQMNet::Peer::updated
bool updated
Definition: DQMNet.h:132
DQMNet::sel_
lat::IOSelector sel_
Definition: DQMNet.h:355
DQMNet::WaitObject
Definition: DQMNet.h:114
DQMNet::flush_
bool flush_
Definition: DQMNet.h:370
DQMNet::waitStale_
lat::TimeSpan waitStale_
Definition: DQMNet.h:368
MonitorElementData::QReport::QValue
Definition: MonitorElementCollection.h:57
DQMNet::makeObject
virtual Object * makeObject(Peer *p, const std::string &name)=0
DQMImplNet::ImplPeer::dirs
DirMap dirs
Definition: DQMNet.h:389
DQMNet::DQM_PROP_TAGGED
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:55
DQMNet::DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:41
DQMNet::DQM_PROP_DEAD
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:62
DQMNet::DQM_PROP_TYPE_INVALID
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:30
dqmdumpme.first
first
Definition: dqmdumpme.py:55
python.cmstools.all
def all(container)
workaround iterator generators for ROOT classes
Definition: cmstools.py:26
DQMNet::shouldStop
virtual bool shouldStop()
Definition: DQMNet.cc:368
DQMNet::onPeerData
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:663
DQMNet::HashEqual
Definition: DQMNet.h:188
end
#define end
Definition: vmac.h:39
EcalTangentSkim_cfg.o
o
Definition: EcalTangentSkim_cfg.py:36
DQMNet::DQM_PROP_TYPE_TH1D
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:36
DQMNet::CoreObject::flags
uint32_t flags
Definition: DQMNet.h:89
DQMNet::packQualityData
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:158
DQMNet::AutoPeer
Definition: DQMNet.h:138
query.host
host
Definition: query.py:115
DQMImplNet::findObject
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
Definition: DQMNet.h:397
DQMNet::DQM_PROP_TYPE_SCALAR
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:29
DQMNet::Object::qdata
std::string qdata
Definition: DQMNet.h:106
DQMNet::AutoPeer::update
bool update
Definition: DQMNet.h:143
alignCSCRings.s
s
Definition: alignCSCRings.py:92
DQMBasicNet
Definition: DQMNet.h:591
DQMNet::sendObjectListToPeers
virtual void sendObjectListToPeers(bool all)=0
DQMImplNet::ImplPeer
Definition: DQMNet.h:386
DQMNet::onPeerConnect
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:811
DQMNet::DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:48
DQMNet::DQMNet
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:906
DQMNet::shutdown_
sig_atomic_t shutdown_
Definition: DQMNet.h:365
DQMNet::debug
void debug(bool doit)
Definition: DQMNet.cc:937
DQMNet
Definition: DQMNet.h:26
DQMNet::CoreObject::streamId
uint32_t streamId
Definition: DQMNet.h:94
DQMNet::CoreObject::lumi
uint32_t lumi
Definition: DQMNet.h:93
DQMNet::Bucket::data
DataBlob data
Definition: DQMNet.h:111
DQMNet::CoreObject::tag
uint32_t tag
Definition: DQMNet.h:90
w
const double w
Definition: UKUtility.cc:23
DQMNet::findObject
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
MonitorElementCollection.h
DQMNet::Bucket
Definition: DQMNet.h:109
DQMNet::downstream_
AutoPeer downstream_
Definition: DQMNet.h:361
DQMNet::DQM_PROP_REPORT_WARN
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:50
DQMNet::DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:40
DQMImplNet::PeerMap
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:385
DQMNet::DQM_PROP_EFFICIENCY_PLOT
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:64
DQMNet::DQM_PROP_NEW
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:59
DQMNet::DQM_PROP_TYPE_MASK
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:28
DQMNet::upstream_
AutoPeer upstream_
Definition: DQMNet.h:360
DQMNet::appname_
std::string appname_
Definition: DQMNet.h:352
DQMNet::Peer::automatic
AutoPeer * automatic
Definition: DQMNet.h:135
dqmdumpme.k
k
Definition: dqmdumpme.py:60
DQMNet::Peer::socket
lat::Socket * socket
Definition: DQMNet.h:124
DQMNet::Object::lastreq
uint64_t lastreq
Definition: DQMNet.h:103
DQMNet::CoreObject::objname
std::string objname
Definition: DQMNet.h:97
b
double b
Definition: hdecay.h:118
DQMNet::CoreObject::version
uint64_t version
Definition: DQMNet.h:91
DQMNet::DQM_PROP_TYPE_TH1F
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:34
DQMNet::DQM_PROP_TYPE_REAL
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:32
DQMNet::Peer::updates
size_t updates
Definition: DQMNet.h:133
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
DQMNet::WaitObject::time
lat::Time time
Definition: DQMNet.h:115
DQMNet::discard
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
DQMNet::HashEqual::operator()
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:189
DQMNet::DQM_REPLY_LIST_BEGIN
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:72
DQMBasicNet::removeLocalExcept
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1233
DQMNet::DataBlob
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:83
DQMImplNet::ObjectMap
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:384
DQMImplNet::peers_
PeerMap peers_
Definition: DQMNet.h:588
a
double a
Definition: hdecay.h:119
DQMNet::WaitObject::name
std::string name
Definition: DQMNet.h:116
dqmhashfinal
#define dqmhashfinal(a, b, c)
clear
void clear(HadCaloObj &c)
Definition: data.h:124
DQMNet::DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:49
runTheMatrix.err
err
Definition: runTheMatrix.py:288
DQMNet::Peer::mask
unsigned mask
Definition: DQMNet.h:129
DQMImplNet::makeObject
Object * makeObject(Peer *p, const std::string &name) override
Definition: DQMNet.h:434
DQMNet::sendObjectListToPeer
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
DQMNet::DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:51
DQMNet::DQM_PROP_TYPE_DATABLOB
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:45
DQMImplNet
Definition: DQMNet.h:379
DQMNet::markObjectsDead
virtual void markObjectsDead(Peer *p)=0
DQMNet::Peer
Definition: DQMNet.h:122
DQMNet::updateMask
void updateMask(Peer *p)
Definition: DQMNet.cc:880
DQMNet::removePeer
virtual void removePeer(Peer *p, lat::Socket *s)=0
leef::Error
edm::ErrorSummaryEntry Error
Definition: LogErrorEventFilter.cc:29
DQMNet::CoreObject
Definition: DQMNet.h:88
DQMNet::CoreObject::qreports
QReports qreports
Definition: DQMNet.h:98
DQMNet::losePeer
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:74
DQMImplNet::purgeDeadObjects
void purgeDeadObjects(Peer *p) override
Definition: DQMNet.h:469
DQMNet::Peer::sendpos
size_t sendpos
Definition: DQMNet.h:127
DQMNet::unpackQualityData
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:173
DQMNet::Object::scalar
std::string scalar
Definition: DQMNet.h:105
DQMNet::requestObjectData
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:100
DQMNet::DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:33
DQMNet::DQM_PROP_TYPE_INT
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:31
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
DQMNet::listenToCollector
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1025
DQMNet::WaitObject::info
std::string info
Definition: DQMNet.h:117
DQMBasicNet::reserveLocalSpace
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1208
DQMImplNet< DQMNet::Object >::DirMap
std::set< std::string > DirMap
Definition: DQMNet.h:383
DQMNet::DQM_PROP_TYPE_TH1S
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:35
DQMNet::start
void start()
Definition: DQMNet.cc:1072
DQMNet::Object::hash
uint64_t hash
Definition: DQMNet.h:102
PixelMapPlotter.reason
reason
Definition: PixelMapPlotter.py:509
DQMNet::delay_
int delay_
Definition: DQMNet.h:367
DQMImplNet::ImplPeer::ImplPeer
ImplPeer()=default
DQMNet::waitMax_
lat::TimeSpan waitMax_
Definition: DQMNet.h:369
DQMNet::DQM_PROP_MARKTODELETE
static const uint32_t DQM_PROP_MARKTODELETE
Definition: DQMNet.h:65
DQMNet::DQM_PROP_TYPE_TPROF
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:43
DQMImplNet::ImplPeer::objs
ObjectMap objs
Definition: DQMNet.h:388
DQMNet::DQM_PROP_HAS_REFERENCE
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:54
DQMImplNet::DQMImplNet
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:392
query.port
port
Definition: query.py:116
pickleFileParser.slash
slash
Definition: pickleFileParser.py:12
DQMNet::lock
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1058
DQMNet::debug_
bool debug_
Definition: DQMNet.h:339
DQMNet::createPeer
virtual Peer * createPeer(lat::Socket *s)=0
DQMNet::DQM_PROP_REPORT_MASK
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:47
DQMNet::lock_
pthread_mutex_t lock_
Definition: DQMNet.h:340
DQMBasicNet::local_
ImplPeer * local_
Definition: DQMNet.h:600
DQMNet::Peer::update
bool update
Definition: DQMNet.h:131
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
DQMNet::Peer::incoming
DataBlob incoming
Definition: DQMNet.h:125
DQMNet::Peer::sendq
Bucket * sendq
Definition: DQMNet.h:126
DQMNet::Bucket::next
Bucket * next
Definition: DQMNet.h:110
DQMNet::DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
DQMNet::CoreObject::run
uint32_t run
Definition: DQMNet.h:92
DQMImplNet::sendObjectListToPeers
void sendObjectListToPeers(bool all) override
Definition: DQMNet.h:553
DQMNet::CoreObject::moduleId
uint32_t moduleId
Definition: DQMNet.h:95
DQMNet::server_
lat::Socket * server_
Definition: DQMNet.h:356
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
DQMNet::AutoPeer::next
lat::Time next
Definition: DQMNet.h:140
RecoSummaryTask_cfi.Time
Time
Definition: RecoSummaryTask_cfi.py:34
DQMNet::DQM_PROP_TYPE_TH2D
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:39
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
DQMNet::DQM_MSG_GET_OBJECT
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:70
DQMNet::DQM_MSG_HELLO
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:67
DQMNet::DQM_PROP_LUMI
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:61
DQMNet::TagList
std::vector< uint32_t > TagList
Definition: DQMNet.h:85
pi
const Double_t pi
Definition: trackSplitPlot.h:36
DQMNet::Peer::peeraddr
std::string peeraddr
Definition: DQMNet.h:123
DQMNet::Peer::source
bool source
Definition: DQMNet.h:130
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
DQMNet::AutoPeer::peer
Peer * peer
Definition: DQMNet.h:139
DQMNet::CoreObject::dirname
std::string dirname
Definition: DQMNet.h:96
DQMImplNet::~DQMImplNet
~DQMImplNet() override=default
DQMNet::onLocalNotify
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:856
DQMNet::Peer::waiting
size_t waiting
Definition: DQMNet.h:134
DQMNet::communicate_
pthread_t communicate_
Definition: DQMNet.h:364
DQMNet::waiting_
WaitList waiting_
Definition: DQMNet.h:362
DQMNet::delay
void delay(int delay)
Definition: DQMNet.cc:941
DQMNet::DQM_PROP_ACCUMULATE
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:56
ntuplemaker.time
time
Definition: ntuplemaker.py:310
event
Definition: event.py:1
DQMNet::logme
std::ostream & logme()
Definition: DQMNet.cc:50
crabWrapper.key
key
Definition: crabWrapper.py:19
DQMNet::AutoPeer::port
int port
Definition: DQMNet.h:142
HLT_2018_cff.flags
flags
Definition: HLT_2018_cff.py:11758
DQMNet::sendLocalChanges
void sendLocalChanges()
Definition: DQMNet.cc:1195
DQMNet::updateToCollector
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1010
DQMNet::copydata
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
DQMNet::HashOp::operator()
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:185
DQMNet::run
void run()
Definition: DQMNet.cc:1083
DQMNet::WaitObject::peer
Peer * peer
Definition: DQMNet.h:118
DQMNet::MAX_PEER_WAITREQS
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:77
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
DQMNet::startLocalServer
void startLocalServer(int port)
Definition: DQMNet.cc:952
DQMNet::DQM_PROP_TYPE_TH3D
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:42
DQMNet::purgeDeadObjects
virtual void purgeDeadObjects(Peer *p)=0
DQMNet::DQM_PROP_TYPE_TPROF2D
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:44