CMS 3D CMS Logo

DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 #define DQMSERVICES_CORE_DQM_NET_H
3 
4 #include "classlib/iobase/Socket.h"
5 #include "classlib/iobase/IOSelector.h"
6 #include "classlib/iobase/Pipe.h"
7 #include "classlib/utils/Error.h"
8 #include "classlib/utils/Time.h"
9 #include <pthread.h>
10 #include <cstdint>
11 #include <csignal>
12 #include <iostream>
13 #include <vector>
14 #include <string>
15 #include <list>
16 #include <map>
17 #include <set>
18 #include <unordered_set>
19 
20 // for definition of QValue
22 
23 //class DQMStore;
24 
25 class DQMNet {
26 public:
27  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
28  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
29  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
30  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
31  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
32  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
33  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
34  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
35  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
36  static const uint32_t DQM_PROP_TYPE_TH1I = 0x00000013;
37  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
38  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
39  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
40  static const uint32_t DQM_PROP_TYPE_TH2I = 0x00000023;
41  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
42  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
43  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
44  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
45  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
46  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
47 
48  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
49  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
50  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
51  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
52  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
54 
55  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
56  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
57  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
58  static const uint32_t DQM_PROP_RESET = 0x00008000;
59 
60  static const uint32_t DQM_PROP_NEW = 0x00010000;
61  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
62  static const uint32_t DQM_PROP_LUMI = 0x00040000;
63  static const uint32_t DQM_PROP_DEAD = 0x00080000;
64  static const uint32_t DQM_PROP_STALE = 0x00100000;
65  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
66  static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
67 
68  static const uint32_t DQM_MSG_HELLO = 0;
69  static const uint32_t DQM_MSG_UPDATE_ME = 1;
70  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
71  static const uint32_t DQM_MSG_GET_OBJECT = 3;
72 
73  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
74  static const uint32_t DQM_REPLY_LIST_END = 102;
75  static const uint32_t DQM_REPLY_NONE = 103;
76  static const uint32_t DQM_REPLY_OBJECT = 104;
77 
78  static const uint32_t MAX_PEER_WAITREQS = 128;
79 
80  struct Peer;
81  struct WaitObject;
82 
84  using DataBlob = std::vector<unsigned char>;
85  using QReports = std::vector<QValue>;
86  using TagList = std::vector<uint32_t>; // DEPRECATED
87  using WaitList = std::list<WaitObject>;
88 
89  struct CoreObject {
90  uint32_t flags;
91  uint32_t tag;
93  uint32_t run;
94  uint32_t lumi;
95  uint32_t streamId;
96  uint32_t moduleId;
100  };
101 
102  struct Object : CoreObject {
108  };
109 
110  struct Bucket {
113  };
114 
115  struct WaitObject {
120  };
121 
122  struct AutoPeer;
123  struct Peer {
125  lat::Socket *socket;
128  size_t sendpos;
129 
130  unsigned mask;
131  bool source;
132  bool update;
133  bool updated;
134  size_t updates;
135  size_t waiting;
137  };
138 
139  struct AutoPeer {
143  int port;
144  bool update;
145  };
146 
147  DQMNet(const std::string &appname = "");
148  virtual ~DQMNet();
149 
150  void debug(bool doit);
151  void delay(int delay);
152  void startLocalServer(int port);
153  void startLocalServer(const char *path);
154  void staleObjectWaitLimit(lat::TimeSpan time);
155  void updateToCollector(const std::string &host, int port);
156  void listenToCollector(const std::string &host, int port);
157  void shutdown();
158  void lock();
159  void unlock();
160 
161  void start();
162  void run();
163 
164  void sendLocalChanges();
165 
166  static bool setOrder(const CoreObject &a, const CoreObject &b) {
167  if (a.run == b.run) {
168  if (a.lumi == b.lumi) {
169  if (a.streamId == b.streamId) {
170  if (a.moduleId == b.moduleId) {
171  if (a.dirname == b.dirname) {
172  return a.objname < b.objname;
173  }
174  return a.dirname < b.dirname;
175  }
176  return a.moduleId < b.moduleId;
177  }
178  return a.streamId < b.streamId;
179  }
180  return a.lumi < b.lumi;
181  }
182  return a.run < b.run;
183  }
184 
185  struct HashOp {
186  uint32_t operator()(const Object &a) const { return a.hash; }
187  };
188 
189  struct HashEqual {
190  bool operator()(const Object &a, const Object &b) const {
191  return a.hash == b.hash && a.dirname == b.dirname && a.objname == b.objname;
192  }
193  };
194 
195  static size_t dqmhash(const void *key, size_t keylen) {
196  // Reduced version of Bob Jenkins' hash function at:
197  // http://www.burtleburtle.net/bob/c/lookup3.c
198 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
199 #define dqmhashmix(a, b, c) \
200  { \
201  a -= c; \
202  a ^= dqmhashrot(c, 4); \
203  c += b; \
204  b -= a; \
205  b ^= dqmhashrot(a, 6); \
206  a += c; \
207  c -= b; \
208  c ^= dqmhashrot(b, 8); \
209  b += a; \
210  a -= c; \
211  a ^= dqmhashrot(c, 16); \
212  c += b; \
213  b -= a; \
214  b ^= dqmhashrot(a, 19); \
215  a += c; \
216  c -= b; \
217  c ^= dqmhashrot(b, 4); \
218  b += a; \
219  }
220 #define dqmhashfinal(a, b, c) \
221  { \
222  c ^= b; \
223  c -= dqmhashrot(b, 14); \
224  a ^= c; \
225  a -= dqmhashrot(c, 11); \
226  b ^= a; \
227  b -= dqmhashrot(a, 25); \
228  c ^= b; \
229  c -= dqmhashrot(b, 16); \
230  a ^= c; \
231  a -= dqmhashrot(c, 4); \
232  b ^= a; \
233  b -= dqmhashrot(a, 14); \
234  c ^= b; \
235  c -= dqmhashrot(b, 24); \
236  }
237 
238  uint32_t a, b, c;
239  a = b = c = 0xdeadbeef + (uint32_t)keylen;
240  const auto *k = (const unsigned char *)key;
241 
242  // all but the last block: affect some bits of (a, b, c)
243  while (keylen > 12) {
244  a += k[0];
245  a += ((uint32_t)k[1]) << 8;
246  a += ((uint32_t)k[2]) << 16;
247  a += ((uint32_t)k[3]) << 24;
248  b += k[4];
249  b += ((uint32_t)k[5]) << 8;
250  b += ((uint32_t)k[6]) << 16;
251  b += ((uint32_t)k[7]) << 24;
252  c += k[8];
253  c += ((uint32_t)k[9]) << 8;
254  c += ((uint32_t)k[10]) << 16;
255  c += ((uint32_t)k[11]) << 24;
256  dqmhashmix(a, b, c);
257  keylen -= 12;
258  k += 12;
259  }
260 
261  // last block: affect all 32 bits of (c); all case statements fall through
262  switch (keylen) {
263  case 12:
264  c += ((uint32_t)k[11]) << 24;
265  [[fallthrough]];
266  case 11:
267  c += ((uint32_t)k[10]) << 16;
268  [[fallthrough]];
269  case 10:
270  c += ((uint32_t)k[9]) << 8;
271  [[fallthrough]];
272  case 9:
273  c += k[8];
274  [[fallthrough]];
275  case 8:
276  b += ((uint32_t)k[7]) << 24;
277  [[fallthrough]];
278  case 7:
279  b += ((uint32_t)k[6]) << 16;
280  [[fallthrough]];
281  case 6:
282  b += ((uint32_t)k[5]) << 8;
283  [[fallthrough]];
284  case 5:
285  b += k[4];
286  [[fallthrough]];
287  case 4:
288  a += ((uint32_t)k[3]) << 24;
289  [[fallthrough]];
290  case 3:
291  a += ((uint32_t)k[2]) << 16;
292  [[fallthrough]];
293  case 2:
294  a += ((uint32_t)k[1]) << 8;
295  [[fallthrough]];
296  case 1:
297  a += k[0];
298  break;
299  case 0:
300  return c;
301  }
302 
303  dqmhashfinal(a, b, c);
304  return c;
305 #undef dqmhashrot
306 #undef dqmhashmix
307 #undef dqmhashfinal
308  }
309 
310  static void packQualityData(std::string &into, const QReports &qr);
311  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
312 
313 protected:
314  std::ostream &logme();
315  static void copydata(Bucket *b, const void *data, size_t len);
316  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
317 
318  virtual bool shouldStop();
319  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
320  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
321  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
322 
323  // bool reconstructObject(Object &o);
324  // bool reinstateObject(DQMStore *store, Object &o);
325  virtual Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
326  virtual Object *makeObject(Peer *p, const std::string &name) = 0;
327  virtual void markObjectsDead(Peer *p) = 0;
328  virtual void purgeDeadObjects(Peer *p) = 0;
329 
330  virtual Peer *getPeer(lat::Socket *s) = 0;
331  virtual Peer *createPeer(lat::Socket *s) = 0;
332  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
333  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
334  virtual void sendObjectListToPeers(bool all) = 0;
335 
336  void updateMask(Peer *p);
337  virtual void updatePeerMasks() = 0;
338  static void discard(Bucket *&b);
339 
340  bool debug_;
341  pthread_mutex_t lock_;
342 
343 private:
344  void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err = nullptr);
345  void requestObjectData(Peer *p, const char *name, size_t len);
346  void releaseFromWait(WaitList::iterator i, Object *o);
347  void releaseWaiters(const std::string &name, Object *o);
348 
349  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
350  bool onPeerConnect(lat::IOSelectEvent *ev);
351  bool onLocalNotify(lat::IOSelectEvent *ev);
352 
354  int pid_;
355 
356  lat::IOSelector sel_;
357  lat::Socket *server_;
358  lat::Pipe wakeup_;
360 
364 
365  pthread_t communicate_;
366  sig_atomic_t shutdown_;
367 
368  int delay_;
369  lat::TimeSpan waitStale_;
370  lat::TimeSpan waitMax_;
371  bool flush_;
372 
373 public:
374  // copying is not available
375  DQMNet(const DQMNet &) = delete;
376  DQMNet &operator=(const DQMNet &) = delete;
377 };
378 
379 template <class ObjType>
380 class DQMImplNet : public DQMNet {
381 public:
382  struct ImplPeer;
383 
384  using DirMap = std::set<std::string>;
385  typedef std::unordered_set<ObjType, HashOp, HashEqual> ObjectMap;
386  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
387  struct ImplPeer : Peer {
388  ImplPeer() = default;
391  };
392 
393  DQMImplNet(const std::string &appname = "") : DQMNet(appname) {}
394 
395  ~DQMImplNet() override = default;
396 
397 protected:
398  Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override {
399  size_t slash = name.rfind('/');
400  size_t dirpos = (slash == std::string::npos ? 0 : slash);
401  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
402  std::string path(name, 0, dirpos);
403  ObjType proto;
404  proto.hash = dqmhash(name.c_str(), name.size());
405  proto.dirname = path;
406  proto.objname.append(name, namepos, std::string::npos);
407 
408  typename ObjectMap::iterator pos;
409  typename PeerMap::iterator i, e;
410  if (owner)
411  *owner = nullptr;
412  if (p) {
413  auto *ip = static_cast<ImplPeer *>(p);
414  pos = ip->objs.find(proto);
415  if (pos == ip->objs.end())
416  return nullptr;
417  else {
418  if (owner)
419  *owner = ip;
420  return const_cast<ObjType *>(&*pos);
421  }
422  } else {
423  for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
424  pos = i->second.objs.find(proto);
425  if (pos != i->second.objs.end()) {
426  if (owner)
427  *owner = &i->second;
428  return const_cast<ObjType *>(&*pos);
429  }
430  }
431  return nullptr;
432  }
433  }
434 
435  Object *makeObject(Peer *p, const std::string &name) override {
436  auto *ip = static_cast<ImplPeer *>(p);
437  size_t slash = name.rfind('/');
438  size_t dirpos = (slash == std::string::npos ? 0 : slash);
439  size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
440  ObjType o;
441  o.flags = 0;
442  o.tag = 0;
443  o.version = 0;
444  o.lastreq = 0;
445  o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).first;
446  o.objname.append(name, namepos, std::string::npos);
447  o.hash = dqmhash(name.c_str(), name.size());
448  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
449  }
450 
451  // Mark all the objects dead. This is intended to be used when
452  // starting to process a complete list of objects, in order to
453  // flag the objects that need to be killed at the end. After
454  // call to this method, revive all live objects by removing the
455  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
456  // to remove the dead ones. This also turns off object request
457  // for objects we've lost interest in.
458  void markObjectsDead(Peer *p) override {
459  uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
460  auto *ip = static_cast<ImplPeer *>(p);
461  typename ObjectMap::iterator i, e;
462  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i) {
463  if (i->lastreq && i->lastreq < minreq)
464  const_cast<ObjType &>(*i).lastreq = 0;
465  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
466  }
467  }
468 
469  // Mark remaining zombie objects as dead. See markObjectsDead().
470  void purgeDeadObjects(Peer *p) override {
471  auto *ip = static_cast<ImplPeer *>(p);
472  typename ObjectMap::iterator i, e;
473  for (i = ip->objs.begin(), e = ip->objs.end(); i != e;) {
474  if (i->flags & DQM_PROP_DEAD)
475  ip->objs.erase(i++);
476  else
477  ++i;
478  }
479  }
480 
481  Peer *getPeer(lat::Socket *s) override {
482  auto pos = peers_.find(s);
483  auto end = peers_.end();
484  return pos == end ? nullptr : &pos->second;
485  }
486 
487  Peer *createPeer(lat::Socket *s) override {
488  ImplPeer *ip = &peers_[s];
489  ip->socket = nullptr;
490  ip->sendq = nullptr;
491  ip->sendpos = 0;
492  ip->mask = 0;
493  ip->source = false;
494  ip->update = false;
495  ip->updated = false;
496  ip->updates = 0;
497  ip->waiting = 0;
498  ip->automatic = nullptr;
499  return ip;
500  }
501 
502  void removePeer(Peer *p, lat::Socket *s) override {
503  auto *ip = static_cast<ImplPeer *>(p);
504  bool needflush = !ip->objs.empty();
505 
506  typename ObjectMap::iterator i, e;
507  for (i = ip->objs.begin(), e = ip->objs.end(); i != e;)
508  ip->objs.erase(i++);
509 
510  peers_.erase(s);
511 
512  // If we removed a peer with objects, our list of objects
513  // has changed and we need to update downstream peers.
514  if (needflush)
516  }
517 
519  void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override {
520  typename PeerMap::iterator pi, pe;
521  typename ObjectMap::iterator oi, oe;
522  size_t size = 0;
523  size_t numobjs = 0;
524  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
525  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
526  if (all || (oi->flags & DQM_PROP_NEW))
527  size += 9 * sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
528  oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
529 
530  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
531 
532  uint32_t nupdates = 0;
533  uint32_t words[4];
534  words[0] = sizeof(words);
535  words[1] = DQM_REPLY_LIST_BEGIN;
536  words[2] = numobjs;
537  words[3] = all;
538  copydata(msg, &words[0], sizeof(words));
539 
540  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
541  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
542  if (all || (oi->flags & DQM_PROP_NEW)) {
543  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
544  if (clear)
545  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
546  ++nupdates;
547  }
548 
549  words[1] = DQM_REPLY_LIST_END;
550  words[2] = nupdates;
551  copydata(msg, &words[0], sizeof(words));
552  }
553 
554  void sendObjectListToPeers(bool all) override {
555  typename PeerMap::iterator i, e;
556  typename ObjectMap::iterator oi, oe;
557  for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
558  ImplPeer &p = i->second;
559  if (!p.update)
560  continue;
561 
562  if (debug_)
563  logme() << "DEBUG: notifying " << p.peeraddr << std::endl;
564 
565  Bucket msg;
566  msg.next = nullptr;
567  sendObjectListToPeer(&msg, !p.updated || all, true);
568 
569  if (!msg.data.empty()) {
570  Bucket **prev = &p.sendq;
571  while (*prev)
572  prev = &(*prev)->next;
573 
574  *prev = new Bucket;
575  (*prev)->next = nullptr;
576  (*prev)->data.swap(msg.data);
577  }
578  p.updated = true;
579  }
580  }
581 
582  void updatePeerMasks() override {
583  typename PeerMap::iterator i, e;
584  for (i = peers_.begin(), e = peers_.end(); i != e;)
585  updateMask(&(i++)->second);
586  }
587 
588 protected:
590 };
591 
592 class DQMBasicNet : public DQMImplNet<DQMNet::Object> {
593 public:
594  DQMBasicNet(const std::string &appname = "");
595 
596  void reserveLocalSpace(uint32_t size);
597  void updateLocalObject(Object &o);
598  bool removeLocalExcept(const std::set<std::string> &known);
599 
600 private:
601  ImplPeer *local_;
602 };
603 
604 #endif // DQMSERVICES_CORE_DQM_NET_H
size
Write out results.
lat::Time time
Definition: DQMNet.h:116
edm::ErrorSummaryEntry Error
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:46
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:169
AutoPeer downstream_
Definition: DQMNet.h:362
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:910
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:51
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:69
DataBlob incoming
Definition: DQMNet.h:126
std::list< WaitObject > WaitList
Definition: DQMNet.h:87
uint32_t moduleId
Definition: DQMNet.h:96
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:34
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:386
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:860
QReports qreports
Definition: DQMNet.h:99
pthread_t communicate_
Definition: DQMNet.h:365
void sendObjectListToPeers(bool all) override
Definition: DQMNet.h:554
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:44
string host
Definition: query.py:115
def all(container)
workaround iterator generators for ROOT classes
Definition: cmstools.py:25
uint64_t version
Definition: DQMNet.h:92
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:131
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1062
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:74
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:39
std::ostream & logme()
Definition: DQMNet.cc:46
virtual Peer * getPeer(lat::Socket *s)=0
std::unordered_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:385
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:48
T w() const
lat::Time version_
Definition: DQMNet.h:359
static void discard(Bucket *&b)
Definition: DQMNet.cc:58
Definition: DQMNet.h:25
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:368
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:28
pthread_mutex_t lock_
Definition: DQMNet.h:341
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:945
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:90
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:143
std::string peeraddr
Definition: DQMNet.h:124
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:56
lat::Socket * socket
Definition: DQMNet.h:125
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:65
~DQMImplNet() override=default
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:951
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:41
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:58
uint32_t tag
Definition: DQMNet.h:91
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:63
AutoPeer * automatic
Definition: DQMNet.h:136
AutoPeer upstream_
Definition: DQMNet.h:361
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:33
int port
Definition: query.py:116
void removePeer(Peer *p, lat::Socket *s) override
Definition: DQMNet.h:502
static const uint32_t DQM_PROP_MARKTODELETE
Definition: DQMNet.h:66
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1042
int pid_
Definition: DQMNet.h:354
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:393
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:68
uint32_t run
Definition: DQMNet.h:93
std::string qdata
Definition: DQMNet.h:107
void debug(bool doit)
Definition: DQMNet.cc:941
bool updated
Definition: DQMNet.h:133
const Double_t pi
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:57
Peer * getPeer(lat::Socket *s) override
Definition: DQMNet.h:481
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:55
std::set< std::string > DirMap
Definition: DQMNet.h:384
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:114
virtual bool shouldStop()
Definition: DQMNet.cc:372
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:96
uint64_t lastreq
Definition: DQMNet.h:104
std::string appname_
Definition: DQMNet.h:353
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:30
std::string name
Definition: DQMNet.h:117
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:437
DataBlob data
Definition: DQMNet.h:112
void start()
Definition: DQMNet.cc:1076
uint32_t lumi
Definition: DQMNet.h:94
Peer * peer
Definition: DQMNet.h:140
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:667
void sendLocalChanges()
Definition: DQMNet.cc:1199
key
prepare the HTCondor submission files and eventually submit them
void markObjectsDead(Peer *p) override
Definition: DQMNet.h:458
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:195
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:50
std::string dirname
Definition: DQMNet.h:97
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:52
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:35
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:376
virtual void markObjectsDead(Peer *p)=0
sig_atomic_t shutdown_
Definition: DQMNet.h:366
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:487
std::string objname
Definition: DQMNet.h:98
static const uint32_t DQM_PROP_TYPE_TH1I
Definition: DQMNet.h:36
bool update
Definition: DQMNet.h:132
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:78
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:190
std::string scalar
Definition: DQMNet.h:106
lat::Time next
Definition: DQMNet.h:141
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:76
virtual Object * makeObject(Peer *p, const std::string &name)=0
unsigned mask
Definition: DQMNet.h:130
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:394
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:71
ObjectMap objs
Definition: DQMNet.h:389
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1237
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:815
size_t updates
Definition: DQMNet.h:134
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:42
std::string host
Definition: DQMNet.h:142
double b
Definition: hdecay.h:120
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:53
void run()
Definition: DQMNet.cc:1087
void startLocalServer(int port)
Definition: DQMNet.cc:956
uint64_t hash
Definition: DQMNet.h:103
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:154
tuple msg
Definition: mps_check.py:286
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
Definition: DQMNet.h:582
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:64
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:73
std::string info
Definition: DQMNet.h:118
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:70
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:519
lat::Pipe wakeup_
Definition: DQMNet.h:358
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:340
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
size_t waiting
Definition: DQMNet.h:135
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:166
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:75
double a
Definition: hdecay.h:121
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1068
virtual ~DQMNet()
Definition: DQMNet.cc:935
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:38
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
Definition: DQMNet.h:435
size_t sendpos
Definition: DQMNet.h:128
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1029
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:70
std::vector< uint32_t > TagList
Definition: DQMNet.h:86
void updateMask(Peer *p)
Definition: DQMNet.cc:884
bool flush_
Definition: DQMNet.h:371
lat::Socket * server_
Definition: DQMNet.h:357
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:32
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:43
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
Definition: DQMNet.h:398
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1216
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:60
void clear(EGIsoObj &c)
Definition: egamma.h:82
PeerMap peers_
Definition: DQMNet.h:589
lat::TimeSpan waitMax_
Definition: DQMNet.h:370
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:27
Bucket * next
Definition: DQMNet.h:111
Bucket * sendq
Definition: DQMNet.h:127
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:186
std::vector< QValue > QReports
Definition: DQMNet.h:85
lat::TimeSpan waitStale_
Definition: DQMNet.h:369
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1212
virtual void purgeDeadObjects(Peer *p)=0
uint32_t streamId
Definition: DQMNet.h:95
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:49
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:62
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1014
WaitList waiting_
Definition: DQMNet.h:363
static const uint32_t DQM_PROP_TYPE_TH2I
Definition: DQMNet.h:40
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:84
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:61
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:31
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:29
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1207
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:45
Definition: event.py:1
DataBlob rawdata
Definition: DQMNet.h:105
void purgeDeadObjects(Peer *p) override
Definition: DQMNet.h:470
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:37
ImplPeer * local_
Definition: DQMNet.h:601
lat::IOSelector sel_
Definition: DQMNet.h:356