CMS 3D CMS Logo

DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
3 
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
10 # include <pthread.h>
11 # include <cstdint>
12 # include <csignal>
13 # include <iostream>
14 # include <vector>
15 # include <string>
16 # include <list>
17 # include <map>
18 # include <set>
19 # include <ext/hash_set>
20 
21 //class DQMStore;
22 
23 class DQMNet
24 {
25 public:
26  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
27  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
28  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
29  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
30  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
31  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
32  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
33  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
34  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
35  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
36  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
37  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
38  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
39  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
40  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
41  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
42  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
43  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
44 
45  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
46  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
47  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
48  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
49  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
50  static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR
51  | DQM_PROP_REPORT_WARN
53 
54  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
55  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
56  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
57  static const uint32_t DQM_PROP_RESET = 0x00008000;
58 
59  static const uint32_t DQM_PROP_NEW = 0x00010000;
60  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
61  static const uint32_t DQM_PROP_LUMI = 0x00040000;
62  static const uint32_t DQM_PROP_DEAD = 0x00080000;
63  static const uint32_t DQM_PROP_STALE = 0x00100000;
64  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
65  static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
66 
67  static const uint32_t DQM_MSG_HELLO = 0;
68  static const uint32_t DQM_MSG_UPDATE_ME = 1;
69  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
70  static const uint32_t DQM_MSG_GET_OBJECT = 3;
71 
72  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
73  static const uint32_t DQM_REPLY_LIST_END = 102;
74  static const uint32_t DQM_REPLY_NONE = 103;
75  static const uint32_t DQM_REPLY_OBJECT = 104;
76 
77  static const uint32_t MAX_PEER_WAITREQS = 128;
78 
79  struct Peer;
80  struct QValue;
81  struct WaitObject;
82 
83  typedef std::vector<unsigned char> DataBlob;
84  typedef std::vector<QValue> QReports;
85  typedef std::vector<uint32_t> TagList; // DEPRECATED
86  typedef std::list<WaitObject> WaitList;
87 
88  struct QValue
89  {
90  int code;
91  float qtresult;
95  };
96 
97  struct CoreObject
98  {
99  uint32_t flags;
100  uint32_t tag;
102  uint32_t run;
103  uint32_t lumi;
104  uint32_t streamId;
105  uint32_t moduleId;
108  QReports qreports;
109  };
110 
112  {
115  DataBlob rawdata;
118  };
119 
120  struct Bucket
121  {
123  DataBlob data;
124  };
125 
126  struct WaitObject
127  {
128  lat::Time time;
132  };
133 
134  struct AutoPeer;
135  struct Peer
136  {
138  lat::Socket *socket;
139  DataBlob incoming;
141  size_t sendpos;
142 
143  unsigned mask;
144  bool source;
145  bool update;
146  bool updated;
147  size_t updates;
148  size_t waiting;
150  };
151 
152  struct AutoPeer
153  {
155  lat::Time next;
157  int port;
158  bool update;
159  };
160 
161  DQMNet(const std::string &appname = "");
162  virtual ~DQMNet(void);
163 
164  void debug(bool doit);
165  void delay(int delay);
166  void startLocalServer(int port);
167  void startLocalServer(const char *path);
168  void staleObjectWaitLimit(lat::TimeSpan time);
169  void updateToCollector(const std::string &host, int port);
170  void listenToCollector(const std::string &host, int port);
171  void shutdown(void);
172  void lock(void);
173  void unlock(void);
174 
175  void start(void);
176  void run(void);
177 
178  void sendLocalChanges(void);
179 
180  static bool setOrder(const CoreObject &a, const CoreObject &b)
181  {
182  if (a.run == b.run) {
183  if (a.lumi == b.lumi) {
184  if (a.streamId == b.streamId) {
185  if (a.moduleId == b.moduleId) {
186  if (*a.dirname == *b.dirname) {
187  return a.objname < b.objname;
188  }
189  return *a.dirname < *b.dirname;
190  }
191  return a.moduleId < b.moduleId;
192  }
193  return a.streamId < b.streamId;
194  }
195  return a.lumi < b.lumi;
196  }
197  return a.run < b.run;
198  }
199 
200  struct HashOp
201  {
202  uint32_t operator()(const Object &a) const
203  {
204  return a.hash;
205  }
206  };
207 
208  struct HashEqual
209  {
210  bool operator()(const Object &a, const Object &b) const
211  {
212  return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
213  }
214  };
215 
216  static size_t
217  dqmhash(const void *key, size_t keylen)
218  {
219  // Reduced version of Bob Jenkins' hash function at:
220  // http://www.burtleburtle.net/bob/c/lookup3.c
221 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
222 # define dqmhashmix(a,b,c) { \
223  a -= c; a ^= dqmhashrot(c, 4); c += b; \
224  b -= a; b ^= dqmhashrot(a, 6); a += c; \
225  c -= b; c ^= dqmhashrot(b, 8); b += a; \
226  a -= c; a ^= dqmhashrot(c,16); c += b; \
227  b -= a; b ^= dqmhashrot(a,19); a += c; \
228  c -= b; c ^= dqmhashrot(b, 4); b += a; }
229 # define dqmhashfinal(a,b,c) { \
230  c ^= b; c -= dqmhashrot(b,14); \
231  a ^= c; a -= dqmhashrot(c,11); \
232  b ^= a; b -= dqmhashrot(a,25); \
233  c ^= b; c -= dqmhashrot(b,16); \
234  a ^= c; a -= dqmhashrot(c,4); \
235  b ^= a; b -= dqmhashrot(a,14); \
236  c ^= b; c -= dqmhashrot(b,24); }
237 
238  uint32_t a, b, c;
239  a = b = c = 0xdeadbeef + (uint32_t) keylen;
240  const unsigned char *k = (const unsigned char *) key;
241 
242  // all but the last block: affect some bits of (a, b, c)
243  while (keylen > 12)
244  {
245  a += k[0];
246  a += ((uint32_t)k[1]) << 8;
247  a += ((uint32_t)k[2]) << 16;
248  a += ((uint32_t)k[3]) << 24;
249  b += k[4];
250  b += ((uint32_t)k[5]) << 8;
251  b += ((uint32_t)k[6]) << 16;
252  b += ((uint32_t)k[7]) << 24;
253  c += k[8];
254  c += ((uint32_t)k[9]) << 8;
255  c += ((uint32_t)k[10]) << 16;
256  c += ((uint32_t)k[11]) << 24;
257  dqmhashmix(a,b,c);
258  keylen -= 12;
259  k += 12;
260  }
261 
262  // last block: affect all 32 bits of (c); all case statements fall through
263  switch (keylen)
264  {
265  case 12: c += ((uint32_t)k[11]) << 24;
266  case 11: c += ((uint32_t)k[10]) << 16;
267  case 10: c += ((uint32_t)k[9]) << 8;
268  case 9 : c += k[8];
269  case 8 : b += ((uint32_t)k[7]) << 24;
270  case 7 : b += ((uint32_t)k[6]) << 16;
271  case 6 : b += ((uint32_t)k[5]) << 8;
272  case 5 : b += k[4];
273  case 4 : a += ((uint32_t)k[3]) << 24;
274  case 3 : a += ((uint32_t)k[2]) << 16;
275  case 2 : a += ((uint32_t)k[1]) << 8;
276  case 1 : a += k[0];
277  break;
278  case 0 : return c;
279  }
280 
281  dqmhashfinal(a, b, c);
282  return c;
283 # undef dqmhashrot
284 # undef dqmhashmix
285 # undef dqmhashfinal
286  }
287 
288  static void packQualityData(std::string &into, const QReports &qr);
289  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
290 
291 protected:
292  std::ostream & logme(void);
293  static void copydata(Bucket *b, const void *data, size_t len);
294  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
295 
296  virtual bool shouldStop(void);
297  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
298  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
299  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
300 
301  // bool reconstructObject(Object &o);
302  // bool reinstateObject(DQMStore *store, Object &o);
303  virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
304  virtual Object * makeObject(Peer *p, const std::string &name) = 0;
305  virtual void markObjectsDead(Peer *p) = 0;
306  virtual void purgeDeadObjects(Peer *p) = 0;
307 
308  virtual Peer * getPeer(lat::Socket *s) = 0;
309  virtual Peer * createPeer(lat::Socket *s) = 0;
310  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
311  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
312  virtual void sendObjectListToPeers(bool all) = 0;
313 
314  void updateMask(Peer *p);
315  virtual void updatePeerMasks(void) = 0;
316  static void discard(Bucket *&b);
317 
318  bool debug_;
319  pthread_mutex_t lock_;
320 
321 private:
322  void losePeer(const char *reason,
323  Peer *peer,
324  lat::IOSelectEvent *event,
325  lat::Error *err = nullptr);
326  void requestObjectData(Peer *p, const char *name, size_t len);
327  void releaseFromWait(WaitList::iterator i, Object *o);
328  void releaseWaiters(const std::string &name, Object *o);
329 
330  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
331  bool onPeerConnect(lat::IOSelectEvent *ev);
332  bool onLocalNotify(lat::IOSelectEvent *ev);
333 
335  int pid_;
336 
337  lat::IOSelector sel_;
338  lat::Socket *server_;
339  lat::Pipe wakeup_;
340  lat::Time version_;
341 
344  WaitList waiting_;
345 
346  pthread_t communicate_;
347  sig_atomic_t shutdown_;
348 
349  int delay_;
350  lat::TimeSpan waitStale_;
351  lat::TimeSpan waitMax_;
352  bool flush_;
353 
354  // copying is not available
355  DQMNet(const DQMNet &) = delete;
356  DQMNet &operator=(const DQMNet &) = delete;
357 };
358 
359 template <class ObjType>
360 class DQMImplNet : public DQMNet
361 {
362 public:
363  struct ImplPeer;
364 
365  typedef std::set<std::string> DirMap;
366  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
367  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
368  struct ImplPeer : Peer
369  {
370  ImplPeer(void) {}
371  ObjectMap objs;
372  DirMap dirs;
373  };
374 
375  DQMImplNet(const std::string &appname = "")
376  : DQMNet(appname)
377  {}
378 
379  ~DQMImplNet(void) override
380  {}
381 
382 protected:
383  Object *
384  findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override
385  {
386  size_t slash = name.rfind('/');
387  size_t dirpos = (slash == std::string::npos ? 0 : slash);
388  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
389  std::string path(name, 0, dirpos);
390  ObjType proto;
391  proto.hash = dqmhash(name.c_str(), name.size());
392  proto.dirname = &path;
393  proto.objname.append(name, namepos, std::string::npos);
394 
395  typename ObjectMap::iterator pos;
396  typename PeerMap::iterator i, e;
397  if (owner)
398  *owner = nullptr;
399  if (p)
400  {
401  ImplPeer *ip = static_cast<ImplPeer *>(p);
402  pos = ip->objs.find(proto);
403  if (pos == ip->objs.end())
404  return nullptr;
405  else
406  {
407  if (owner) *owner = ip;
408  return const_cast<ObjType *>(&*pos);
409  }
410  }
411  else
412  {
413  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
414  {
415  pos = i->second.objs.find(proto);
416  if (pos != i->second.objs.end())
417  {
418  if (owner) *owner = &i->second;
419  return const_cast<ObjType *>(&*pos);
420  }
421  }
422  return nullptr;
423  }
424  }
425 
426  Object *
427  makeObject(Peer *p, const std::string &name) override
428  {
429  ImplPeer *ip = static_cast<ImplPeer *>(p);
430  size_t slash = name.rfind('/');
431  size_t dirpos = (slash == std::string::npos ? 0 : slash);
432  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
433  ObjType o;
434  o.flags = 0;
435  o.tag = 0;
436  o.version = 0;
437  o.lastreq = 0;
438  o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
439  o.objname.append(name, namepos, std::string::npos);
440  o.hash = dqmhash(name.c_str(), name.size());
441  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
442  }
443 
444  // Mark all the objects dead. This is intended to be used when
445  // starting to process a complete list of objects, in order to
446  // flag the objects that need to be killed at the end. After
447  // call to this method, revive all live objects by removing the
448  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
449  // to remove the dead ones. This also turns off object request
450  // for objects we've lost interest in.
451  void
452  markObjectsDead(Peer *p) override
453  {
454  uint64_t minreq
455  = (lat::Time::current()
456  - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
457  ImplPeer *ip = static_cast<ImplPeer *>(p);
458  typename ObjectMap::iterator i, e;
459  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
460  {
461  if (i->lastreq && i->lastreq < minreq)
462  const_cast<ObjType &>(*i).lastreq = 0;
463  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
464  }
465  }
466 
467  // Mark remaining zombie objects as dead. See markObjectsDead().
468  void
470  {
471  ImplPeer *ip = static_cast<ImplPeer *>(p);
472  typename ObjectMap::iterator i, e;
473  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
474  {
475  if (i->flags & DQM_PROP_DEAD)
476  ip->objs.erase(i++);
477  else
478  ++i;
479  }
480  }
481 
482  Peer *
483  getPeer(lat::Socket *s) override
484  {
485  typename PeerMap::iterator pos = peers_.find(s);
486  typename PeerMap::iterator end = peers_.end();
487  return pos == end ? nullptr : &pos->second;
488  }
489 
490  Peer *
491  createPeer(lat::Socket *s) override
492  {
493  ImplPeer *ip = &peers_[s];
494  ip->socket = nullptr;
495  ip->sendq = nullptr;
496  ip->sendpos = 0;
497  ip->mask = 0;
498  ip->source = false;
499  ip->update = false;
500  ip->updated = false;
501  ip->updates = 0;
502  ip->waiting = 0;
503  ip->automatic = nullptr;
504  return ip;
505  }
506 
507  void
508  removePeer(Peer *p, lat::Socket *s) override
509  {
510  ImplPeer *ip = static_cast<ImplPeer *>(p);
511  bool needflush = ! ip->objs.empty();
512 
513  typename ObjectMap::iterator i, e;
514  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
515  ip->objs.erase(i++);
516 
517  peers_.erase(s);
518 
519  // If we removed a peer with objects, our list of objects
520  // has changed and we need to update downstream peers.
521  if (needflush)
523  }
524 
526  void
527  sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
528  {
529  typename PeerMap::iterator pi, pe;
530  typename ObjectMap::iterator oi, oe;
531  size_t size = 0;
532  size_t numobjs = 0;
533  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
534  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
535  if (all || (oi->flags & DQM_PROP_NEW))
536  size += 9*sizeof(uint32_t) + oi->dirname->size()
537  + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
538  + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
539 
540  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
541 
542  uint32_t nupdates = 0;
543  uint32_t words [4];
544  words[0] = sizeof(words);
545  words[1] = DQM_REPLY_LIST_BEGIN;
546  words[2] = numobjs;
547  words[3] = all;
548  copydata(msg, &words[0], sizeof(words));
549 
550  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
551  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
552  if (all || (oi->flags & DQM_PROP_NEW))
553  {
554  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
555  if (clear)
556  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
557  ++nupdates;
558  }
559 
560  words[1] = DQM_REPLY_LIST_END;
561  words[2] = nupdates;
562  copydata(msg, &words[0], sizeof(words));
563  }
564 
565  void
566  sendObjectListToPeers(bool all) override
567  {
568  typename PeerMap::iterator i, e;
569  typename ObjectMap::iterator oi, oe;
570  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
571  {
572  ImplPeer &p = i->second;
573  if (! p.update)
574  continue;
575 
576  if (debug_)
577  logme()
578  << "DEBUG: notifying " << p.peeraddr << std::endl;
579 
580  Bucket msg;
581  msg.next = nullptr;
582  sendObjectListToPeer(&msg, !p.updated || all, true);
583 
584  if (! msg.data.empty())
585  {
586  Bucket **prev = &p.sendq;
587  while (*prev)
588  prev = &(*prev)->next;
589 
590  *prev = new Bucket;
591  (*prev)->next = nullptr;
592  (*prev)->data.swap(msg.data);
593  }
594  p.updated = true;
595  }
596  }
597 
598  void
599  updatePeerMasks(void) override
600  {
601  typename PeerMap::iterator i, e;
602  for (i = peers_.begin(), e = peers_.end(); i != e; )
603  updateMask(&(i++)->second);
604  }
605 
606 protected:
607  PeerMap peers_;
608 };
609 
610 
611 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
612 {
613 public:
614  DQMBasicNet(const std::string &appname = "");
615 
616  void reserveLocalSpace(uint32_t size);
617  void updateLocalObject(Object &o);
618  bool removeLocalExcept(const std::set<std::string> &known);
619 
620 private:
621  ImplPeer *local_;
622 };
623 
624 
625 #endif // DQMSERVICES_CORE_DQM_NET_H
size
Write out results.
void run(void)
Definition: DQMNet.cc:1295
lat::Time time
Definition: DQMNet.h:128
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:43
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:343
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1069
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:48
host
Definition: query.py:114
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
DataBlob incoming
Definition: DQMNet.h:139
virtual bool shouldStop(void)
Definition: DQMNet.cc:389
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1239
uint32_t moduleId
Definition: DQMNet.h:105
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:33
void sendLocalChanges(void)
Definition: DQMNet.cc:1435
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:367
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1008
QReports qreports
Definition: DQMNet.h:108
pthread_t communicate_
Definition: DQMNet.h:346
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:210
void sendObjectListToPeers(bool all) override
Definition: DQMNet.h:566
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:41
std::string algorithm
Definition: DQMNet.h:94
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:101
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:144
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:73
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:37
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:45
lat::Time version_
Definition: DQMNet.h:340
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:23
int delay_
Definition: DQMNet.h:349
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:27
pthread_mutex_t lock_
Definition: DQMNet.h:319
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:1111
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:99
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
void updatePeerMasks(void) override
Definition: DQMNet.h:599
std::string peeraddr
Definition: DQMNet.h:137
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:55
lat::Socket * socket
Definition: DQMNet.h:138
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:64
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1121
bool ev
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:38
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:57
uint32_t tag
Definition: DQMNet.h:100
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:62
const std::string * dirname
Definition: DQMNet.h:106
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1262
AutoPeer * automatic
Definition: DQMNet.h:149
AutoPeer upstream_
Definition: DQMNet.h:342
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:32
void removePeer(Peer *p, lat::Socket *s) override
Definition: DQMNet.h:508
static const uint32_t DQM_PROP_MARKTODELETE
Definition: DQMNet.h:65
int pid_
Definition: DQMNet.h:335
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:375
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:67
uint32_t run
Definition: DQMNet.h:102
std::string qdata
Definition: DQMNet.h:117
port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1103
bool updated
Definition: DQMNet.h:146
const Double_t pi
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:56
Peer * getPeer(lat::Socket *s) override
Definition: DQMNet.h:483
virtual ~DQMNet(void)
Definition: DQMNet.cc:1095
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:54
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:114
std::string appname_
Definition: DQMNet.h:334
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:29
std::string name
Definition: DQMNet.h:129
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:468
DataBlob data
Definition: DQMNet.h:123
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:167
uint32_t lumi
Definition: DQMNet.h:103
Peer * peer
Definition: DQMNet.h:154
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:81
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:768
void markObjectsDead(Peer *p) override
Definition: DQMNet.h:452
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:217
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:47
#define end
Definition: vmac.h:37
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:49
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:34
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:397
virtual void markObjectsDead(Peer *p)=0
sig_atomic_t shutdown_
Definition: DQMNet.h:347
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:491
std::string objname
Definition: DQMNet.h:107
bool update
Definition: DQMNet.h:145
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1270
std::string qtname
Definition: DQMNet.h:93
int k[5][pyjets_maxn]
std::string scalar
Definition: DQMNet.h:116
lat::Time next
Definition: DQMNet.h:155
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:75
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
Definition: DQMNet.h:86
unsigned mask
Definition: DQMNet.h:143
std::vector< uint32_t > TagList
Definition: DQMNet.h:85
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:418
void start(void)
Definition: DQMNet.cc:1280
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:70
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:202
ObjectMap objs
Definition: DQMNet.h:371
unsigned long long uint64_t
Definition: Time.h:15
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:951
size_t updates
Definition: DQMNet.h:147
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:39
std::string host
Definition: DQMNet.h:156
double b
Definition: hdecay.h:120
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:50
void startLocalServer(int port)
Definition: DQMNet.cc:1130
uint64_t hash
Definition: DQMNet.h:113
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
tuple msg
Definition: mps_check.py:277
DQMNet & operator=(const DQMNet &)=delete
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:72
std::string info
Definition: DQMNet.h:130
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:77
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:527
lat::Pipe wakeup_
Definition: DQMNet.h:339
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:318
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:148
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:180
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:74
double a
Definition: hdecay.h:121
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:36
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
Definition: DQMNet.h:427
std::string message
Definition: DQMNet.h:92
size_t sendpos
Definition: DQMNet.h:141
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1222
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
void updateMask(Peer *p)
Definition: DQMNet.cc:1039
bool flush_
Definition: DQMNet.h:352
lat::Socket * server_
Definition: DQMNet.h:338
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:31
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:40
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:366
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
Definition: DQMNet.h:384
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:59
PeerMap peers_
Definition: DQMNet.h:607
lat::TimeSpan waitMax_
Definition: DQMNet.h:351
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:26
Bucket * next
Definition: DQMNet.h:122
std::vector< QValue > QReports
Definition: DQMNet.h:84
Bucket * sendq
Definition: DQMNet.h:140
lat::TimeSpan waitStale_
Definition: DQMNet.h:350
virtual void purgeDeadObjects(Peer *p)=0
uint32_t streamId
Definition: DQMNet.h:104
float qtresult
Definition: DQMNet.h:91
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:46
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:61
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1203
WaitList waiting_
Definition: DQMNet.h:344
std::set< std::string > DirMap
Definition: DQMNet.h:363
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:60
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:30
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:28
~DQMImplNet(void) override
Definition: DQMNet.h:379
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:42
Definition: event.py:1
DataBlob rawdata
Definition: DQMNet.h:115
void purgeDeadObjects(Peer *p) override
Definition: DQMNet.h:469
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:35
ImplPeer * local_
Definition: DQMNet.h:621
lat::IOSelector sel_
Definition: DQMNet.h:337