CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
3 
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
10 # include <pthread.h>
11 # include <stdint.h>
12 # include <iostream>
13 # include <vector>
14 # include <string>
15 # include <list>
16 # include <map>
17 # include <set>
18 # include <ext/hash_set>
19 
20 //class DQMStore;
21 
22 class DQMNet
23 {
24 public:
25  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
26  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
27  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
28  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
29  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
30  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
31  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
32  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
33  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
34  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
35  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
36  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
37  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
38  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
39  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
40  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
41  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
42  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
43 
44  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
45  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
46  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
47  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
48  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
52 
53  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
54  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
55  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
56  static const uint32_t DQM_PROP_RESET = 0x00008000;
57 
58  static const uint32_t DQM_PROP_NEW = 0x00010000;
59  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
60  static const uint32_t DQM_PROP_LUMI = 0x00040000;
61  static const uint32_t DQM_PROP_DEAD = 0x00080000;
62  static const uint32_t DQM_PROP_STALE = 0x00100000;
63 
64  static const uint32_t DQM_MSG_HELLO = 0;
65  static const uint32_t DQM_MSG_UPDATE_ME = 1;
66  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
67  static const uint32_t DQM_MSG_GET_OBJECT = 3;
68 
69  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
70  static const uint32_t DQM_REPLY_LIST_END = 102;
71  static const uint32_t DQM_REPLY_NONE = 103;
72  static const uint32_t DQM_REPLY_OBJECT = 104;
73 
74  static const uint32_t MAX_PEER_WAITREQS = 128;
75 
76  struct Peer;
77  struct QValue;
78  struct WaitObject;
79 
80  typedef std::vector<unsigned char> DataBlob;
81  typedef std::vector<QValue> QReports;
82  typedef std::vector<uint32_t> TagList; // DEPRECATED
83  typedef std::list<WaitObject> WaitList;
84 
85  struct QValue
86  {
87  int code;
88  float qtresult;
89  std::string message;
90  std::string qtname;
91  std::string algorithm;
92  };
93 
94  struct CoreObject
95  {
96  uint32_t flags;
97  uint32_t tag;
99  const std::string *dirname;
100  std::string objname;
102  };
103 
105  {
109  std::string scalar;
110  std::string qdata;
111  };
112 
113  struct Bucket
114  {
117  };
118 
119  struct WaitObject
120  {
121  lat::Time time;
122  std::string name;
123  std::string info;
125  };
126 
127  struct AutoPeer;
128  struct Peer
129  {
130  std::string peeraddr;
131  lat::Socket *socket;
134  size_t sendpos;
135 
136  unsigned mask;
137  bool source;
138  bool update;
139  bool updated;
140  size_t updates;
141  size_t waiting;
143  };
144 
145  struct AutoPeer
146  {
148  lat::Time next;
149  std::string host;
150  int port;
151  bool update;
152  };
153 
154  DQMNet(const std::string &appname = "");
155  virtual ~DQMNet(void);
156 
157  void debug(bool doit);
158  void delay(int delay);
159  void startLocalServer(int port);
160  void startLocalServer(const char *path);
161  void staleObjectWaitLimit(lat::TimeSpan time);
162  void updateToCollector(const std::string &host, int port);
163  void listenToCollector(const std::string &host, int port);
164  void shutdown(void);
165  void lock(void);
166  void unlock(void);
167 
168  void start(void);
169  void run(void);
170 
171  void sendLocalChanges(void);
172 
173  static bool setOrder(const CoreObject &a, const CoreObject &b)
174  {
175  int diff = a.dirname->compare(*b.dirname);
176  return (diff < 0 ? true
177  : diff == 0 ? a.objname < b.objname
178  : false);
179  }
180 
181  struct HashOp
182  {
183  uint32_t operator()(const Object &a) const
184  {
185  return a.hash;
186  }
187  };
188 
189  struct HashEqual
190  {
191  bool operator()(const Object &a, const Object &b) const
192  {
193  return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
194  }
195  };
196 
197  static size_t
198  dqmhash(const void *key, size_t keylen)
199  {
200  // Reduced version of Bob Jenkins' hash function at:
201  // http://www.burtleburtle.net/bob/c/lookup3.c
202 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
203 # define dqmhashmix(a,b,c) { \
204  a -= c; a ^= dqmhashrot(c, 4); c += b; \
205  b -= a; b ^= dqmhashrot(a, 6); a += c; \
206  c -= b; c ^= dqmhashrot(b, 8); b += a; \
207  a -= c; a ^= dqmhashrot(c,16); c += b; \
208  b -= a; b ^= dqmhashrot(a,19); a += c; \
209  c -= b; c ^= dqmhashrot(b, 4); b += a; }
210 # define dqmhashfinal(a,b,c) { \
211  c ^= b; c -= dqmhashrot(b,14); \
212  a ^= c; a -= dqmhashrot(c,11); \
213  b ^= a; b -= dqmhashrot(a,25); \
214  c ^= b; c -= dqmhashrot(b,16); \
215  a ^= c; a -= dqmhashrot(c,4); \
216  b ^= a; b -= dqmhashrot(a,14); \
217  c ^= b; c -= dqmhashrot(b,24); }
218 
219  uint32_t a, b, c;
220  a = b = c = 0xdeadbeef + (uint32_t) keylen;
221  const unsigned char *k = (const unsigned char *) key;
222 
223  // all but the last block: affect some bits of (a, b, c)
224  while (keylen > 12)
225  {
226  a += k[0];
227  a += ((uint32_t)k[1]) << 8;
228  a += ((uint32_t)k[2]) << 16;
229  a += ((uint32_t)k[3]) << 24;
230  b += k[4];
231  b += ((uint32_t)k[5]) << 8;
232  b += ((uint32_t)k[6]) << 16;
233  b += ((uint32_t)k[7]) << 24;
234  c += k[8];
235  c += ((uint32_t)k[9]) << 8;
236  c += ((uint32_t)k[10]) << 16;
237  c += ((uint32_t)k[11]) << 24;
238  dqmhashmix(a,b,c);
239  keylen -= 12;
240  k += 12;
241  }
242 
243  // last block: affect all 32 bits of (c); all case statements fall through
244  switch (keylen)
245  {
246  case 12: c += ((uint32_t)k[11]) << 24;
247  case 11: c += ((uint32_t)k[10]) << 16;
248  case 10: c += ((uint32_t)k[9]) << 8;
249  case 9 : c += k[8];
250  case 8 : b += ((uint32_t)k[7]) << 24;
251  case 7 : b += ((uint32_t)k[6]) << 16;
252  case 6 : b += ((uint32_t)k[5]) << 8;
253  case 5 : b += k[4];
254  case 4 : a += ((uint32_t)k[3]) << 24;
255  case 3 : a += ((uint32_t)k[2]) << 16;
256  case 2 : a += ((uint32_t)k[1]) << 8;
257  case 1 : a += k[0];
258  break;
259  case 0 : return c;
260  }
261 
262  dqmhashfinal(a, b, c);
263  return c;
264 # undef dqmhashrot
265 # undef dqmhashmix
266 # undef dqmhashfinal
267  }
268 
269  static void packQualityData(std::string &into, const QReports &qr);
270  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
271 
272 protected:
273  std::ostream & logme(void);
274  static void copydata(Bucket *b, const void *data, size_t len);
275  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
276 
277  virtual bool shouldStop(void);
278  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
279  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
280  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
281 
282  // bool reconstructObject(Object &o);
283  // bool reinstateObject(DQMStore *store, Object &o);
284  virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
285  virtual Object * makeObject(Peer *p, const std::string &name) = 0;
286  virtual void markObjectsDead(Peer *p) = 0;
287  virtual void purgeDeadObjects(Peer *p) = 0;
288 
289  virtual Peer * getPeer(lat::Socket *s) = 0;
290  virtual Peer * createPeer(lat::Socket *s) = 0;
291  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
292  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
293  virtual void sendObjectListToPeers(bool all) = 0;
294 
295  void updateMask(Peer *p);
296  virtual void updatePeerMasks(void) = 0;
297  static void discard(Bucket *&b);
298 
299  bool debug_;
300  pthread_mutex_t lock_;
301 
302 private:
303  void losePeer(const char *reason,
304  Peer *peer,
305  lat::IOSelectEvent *event,
306  lat::Error *err = 0);
307  void requestObjectData(Peer *p, const char *name, size_t len);
308  void releaseFromWait(WaitList::iterator i, Object *o);
309  void releaseWaiters(const std::string &name, Object *o);
310 
311  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
312  bool onPeerConnect(lat::IOSelectEvent *ev);
313  bool onLocalNotify(lat::IOSelectEvent *ev);
314 
315  std::string appname_;
316  int pid_;
317 
318  lat::IOSelector sel_;
319  lat::Socket *server_;
320  lat::Pipe wakeup_;
321  lat::Time version_;
322 
326 
327  pthread_t communicate_;
328  sig_atomic_t shutdown_;
329 
330  int delay_;
331  lat::TimeSpan waitStale_;
332  lat::TimeSpan waitMax_;
333  bool flush_;
334 
335  // copying is not available
336  DQMNet(const DQMNet &);
337  DQMNet &operator=(const DQMNet &);
338 };
339 
340 template <class ObjType>
341 class DQMImplNet : public DQMNet
342 {
343 public:
344  struct ImplPeer;
345 
346  typedef std::set<std::string> DirMap;
347  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
348  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
349  struct ImplPeer : Peer
350  {
351  ImplPeer(void) {}
354  };
355 
356  DQMImplNet(const std::string &appname = "")
357  : DQMNet(appname)
358  {}
359 
361  {}
362 
363 protected:
364  virtual Object *
365  findObject(Peer *p, const std::string &name, Peer **owner = 0)
366  {
367  size_t slash = name.rfind('/');
368  size_t dirpos = (slash == std::string::npos ? 0 : slash);
369  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
370  std::string path(name, 0, dirpos);
371  ObjType proto;
372  proto.hash = dqmhash(name.c_str(), name.size());
373  proto.dirname = &path;
374  proto.objname.append(name, namepos, std::string::npos);
375 
376  typename ObjectMap::iterator pos;
377  typename PeerMap::iterator i, e;
378  if (owner)
379  *owner = 0;
380  if (p)
381  {
382  ImplPeer *ip = static_cast<ImplPeer *>(p);
383  pos = ip->objs.find(proto);
384  if (pos == ip->objs.end())
385  return 0;
386  else
387  {
388  if (owner) *owner = ip;
389  return const_cast<ObjType *>(&*pos);
390  }
391  }
392  else
393  {
394  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
395  {
396  pos = i->second.objs.find(proto);
397  if (pos != i->second.objs.end())
398  {
399  if (owner) *owner = &i->second;
400  return const_cast<ObjType *>(&*pos);
401  }
402  }
403  return 0;
404  }
405  }
406 
407  virtual Object *
408  makeObject(Peer *p, const std::string &name)
409  {
410  ImplPeer *ip = static_cast<ImplPeer *>(p);
411  size_t slash = name.rfind('/');
412  size_t dirpos = (slash == std::string::npos ? 0 : slash);
413  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
414  ObjType o;
415  o.flags = 0;
416  o.tag = 0;
417  o.version = 0;
418  o.lastreq = 0;
419  o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
420  o.objname.append(name, namepos, std::string::npos);
421  o.hash = dqmhash(name.c_str(), name.size());
422  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
423  }
424 
425  // Mark all the objects dead. This is intended to be used when
426  // starting to process a complete list of objects, in order to
427  // flag the objects that need to be killed at the end. After
428  // call to this method, revive all live objects by removing the
429  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
430  // to remove the dead ones. This also turns off object request
431  // for objects we've lost interest in.
432  virtual void
434  {
435  uint64_t minreq
436  = (lat::Time::current()
437  - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
438  ImplPeer *ip = static_cast<ImplPeer *>(p);
439  typename ObjectMap::iterator i, e;
440  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
441  {
442  if (i->lastreq && i->lastreq < minreq)
443  const_cast<ObjType &>(*i).lastreq = 0;
444  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
445  }
446  }
447 
448  // Mark remaining zombie objects as dead. See markObjectsDead().
449  virtual void
451  {
452  ImplPeer *ip = static_cast<ImplPeer *>(p);
453  typename ObjectMap::iterator i, e;
454  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
455  {
456  if (i->flags & DQM_PROP_DEAD)
457  ip->objs.erase(i++);
458  else
459  ++i;
460  }
461  }
462 
463  virtual Peer *
464  getPeer(lat::Socket *s)
465  {
466  typename PeerMap::iterator pos = peers_.find(s);
467  typename PeerMap::iterator end = peers_.end();
468  return pos == end ? 0 : &pos->second;
469  }
470 
471  virtual Peer *
472  createPeer(lat::Socket *s)
473  {
474  ImplPeer *ip = &peers_[s];
475  ip->socket = 0;
476  ip->sendq = 0;
477  ip->sendpos = 0;
478  ip->mask = 0;
479  ip->source = false;
480  ip->update = false;
481  ip->updated = false;
482  ip->updates = 0;
483  ip->waiting = 0;
484  ip->automatic = 0;
485  return ip;
486  }
487 
488  virtual void
489  removePeer(Peer *p, lat::Socket *s)
490  {
491  ImplPeer *ip = static_cast<ImplPeer *>(p);
492  bool needflush = ! ip->objs.empty();
493 
494  typename ObjectMap::iterator i, e;
495  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
496  ip->objs.erase(i++);
497 
498  peers_.erase(s);
499 
500  // If we removed a peer with objects, our list of objects
501  // has changed and we need to update downstream peers.
502  if (needflush)
504  }
505 
507  virtual void
508  sendObjectListToPeer(Bucket *msg, bool all, bool clear)
509  {
510  typename PeerMap::iterator pi, pe;
511  typename ObjectMap::iterator oi, oe;
512  size_t size = 0;
513  size_t numobjs = 0;
514  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
515  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
516  if (all || (oi->flags & DQM_PROP_NEW))
517  size += 9*sizeof(uint32_t) + oi->dirname->size()
518  + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
519  + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
520 
521  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
522 
523  uint32_t nupdates = 0;
524  uint32_t words [4];
525  words[0] = sizeof(words);
526  words[1] = DQM_REPLY_LIST_BEGIN;
527  words[2] = numobjs;
528  words[3] = all;
529  copydata(msg, &words[0], sizeof(words));
530 
531  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
532  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
533  if (all || (oi->flags & DQM_PROP_NEW))
534  {
535  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
536  if (clear)
537  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
538  ++nupdates;
539  }
540 
541  words[1] = DQM_REPLY_LIST_END;
542  words[2] = nupdates;
543  copydata(msg, &words[0], sizeof(words));
544  }
545 
546  virtual void
548  {
549  typename PeerMap::iterator i, e;
550  typename ObjectMap::iterator oi, oe;
551  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
552  {
553  ImplPeer &p = i->second;
554  if (! p.update)
555  continue;
556 
557  if (debug_)
558  logme()
559  << "DEBUG: notifying " << p.peeraddr << std::endl;
560 
561  Bucket msg;
562  msg.next = 0;
563  sendObjectListToPeer(&msg, !p.updated || all, true);
564 
565  if (! msg.data.empty())
566  {
567  Bucket **prev = &p.sendq;
568  while (*prev)
569  prev = &(*prev)->next;
570 
571  *prev = new Bucket;
572  (*prev)->next = 0;
573  (*prev)->data.swap(msg.data);
574  }
575  p.updated = true;
576  }
577  }
578 
579  virtual void
581  {
582  typename PeerMap::iterator i, e;
583  for (i = peers_.begin(), e = peers_.end(); i != e; )
584  updateMask(&(i++)->second);
585  }
586 
587 protected:
589 };
590 
591 
592 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
593 {
594 public:
595  DQMBasicNet(const std::string &appname = "");
596 
597  void reserveLocalSpace(uint32_t size);
598  void updateLocalObject(Object &o);
599  bool removeLocalExcept(const std::set<std::string> &known);
600 
601 private:
602  ImplPeer *local_;
603 };
604 
605 
606 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
Definition: DQMNet.h:464
void run(void)
Definition: DQMNet.cc:1285
lat::Time time
Definition: DQMNet.h:121
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:42
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:194
AutoPeer downstream_
Definition: DQMNet.h:324
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1059
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:65
int i
Definition: DBlmapReader.cc:9
DataBlob incoming
Definition: DQMNet.h:132
virtual bool shouldStop(void)
Definition: DQMNet.cc:379
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1229
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:32
void sendLocalChanges(void)
Definition: DQMNet.cc:1425
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:348
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:998
QReports qreports
Definition: DQMNet.h:101
virtual Object * makeObject(Peer *p, const std::string &name)
Definition: DQMNet.h:408
pthread_t communicate_
Definition: DQMNet.h:327
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:191
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:40
std::string algorithm
Definition: DQMNet.h:91
uint64_t version
Definition: DQMNet.h:98
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:137
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:70
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:36
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:44
lat::Time version_
Definition: DQMNet.h:321
static void discard(Bucket *&b)
Definition: DQMNet.cc:57
Definition: DQMNet.h:22
virtual void sendObjectListToPeers(bool all)
Definition: DQMNet.h:547
int delay_
Definition: DQMNet.h:330
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
pthread_mutex_t lock_
Definition: DQMNet.h:300
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:1101
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:96
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:159
std::string peeraddr
Definition: DQMNet.h:130
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:54
lat::Socket * socket
Definition: DQMNet.h:131
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1111
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:37
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:56
uint32_t tag
Definition: DQMNet.h:97
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
const std::string * dirname
Definition: DQMNet.h:99
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1252
AutoPeer * automatic
Definition: DQMNet.h:142
AutoPeer upstream_
Definition: DQMNet.h:323
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:31
virtual void markObjectsDead(Peer *p)
Definition: DQMNet.h:433
int pid_
Definition: DQMNet.h:316
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:356
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:64
std::string qdata
Definition: DQMNet.h:110
int port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1093
bool updated
Definition: DQMNet.h:139
virtual void updatePeerMasks(void)
Definition: DQMNet.h:580
int path() const
Definition: HLTadd.h:3
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:55
virtual ~DQMNet(void)
Definition: DQMNet.cc:1085
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:53
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:126
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:106
uint64_t lastreq
Definition: DQMNet.h:107
std::string appname_
Definition: DQMNet.h:315
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:28
DQMNet & operator=(const DQMNet &)
std::string name
Definition: DQMNet.h:122
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:458
DataBlob data
Definition: DQMNet.h:116
Peer * peer
Definition: DQMNet.h:147
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:78
std::ostream & logme(void)
Definition: DQMNet.cc:37
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:758
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:198
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
#define end
Definition: vmac.h:38
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:33
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:387
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool first
Definition: L1TdeRCT.cc:79
sig_atomic_t shutdown_
Definition: DQMNet.h:328
std::string objname
Definition: DQMNet.h:100
bool update
Definition: DQMNet.h:138
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:74
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:72
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1260
std::string qtname
Definition: DQMNet.h:90
int k[5][pyjets_maxn]
std::string scalar
Definition: DQMNet.h:109
lat::Time next
Definition: DQMNet.h:148
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:72
static std::string from(" from ")
virtual Object * makeObject(Peer *p, const std::string &name)=0
string host
Definition: query.py:114
std::list< WaitObject > WaitList
Definition: DQMNet.h:83
unsigned mask
Definition: DQMNet.h:136
std::vector< uint32_t > TagList
Definition: DQMNet.h:82
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:408
void start(void)
Definition: DQMNet.cc:1270
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:67
~DQMImplNet(void)
Definition: DQMNet.h:360
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:183
ObjectMap objs
Definition: DQMNet.h:352
unsigned long long uint64_t
Definition: Time.h:15
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1474
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:941
size_t updates
Definition: DQMNet.h:140
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:38
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
Definition: DQMNet.h:365
std::string host
Definition: DQMNet.h:149
double b
Definition: hdecay.h:120
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:508
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:49
void startLocalServer(int port)
Definition: DQMNet.cc:1120
uint64_t hash
Definition: DQMNet.h:106
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:172
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:69
std::string info
Definition: DQMNet.h:123
virtual void purgeDeadObjects(Peer *p)
Definition: DQMNet.h:450
lat::Pipe wakeup_
Definition: DQMNet.h:320
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:299
size_t waiting
Definition: DQMNet.h:141
#define dqmhashfinal(a, b, c)
list key
Definition: combine.py:13
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:173
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:71
double a
Definition: hdecay.h:121
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:35
virtual void removePeer(Peer *p, lat::Socket *s)=0
ObjType
Definition: DisplayCommon.h:6
std::string message
Definition: DQMNet.h:89
size_t sendpos
Definition: DQMNet.h:134
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1212
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:66
void updateMask(Peer *p)
Definition: DQMNet.cc:1029
bool flush_
Definition: DQMNet.h:333
lat::Socket * server_
Definition: DQMNet.h:319
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:30
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:39
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:347
string s
Definition: asciidump.py:422
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1450
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:58
double pi
PeerMap peers_
Definition: DQMNet.h:588
lat::TimeSpan waitMax_
Definition: DQMNet.h:332
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
Bucket * next
Definition: DQMNet.h:115
std::vector< QValue > QReports
Definition: DQMNet.h:81
Bucket * sendq
Definition: DQMNet.h:133
lat::TimeSpan waitStale_
Definition: DQMNet.h:331
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1442
virtual void purgeDeadObjects(Peer *p)=0
float qtresult
Definition: DQMNet.h:88
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:45
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:60
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1193
WaitList waiting_
Definition: DQMNet.h:325
std::set< std::string > DirMap
Definition: DQMNet.h:344
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:59
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:29
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:27
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1434
virtual Peer * createPeer(lat::Socket *s)
Definition: DQMNet.h:472
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:41
DataBlob rawdata
Definition: DQMNet.h:108
virtual void removePeer(Peer *p, lat::Socket *s)
Definition: DQMNet.h:489
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:34
ImplPeer * local_
Definition: DQMNet.h:602
lat::IOSelector sel_
Definition: DQMNet.h:318
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0