CMS 3D CMS Logo

DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
3 
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
10 # include <pthread.h>
11 # include <stdint.h>
12 # include <iostream>
13 # include <vector>
14 # include <string>
15 # include <list>
16 # include <map>
17 # include <set>
18 # include <ext/hash_set>
19 
20 //class DQMStore;
21 
22 class DQMNet
23 {
24 public:
25  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
26  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
27  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
28  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
29  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
30  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
31  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
32  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
33  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
34  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
35  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
36  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
37  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
38  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
39  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
40  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
41  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
42  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
43 
44  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
45  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
46  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
47  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
48  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
49  static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR
50  | DQM_PROP_REPORT_WARN
52 
53  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
54  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
55  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
56  static const uint32_t DQM_PROP_RESET = 0x00008000;
57 
58  static const uint32_t DQM_PROP_NEW = 0x00010000;
59  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
60  static const uint32_t DQM_PROP_LUMI = 0x00040000;
61  static const uint32_t DQM_PROP_DEAD = 0x00080000;
62  static const uint32_t DQM_PROP_STALE = 0x00100000;
63  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
64  static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
65 
66  static const uint32_t DQM_MSG_HELLO = 0;
67  static const uint32_t DQM_MSG_UPDATE_ME = 1;
68  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
69  static const uint32_t DQM_MSG_GET_OBJECT = 3;
70 
71  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
72  static const uint32_t DQM_REPLY_LIST_END = 102;
73  static const uint32_t DQM_REPLY_NONE = 103;
74  static const uint32_t DQM_REPLY_OBJECT = 104;
75 
76  static const uint32_t MAX_PEER_WAITREQS = 128;
77 
78  struct Peer;
79  struct QValue;
80  struct WaitObject;
81 
82  typedef std::vector<unsigned char> DataBlob;
83  typedef std::vector<QValue> QReports;
84  typedef std::vector<uint32_t> TagList; // DEPRECATED
85  typedef std::list<WaitObject> WaitList;
86 
87  struct QValue
88  {
89  int code;
90  float qtresult;
94  };
95 
96  struct CoreObject
97  {
98  uint32_t flags;
99  uint32_t tag;
101  uint32_t run;
102  uint32_t lumi;
103  uint32_t streamId;
104  uint32_t moduleId;
107  QReports qreports;
108  };
109 
111  {
114  DataBlob rawdata;
117  };
118 
119  struct Bucket
120  {
122  DataBlob data;
123  };
124 
125  struct WaitObject
126  {
127  lat::Time time;
131  };
132 
133  struct AutoPeer;
134  struct Peer
135  {
137  lat::Socket *socket;
138  DataBlob incoming;
140  size_t sendpos;
141 
142  unsigned mask;
143  bool source;
144  bool update;
145  bool updated;
146  size_t updates;
147  size_t waiting;
149  };
150 
151  struct AutoPeer
152  {
154  lat::Time next;
156  int port;
157  bool update;
158  };
159 
160  DQMNet(const std::string &appname = "");
161  virtual ~DQMNet(void);
162 
163  void debug(bool doit);
164  void delay(int delay);
165  void startLocalServer(int port);
166  void startLocalServer(const char *path);
167  void staleObjectWaitLimit(lat::TimeSpan time);
168  void updateToCollector(const std::string &host, int port);
169  void listenToCollector(const std::string &host, int port);
170  void shutdown(void);
171  void lock(void);
172  void unlock(void);
173 
174  void start(void);
175  void run(void);
176 
177  void sendLocalChanges(void);
178 
179  static bool setOrder(const CoreObject &a, const CoreObject &b)
180  {
181  if (a.run == b.run) {
182  if (a.lumi == b.lumi) {
183  if (a.streamId == b.streamId) {
184  if (a.moduleId == b.moduleId) {
185  if (*a.dirname == *b.dirname) {
186  return a.objname < b.objname;
187  }
188  return *a.dirname < *b.dirname;
189  }
190  return a.moduleId < b.moduleId;
191  }
192  return a.streamId < b.streamId;
193  }
194  return a.lumi < b.lumi;
195  }
196  return a.run < b.run;
197  }
198 
199  struct HashOp
200  {
201  uint32_t operator()(const Object &a) const
202  {
203  return a.hash;
204  }
205  };
206 
207  struct HashEqual
208  {
209  bool operator()(const Object &a, const Object &b) const
210  {
211  return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
212  }
213  };
214 
215  static size_t
216  dqmhash(const void *key, size_t keylen)
217  {
218  // Reduced version of Bob Jenkins' hash function at:
219  // http://www.burtleburtle.net/bob/c/lookup3.c
220 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
221 # define dqmhashmix(a,b,c) { \
222  a -= c; a ^= dqmhashrot(c, 4); c += b; \
223  b -= a; b ^= dqmhashrot(a, 6); a += c; \
224  c -= b; c ^= dqmhashrot(b, 8); b += a; \
225  a -= c; a ^= dqmhashrot(c,16); c += b; \
226  b -= a; b ^= dqmhashrot(a,19); a += c; \
227  c -= b; c ^= dqmhashrot(b, 4); b += a; }
228 # define dqmhashfinal(a,b,c) { \
229  c ^= b; c -= dqmhashrot(b,14); \
230  a ^= c; a -= dqmhashrot(c,11); \
231  b ^= a; b -= dqmhashrot(a,25); \
232  c ^= b; c -= dqmhashrot(b,16); \
233  a ^= c; a -= dqmhashrot(c,4); \
234  b ^= a; b -= dqmhashrot(a,14); \
235  c ^= b; c -= dqmhashrot(b,24); }
236 
237  uint32_t a, b, c;
238  a = b = c = 0xdeadbeef + (uint32_t) keylen;
239  const unsigned char *k = (const unsigned char *) key;
240 
241  // all but the last block: affect some bits of (a, b, c)
242  while (keylen > 12)
243  {
244  a += k[0];
245  a += ((uint32_t)k[1]) << 8;
246  a += ((uint32_t)k[2]) << 16;
247  a += ((uint32_t)k[3]) << 24;
248  b += k[4];
249  b += ((uint32_t)k[5]) << 8;
250  b += ((uint32_t)k[6]) << 16;
251  b += ((uint32_t)k[7]) << 24;
252  c += k[8];
253  c += ((uint32_t)k[9]) << 8;
254  c += ((uint32_t)k[10]) << 16;
255  c += ((uint32_t)k[11]) << 24;
256  dqmhashmix(a,b,c);
257  keylen -= 12;
258  k += 12;
259  }
260 
261  // last block: affect all 32 bits of (c); all case statements fall through
262  switch (keylen)
263  {
264  case 12: c += ((uint32_t)k[11]) << 24;
265  case 11: c += ((uint32_t)k[10]) << 16;
266  case 10: c += ((uint32_t)k[9]) << 8;
267  case 9 : c += k[8];
268  case 8 : b += ((uint32_t)k[7]) << 24;
269  case 7 : b += ((uint32_t)k[6]) << 16;
270  case 6 : b += ((uint32_t)k[5]) << 8;
271  case 5 : b += k[4];
272  case 4 : a += ((uint32_t)k[3]) << 24;
273  case 3 : a += ((uint32_t)k[2]) << 16;
274  case 2 : a += ((uint32_t)k[1]) << 8;
275  case 1 : a += k[0];
276  break;
277  case 0 : return c;
278  }
279 
280  dqmhashfinal(a, b, c);
281  return c;
282 # undef dqmhashrot
283 # undef dqmhashmix
284 # undef dqmhashfinal
285  }
286 
287  static void packQualityData(std::string &into, const QReports &qr);
288  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
289 
290 protected:
291  std::ostream & logme(void);
292  static void copydata(Bucket *b, const void *data, size_t len);
293  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
294 
295  virtual bool shouldStop(void);
296  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
297  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
298  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
299 
300  // bool reconstructObject(Object &o);
301  // bool reinstateObject(DQMStore *store, Object &o);
302  virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
303  virtual Object * makeObject(Peer *p, const std::string &name) = 0;
304  virtual void markObjectsDead(Peer *p) = 0;
305  virtual void purgeDeadObjects(Peer *p) = 0;
306 
307  virtual Peer * getPeer(lat::Socket *s) = 0;
308  virtual Peer * createPeer(lat::Socket *s) = 0;
309  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
310  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
311  virtual void sendObjectListToPeers(bool all) = 0;
312 
313  void updateMask(Peer *p);
314  virtual void updatePeerMasks(void) = 0;
315  static void discard(Bucket *&b);
316 
317  bool debug_;
318  pthread_mutex_t lock_;
319 
320 private:
321  void losePeer(const char *reason,
322  Peer *peer,
323  lat::IOSelectEvent *event,
324  lat::Error *err = 0);
325  void requestObjectData(Peer *p, const char *name, size_t len);
326  void releaseFromWait(WaitList::iterator i, Object *o);
327  void releaseWaiters(const std::string &name, Object *o);
328 
329  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
330  bool onPeerConnect(lat::IOSelectEvent *ev);
331  bool onLocalNotify(lat::IOSelectEvent *ev);
332 
334  int pid_;
335 
336  lat::IOSelector sel_;
337  lat::Socket *server_;
338  lat::Pipe wakeup_;
339  lat::Time version_;
340 
343  WaitList waiting_;
344 
345  pthread_t communicate_;
346  sig_atomic_t shutdown_;
347 
348  int delay_;
349  lat::TimeSpan waitStale_;
350  lat::TimeSpan waitMax_;
351  bool flush_;
352 
353  // copying is not available
354  DQMNet(const DQMNet &);
355  DQMNet &operator=(const DQMNet &);
356 };
357 
358 template <class ObjType>
359 class DQMImplNet : public DQMNet
360 {
361 public:
362  struct ImplPeer;
363 
364  typedef std::set<std::string> DirMap;
365  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
366  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
367  struct ImplPeer : Peer
368  {
369  ImplPeer(void) {}
370  ObjectMap objs;
371  DirMap dirs;
372  };
373 
374  DQMImplNet(const std::string &appname = "")
375  : DQMNet(appname)
376  {}
377 
379  {}
380 
381 protected:
382  virtual Object *
383  findObject(Peer *p, const std::string &name, Peer **owner = 0)
384  {
385  size_t slash = name.rfind('/');
386  size_t dirpos = (slash == std::string::npos ? 0 : slash);
387  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
388  std::string path(name, 0, dirpos);
389  ObjType proto;
390  proto.hash = dqmhash(name.c_str(), name.size());
391  proto.dirname = &path;
392  proto.objname.append(name, namepos, std::string::npos);
393 
394  typename ObjectMap::iterator pos;
395  typename PeerMap::iterator i, e;
396  if (owner)
397  *owner = 0;
398  if (p)
399  {
400  ImplPeer *ip = static_cast<ImplPeer *>(p);
401  pos = ip->objs.find(proto);
402  if (pos == ip->objs.end())
403  return 0;
404  else
405  {
406  if (owner) *owner = ip;
407  return const_cast<ObjType *>(&*pos);
408  }
409  }
410  else
411  {
412  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
413  {
414  pos = i->second.objs.find(proto);
415  if (pos != i->second.objs.end())
416  {
417  if (owner) *owner = &i->second;
418  return const_cast<ObjType *>(&*pos);
419  }
420  }
421  return 0;
422  }
423  }
424 
425  virtual Object *
427  {
428  ImplPeer *ip = static_cast<ImplPeer *>(p);
429  size_t slash = name.rfind('/');
430  size_t dirpos = (slash == std::string::npos ? 0 : slash);
431  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
432  ObjType o;
433  o.flags = 0;
434  o.tag = 0;
435  o.version = 0;
436  o.lastreq = 0;
437  o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
438  o.objname.append(name, namepos, std::string::npos);
439  o.hash = dqmhash(name.c_str(), name.size());
440  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
441  }
442 
443  // Mark all the objects dead. This is intended to be used when
444  // starting to process a complete list of objects, in order to
445  // flag the objects that need to be killed at the end. After
446  // call to this method, revive all live objects by removing the
447  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
448  // to remove the dead ones. This also turns off object request
449  // for objects we've lost interest in.
450  virtual void
452  {
453  uint64_t minreq
454  = (lat::Time::current()
455  - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
456  ImplPeer *ip = static_cast<ImplPeer *>(p);
457  typename ObjectMap::iterator i, e;
458  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
459  {
460  if (i->lastreq && i->lastreq < minreq)
461  const_cast<ObjType &>(*i).lastreq = 0;
462  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
463  }
464  }
465 
466  // Mark remaining zombie objects as dead. See markObjectsDead().
467  virtual void
469  {
470  ImplPeer *ip = static_cast<ImplPeer *>(p);
471  typename ObjectMap::iterator i, e;
472  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
473  {
474  if (i->flags & DQM_PROP_DEAD)
475  ip->objs.erase(i++);
476  else
477  ++i;
478  }
479  }
480 
481  virtual Peer *
482  getPeer(lat::Socket *s)
483  {
484  typename PeerMap::iterator pos = peers_.find(s);
485  typename PeerMap::iterator end = peers_.end();
486  return pos == end ? 0 : &pos->second;
487  }
488 
489  virtual Peer *
490  createPeer(lat::Socket *s)
491  {
492  ImplPeer *ip = &peers_[s];
493  ip->socket = 0;
494  ip->sendq = 0;
495  ip->sendpos = 0;
496  ip->mask = 0;
497  ip->source = false;
498  ip->update = false;
499  ip->updated = false;
500  ip->updates = 0;
501  ip->waiting = 0;
502  ip->automatic = 0;
503  return ip;
504  }
505 
506  virtual void
507  removePeer(Peer *p, lat::Socket *s)
508  {
509  ImplPeer *ip = static_cast<ImplPeer *>(p);
510  bool needflush = ! ip->objs.empty();
511 
512  typename ObjectMap::iterator i, e;
513  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
514  ip->objs.erase(i++);
515 
516  peers_.erase(s);
517 
518  // If we removed a peer with objects, our list of objects
519  // has changed and we need to update downstream peers.
520  if (needflush)
522  }
523 
525  virtual void
527  {
528  typename PeerMap::iterator pi, pe;
529  typename ObjectMap::iterator oi, oe;
530  size_t size = 0;
531  size_t numobjs = 0;
532  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
533  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
534  if (all || (oi->flags & DQM_PROP_NEW))
535  size += 9*sizeof(uint32_t) + oi->dirname->size()
536  + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
537  + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
538 
539  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
540 
541  uint32_t nupdates = 0;
542  uint32_t words [4];
543  words[0] = sizeof(words);
544  words[1] = DQM_REPLY_LIST_BEGIN;
545  words[2] = numobjs;
546  words[3] = all;
547  copydata(msg, &words[0], sizeof(words));
548 
549  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
550  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
551  if (all || (oi->flags & DQM_PROP_NEW))
552  {
553  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
554  if (clear)
555  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
556  ++nupdates;
557  }
558 
559  words[1] = DQM_REPLY_LIST_END;
560  words[2] = nupdates;
561  copydata(msg, &words[0], sizeof(words));
562  }
563 
564  virtual void
566  {
567  typename PeerMap::iterator i, e;
568  typename ObjectMap::iterator oi, oe;
569  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
570  {
571  ImplPeer &p = i->second;
572  if (! p.update)
573  continue;
574 
575  if (debug_)
576  logme()
577  << "DEBUG: notifying " << p.peeraddr << std::endl;
578 
579  Bucket msg;
580  msg.next = 0;
581  sendObjectListToPeer(&msg, !p.updated || all, true);
582 
583  if (! msg.data.empty())
584  {
585  Bucket **prev = &p.sendq;
586  while (*prev)
587  prev = &(*prev)->next;
588 
589  *prev = new Bucket;
590  (*prev)->next = 0;
591  (*prev)->data.swap(msg.data);
592  }
593  p.updated = true;
594  }
595  }
596 
597  virtual void
599  {
600  typename PeerMap::iterator i, e;
601  for (i = peers_.begin(), e = peers_.end(); i != e; )
602  updateMask(&(i++)->second);
603  }
604 
605 protected:
606  PeerMap peers_;
607 };
608 
609 
610 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
611 {
612 public:
613  DQMBasicNet(const std::string &appname = "");
614 
615  void reserveLocalSpace(uint32_t size);
616  void updateLocalObject(Object &o);
617  bool removeLocalExcept(const std::set<std::string> &known);
618 
619 private:
620  ImplPeer *local_;
621 };
622 
623 
624 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
Definition: DQMNet.h:482
size
Write out results.
void run(void)
Definition: DQMNet.cc:1295
lat::Time time
Definition: DQMNet.h:127
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:42
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:342
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1069
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
host
Definition: query.py:114
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:67
DataBlob incoming
Definition: DQMNet.h:138
virtual bool shouldStop(void)
Definition: DQMNet.cc:389
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1239
uint32_t moduleId
Definition: DQMNet.h:104
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:32
void sendLocalChanges(void)
Definition: DQMNet.cc:1435
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:366
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1008
QReports qreports
Definition: DQMNet.h:107
virtual Object * makeObject(Peer *p, const std::string &name)
Definition: DQMNet.h:426
pthread_t communicate_
Definition: DQMNet.h:345
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:209
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:40
std::string algorithm
Definition: DQMNet.h:93
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:100
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:143
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:72
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:36
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:44
lat::Time version_
Definition: DQMNet.h:339
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:22
virtual void sendObjectListToPeers(bool all)
Definition: DQMNet.h:565
int delay_
Definition: DQMNet.h:348
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
pthread_mutex_t lock_
Definition: DQMNet.h:318
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:1111
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:98
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::string peeraddr
Definition: DQMNet.h:136
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:54
lat::Socket * socket
Definition: DQMNet.h:137
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:63
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1121
bool ev
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:37
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:56
uint32_t tag
Definition: DQMNet.h:99
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
const std::string * dirname
Definition: DQMNet.h:105
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1262
AutoPeer * automatic
Definition: DQMNet.h:148
AutoPeer upstream_
Definition: DQMNet.h:341
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:31
static const uint32_t DQM_PROP_MARKTODELETE
Definition: DQMNet.h:64
virtual void markObjectsDead(Peer *p)
Definition: DQMNet.h:451
int pid_
Definition: DQMNet.h:334
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:374
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:66
uint32_t run
Definition: DQMNet.h:101
std::string qdata
Definition: DQMNet.h:116
port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1103
bool updated
Definition: DQMNet.h:145
virtual void updatePeerMasks(void)
Definition: DQMNet.h:598
const Double_t pi
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:55
virtual ~DQMNet(void)
Definition: DQMNet.cc:1095
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:53
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:113
std::string appname_
Definition: DQMNet.h:333
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:28
DQMNet & operator=(const DQMNet &)
std::string name
Definition: DQMNet.h:128
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:468
DataBlob data
Definition: DQMNet.h:122
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:167
uint32_t lumi
Definition: DQMNet.h:102
Peer * peer
Definition: DQMNet.h:153
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:768
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:216
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
#define end
Definition: vmac.h:37
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:33
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:397
virtual void markObjectsDead(Peer *p)=0
sig_atomic_t shutdown_
Definition: DQMNet.h:346
std::string objname
Definition: DQMNet.h:106
bool update
Definition: DQMNet.h:144
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:76
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1270
std::string qtname
Definition: DQMNet.h:92
int k[5][pyjets_maxn]
std::string scalar
Definition: DQMNet.h:115
lat::Time next
Definition: DQMNet.h:154
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:74
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
Definition: DQMNet.h:85
unsigned mask
Definition: DQMNet.h:142
std::vector< uint32_t > TagList
Definition: DQMNet.h:84
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:418
void start(void)
Definition: DQMNet.cc:1280
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:69
~DQMImplNet(void)
Definition: DQMNet.h:378
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:201
ObjectMap objs
Definition: DQMNet.h:370
unsigned long long uint64_t
Definition: Time.h:15
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:951
size_t updates
Definition: DQMNet.h:146
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:38
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
Definition: DQMNet.h:383
std::string host
Definition: DQMNet.h:155
double b
Definition: hdecay.h:120
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:526
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:49
void startLocalServer(int port)
Definition: DQMNet.cc:1130
uint64_t hash
Definition: DQMNet.h:112
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:71
std::string info
Definition: DQMNet.h:129
virtual void purgeDeadObjects(Peer *p)
Definition: DQMNet.h:468
lat::Pipe wakeup_
Definition: DQMNet.h:338
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:317
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:147
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:179
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:73
double a
Definition: hdecay.h:121
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:35
virtual void removePeer(Peer *p, lat::Socket *s)=0
std::string message
Definition: DQMNet.h:91
size_t sendpos
Definition: DQMNet.h:140
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1222
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:68
void updateMask(Peer *p)
Definition: DQMNet.cc:1039
bool flush_
Definition: DQMNet.h:351
lat::Socket * server_
Definition: DQMNet.h:337
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:30
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:39
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:365
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:58
PeerMap peers_
Definition: DQMNet.h:606
lat::TimeSpan waitMax_
Definition: DQMNet.h:350
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
Bucket * next
Definition: DQMNet.h:121
std::vector< QValue > QReports
Definition: DQMNet.h:83
Bucket * sendq
Definition: DQMNet.h:139
lat::TimeSpan waitStale_
Definition: DQMNet.h:349
virtual void purgeDeadObjects(Peer *p)=0
uint32_t streamId
Definition: DQMNet.h:103
float qtresult
Definition: DQMNet.h:90
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:45
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:60
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1203
WaitList waiting_
Definition: DQMNet.h:343
std::set< std::string > DirMap
Definition: DQMNet.h:362
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:59
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:29
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:27
virtual Peer * createPeer(lat::Socket *s)
Definition: DQMNet.h:490
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:41
Definition: event.py:1
DataBlob rawdata
Definition: DQMNet.h:114
virtual void removePeer(Peer *p, lat::Socket *s)
Definition: DQMNet.h:507
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:34
ImplPeer * local_
Definition: DQMNet.h:620
lat::IOSelector sel_
Definition: DQMNet.h:336
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0