CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
3 
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
10 # include <pthread.h>
11 # include <stdint.h>
12 # include <iostream>
13 # include <vector>
14 # include <string>
15 # include <list>
16 # include <map>
17 # include <set>
18 # include <ext/hash_set>
19 
20 //class DQMStore;
21 
22 class DQMNet
23 {
24 public:
25  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
26  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
27  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
28  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
29  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
30  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
31  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
32  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
33  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
34  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
35  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
36  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
37  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
38  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
39  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
40  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
41  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
42  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
43 
44  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
45  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
46  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
47  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
48  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
52 
53  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
54  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
55  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
56  static const uint32_t DQM_PROP_RESET = 0x00008000;
57 
58  static const uint32_t DQM_PROP_NEW = 0x00010000;
59  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
60  static const uint32_t DQM_PROP_LUMI = 0x00040000;
61  static const uint32_t DQM_PROP_DEAD = 0x00080000;
62  static const uint32_t DQM_PROP_STALE = 0x00100000;
63  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
64 
65  static const uint32_t DQM_MSG_HELLO = 0;
66  static const uint32_t DQM_MSG_UPDATE_ME = 1;
67  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
68  static const uint32_t DQM_MSG_GET_OBJECT = 3;
69 
70  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
71  static const uint32_t DQM_REPLY_LIST_END = 102;
72  static const uint32_t DQM_REPLY_NONE = 103;
73  static const uint32_t DQM_REPLY_OBJECT = 104;
74 
75  static const uint32_t MAX_PEER_WAITREQS = 128;
76 
77  struct Peer;
78  struct QValue;
79  struct WaitObject;
80 
81  typedef std::vector<unsigned char> DataBlob;
82  typedef std::vector<QValue> QReports;
83  typedef std::vector<uint32_t> TagList; // DEPRECATED
84  typedef std::list<WaitObject> WaitList;
85 
86  struct QValue
87  {
88  int code;
89  float qtresult;
93  };
94 
95  struct CoreObject
96  {
97  uint32_t flags;
98  uint32_t tag;
103  };
104 
106  {
112  };
113 
114  struct Bucket
115  {
118  };
119 
120  struct WaitObject
121  {
122  lat::Time time;
126  };
127 
128  struct AutoPeer;
129  struct Peer
130  {
132  lat::Socket *socket;
135  size_t sendpos;
136 
137  unsigned mask;
138  bool source;
139  bool update;
140  bool updated;
141  size_t updates;
142  size_t waiting;
144  };
145 
146  struct AutoPeer
147  {
149  lat::Time next;
151  int port;
152  bool update;
153  };
154 
155  DQMNet(const std::string &appname = "");
156  virtual ~DQMNet(void);
157 
158  void debug(bool doit);
159  void delay(int delay);
160  void startLocalServer(int port);
161  void startLocalServer(const char *path);
162  void staleObjectWaitLimit(lat::TimeSpan time);
163  void updateToCollector(const std::string &host, int port);
164  void listenToCollector(const std::string &host, int port);
165  void shutdown(void);
166  void lock(void);
167  void unlock(void);
168 
169  void start(void);
170  void run(void);
171 
172  void sendLocalChanges(void);
173 
174  static bool setOrder(const CoreObject &a, const CoreObject &b)
175  {
176  int diff = a.dirname->compare(*b.dirname);
177  return (diff < 0 ? true
178  : diff == 0 ? a.objname < b.objname
179  : false);
180  }
181 
182  struct HashOp
183  {
184  uint32_t operator()(const Object &a) const
185  {
186  return a.hash;
187  }
188  };
189 
190  struct HashEqual
191  {
192  bool operator()(const Object &a, const Object &b) const
193  {
194  return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
195  }
196  };
197 
198  static size_t
199  dqmhash(const void *key, size_t keylen)
200  {
201  // Reduced version of Bob Jenkins' hash function at:
202  // http://www.burtleburtle.net/bob/c/lookup3.c
203 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
204 # define dqmhashmix(a,b,c) { \
205  a -= c; a ^= dqmhashrot(c, 4); c += b; \
206  b -= a; b ^= dqmhashrot(a, 6); a += c; \
207  c -= b; c ^= dqmhashrot(b, 8); b += a; \
208  a -= c; a ^= dqmhashrot(c,16); c += b; \
209  b -= a; b ^= dqmhashrot(a,19); a += c; \
210  c -= b; c ^= dqmhashrot(b, 4); b += a; }
211 # define dqmhashfinal(a,b,c) { \
212  c ^= b; c -= dqmhashrot(b,14); \
213  a ^= c; a -= dqmhashrot(c,11); \
214  b ^= a; b -= dqmhashrot(a,25); \
215  c ^= b; c -= dqmhashrot(b,16); \
216  a ^= c; a -= dqmhashrot(c,4); \
217  b ^= a; b -= dqmhashrot(a,14); \
218  c ^= b; c -= dqmhashrot(b,24); }
219 
220  uint32_t a, b, c;
221  a = b = c = 0xdeadbeef + (uint32_t) keylen;
222  const unsigned char *k = (const unsigned char *) key;
223 
224  // all but the last block: affect some bits of (a, b, c)
225  while (keylen > 12)
226  {
227  a += k[0];
228  a += ((uint32_t)k[1]) << 8;
229  a += ((uint32_t)k[2]) << 16;
230  a += ((uint32_t)k[3]) << 24;
231  b += k[4];
232  b += ((uint32_t)k[5]) << 8;
233  b += ((uint32_t)k[6]) << 16;
234  b += ((uint32_t)k[7]) << 24;
235  c += k[8];
236  c += ((uint32_t)k[9]) << 8;
237  c += ((uint32_t)k[10]) << 16;
238  c += ((uint32_t)k[11]) << 24;
239  dqmhashmix(a,b,c);
240  keylen -= 12;
241  k += 12;
242  }
243 
244  // last block: affect all 32 bits of (c); all case statements fall through
245  switch (keylen)
246  {
247  case 12: c += ((uint32_t)k[11]) << 24;
248  case 11: c += ((uint32_t)k[10]) << 16;
249  case 10: c += ((uint32_t)k[9]) << 8;
250  case 9 : c += k[8];
251  case 8 : b += ((uint32_t)k[7]) << 24;
252  case 7 : b += ((uint32_t)k[6]) << 16;
253  case 6 : b += ((uint32_t)k[5]) << 8;
254  case 5 : b += k[4];
255  case 4 : a += ((uint32_t)k[3]) << 24;
256  case 3 : a += ((uint32_t)k[2]) << 16;
257  case 2 : a += ((uint32_t)k[1]) << 8;
258  case 1 : a += k[0];
259  break;
260  case 0 : return c;
261  }
262 
263  dqmhashfinal(a, b, c);
264  return c;
265 # undef dqmhashrot
266 # undef dqmhashmix
267 # undef dqmhashfinal
268  }
269 
270  static void packQualityData(std::string &into, const QReports &qr);
271  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
272 
273 protected:
274  std::ostream & logme(void);
275  static void copydata(Bucket *b, const void *data, size_t len);
276  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
277 
278  virtual bool shouldStop(void);
279  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
280  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
281  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
282 
283  // bool reconstructObject(Object &o);
284  // bool reinstateObject(DQMStore *store, Object &o);
285  virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
286  virtual Object * makeObject(Peer *p, const std::string &name) = 0;
287  virtual void markObjectsDead(Peer *p) = 0;
288  virtual void purgeDeadObjects(Peer *p) = 0;
289 
290  virtual Peer * getPeer(lat::Socket *s) = 0;
291  virtual Peer * createPeer(lat::Socket *s) = 0;
292  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
293  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
294  virtual void sendObjectListToPeers(bool all) = 0;
295 
296  void updateMask(Peer *p);
297  virtual void updatePeerMasks(void) = 0;
298  static void discard(Bucket *&b);
299 
300  bool debug_;
301  pthread_mutex_t lock_;
302 
303 private:
304  void losePeer(const char *reason,
305  Peer *peer,
306  lat::IOSelectEvent *event,
307  lat::Error *err = 0);
308  void requestObjectData(Peer *p, const char *name, size_t len);
309  void releaseFromWait(WaitList::iterator i, Object *o);
310  void releaseWaiters(const std::string &name, Object *o);
311 
312  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
313  bool onPeerConnect(lat::IOSelectEvent *ev);
314  bool onLocalNotify(lat::IOSelectEvent *ev);
315 
317  int pid_;
318 
319  lat::IOSelector sel_;
320  lat::Socket *server_;
321  lat::Pipe wakeup_;
322  lat::Time version_;
323 
327 
328  pthread_t communicate_;
329  sig_atomic_t shutdown_;
330 
331  int delay_;
332  lat::TimeSpan waitStale_;
333  lat::TimeSpan waitMax_;
334  bool flush_;
335 
336  // copying is not available
337  DQMNet(const DQMNet &);
338  DQMNet &operator=(const DQMNet &);
339 };
340 
341 template <class ObjType>
342 class DQMImplNet : public DQMNet
343 {
344 public:
345  struct ImplPeer;
346 
347  typedef std::set<std::string> DirMap;
348  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
349  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
350  struct ImplPeer : Peer
351  {
352  ImplPeer(void) {}
355  };
356 
357  DQMImplNet(const std::string &appname = "")
358  : DQMNet(appname)
359  {}
360 
362  {}
363 
364 protected:
365  virtual Object *
366  findObject(Peer *p, const std::string &name, Peer **owner = 0)
367  {
368  size_t slash = name.rfind('/');
369  size_t dirpos = (slash == std::string::npos ? 0 : slash);
370  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
371  std::string path(name, 0, dirpos);
372  ObjType proto;
373  proto.hash = dqmhash(name.c_str(), name.size());
374  proto.dirname = &path;
375  proto.objname.append(name, namepos, std::string::npos);
376 
377  typename ObjectMap::iterator pos;
378  typename PeerMap::iterator i, e;
379  if (owner)
380  *owner = 0;
381  if (p)
382  {
383  ImplPeer *ip = static_cast<ImplPeer *>(p);
384  pos = ip->objs.find(proto);
385  if (pos == ip->objs.end())
386  return 0;
387  else
388  {
389  if (owner) *owner = ip;
390  return const_cast<ObjType *>(&*pos);
391  }
392  }
393  else
394  {
395  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
396  {
397  pos = i->second.objs.find(proto);
398  if (pos != i->second.objs.end())
399  {
400  if (owner) *owner = &i->second;
401  return const_cast<ObjType *>(&*pos);
402  }
403  }
404  return 0;
405  }
406  }
407 
408  virtual Object *
410  {
411  ImplPeer *ip = static_cast<ImplPeer *>(p);
412  size_t slash = name.rfind('/');
413  size_t dirpos = (slash == std::string::npos ? 0 : slash);
414  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
415  ObjType o;
416  o.flags = 0;
417  o.tag = 0;
418  o.version = 0;
419  o.lastreq = 0;
420  o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
421  o.objname.append(name, namepos, std::string::npos);
422  o.hash = dqmhash(name.c_str(), name.size());
423  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
424  }
425 
426  // Mark all the objects dead. This is intended to be used when
427  // starting to process a complete list of objects, in order to
428  // flag the objects that need to be killed at the end. After
429  // call to this method, revive all live objects by removing the
430  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
431  // to remove the dead ones. This also turns off object request
432  // for objects we've lost interest in.
433  virtual void
435  {
436  uint64_t minreq
437  = (lat::Time::current()
438  - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
439  ImplPeer *ip = static_cast<ImplPeer *>(p);
440  typename ObjectMap::iterator i, e;
441  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
442  {
443  if (i->lastreq && i->lastreq < minreq)
444  const_cast<ObjType &>(*i).lastreq = 0;
445  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
446  }
447  }
448 
449  // Mark remaining zombie objects as dead. See markObjectsDead().
450  virtual void
452  {
453  ImplPeer *ip = static_cast<ImplPeer *>(p);
454  typename ObjectMap::iterator i, e;
455  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
456  {
457  if (i->flags & DQM_PROP_DEAD)
458  ip->objs.erase(i++);
459  else
460  ++i;
461  }
462  }
463 
464  virtual Peer *
465  getPeer(lat::Socket *s)
466  {
467  typename PeerMap::iterator pos = peers_.find(s);
468  typename PeerMap::iterator end = peers_.end();
469  return pos == end ? 0 : &pos->second;
470  }
471 
472  virtual Peer *
473  createPeer(lat::Socket *s)
474  {
475  ImplPeer *ip = &peers_[s];
476  ip->socket = 0;
477  ip->sendq = 0;
478  ip->sendpos = 0;
479  ip->mask = 0;
480  ip->source = false;
481  ip->update = false;
482  ip->updated = false;
483  ip->updates = 0;
484  ip->waiting = 0;
485  ip->automatic = 0;
486  return ip;
487  }
488 
489  virtual void
490  removePeer(Peer *p, lat::Socket *s)
491  {
492  ImplPeer *ip = static_cast<ImplPeer *>(p);
493  bool needflush = ! ip->objs.empty();
494 
495  typename ObjectMap::iterator i, e;
496  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
497  ip->objs.erase(i++);
498 
499  peers_.erase(s);
500 
501  // If we removed a peer with objects, our list of objects
502  // has changed and we need to update downstream peers.
503  if (needflush)
505  }
506 
508  virtual void
510  {
511  typename PeerMap::iterator pi, pe;
512  typename ObjectMap::iterator oi, oe;
513  size_t size = 0;
514  size_t numobjs = 0;
515  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
516  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
517  if (all || (oi->flags & DQM_PROP_NEW))
518  size += 9*sizeof(uint32_t) + oi->dirname->size()
519  + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
520  + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
521 
522  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
523 
524  uint32_t nupdates = 0;
525  uint32_t words [4];
526  words[0] = sizeof(words);
527  words[1] = DQM_REPLY_LIST_BEGIN;
528  words[2] = numobjs;
529  words[3] = all;
530  copydata(msg, &words[0], sizeof(words));
531 
532  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
533  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
534  if (all || (oi->flags & DQM_PROP_NEW))
535  {
536  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
537  if (clear)
538  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
539  ++nupdates;
540  }
541 
542  words[1] = DQM_REPLY_LIST_END;
543  words[2] = nupdates;
544  copydata(msg, &words[0], sizeof(words));
545  }
546 
547  virtual void
549  {
550  typename PeerMap::iterator i, e;
551  typename ObjectMap::iterator oi, oe;
552  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
553  {
554  ImplPeer &p = i->second;
555  if (! p.update)
556  continue;
557 
558  if (debug_)
559  logme()
560  << "DEBUG: notifying " << p.peeraddr << std::endl;
561 
562  Bucket msg;
563  msg.next = 0;
564  sendObjectListToPeer(&msg, !p.updated || all, true);
565 
566  if (! msg.data.empty())
567  {
568  Bucket **prev = &p.sendq;
569  while (*prev)
570  prev = &(*prev)->next;
571 
572  *prev = new Bucket;
573  (*prev)->next = 0;
574  (*prev)->data.swap(msg.data);
575  }
576  p.updated = true;
577  }
578  }
579 
580  virtual void
582  {
583  typename PeerMap::iterator i, e;
584  for (i = peers_.begin(), e = peers_.end(); i != e; )
585  updateMask(&(i++)->second);
586  }
587 
588 protected:
590 };
591 
592 
593 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
594 {
595 public:
596  DQMBasicNet(const std::string &appname = "");
597 
598  void reserveLocalSpace(uint32_t size);
599  void updateLocalObject(Object &o);
600  bool removeLocalExcept(const std::set<std::string> &known);
601 
602 private:
603  ImplPeer *local_;
604 };
605 
606 
607 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
Definition: DQMNet.h:465
void run(void)
Definition: DQMNet.cc:1290
lat::Time time
Definition: DQMNet.h:122
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:42
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:325
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1064
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:66
int i
Definition: DBlmapReader.cc:9
DataBlob incoming
Definition: DQMNet.h:133
virtual bool shouldStop(void)
Definition: DQMNet.cc:384
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1234
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:32
void sendLocalChanges(void)
Definition: DQMNet.cc:1430
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:349
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1003
QReports qreports
Definition: DQMNet.h:102
virtual Object * makeObject(Peer *p, const std::string &name)
Definition: DQMNet.h:409
pthread_t communicate_
Definition: DQMNet.h:328
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:192
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:40
std::string algorithm
Definition: DQMNet.h:92
uint64_t version
Definition: DQMNet.h:99
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:138
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:71
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:36
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:44
lat::Time version_
Definition: DQMNet.h:322
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:22
virtual void sendObjectListToPeers(bool all)
Definition: DQMNet.h:548
int delay_
Definition: DQMNet.h:331
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
pthread_mutex_t lock_
Definition: DQMNet.h:301
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:1106
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:97
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::string peeraddr
Definition: DQMNet.h:131
string reason
Definition: Association.py:177
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:54
lat::Socket * socket
Definition: DQMNet.h:132
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:63
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1116
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:37
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:56
uint32_t tag
Definition: DQMNet.h:98
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
const std::string * dirname
Definition: DQMNet.h:100
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1257
AutoPeer * automatic
Definition: DQMNet.h:143
AutoPeer upstream_
Definition: DQMNet.h:324
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:31
virtual void markObjectsDead(Peer *p)
Definition: DQMNet.h:434
int pid_
Definition: DQMNet.h:317
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:357
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:65
std::string qdata
Definition: DQMNet.h:111
int port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1098
bool updated
Definition: DQMNet.h:140
virtual void updatePeerMasks(void)
Definition: DQMNet.h:581
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:55
virtual ~DQMNet(void)
Definition: DQMNet.cc:1090
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:53
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:108
std::string appname_
Definition: DQMNet.h:316
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:28
DQMNet & operator=(const DQMNet &)
std::string name
Definition: DQMNet.h:123
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:463
DataBlob data
Definition: DQMNet.h:117
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:168
Peer * peer
Definition: DQMNet.h:148
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:79
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:763
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:199
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
#define end
Definition: vmac.h:38
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:33
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:392
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool first
Definition: L1TdeRCT.cc:94
sig_atomic_t shutdown_
Definition: DQMNet.h:329
std::string objname
Definition: DQMNet.h:101
bool update
Definition: DQMNet.h:139
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:75
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1265
std::string qtname
Definition: DQMNet.h:91
int k[5][pyjets_maxn]
std::string scalar
Definition: DQMNet.h:110
lat::Time next
Definition: DQMNet.h:149
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:73
virtual Object * makeObject(Peer *p, const std::string &name)=0
string host
Definition: query.py:114
std::list< WaitObject > WaitList
Definition: DQMNet.h:84
unsigned mask
Definition: DQMNet.h:137
std::vector< uint32_t > TagList
Definition: DQMNet.h:83
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:413
void start(void)
Definition: DQMNet.cc:1275
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:68
~DQMImplNet(void)
Definition: DQMNet.h:361
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:184
ObjectMap objs
Definition: DQMNet.h:353
unsigned long long uint64_t
Definition: Time.h:15
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1479
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:946
size_t updates
Definition: DQMNet.h:141
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:38
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
Definition: DQMNet.h:366
std::string host
Definition: DQMNet.h:150
double b
Definition: hdecay.h:120
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:509
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:49
void startLocalServer(int port)
Definition: DQMNet.cc:1125
uint64_t hash
Definition: DQMNet.h:107
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:70
std::string info
Definition: DQMNet.h:124
virtual void purgeDeadObjects(Peer *p)
Definition: DQMNet.h:451
lat::Pipe wakeup_
Definition: DQMNet.h:321
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:300
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:142
#define dqmhashfinal(a, b, c)
list key
Definition: combine.py:13
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:174
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:72
double a
Definition: hdecay.h:121
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:35
virtual void removePeer(Peer *p, lat::Socket *s)=0
ObjType
Definition: DisplayCommon.h:6
std::string message
Definition: DQMNet.h:90
size_t sendpos
Definition: DQMNet.h:135
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1217
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:67
void updateMask(Peer *p)
Definition: DQMNet.cc:1034
bool flush_
Definition: DQMNet.h:334
lat::Socket * server_
Definition: DQMNet.h:320
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:30
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:39
T w() const
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:348
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1455
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:58
double pi
PeerMap peers_
Definition: DQMNet.h:589
lat::TimeSpan waitMax_
Definition: DQMNet.h:333
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
Bucket * next
Definition: DQMNet.h:116
std::vector< QValue > QReports
Definition: DQMNet.h:82
Bucket * sendq
Definition: DQMNet.h:134
lat::TimeSpan waitStale_
Definition: DQMNet.h:332
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1447
virtual void purgeDeadObjects(Peer *p)=0
float qtresult
Definition: DQMNet.h:89
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:45
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:60
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1198
WaitList waiting_
Definition: DQMNet.h:326
std::set< std::string > DirMap
Definition: DQMNet.h:345
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:59
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:29
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:27
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1439
virtual Peer * createPeer(lat::Socket *s)
Definition: DQMNet.h:473
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:41
DataBlob rawdata
Definition: DQMNet.h:109
virtual void removePeer(Peer *p, lat::Socket *s)
Definition: DQMNet.h:490
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:34
ImplPeer * local_
Definition: DQMNet.h:603
lat::IOSelector sel_
Definition: DQMNet.h:319
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0