CMS 3D CMS Logo

DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
3 
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
10 # include <pthread.h>
11 # include <cstdint>
12 # include <csignal>
13 # include <iostream>
14 # include <vector>
15 # include <string>
16 # include <list>
17 # include <map>
18 # include <set>
19 # include <ext/hash_set>
20 
21 //class DQMStore;
22 
23 class DQMNet
24 {
25 public:
26  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
27  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
28  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
29  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
30  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
31  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
32  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
33  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
34  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
35  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
36  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
37  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
38  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
39  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
40  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
41  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
42  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
43  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
44 
45  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
46  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
47  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
48  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
49  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
50  static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR
51  | DQM_PROP_REPORT_WARN
53 
54  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
55  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
56  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
57  static const uint32_t DQM_PROP_RESET = 0x00008000;
58 
59  static const uint32_t DQM_PROP_NEW = 0x00010000;
60  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
61  static const uint32_t DQM_PROP_LUMI = 0x00040000;
62  static const uint32_t DQM_PROP_DEAD = 0x00080000;
63  static const uint32_t DQM_PROP_STALE = 0x00100000;
64  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
65  static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
66 
67  static const uint32_t DQM_MSG_HELLO = 0;
68  static const uint32_t DQM_MSG_UPDATE_ME = 1;
69  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
70  static const uint32_t DQM_MSG_GET_OBJECT = 3;
71 
72  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
73  static const uint32_t DQM_REPLY_LIST_END = 102;
74  static const uint32_t DQM_REPLY_NONE = 103;
75  static const uint32_t DQM_REPLY_OBJECT = 104;
76 
77  static const uint32_t MAX_PEER_WAITREQS = 128;
78 
79  struct Peer;
80  struct QValue;
81  struct WaitObject;
82 
83  using DataBlob = std::vector<unsigned char>;
84  using QReports = std::vector<QValue>;
85  using TagList = std::vector<uint32_t>; // DEPRECATED
86  using WaitList = std::list<WaitObject>;
87 
88  struct QValue
89  {
90  int code;
91  float qtresult;
95  };
96 
97  struct CoreObject
98  {
99  uint32_t flags;
100  uint32_t tag;
102  uint32_t run;
103  uint32_t lumi;
104  uint32_t streamId;
105  uint32_t moduleId;
109  };
110 
112  {
118  };
119 
120  struct Bucket
121  {
124  };
125 
126  struct WaitObject
127  {
128  lat::Time time;
132  };
133 
134  struct AutoPeer;
135  struct Peer
136  {
138  lat::Socket *socket;
141  size_t sendpos;
142 
143  unsigned mask;
144  bool source;
145  bool update;
146  bool updated;
147  size_t updates;
148  size_t waiting;
150  };
151 
152  struct AutoPeer
153  {
155  lat::Time next;
157  int port;
158  bool update;
159  };
160 
161  DQMNet(const std::string &appname = "");
162  virtual ~DQMNet();
163 
164  void debug(bool doit);
165  void delay(int delay);
166  void startLocalServer(int port);
167  void startLocalServer(const char *path);
168  void staleObjectWaitLimit(lat::TimeSpan time);
169  void updateToCollector(const std::string &host, int port);
170  void listenToCollector(const std::string &host, int port);
171  void shutdown();
172  void lock();
173  void unlock();
174 
175  void start();
176  void run();
177 
178  void sendLocalChanges();
179 
180  static bool setOrder(const CoreObject &a, const CoreObject &b)
181  {
182  if (a.run == b.run) {
183  if (a.lumi == b.lumi) {
184  if (a.streamId == b.streamId) {
185  if (a.moduleId == b.moduleId) {
186  if (*a.dirname == *b.dirname) {
187  return a.objname < b.objname;
188  }
189  return *a.dirname < *b.dirname;
190  }
191  return a.moduleId < b.moduleId;
192  }
193  return a.streamId < b.streamId;
194  }
195  return a.lumi < b.lumi;
196  }
197  return a.run < b.run;
198  }
199 
200  struct HashOp
201  {
202  uint32_t operator()(const Object &a) const
203  {
204  return a.hash;
205  }
206  };
207 
208  struct HashEqual
209  {
210  bool operator()(const Object &a, const Object &b) const
211  {
212  return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
213  }
214  };
215 
216  static size_t
217  dqmhash(const void *key, size_t keylen)
218  {
219  // Reduced version of Bob Jenkins' hash function at:
220  // http://www.burtleburtle.net/bob/c/lookup3.c
221 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
222 # define dqmhashmix(a,b,c) { \
223  a -= c; a ^= dqmhashrot(c, 4); c += b; \
224  b -= a; b ^= dqmhashrot(a, 6); a += c; \
225  c -= b; c ^= dqmhashrot(b, 8); b += a; \
226  a -= c; a ^= dqmhashrot(c,16); c += b; \
227  b -= a; b ^= dqmhashrot(a,19); a += c; \
228  c -= b; c ^= dqmhashrot(b, 4); b += a; }
229 # define dqmhashfinal(a,b,c) { \
230  c ^= b; c -= dqmhashrot(b,14); \
231  a ^= c; a -= dqmhashrot(c,11); \
232  b ^= a; b -= dqmhashrot(a,25); \
233  c ^= b; c -= dqmhashrot(b,16); \
234  a ^= c; a -= dqmhashrot(c,4); \
235  b ^= a; b -= dqmhashrot(a,14); \
236  c ^= b; c -= dqmhashrot(b,24); }
237 
238  uint32_t a, b, c;
239  a = b = c = 0xdeadbeef + (uint32_t) keylen;
240  const auto *k = (const unsigned char *) key;
241 
242  // all but the last block: affect some bits of (a, b, c)
243  while (keylen > 12)
244  {
245  a += k[0];
246  a += ((uint32_t)k[1]) << 8;
247  a += ((uint32_t)k[2]) << 16;
248  a += ((uint32_t)k[3]) << 24;
249  b += k[4];
250  b += ((uint32_t)k[5]) << 8;
251  b += ((uint32_t)k[6]) << 16;
252  b += ((uint32_t)k[7]) << 24;
253  c += k[8];
254  c += ((uint32_t)k[9]) << 8;
255  c += ((uint32_t)k[10]) << 16;
256  c += ((uint32_t)k[11]) << 24;
257  dqmhashmix(a,b,c);
258  keylen -= 12;
259  k += 12;
260  }
261 
262  // last block: affect all 32 bits of (c); all case statements fall through
263  switch (keylen)
264  {
265  case 12: c += ((uint32_t)k[11]) << 24;
266  case 11: c += ((uint32_t)k[10]) << 16;
267  case 10: c += ((uint32_t)k[9]) << 8;
268  case 9 : c += k[8];
269  case 8 : b += ((uint32_t)k[7]) << 24;
270  case 7 : b += ((uint32_t)k[6]) << 16;
271  case 6 : b += ((uint32_t)k[5]) << 8;
272  case 5 : b += k[4];
273  case 4 : a += ((uint32_t)k[3]) << 24;
274  case 3 : a += ((uint32_t)k[2]) << 16;
275  case 2 : a += ((uint32_t)k[1]) << 8;
276  case 1 : a += k[0];
277  break;
278  case 0 : return c;
279  }
280 
281  dqmhashfinal(a, b, c);
282  return c;
283 # undef dqmhashrot
284 # undef dqmhashmix
285 # undef dqmhashfinal
286  }
287 
288  static void packQualityData(std::string &into, const QReports &qr);
289  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
290 
291 protected:
292  std::ostream & logme();
293  static void copydata(Bucket *b, const void *data, size_t len);
294  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
295 
296  virtual bool shouldStop();
297  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
298  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
299  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
300 
301  // bool reconstructObject(Object &o);
302  // bool reinstateObject(DQMStore *store, Object &o);
303  virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
304  virtual Object * makeObject(Peer *p, const std::string &name) = 0;
305  virtual void markObjectsDead(Peer *p) = 0;
306  virtual void purgeDeadObjects(Peer *p) = 0;
307 
308  virtual Peer * getPeer(lat::Socket *s) = 0;
309  virtual Peer * createPeer(lat::Socket *s) = 0;
310  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
311  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
312  virtual void sendObjectListToPeers(bool all) = 0;
313 
314  void updateMask(Peer *p);
315  virtual void updatePeerMasks() = 0;
316  static void discard(Bucket *&b);
317 
318  bool debug_;
319  pthread_mutex_t lock_;
320 
321 private:
322  void losePeer(const char *reason,
323  Peer *peer,
324  lat::IOSelectEvent *event,
325  lat::Error *err = nullptr);
326  void requestObjectData(Peer *p, const char *name, size_t len);
327  void releaseFromWait(WaitList::iterator i, Object *o);
328  void releaseWaiters(const std::string &name, Object *o);
329 
330  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
331  bool onPeerConnect(lat::IOSelectEvent *ev);
332  bool onLocalNotify(lat::IOSelectEvent *ev);
333 
335  int pid_;
336 
337  lat::IOSelector sel_;
338  lat::Socket *server_;
339  lat::Pipe wakeup_;
340  lat::Time version_;
341 
345 
346  pthread_t communicate_;
347  sig_atomic_t shutdown_;
348 
349  int delay_;
350  lat::TimeSpan waitStale_;
351  lat::TimeSpan waitMax_;
352  bool flush_;
353 
354 public:
355  // copying is not available
356  DQMNet(const DQMNet &) = delete;
357  DQMNet &operator=(const DQMNet &) = delete;
358 };
359 
360 template <class ObjType>
361 class DQMImplNet : public DQMNet
362 {
363 public:
364  struct ImplPeer;
365 
366  using DirMap = std::set<std::string>;
367  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
368  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
369  struct ImplPeer : Peer
370  {
371  ImplPeer() = default;
372  ObjectMap objs;
374  };
375 
376  DQMImplNet(const std::string &appname = "")
377  : DQMNet(appname)
378  {}
379 
380  ~DQMImplNet() override
381  = default;
382 
383 protected:
384  Object *
385  findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override
386  {
387  size_t slash = name.rfind('/');
388  size_t dirpos = (slash == std::string::npos ? 0 : slash);
389  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
390  std::string path(name, 0, dirpos);
391  ObjType proto;
392  proto.hash = dqmhash(name.c_str(), name.size());
393  proto.dirname = &path;
394  proto.objname.append(name, namepos, std::string::npos);
395 
396  typename ObjectMap::iterator pos;
397  typename PeerMap::iterator i, e;
398  if (owner)
399  *owner = nullptr;
400  if (p)
401  {
402  auto *ip = static_cast<ImplPeer *>(p);
403  pos = ip->objs.find(proto);
404  if (pos == ip->objs.end())
405  return nullptr;
406  else
407  {
408  if (owner) *owner = ip;
409  return const_cast<ObjType *>(&*pos);
410  }
411  }
412  else
413  {
414  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
415  {
416  pos = i->second.objs.find(proto);
417  if (pos != i->second.objs.end())
418  {
419  if (owner) *owner = &i->second;
420  return const_cast<ObjType *>(&*pos);
421  }
422  }
423  return nullptr;
424  }
425  }
426 
427  Object *
428  makeObject(Peer *p, const std::string &name) override
429  {
430  auto *ip = static_cast<ImplPeer *>(p);
431  size_t slash = name.rfind('/');
432  size_t dirpos = (slash == std::string::npos ? 0 : slash);
433  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
434  ObjType o;
435  o.flags = 0;
436  o.tag = 0;
437  o.version = 0;
438  o.lastreq = 0;
439  o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
440  o.objname.append(name, namepos, std::string::npos);
441  o.hash = dqmhash(name.c_str(), name.size());
442  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
443  }
444 
445  // Mark all the objects dead. This is intended to be used when
446  // starting to process a complete list of objects, in order to
447  // flag the objects that need to be killed at the end. After
448  // call to this method, revive all live objects by removing the
449  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
450  // to remove the dead ones. This also turns off object request
451  // for objects we've lost interest in.
452  void
453  markObjectsDead(Peer *p) override
454  {
455  uint64_t minreq
456  = (lat::Time::current()
457  - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
458  auto *ip = static_cast<ImplPeer *>(p);
459  typename ObjectMap::iterator i, e;
460  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
461  {
462  if (i->lastreq && i->lastreq < minreq)
463  const_cast<ObjType &>(*i).lastreq = 0;
464  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
465  }
466  }
467 
468  // Mark remaining zombie objects as dead. See markObjectsDead().
469  void
471  {
472  auto *ip = static_cast<ImplPeer *>(p);
473  typename ObjectMap::iterator i, e;
474  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
475  {
476  if (i->flags & DQM_PROP_DEAD)
477  ip->objs.erase(i++);
478  else
479  ++i;
480  }
481  }
482 
483  Peer *
484  getPeer(lat::Socket *s) override
485  {
486  auto pos = peers_.find(s);
487  auto end = peers_.end();
488  return pos == end ? nullptr : &pos->second;
489  }
490 
491  Peer *
492  createPeer(lat::Socket *s) override
493  {
494  ImplPeer *ip = &peers_[s];
495  ip->socket = nullptr;
496  ip->sendq = nullptr;
497  ip->sendpos = 0;
498  ip->mask = 0;
499  ip->source = false;
500  ip->update = false;
501  ip->updated = false;
502  ip->updates = 0;
503  ip->waiting = 0;
504  ip->automatic = nullptr;
505  return ip;
506  }
507 
508  void
509  removePeer(Peer *p, lat::Socket *s) override
510  {
511  auto *ip = static_cast<ImplPeer *>(p);
512  bool needflush = ! ip->objs.empty();
513 
514  typename ObjectMap::iterator i, e;
515  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
516  ip->objs.erase(i++);
517 
518  peers_.erase(s);
519 
520  // If we removed a peer with objects, our list of objects
521  // has changed and we need to update downstream peers.
522  if (needflush)
524  }
525 
527  void
528  sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
529  {
530  typename PeerMap::iterator pi, pe;
531  typename ObjectMap::iterator oi, oe;
532  size_t size = 0;
533  size_t numobjs = 0;
534  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
535  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
536  if (all || (oi->flags & DQM_PROP_NEW))
537  size += 9*sizeof(uint32_t) + oi->dirname->size()
538  + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
539  + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
540 
541  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
542 
543  uint32_t nupdates = 0;
544  uint32_t words [4];
545  words[0] = sizeof(words);
546  words[1] = DQM_REPLY_LIST_BEGIN;
547  words[2] = numobjs;
548  words[3] = all;
549  copydata(msg, &words[0], sizeof(words));
550 
551  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
552  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
553  if (all || (oi->flags & DQM_PROP_NEW))
554  {
555  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
556  if (clear)
557  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
558  ++nupdates;
559  }
560 
561  words[1] = DQM_REPLY_LIST_END;
562  words[2] = nupdates;
563  copydata(msg, &words[0], sizeof(words));
564  }
565 
566  void
567  sendObjectListToPeers(bool all) override
568  {
569  typename PeerMap::iterator i, e;
570  typename ObjectMap::iterator oi, oe;
571  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
572  {
573  ImplPeer &p = i->second;
574  if (! p.update)
575  continue;
576 
577  if (debug_)
578  logme()
579  << "DEBUG: notifying " << p.peeraddr << std::endl;
580 
581  Bucket msg;
582  msg.next = nullptr;
583  sendObjectListToPeer(&msg, !p.updated || all, true);
584 
585  if (! msg.data.empty())
586  {
587  Bucket **prev = &p.sendq;
588  while (*prev)
589  prev = &(*prev)->next;
590 
591  *prev = new Bucket;
592  (*prev)->next = nullptr;
593  (*prev)->data.swap(msg.data);
594  }
595  p.updated = true;
596  }
597  }
598 
599  void
600  updatePeerMasks() override
601  {
602  typename PeerMap::iterator i, e;
603  for (i = peers_.begin(), e = peers_.end(); i != e; )
604  updateMask(&(i++)->second);
605  }
606 
607 protected:
608  PeerMap peers_;
609 };
610 
611 
612 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
613 {
614 public:
615  DQMBasicNet(const std::string &appname = "");
616 
617  void reserveLocalSpace(uint32_t size);
618  void updateLocalObject(Object &o);
619  bool removeLocalExcept(const std::set<std::string> &known);
620 
621 private:
622  ImplPeer *local_;
623 };
624 
625 
626 #endif // DQMSERVICES_CORE_DQM_NET_H
size
Write out results.
lat::Time time
Definition: DQMNet.h:128
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:43
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:343
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1069
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:48
host
Definition: query.py:114
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
DataBlob incoming
Definition: DQMNet.h:139
std::list< WaitObject > WaitList
Definition: DQMNet.h:86
uint32_t moduleId
Definition: DQMNet.h:105
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:33
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:368
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1008
QReports qreports
Definition: DQMNet.h:108
pthread_t communicate_
Definition: DQMNet.h:346
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:210
void sendObjectListToPeers(bool all) override
Definition: DQMNet.h:567
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:41
std::string algorithm
Definition: DQMNet.h:94
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:101
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:144
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1262
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:73
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:37
std::ostream & logme()
Definition: DQMNet.cc:42
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:45
lat::Time version_
Definition: DQMNet.h:340
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:23
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:349
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:27
pthread_mutex_t lock_
Definition: DQMNet.h:319
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:1111
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:99
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::string peeraddr
Definition: DQMNet.h:137
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:55
lat::Socket * socket
Definition: DQMNet.h:138
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:64
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1121
bool ev
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:38
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:57
uint32_t tag
Definition: DQMNet.h:100
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:62
const std::string * dirname
Definition: DQMNet.h:106
AutoPeer * automatic
Definition: DQMNet.h:149
AutoPeer upstream_
Definition: DQMNet.h:342
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:32
void removePeer(Peer *p, lat::Socket *s) override
Definition: DQMNet.h:509
static const uint32_t DQM_PROP_MARKTODELETE
Definition: DQMNet.h:65
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1239
int pid_
Definition: DQMNet.h:335
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:376
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:67
uint32_t run
Definition: DQMNet.h:102
std::string qdata
Definition: DQMNet.h:117
port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1103
bool updated
Definition: DQMNet.h:146
const Double_t pi
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:56
Peer * getPeer(lat::Socket *s) override
Definition: DQMNet.h:484
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:54
std::set< std::string > DirMap
Definition: DQMNet.h:366
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
virtual bool shouldStop()
Definition: DQMNet.cc:389
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:114
std::string appname_
Definition: DQMNet.h:334
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:29
std::string name
Definition: DQMNet.h:129
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:468
DataBlob data
Definition: DQMNet.h:123
void start()
Definition: DQMNet.cc:1280
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:167
uint32_t lumi
Definition: DQMNet.h:103
Peer * peer
Definition: DQMNet.h:154
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:768
void sendLocalChanges()
Definition: DQMNet.cc:1433
void markObjectsDead(Peer *p) override
Definition: DQMNet.h:453
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:217
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:47
#define end
Definition: vmac.h:39
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:49
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:34
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:397
virtual void markObjectsDead(Peer *p)=0
sig_atomic_t shutdown_
Definition: DQMNet.h:347
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:492
std::string objname
Definition: DQMNet.h:107
bool update
Definition: DQMNet.h:145
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:77
std::string qtname
Definition: DQMNet.h:93
int k[5][pyjets_maxn]
std::string scalar
Definition: DQMNet.h:116
lat::Time next
Definition: DQMNet.h:155
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:75
virtual Object * makeObject(Peer *p, const std::string &name)=0
unsigned mask
Definition: DQMNet.h:143
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:418
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:70
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:202
ObjectMap objs
Definition: DQMNet.h:372
unsigned long long uint64_t
Definition: Time.h:15
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:951
size_t updates
Definition: DQMNet.h:147
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:39
std::string host
Definition: DQMNet.h:156
double b
Definition: hdecay.h:120
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:50
void run()
Definition: DQMNet.cc:1295
void startLocalServer(int port)
Definition: DQMNet.cc:1130
uint64_t hash
Definition: DQMNet.h:113
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
tuple msg
Definition: mps_check.py:277
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
Definition: DQMNet.h:600
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:72
std::string info
Definition: DQMNet.h:130
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:77
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:528
lat::Pipe wakeup_
Definition: DQMNet.h:339
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:318
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:148
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:180
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:74
double a
Definition: hdecay.h:121
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1270
virtual ~DQMNet()
Definition: DQMNet.cc:1095
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:36
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
Definition: DQMNet.h:428
std::string message
Definition: DQMNet.h:92
size_t sendpos
Definition: DQMNet.h:141
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1222
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
std::vector< uint32_t > TagList
Definition: DQMNet.h:85
void updateMask(Peer *p)
Definition: DQMNet.cc:1039
bool flush_
Definition: DQMNet.h:352
lat::Socket * server_
Definition: DQMNet.h:338
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:31
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:40
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:367
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
Definition: DQMNet.h:385
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:59
PeerMap peers_
Definition: DQMNet.h:608
lat::TimeSpan waitMax_
Definition: DQMNet.h:351
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:26
Bucket * next
Definition: DQMNet.h:122
Bucket * sendq
Definition: DQMNet.h:140
std::vector< QValue > QReports
Definition: DQMNet.h:84
lat::TimeSpan waitStale_
Definition: DQMNet.h:350
virtual void purgeDeadObjects(Peer *p)=0
uint32_t streamId
Definition: DQMNet.h:104
float qtresult
Definition: DQMNet.h:91
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:46
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:61
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1203
WaitList waiting_
Definition: DQMNet.h:344
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:83
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:60
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:30
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:28
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:42
Definition: event.py:1
DataBlob rawdata
Definition: DQMNet.h:115
void purgeDeadObjects(Peer *p) override
Definition: DQMNet.h:470
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:35
ImplPeer * local_
Definition: DQMNet.h:622
lat::IOSelector sel_
Definition: DQMNet.h:337