CMS 3D CMS Logo

DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 #define DQMSERVICES_CORE_DQM_NET_H
3 
4 #include "classlib/iobase/Socket.h"
5 #include "classlib/iobase/IOSelector.h"
6 #include "classlib/iobase/Pipe.h"
7 #include "classlib/utils/Signal.h"
8 #include "classlib/utils/Error.h"
9 #include "classlib/utils/Time.h"
10 #include <pthread.h>
11 #include <cstdint>
12 #include <csignal>
13 #include <iostream>
14 #include <vector>
15 #include <string>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <ext/hash_set>
20 
21 // for definition of QValue
23 
24 //class DQMStore;
25 
26 class DQMNet {
27 public:
28  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
29  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
30  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
31  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
32  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
33  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
34  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
35  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
36  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
37  static const uint32_t DQM_PROP_TYPE_TH1I = 0x00000013;
38  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
39  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
40  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
41  static const uint32_t DQM_PROP_TYPE_TH2I = 0x00000023;
42  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
43  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
44  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
45  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
46  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
47  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
48 
49  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
50  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
51  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
52  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
53  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
55 
56  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
57  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
58  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
59  static const uint32_t DQM_PROP_RESET = 0x00008000;
60 
61  static const uint32_t DQM_PROP_NEW = 0x00010000;
62  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
63  static const uint32_t DQM_PROP_LUMI = 0x00040000;
64  static const uint32_t DQM_PROP_DEAD = 0x00080000;
65  static const uint32_t DQM_PROP_STALE = 0x00100000;
66  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
67  static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
68 
69  static const uint32_t DQM_MSG_HELLO = 0;
70  static const uint32_t DQM_MSG_UPDATE_ME = 1;
71  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
72  static const uint32_t DQM_MSG_GET_OBJECT = 3;
73 
74  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
75  static const uint32_t DQM_REPLY_LIST_END = 102;
76  static const uint32_t DQM_REPLY_NONE = 103;
77  static const uint32_t DQM_REPLY_OBJECT = 104;
78 
79  static const uint32_t MAX_PEER_WAITREQS = 128;
80 
81  struct Peer;
82  struct WaitObject;
83 
85  using DataBlob = std::vector<unsigned char>;
86  using QReports = std::vector<QValue>;
87  using TagList = std::vector<uint32_t>; // DEPRECATED
88  using WaitList = std::list<WaitObject>;
89 
90  struct CoreObject {
91  uint32_t flags;
92  uint32_t tag;
94  uint32_t run;
95  uint32_t lumi;
96  uint32_t streamId;
97  uint32_t moduleId;
101  };
102 
103  struct Object : CoreObject {
109  };
110 
111  struct Bucket {
114  };
115 
116  struct WaitObject {
121  };
122 
123  struct AutoPeer;
124  struct Peer {
126  lat::Socket *socket;
129  size_t sendpos;
130 
131  unsigned mask;
132  bool source;
133  bool update;
134  bool updated;
135  size_t updates;
136  size_t waiting;
138  };
139 
140  struct AutoPeer {
144  int port;
145  bool update;
146  };
147 
148  DQMNet(const std::string &appname = "");
149  virtual ~DQMNet();
150 
151  void debug(bool doit);
152  void delay(int delay);
153  void startLocalServer(int port);
154  void startLocalServer(const char *path);
155  void staleObjectWaitLimit(lat::TimeSpan time);
156  void updateToCollector(const std::string &host, int port);
157  void listenToCollector(const std::string &host, int port);
158  void shutdown();
159  void lock();
160  void unlock();
161 
162  void start();
163  void run();
164 
165  void sendLocalChanges();
166 
167  static bool setOrder(const CoreObject &a, const CoreObject &b) {
168  if (a.run == b.run) {
169  if (a.lumi == b.lumi) {
170  if (a.streamId == b.streamId) {
171  if (a.moduleId == b.moduleId) {
172  if (a.dirname == b.dirname) {
173  return a.objname < b.objname;
174  }
175  return a.dirname < b.dirname;
176  }
177  return a.moduleId < b.moduleId;
178  }
179  return a.streamId < b.streamId;
180  }
181  return a.lumi < b.lumi;
182  }
183  return a.run < b.run;
184  }
185 
186  struct HashOp {
187  uint32_t operator()(const Object &a) const { return a.hash; }
188  };
189 
190  struct HashEqual {
191  bool operator()(const Object &a, const Object &b) const {
192  return a.hash == b.hash && a.dirname == b.dirname && a.objname == b.objname;
193  }
194  };
195 
196  static size_t dqmhash(const void *key, size_t keylen) {
197  // Reduced version of Bob Jenkins' hash function at:
198  // http://www.burtleburtle.net/bob/c/lookup3.c
199 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
200 #define dqmhashmix(a, b, c) \
201  { \
202  a -= c; \
203  a ^= dqmhashrot(c, 4); \
204  c += b; \
205  b -= a; \
206  b ^= dqmhashrot(a, 6); \
207  a += c; \
208  c -= b; \
209  c ^= dqmhashrot(b, 8); \
210  b += a; \
211  a -= c; \
212  a ^= dqmhashrot(c, 16); \
213  c += b; \
214  b -= a; \
215  b ^= dqmhashrot(a, 19); \
216  a += c; \
217  c -= b; \
218  c ^= dqmhashrot(b, 4); \
219  b += a; \
220  }
221 #define dqmhashfinal(a, b, c) \
222  { \
223  c ^= b; \
224  c -= dqmhashrot(b, 14); \
225  a ^= c; \
226  a -= dqmhashrot(c, 11); \
227  b ^= a; \
228  b -= dqmhashrot(a, 25); \
229  c ^= b; \
230  c -= dqmhashrot(b, 16); \
231  a ^= c; \
232  a -= dqmhashrot(c, 4); \
233  b ^= a; \
234  b -= dqmhashrot(a, 14); \
235  c ^= b; \
236  c -= dqmhashrot(b, 24); \
237  }
238 
239  uint32_t a, b, c;
240  a = b = c = 0xdeadbeef + (uint32_t)keylen;
241  const auto *k = (const unsigned char *)key;
242 
243  // all but the last block: affect some bits of (a, b, c)
244  while (keylen > 12) {
245  a += k[0];
246  a += ((uint32_t)k[1]) << 8;
247  a += ((uint32_t)k[2]) << 16;
248  a += ((uint32_t)k[3]) << 24;
249  b += k[4];
250  b += ((uint32_t)k[5]) << 8;
251  b += ((uint32_t)k[6]) << 16;
252  b += ((uint32_t)k[7]) << 24;
253  c += k[8];
254  c += ((uint32_t)k[9]) << 8;
255  c += ((uint32_t)k[10]) << 16;
256  c += ((uint32_t)k[11]) << 24;
257  dqmhashmix(a, b, c);
258  keylen -= 12;
259  k += 12;
260  }
261 
262  // last block: affect all 32 bits of (c); all case statements fall through
263  switch (keylen) {
264  case 12:
265  c += ((uint32_t)k[11]) << 24;
266  [[fallthrough]];
267  case 11:
268  c += ((uint32_t)k[10]) << 16;
269  [[fallthrough]];
270  case 10:
271  c += ((uint32_t)k[9]) << 8;
272  [[fallthrough]];
273  case 9:
274  c += k[8];
275  [[fallthrough]];
276  case 8:
277  b += ((uint32_t)k[7]) << 24;
278  [[fallthrough]];
279  case 7:
280  b += ((uint32_t)k[6]) << 16;
281  [[fallthrough]];
282  case 6:
283  b += ((uint32_t)k[5]) << 8;
284  [[fallthrough]];
285  case 5:
286  b += k[4];
287  [[fallthrough]];
288  case 4:
289  a += ((uint32_t)k[3]) << 24;
290  [[fallthrough]];
291  case 3:
292  a += ((uint32_t)k[2]) << 16;
293  [[fallthrough]];
294  case 2:
295  a += ((uint32_t)k[1]) << 8;
296  [[fallthrough]];
297  case 1:
298  a += k[0];
299  break;
300  case 0:
301  return c;
302  }
303 
304  dqmhashfinal(a, b, c);
305  return c;
306 #undef dqmhashrot
307 #undef dqmhashmix
308 #undef dqmhashfinal
309  }
310 
311  static void packQualityData(std::string &into, const QReports &qr);
312  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
313 
314 protected:
315  std::ostream &logme();
316  static void copydata(Bucket *b, const void *data, size_t len);
317  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
318 
319  virtual bool shouldStop();
320  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
321  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
322  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
323 
324  // bool reconstructObject(Object &o);
325  // bool reinstateObject(DQMStore *store, Object &o);
326  virtual Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
327  virtual Object *makeObject(Peer *p, const std::string &name) = 0;
328  virtual void markObjectsDead(Peer *p) = 0;
329  virtual void purgeDeadObjects(Peer *p) = 0;
330 
331  virtual Peer *getPeer(lat::Socket *s) = 0;
332  virtual Peer *createPeer(lat::Socket *s) = 0;
333  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
334  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
335  virtual void sendObjectListToPeers(bool all) = 0;
336 
337  void updateMask(Peer *p);
338  virtual void updatePeerMasks() = 0;
339  static void discard(Bucket *&b);
340 
341  bool debug_;
342  pthread_mutex_t lock_;
343 
344 private:
345  void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err = nullptr);
346  void requestObjectData(Peer *p, const char *name, size_t len);
347  void releaseFromWait(WaitList::iterator i, Object *o);
348  void releaseWaiters(const std::string &name, Object *o);
349 
350  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
351  bool onPeerConnect(lat::IOSelectEvent *ev);
352  bool onLocalNotify(lat::IOSelectEvent *ev);
353 
355  int pid_;
356 
357  lat::IOSelector sel_;
358  lat::Socket *server_;
359  lat::Pipe wakeup_;
361 
365 
366  pthread_t communicate_;
367  sig_atomic_t shutdown_;
368 
369  int delay_;
370  lat::TimeSpan waitStale_;
371  lat::TimeSpan waitMax_;
372  bool flush_;
373 
374 public:
375  // copying is not available
376  DQMNet(const DQMNet &) = delete;
377  DQMNet &operator=(const DQMNet &) = delete;
378 };
379 
380 template <class ObjType>
381 class DQMImplNet : public DQMNet {
382 public:
383  struct ImplPeer;
384 
385  using DirMap = std::set<std::string>;
386  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
387  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
388  struct ImplPeer : Peer {
389  ImplPeer() = default;
392  };
393 
394  DQMImplNet(const std::string &appname = "") : DQMNet(appname) {}
395 
396  ~DQMImplNet() override = default;
397 
398 protected:
399  Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override {
400  size_t slash = name.rfind('/');
401  size_t dirpos = (slash == std::string::npos ? 0 : slash);
402  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
403  std::string path(name, 0, dirpos);
404  ObjType proto;
405  proto.hash = dqmhash(name.c_str(), name.size());
406  proto.dirname = path;
407  proto.objname.append(name, namepos, std::string::npos);
408 
409  typename ObjectMap::iterator pos;
410  typename PeerMap::iterator i, e;
411  if (owner)
412  *owner = nullptr;
413  if (p) {
414  auto *ip = static_cast<ImplPeer *>(p);
415  pos = ip->objs.find(proto);
416  if (pos == ip->objs.end())
417  return nullptr;
418  else {
419  if (owner)
420  *owner = ip;
421  return const_cast<ObjType *>(&*pos);
422  }
423  } else {
424  for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
425  pos = i->second.objs.find(proto);
426  if (pos != i->second.objs.end()) {
427  if (owner)
428  *owner = &i->second;
429  return const_cast<ObjType *>(&*pos);
430  }
431  }
432  return nullptr;
433  }
434  }
435 
436  Object *makeObject(Peer *p, const std::string &name) override {
437  auto *ip = static_cast<ImplPeer *>(p);
438  size_t slash = name.rfind('/');
439  size_t dirpos = (slash == std::string::npos ? 0 : slash);
440  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
441  ObjType o;
442  o.flags = 0;
443  o.tag = 0;
444  o.version = 0;
445  o.lastreq = 0;
446  o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).first;
447  o.objname.append(name, namepos, std::string::npos);
448  o.hash = dqmhash(name.c_str(), name.size());
449  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
450  }
451 
452  // Mark all the objects dead. This is intended to be used when
453  // starting to process a complete list of objects, in order to
454  // flag the objects that need to be killed at the end. After
455  // call to this method, revive all live objects by removing the
456  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
457  // to remove the dead ones. This also turns off object request
458  // for objects we've lost interest in.
459  void markObjectsDead(Peer *p) override {
460  uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
461  auto *ip = static_cast<ImplPeer *>(p);
462  typename ObjectMap::iterator i, e;
463  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i) {
464  if (i->lastreq && i->lastreq < minreq)
465  const_cast<ObjType &>(*i).lastreq = 0;
466  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
467  }
468  }
469 
470  // Mark remaining zombie objects as dead. See markObjectsDead().
471  void purgeDeadObjects(Peer *p) override {
472  auto *ip = static_cast<ImplPeer *>(p);
473  typename ObjectMap::iterator i, e;
474  for (i = ip->objs.begin(), e = ip->objs.end(); i != e;) {
475  if (i->flags & DQM_PROP_DEAD)
476  ip->objs.erase(i++);
477  else
478  ++i;
479  }
480  }
481 
482  Peer *getPeer(lat::Socket *s) override {
483  auto pos = peers_.find(s);
484  auto end = peers_.end();
485  return pos == end ? nullptr : &pos->second;
486  }
487 
488  Peer *createPeer(lat::Socket *s) override {
489  ImplPeer *ip = &peers_[s];
490  ip->socket = nullptr;
491  ip->sendq = nullptr;
492  ip->sendpos = 0;
493  ip->mask = 0;
494  ip->source = false;
495  ip->update = false;
496  ip->updated = false;
497  ip->updates = 0;
498  ip->waiting = 0;
499  ip->automatic = nullptr;
500  return ip;
501  }
502 
503  void removePeer(Peer *p, lat::Socket *s) override {
504  auto *ip = static_cast<ImplPeer *>(p);
505  bool needflush = !ip->objs.empty();
506 
507  typename ObjectMap::iterator i, e;
508  for (i = ip->objs.begin(), e = ip->objs.end(); i != e;)
509  ip->objs.erase(i++);
510 
511  peers_.erase(s);
512 
513  // If we removed a peer with objects, our list of objects
514  // has changed and we need to update downstream peers.
515  if (needflush)
517  }
518 
520  void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override {
521  typename PeerMap::iterator pi, pe;
522  typename ObjectMap::iterator oi, oe;
523  size_t size = 0;
524  size_t numobjs = 0;
525  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
526  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
527  if (all || (oi->flags & DQM_PROP_NEW))
528  size += 9 * sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
529  oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
530 
531  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
532 
533  uint32_t nupdates = 0;
534  uint32_t words[4];
535  words[0] = sizeof(words);
536  words[1] = DQM_REPLY_LIST_BEGIN;
537  words[2] = numobjs;
538  words[3] = all;
539  copydata(msg, &words[0], sizeof(words));
540 
541  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
542  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
543  if (all || (oi->flags & DQM_PROP_NEW)) {
544  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
545  if (clear)
546  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
547  ++nupdates;
548  }
549 
550  words[1] = DQM_REPLY_LIST_END;
551  words[2] = nupdates;
552  copydata(msg, &words[0], sizeof(words));
553  }
554 
555  void sendObjectListToPeers(bool all) override {
556  typename PeerMap::iterator i, e;
557  typename ObjectMap::iterator oi, oe;
558  for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
559  ImplPeer &p = i->second;
560  if (!p.update)
561  continue;
562 
563  if (debug_)
564  logme() << "DEBUG: notifying " << p.peeraddr << std::endl;
565 
566  Bucket msg;
567  msg.next = nullptr;
568  sendObjectListToPeer(&msg, !p.updated || all, true);
569 
570  if (!msg.data.empty()) {
571  Bucket **prev = &p.sendq;
572  while (*prev)
573  prev = &(*prev)->next;
574 
575  *prev = new Bucket;
576  (*prev)->next = nullptr;
577  (*prev)->data.swap(msg.data);
578  }
579  p.updated = true;
580  }
581  }
582 
583  void updatePeerMasks() override {
584  typename PeerMap::iterator i, e;
585  for (i = peers_.begin(), e = peers_.end(); i != e;)
586  updateMask(&(i++)->second);
587  }
588 
589 protected:
591 };
592 
593 class DQMBasicNet : public DQMImplNet<DQMNet::Object> {
594 public:
595  DQMBasicNet(const std::string &appname = "");
596 
597  void reserveLocalSpace(uint32_t size);
598  void updateLocalObject(Object &o);
599  bool removeLocalExcept(const std::set<std::string> &known);
600 
601 private:
602  ImplPeer *local_;
603 };
604 
605 #endif // DQMSERVICES_CORE_DQM_NET_H
size
Write out results.
lat::Time time
Definition: DQMNet.h:117
edm::ErrorSummaryEntry Error
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:47
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:173
AutoPeer downstream_
Definition: DQMNet.h:363
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:914
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:52
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:70
DataBlob incoming
Definition: DQMNet.h:127
std::list< WaitObject > WaitList
Definition: DQMNet.h:88
uint32_t moduleId
Definition: DQMNet.h:97
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:35
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:387
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:864
QReports qreports
Definition: DQMNet.h:100
pthread_t communicate_
Definition: DQMNet.h:366
void sendObjectListToPeers(bool all) override
Definition: DQMNet.h:555
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:45
string host
Definition: query.py:115
def all(container)
workaround iterator generators for ROOT classes
Definition: cmstools.py:25
uint64_t version
Definition: DQMNet.h:93
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:132
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1066
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:75
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:40
std::ostream & logme()
Definition: DQMNet.cc:50
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:49
T w() const
lat::Time version_
Definition: DQMNet.h:360
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:26
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:369
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:29
pthread_mutex_t lock_
Definition: DQMNet.h:342
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:949
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:91
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:147
std::string peeraddr
Definition: DQMNet.h:125
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:57
lat::Socket * socket
Definition: DQMNet.h:126
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:66
~DQMImplNet() override=default
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:955
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:42
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:59
uint32_t tag
Definition: DQMNet.h:92
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:64
AutoPeer * automatic
Definition: DQMNet.h:137
AutoPeer upstream_
Definition: DQMNet.h:362
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:34
int port
Definition: query.py:116
void removePeer(Peer *p, lat::Socket *s) override
Definition: DQMNet.h:503
static const uint32_t DQM_PROP_MARKTODELETE
Definition: DQMNet.h:67
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1046
int pid_
Definition: DQMNet.h:355
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:394
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:69
uint32_t run
Definition: DQMNet.h:94
std::string qdata
Definition: DQMNet.h:108
void debug(bool doit)
Definition: DQMNet.cc:945
bool updated
Definition: DQMNet.h:134
const Double_t pi
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:58
Peer * getPeer(lat::Socket *s) override
Definition: DQMNet.h:482
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:56
std::set< std::string > DirMap
Definition: DQMNet.h:385
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:118
virtual bool shouldStop()
Definition: DQMNet.cc:376
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:100
uint64_t lastreq
Definition: DQMNet.h:105
std::string appname_
Definition: DQMNet.h:354
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:31
std::string name
Definition: DQMNet.h:118
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:441
DataBlob data
Definition: DQMNet.h:113
void start()
Definition: DQMNet.cc:1080
uint32_t lumi
Definition: DQMNet.h:95
Peer * peer
Definition: DQMNet.h:141
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:671
void sendLocalChanges()
Definition: DQMNet.cc:1203
void markObjectsDead(Peer *p) override
Definition: DQMNet.h:459
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:196
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:51
std::string dirname
Definition: DQMNet.h:98
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:53
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:36
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:380
virtual void markObjectsDead(Peer *p)=0
sig_atomic_t shutdown_
Definition: DQMNet.h:367
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:488
std::string objname
Definition: DQMNet.h:99
static const uint32_t DQM_PROP_TYPE_TH1I
Definition: DQMNet.h:37
bool update
Definition: DQMNet.h:133
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:79
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:191
std::string scalar
Definition: DQMNet.h:107
lat::Time next
Definition: DQMNet.h:142
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:77
virtual Object * makeObject(Peer *p, const std::string &name)=0
unsigned mask
Definition: DQMNet.h:131
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:398
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:72
ObjectMap objs
Definition: DQMNet.h:390
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1241
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:819
size_t updates
Definition: DQMNet.h:135
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:43
std::string host
Definition: DQMNet.h:143
double b
Definition: hdecay.h:118
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:54
void run()
Definition: DQMNet.cc:1091
void startLocalServer(int port)
Definition: DQMNet.cc:960
uint64_t hash
Definition: DQMNet.h:104
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:158
tuple msg
Definition: mps_check.py:285
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
Definition: DQMNet.h:583
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:65
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:74
std::string info
Definition: DQMNet.h:119
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:74
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:520
lat::Pipe wakeup_
Definition: DQMNet.h:359
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:341
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
size_t waiting
Definition: DQMNet.h:136
#define dqmhashfinal(a, b, c)
void clear(HadCaloObj &c)
Definition: data.h:124
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:167
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:76
double a
Definition: hdecay.h:119
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1072
virtual ~DQMNet()
Definition: DQMNet.cc:939
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:39
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
Definition: DQMNet.h:436
size_t sendpos
Definition: DQMNet.h:129
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1033
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:71
std::vector< uint32_t > TagList
Definition: DQMNet.h:87
void updateMask(Peer *p)
Definition: DQMNet.cc:888
bool flush_
Definition: DQMNet.h:372
lat::Socket * server_
Definition: DQMNet.h:358
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:33
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:44
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:386
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
Definition: DQMNet.h:399
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1220
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:61
PeerMap peers_
Definition: DQMNet.h:590
lat::TimeSpan waitMax_
Definition: DQMNet.h:371
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:28
Bucket * next
Definition: DQMNet.h:112
Bucket * sendq
Definition: DQMNet.h:128
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:187
std::vector< QValue > QReports
Definition: DQMNet.h:86
lat::TimeSpan waitStale_
Definition: DQMNet.h:370
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1216
virtual void purgeDeadObjects(Peer *p)=0
uint32_t streamId
Definition: DQMNet.h:96
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:50
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:63
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1018
WaitList waiting_
Definition: DQMNet.h:364
static const uint32_t DQM_PROP_TYPE_TH2I
Definition: DQMNet.h:41
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:85
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:62
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:32
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:30
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1211
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:46
Definition: event.py:1
DataBlob rawdata
Definition: DQMNet.h:106
void purgeDeadObjects(Peer *p) override
Definition: DQMNet.h:471
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:38
ImplPeer * local_
Definition: DQMNet.h:602
lat::IOSelector sel_
Definition: DQMNet.h:357