CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
3 
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
10 # include <pthread.h>
11 # include <stdint.h>
12 # include <iostream>
13 # include <vector>
14 # include <string>
15 # include <list>
16 # include <map>
17 # include <set>
18 # include <ext/hash_set>
19 
20 //class DQMStore;
21 
22 class DQMNet
23 {
24 public:
25  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
26  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
27  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
28  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
29  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
30  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
31  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
32  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
33  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
34  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
35  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
36  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
37  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
38  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
39  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
40  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
41  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
42  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
43 
44  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
45  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
46  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
47  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
48  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
52 
53  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
54  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
55  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
56  static const uint32_t DQM_PROP_RESET = 0x00008000;
57 
58  static const uint32_t DQM_PROP_NEW = 0x00010000;
59  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
60  static const uint32_t DQM_PROP_LUMI = 0x00040000;
61  static const uint32_t DQM_PROP_DEAD = 0x00080000;
62  static const uint32_t DQM_PROP_STALE = 0x00100000;
63  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
64  static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
65 
66  static const uint32_t DQM_MSG_HELLO = 0;
67  static const uint32_t DQM_MSG_UPDATE_ME = 1;
68  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
69  static const uint32_t DQM_MSG_GET_OBJECT = 3;
70 
71  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
72  static const uint32_t DQM_REPLY_LIST_END = 102;
73  static const uint32_t DQM_REPLY_NONE = 103;
74  static const uint32_t DQM_REPLY_OBJECT = 104;
75 
76  static const uint32_t MAX_PEER_WAITREQS = 128;
77 
78  struct Peer;
79  struct QValue;
80  struct WaitObject;
81 
82  typedef std::vector<unsigned char> DataBlob;
83  typedef std::vector<QValue> QReports;
84  typedef std::vector<uint32_t> TagList; // DEPRECATED
85  typedef std::list<WaitObject> WaitList;
86 
87  struct QValue
88  {
89  int code;
90  float qtresult;
94  };
95 
96  struct CoreObject
97  {
98  uint32_t flags;
99  uint32_t tag;
101  uint32_t run;
102  uint32_t lumi;
103  uint32_t streamId;
104  uint32_t moduleId;
108  };
109 
111  {
117  };
118 
119  struct Bucket
120  {
123  };
124 
125  struct WaitObject
126  {
127  lat::Time time;
131  };
132 
133  struct AutoPeer;
134  struct Peer
135  {
137  lat::Socket *socket;
140  size_t sendpos;
141 
142  unsigned mask;
143  bool source;
144  bool update;
145  bool updated;
146  size_t updates;
147  size_t waiting;
149  };
150 
151  struct AutoPeer
152  {
154  lat::Time next;
156  int port;
157  bool update;
158  };
159 
160  DQMNet(const std::string &appname = "");
161  virtual ~DQMNet(void);
162 
163  void debug(bool doit);
164  void delay(int delay);
165  void startLocalServer(int port);
166  void startLocalServer(const char *path);
167  void staleObjectWaitLimit(lat::TimeSpan time);
168  void updateToCollector(const std::string &host, int port);
169  void listenToCollector(const std::string &host, int port);
170  void shutdown(void);
171  void lock(void);
172  void unlock(void);
173 
174  void start(void);
175  void run(void);
176 
177  void sendLocalChanges(void);
178 
179  static bool setOrder(const CoreObject &a, const CoreObject &b)
180  {
181  if (a.run == b.run) {
182  if (a.lumi == b.lumi) {
183  if (a.streamId == b.streamId) {
184  if (a.moduleId == b.moduleId) {
185  if (*a.dirname == *b.dirname) {
186  return a.objname < b.objname;
187  }
188  return *a.dirname < *b.dirname;
189  }
190  return a.moduleId < b.moduleId;
191  }
192  return a.streamId < b.streamId;
193  }
194  return a.lumi < b.lumi;
195  }
196  return a.run < b.run;
197  }
198 
199  struct HashOp
200  {
201  uint32_t operator()(const Object &a) const
202  {
203  return a.hash;
204  }
205  };
206 
207  struct HashEqual
208  {
209  bool operator()(const Object &a, const Object &b) const
210  {
211  return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
212  }
213  };
214 
215  static size_t
216  dqmhash(const void *key, size_t keylen)
217  {
218  // Reduced version of Bob Jenkins' hash function at:
219  // http://www.burtleburtle.net/bob/c/lookup3.c
220 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
221 # define dqmhashmix(a,b,c) { \
222  a -= c; a ^= dqmhashrot(c, 4); c += b; \
223  b -= a; b ^= dqmhashrot(a, 6); a += c; \
224  c -= b; c ^= dqmhashrot(b, 8); b += a; \
225  a -= c; a ^= dqmhashrot(c,16); c += b; \
226  b -= a; b ^= dqmhashrot(a,19); a += c; \
227  c -= b; c ^= dqmhashrot(b, 4); b += a; }
228 # define dqmhashfinal(a,b,c) { \
229  c ^= b; c -= dqmhashrot(b,14); \
230  a ^= c; a -= dqmhashrot(c,11); \
231  b ^= a; b -= dqmhashrot(a,25); \
232  c ^= b; c -= dqmhashrot(b,16); \
233  a ^= c; a -= dqmhashrot(c,4); \
234  b ^= a; b -= dqmhashrot(a,14); \
235  c ^= b; c -= dqmhashrot(b,24); }
236 
237  uint32_t a, b, c;
238  a = b = c = 0xdeadbeef + (uint32_t) keylen;
239  const unsigned char *k = (const unsigned char *) key;
240 
241  // all but the last block: affect some bits of (a, b, c)
242  while (keylen > 12)
243  {
244  a += k[0];
245  a += ((uint32_t)k[1]) << 8;
246  a += ((uint32_t)k[2]) << 16;
247  a += ((uint32_t)k[3]) << 24;
248  b += k[4];
249  b += ((uint32_t)k[5]) << 8;
250  b += ((uint32_t)k[6]) << 16;
251  b += ((uint32_t)k[7]) << 24;
252  c += k[8];
253  c += ((uint32_t)k[9]) << 8;
254  c += ((uint32_t)k[10]) << 16;
255  c += ((uint32_t)k[11]) << 24;
256  dqmhashmix(a,b,c);
257  keylen -= 12;
258  k += 12;
259  }
260 
261  // last block: affect all 32 bits of (c); all case statements fall through
262  switch (keylen)
263  {
264  case 12: c += ((uint32_t)k[11]) << 24;
265  case 11: c += ((uint32_t)k[10]) << 16;
266  case 10: c += ((uint32_t)k[9]) << 8;
267  case 9 : c += k[8];
268  case 8 : b += ((uint32_t)k[7]) << 24;
269  case 7 : b += ((uint32_t)k[6]) << 16;
270  case 6 : b += ((uint32_t)k[5]) << 8;
271  case 5 : b += k[4];
272  case 4 : a += ((uint32_t)k[3]) << 24;
273  case 3 : a += ((uint32_t)k[2]) << 16;
274  case 2 : a += ((uint32_t)k[1]) << 8;
275  case 1 : a += k[0];
276  break;
277  case 0 : return c;
278  }
279 
280  dqmhashfinal(a, b, c);
281  return c;
282 # undef dqmhashrot
283 # undef dqmhashmix
284 # undef dqmhashfinal
285  }
286 
287  static void packQualityData(std::string &into, const QReports &qr);
288  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
289 
290 protected:
291  std::ostream & logme(void);
292  static void copydata(Bucket *b, const void *data, size_t len);
293  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
294 
295  virtual bool shouldStop(void);
296  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
297  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
298  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
299 
300  // bool reconstructObject(Object &o);
301  // bool reinstateObject(DQMStore *store, Object &o);
302  virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
303  virtual Object * makeObject(Peer *p, const std::string &name) = 0;
304  virtual void markObjectsDead(Peer *p) = 0;
305  virtual void purgeDeadObjects(Peer *p) = 0;
306 
307  virtual Peer * getPeer(lat::Socket *s) = 0;
308  virtual Peer * createPeer(lat::Socket *s) = 0;
309  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
310  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
311  virtual void sendObjectListToPeers(bool all) = 0;
312 
313  void updateMask(Peer *p);
314  virtual void updatePeerMasks(void) = 0;
315  static void discard(Bucket *&b);
316 
317  bool debug_;
318  pthread_mutex_t lock_;
319 
320 private:
321  void losePeer(const char *reason,
322  Peer *peer,
323  lat::IOSelectEvent *event,
324  lat::Error *err = 0);
325  void requestObjectData(Peer *p, const char *name, size_t len);
326  void releaseFromWait(WaitList::iterator i, Object *o);
327  void releaseWaiters(const std::string &name, Object *o);
328 
329  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
330  bool onPeerConnect(lat::IOSelectEvent *ev);
331  bool onLocalNotify(lat::IOSelectEvent *ev);
332 
334  int pid_;
335 
336  lat::IOSelector sel_;
337  lat::Socket *server_;
338  lat::Pipe wakeup_;
339  lat::Time version_;
340 
344 
345  pthread_t communicate_;
346  sig_atomic_t shutdown_;
347 
348  int delay_;
349  lat::TimeSpan waitStale_;
350  lat::TimeSpan waitMax_;
351  bool flush_;
352 
353  // copying is not available
354  DQMNet(const DQMNet &);
355  DQMNet &operator=(const DQMNet &);
356 };
357 
358 template <class ObjType>
359 class DQMImplNet : public DQMNet
360 {
361 public:
362  struct ImplPeer;
363 
364  typedef std::set<std::string> DirMap;
365  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
366  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
367  struct ImplPeer : Peer
368  {
369  ImplPeer(void) {}
372  };
373 
374  DQMImplNet(const std::string &appname = "")
375  : DQMNet(appname)
376  {}
377 
379  {}
380 
381 protected:
382  virtual Object *
383  findObject(Peer *p, const std::string &name, Peer **owner = 0)
384  {
385  size_t slash = name.rfind('/');
386  size_t dirpos = (slash == std::string::npos ? 0 : slash);
387  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
388  std::string path(name, 0, dirpos);
389  ObjType proto;
390  proto.hash = dqmhash(name.c_str(), name.size());
391  proto.dirname = &path;
392  proto.objname.append(name, namepos, std::string::npos);
393 
394  typename ObjectMap::iterator pos;
395  typename PeerMap::iterator i, e;
396  if (owner)
397  *owner = 0;
398  if (p)
399  {
400  ImplPeer *ip = static_cast<ImplPeer *>(p);
401  pos = ip->objs.find(proto);
402  if (pos == ip->objs.end())
403  return 0;
404  else
405  {
406  if (owner) *owner = ip;
407  return const_cast<ObjType *>(&*pos);
408  }
409  }
410  else
411  {
412  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
413  {
414  pos = i->second.objs.find(proto);
415  if (pos != i->second.objs.end())
416  {
417  if (owner) *owner = &i->second;
418  return const_cast<ObjType *>(&*pos);
419  }
420  }
421  return 0;
422  }
423  }
424 
425  virtual Object *
427  {
428  ImplPeer *ip = static_cast<ImplPeer *>(p);
429  size_t slash = name.rfind('/');
430  size_t dirpos = (slash == std::string::npos ? 0 : slash);
431  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
432  ObjType o;
433  o.flags = 0;
434  o.tag = 0;
435  o.version = 0;
436  o.lastreq = 0;
437  o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
438  o.objname.append(name, namepos, std::string::npos);
439  o.hash = dqmhash(name.c_str(), name.size());
440  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
441  }
442 
443  // Mark all the objects dead. This is intended to be used when
444  // starting to process a complete list of objects, in order to
445  // flag the objects that need to be killed at the end. After
446  // call to this method, revive all live objects by removing the
447  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
448  // to remove the dead ones. This also turns off object request
449  // for objects we've lost interest in.
450  virtual void
452  {
453  uint64_t minreq
454  = (lat::Time::current()
455  - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
456  ImplPeer *ip = static_cast<ImplPeer *>(p);
457  typename ObjectMap::iterator i, e;
458  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
459  {
460  if (i->lastreq && i->lastreq < minreq)
461  const_cast<ObjType &>(*i).lastreq = 0;
462  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
463  }
464  }
465 
466  // Mark remaining zombie objects as dead. See markObjectsDead().
467  virtual void
469  {
470  ImplPeer *ip = static_cast<ImplPeer *>(p);
471  typename ObjectMap::iterator i, e;
472  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
473  {
474  if (i->flags & DQM_PROP_DEAD)
475  ip->objs.erase(i++);
476  else
477  ++i;
478  }
479  }
480 
481  virtual Peer *
482  getPeer(lat::Socket *s)
483  {
484  typename PeerMap::iterator pos = peers_.find(s);
485  typename PeerMap::iterator end = peers_.end();
486  return pos == end ? 0 : &pos->second;
487  }
488 
489  virtual Peer *
490  createPeer(lat::Socket *s)
491  {
492  ImplPeer *ip = &peers_[s];
493  ip->socket = 0;
494  ip->sendq = 0;
495  ip->sendpos = 0;
496  ip->mask = 0;
497  ip->source = false;
498  ip->update = false;
499  ip->updated = false;
500  ip->updates = 0;
501  ip->waiting = 0;
502  ip->automatic = 0;
503  return ip;
504  }
505 
506  virtual void
507  removePeer(Peer *p, lat::Socket *s)
508  {
509  ImplPeer *ip = static_cast<ImplPeer *>(p);
510  bool needflush = ! ip->objs.empty();
511 
512  typename ObjectMap::iterator i, e;
513  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
514  ip->objs.erase(i++);
515 
516  peers_.erase(s);
517 
518  // If we removed a peer with objects, our list of objects
519  // has changed and we need to update downstream peers.
520  if (needflush)
522  }
523 
525  virtual void
527  {
528  typename PeerMap::iterator pi, pe;
529  typename ObjectMap::iterator oi, oe;
530  size_t size = 0;
531  size_t numobjs = 0;
532  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
533  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
534  if (all || (oi->flags & DQM_PROP_NEW))
535  size += 9*sizeof(uint32_t) + oi->dirname->size()
536  + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
537  + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
538 
539  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
540 
541  uint32_t nupdates = 0;
542  uint32_t words [4];
543  words[0] = sizeof(words);
544  words[1] = DQM_REPLY_LIST_BEGIN;
545  words[2] = numobjs;
546  words[3] = all;
547  copydata(msg, &words[0], sizeof(words));
548 
549  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
550  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
551  if (all || (oi->flags & DQM_PROP_NEW))
552  {
553  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
554  if (clear)
555  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
556  ++nupdates;
557  }
558 
559  words[1] = DQM_REPLY_LIST_END;
560  words[2] = nupdates;
561  copydata(msg, &words[0], sizeof(words));
562  }
563 
564  virtual void
566  {
567  typename PeerMap::iterator i, e;
568  typename ObjectMap::iterator oi, oe;
569  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
570  {
571  ImplPeer &p = i->second;
572  if (! p.update)
573  continue;
574 
575  if (debug_)
576  logme()
577  << "DEBUG: notifying " << p.peeraddr << std::endl;
578 
579  Bucket msg;
580  msg.next = 0;
581  sendObjectListToPeer(&msg, !p.updated || all, true);
582 
583  if (! msg.data.empty())
584  {
585  Bucket **prev = &p.sendq;
586  while (*prev)
587  prev = &(*prev)->next;
588 
589  *prev = new Bucket;
590  (*prev)->next = 0;
591  (*prev)->data.swap(msg.data);
592  }
593  p.updated = true;
594  }
595  }
596 
597  virtual void
599  {
600  typename PeerMap::iterator i, e;
601  for (i = peers_.begin(), e = peers_.end(); i != e; )
602  updateMask(&(i++)->second);
603  }
604 
605 protected:
607 };
608 
609 
610 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
611 {
612 public:
613  DQMBasicNet(const std::string &appname = "");
614 
615  void reserveLocalSpace(uint32_t size);
616  void updateLocalObject(Object &o);
617  bool removeLocalExcept(const std::set<std::string> &known);
618 
619 private:
620  ImplPeer *local_;
621 };
622 
623 
624 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
Definition: DQMNet.h:482
void run(void)
Definition: DQMNet.cc:1290
lat::Time time
Definition: DQMNet.h:127
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:42
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:342
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1064
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:67
int i
Definition: DBlmapReader.cc:9
DataBlob incoming
Definition: DQMNet.h:138
virtual bool shouldStop(void)
Definition: DQMNet.cc:384
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1234
uint32_t moduleId
Definition: DQMNet.h:104
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:32
void sendLocalChanges(void)
Definition: DQMNet.cc:1430
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:366
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1003
QReports qreports
Definition: DQMNet.h:107
virtual Object * makeObject(Peer *p, const std::string &name)
Definition: DQMNet.h:426
pthread_t communicate_
Definition: DQMNet.h:345
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:209
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:40
std::string algorithm
Definition: DQMNet.h:93
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:100
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:143
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:72
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:36
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:44
lat::Time version_
Definition: DQMNet.h:339
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:22
virtual void sendObjectListToPeers(bool all)
Definition: DQMNet.h:565
int delay_
Definition: DQMNet.h:348
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
pthread_mutex_t lock_
Definition: DQMNet.h:318
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:1106
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:98
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::string peeraddr
Definition: DQMNet.h:136
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:54
lat::Socket * socket
Definition: DQMNet.h:137
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:63
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1116
bool ev
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:37
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:56
uint32_t tag
Definition: DQMNet.h:99
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
const std::string * dirname
Definition: DQMNet.h:105
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1257
AutoPeer * automatic
Definition: DQMNet.h:148
AutoPeer upstream_
Definition: DQMNet.h:341
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:31
static const uint32_t DQM_PROP_MARKTODELETE
Definition: DQMNet.h:64
virtual void markObjectsDead(Peer *p)
Definition: DQMNet.h:451
int pid_
Definition: DQMNet.h:334
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:374
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:66
uint32_t run
Definition: DQMNet.h:101
std::string qdata
Definition: DQMNet.h:116
int port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1098
bool updated
Definition: DQMNet.h:145
virtual void updatePeerMasks(void)
Definition: DQMNet.h:598
const Double_t pi
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:55
virtual ~DQMNet(void)
Definition: DQMNet.cc:1090
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:53
tuple path
else: Piece not in the list, fine.
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:113
std::string appname_
Definition: DQMNet.h:333
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:28
DQMNet & operator=(const DQMNet &)
std::string name
Definition: DQMNet.h:128
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:463
DataBlob data
Definition: DQMNet.h:122
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:167
uint32_t lumi
Definition: DQMNet.h:102
Peer * peer
Definition: DQMNet.h:153
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:763
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:216
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
#define end
Definition: vmac.h:37
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:33
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:392
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool first
Definition: L1TdeRCT.cc:75
sig_atomic_t shutdown_
Definition: DQMNet.h:346
std::string objname
Definition: DQMNet.h:106
bool update
Definition: DQMNet.h:144
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:76
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1265
std::string qtname
Definition: DQMNet.h:92
std::string scalar
Definition: DQMNet.h:115
lat::Time next
Definition: DQMNet.h:154
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:74
virtual Object * makeObject(Peer *p, const std::string &name)=0
string host
Definition: query.py:114
std::list< WaitObject > WaitList
Definition: DQMNet.h:85
unsigned mask
Definition: DQMNet.h:142
std::vector< uint32_t > TagList
Definition: DQMNet.h:84
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:413
void start(void)
Definition: DQMNet.cc:1275
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:69
~DQMImplNet(void)
Definition: DQMNet.h:378
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:201
ObjectMap objs
Definition: DQMNet.h:370
unsigned long long uint64_t
Definition: Time.h:15
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1479
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:946
size_t updates
Definition: DQMNet.h:146
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:38
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
Definition: DQMNet.h:383
std::string host
Definition: DQMNet.h:155
double b
Definition: hdecay.h:120
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:526
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:49
void startLocalServer(int port)
Definition: DQMNet.cc:1125
uint64_t hash
Definition: DQMNet.h:112
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:71
std::string info
Definition: DQMNet.h:129
virtual void purgeDeadObjects(Peer *p)
Definition: DQMNet.h:468
lat::Pipe wakeup_
Definition: DQMNet.h:338
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:317
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:147
#define dqmhashfinal(a, b, c)
list key
Definition: combine.py:13
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:179
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:73
double a
Definition: hdecay.h:121
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:35
virtual void removePeer(Peer *p, lat::Socket *s)=0
std::string message
Definition: DQMNet.h:91
size_t sendpos
Definition: DQMNet.h:140
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1217
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:68
void updateMask(Peer *p)
Definition: DQMNet.cc:1034
bool flush_
Definition: DQMNet.h:351
lat::Socket * server_
Definition: DQMNet.h:337
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:30
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:39
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:365
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1455
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:58
PeerMap peers_
Definition: DQMNet.h:606
lat::TimeSpan waitMax_
Definition: DQMNet.h:350
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
Bucket * next
Definition: DQMNet.h:121
std::vector< QValue > QReports
Definition: DQMNet.h:83
Bucket * sendq
Definition: DQMNet.h:139
lat::TimeSpan waitStale_
Definition: DQMNet.h:349
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1447
virtual void purgeDeadObjects(Peer *p)=0
uint32_t streamId
Definition: DQMNet.h:103
float qtresult
Definition: DQMNet.h:90
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:45
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:60
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1198
WaitList waiting_
Definition: DQMNet.h:343
std::set< std::string > DirMap
Definition: DQMNet.h:362
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:59
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:29
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:27
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1439
virtual Peer * createPeer(lat::Socket *s)
Definition: DQMNet.h:490
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:41
DataBlob rawdata
Definition: DQMNet.h:114
virtual void removePeer(Peer *p, lat::Socket *s)
Definition: DQMNet.h:507
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:34
ImplPeer * local_
Definition: DQMNet.h:620
lat::IOSelector sel_
Definition: DQMNet.h:336
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0