CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMNet.h
Go to the documentation of this file.
1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
3 
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
10 # include <pthread.h>
11 # include <stdint.h>
12 # include <iostream>
13 # include <vector>
14 # include <string>
15 # include <list>
16 # include <map>
17 # include <set>
18 # include <ext/hash_set>
19 
20 //class DQMStore;
21 
22 class DQMNet
23 {
24 public:
25  static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
26  static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
27  static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
28  static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
29  static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
30  static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
31  static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
32  static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
33  static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
34  static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
35  static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
36  static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
37  static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
38  static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
39  static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
40  static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
41  static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
42  static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
43 
44  static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
45  static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
46  static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
47  static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
48  static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
52 
53  static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
54  static const uint32_t DQM_PROP_TAGGED = 0x00002000;
55  static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
56  static const uint32_t DQM_PROP_RESET = 0x00008000;
57 
58  static const uint32_t DQM_PROP_NEW = 0x00010000;
59  static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
60  static const uint32_t DQM_PROP_LUMI = 0x00040000;
61  static const uint32_t DQM_PROP_DEAD = 0x00080000;
62  static const uint32_t DQM_PROP_STALE = 0x00100000;
63  static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
64 
65  static const uint32_t DQM_MSG_HELLO = 0;
66  static const uint32_t DQM_MSG_UPDATE_ME = 1;
67  static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
68  static const uint32_t DQM_MSG_GET_OBJECT = 3;
69 
70  static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
71  static const uint32_t DQM_REPLY_LIST_END = 102;
72  static const uint32_t DQM_REPLY_NONE = 103;
73  static const uint32_t DQM_REPLY_OBJECT = 104;
74 
75  static const uint32_t MAX_PEER_WAITREQS = 128;
76 
77  struct Peer;
78  struct QValue;
79  struct WaitObject;
80 
81  typedef std::vector<unsigned char> DataBlob;
82  typedef std::vector<QValue> QReports;
83  typedef std::vector<uint32_t> TagList; // DEPRECATED
84  typedef std::list<WaitObject> WaitList;
85 
86  struct QValue
87  {
88  int code;
89  float qtresult;
93  };
94 
95  struct CoreObject
96  {
97  uint32_t flags;
98  uint32_t tag;
100  uint32_t run;
101  uint32_t lumi;
102  uint32_t streamId;
103  uint32_t moduleId;
107  };
108 
110  {
116  };
117 
118  struct Bucket
119  {
122  };
123 
124  struct WaitObject
125  {
126  lat::Time time;
130  };
131 
132  struct AutoPeer;
133  struct Peer
134  {
136  lat::Socket *socket;
139  size_t sendpos;
140 
141  unsigned mask;
142  bool source;
143  bool update;
144  bool updated;
145  size_t updates;
146  size_t waiting;
148  };
149 
150  struct AutoPeer
151  {
153  lat::Time next;
155  int port;
156  bool update;
157  };
158 
159  DQMNet(const std::string &appname = "");
160  virtual ~DQMNet(void);
161 
162  void debug(bool doit);
163  void delay(int delay);
164  void startLocalServer(int port);
165  void startLocalServer(const char *path);
166  void staleObjectWaitLimit(lat::TimeSpan time);
167  void updateToCollector(const std::string &host, int port);
168  void listenToCollector(const std::string &host, int port);
169  void shutdown(void);
170  void lock(void);
171  void unlock(void);
172 
173  void start(void);
174  void run(void);
175 
176  void sendLocalChanges(void);
177 
178  static bool setOrder(const CoreObject &a, const CoreObject &b)
179  {
180  if (a.run == b.run) {
181  if (a.lumi == b.lumi) {
182  if (a.streamId == b.streamId) {
183  if (a.moduleId == b.moduleId) {
184  if (*a.dirname == *b.dirname) {
185  return a.objname < b.objname;
186  }
187  return *a.dirname < *b.dirname;
188  }
189  return a.moduleId < b.moduleId;
190  }
191  return a.streamId < b.streamId;
192  }
193  return a.lumi < b.lumi;
194  }
195  return a.run < b.run;
196  }
197 
198  struct HashOp
199  {
200  uint32_t operator()(const Object &a) const
201  {
202  return a.hash;
203  }
204  };
205 
206  struct HashEqual
207  {
208  bool operator()(const Object &a, const Object &b) const
209  {
210  return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
211  }
212  };
213 
214  static size_t
215  dqmhash(const void *key, size_t keylen)
216  {
217  // Reduced version of Bob Jenkins' hash function at:
218  // http://www.burtleburtle.net/bob/c/lookup3.c
219 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
220 # define dqmhashmix(a,b,c) { \
221  a -= c; a ^= dqmhashrot(c, 4); c += b; \
222  b -= a; b ^= dqmhashrot(a, 6); a += c; \
223  c -= b; c ^= dqmhashrot(b, 8); b += a; \
224  a -= c; a ^= dqmhashrot(c,16); c += b; \
225  b -= a; b ^= dqmhashrot(a,19); a += c; \
226  c -= b; c ^= dqmhashrot(b, 4); b += a; }
227 # define dqmhashfinal(a,b,c) { \
228  c ^= b; c -= dqmhashrot(b,14); \
229  a ^= c; a -= dqmhashrot(c,11); \
230  b ^= a; b -= dqmhashrot(a,25); \
231  c ^= b; c -= dqmhashrot(b,16); \
232  a ^= c; a -= dqmhashrot(c,4); \
233  b ^= a; b -= dqmhashrot(a,14); \
234  c ^= b; c -= dqmhashrot(b,24); }
235 
236  uint32_t a, b, c;
237  a = b = c = 0xdeadbeef + (uint32_t) keylen;
238  const unsigned char *k = (const unsigned char *) key;
239 
240  // all but the last block: affect some bits of (a, b, c)
241  while (keylen > 12)
242  {
243  a += k[0];
244  a += ((uint32_t)k[1]) << 8;
245  a += ((uint32_t)k[2]) << 16;
246  a += ((uint32_t)k[3]) << 24;
247  b += k[4];
248  b += ((uint32_t)k[5]) << 8;
249  b += ((uint32_t)k[6]) << 16;
250  b += ((uint32_t)k[7]) << 24;
251  c += k[8];
252  c += ((uint32_t)k[9]) << 8;
253  c += ((uint32_t)k[10]) << 16;
254  c += ((uint32_t)k[11]) << 24;
255  dqmhashmix(a,b,c);
256  keylen -= 12;
257  k += 12;
258  }
259 
260  // last block: affect all 32 bits of (c); all case statements fall through
261  switch (keylen)
262  {
263  case 12: c += ((uint32_t)k[11]) << 24;
264  case 11: c += ((uint32_t)k[10]) << 16;
265  case 10: c += ((uint32_t)k[9]) << 8;
266  case 9 : c += k[8];
267  case 8 : b += ((uint32_t)k[7]) << 24;
268  case 7 : b += ((uint32_t)k[6]) << 16;
269  case 6 : b += ((uint32_t)k[5]) << 8;
270  case 5 : b += k[4];
271  case 4 : a += ((uint32_t)k[3]) << 24;
272  case 3 : a += ((uint32_t)k[2]) << 16;
273  case 2 : a += ((uint32_t)k[1]) << 8;
274  case 1 : a += k[0];
275  break;
276  case 0 : return c;
277  }
278 
279  dqmhashfinal(a, b, c);
280  return c;
281 # undef dqmhashrot
282 # undef dqmhashmix
283 # undef dqmhashfinal
284  }
285 
286  static void packQualityData(std::string &into, const QReports &qr);
287  static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
288 
289 protected:
290  std::ostream & logme(void);
291  static void copydata(Bucket *b, const void *data, size_t len);
292  virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
293 
294  virtual bool shouldStop(void);
295  void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
296  virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
297  virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
298 
299  // bool reconstructObject(Object &o);
300  // bool reinstateObject(DQMStore *store, Object &o);
301  virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
302  virtual Object * makeObject(Peer *p, const std::string &name) = 0;
303  virtual void markObjectsDead(Peer *p) = 0;
304  virtual void purgeDeadObjects(Peer *p) = 0;
305 
306  virtual Peer * getPeer(lat::Socket *s) = 0;
307  virtual Peer * createPeer(lat::Socket *s) = 0;
308  virtual void removePeer(Peer *p, lat::Socket *s) = 0;
309  virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
310  virtual void sendObjectListToPeers(bool all) = 0;
311 
312  void updateMask(Peer *p);
313  virtual void updatePeerMasks(void) = 0;
314  static void discard(Bucket *&b);
315 
316  bool debug_;
317  pthread_mutex_t lock_;
318 
319 private:
320  void losePeer(const char *reason,
321  Peer *peer,
322  lat::IOSelectEvent *event,
323  lat::Error *err = 0);
324  void requestObjectData(Peer *p, const char *name, size_t len);
325  void releaseFromWait(WaitList::iterator i, Object *o);
326  void releaseWaiters(const std::string &name, Object *o);
327 
328  bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
329  bool onPeerConnect(lat::IOSelectEvent *ev);
330  bool onLocalNotify(lat::IOSelectEvent *ev);
331 
333  int pid_;
334 
335  lat::IOSelector sel_;
336  lat::Socket *server_;
337  lat::Pipe wakeup_;
338  lat::Time version_;
339 
343 
344  pthread_t communicate_;
345  sig_atomic_t shutdown_;
346 
347  int delay_;
348  lat::TimeSpan waitStale_;
349  lat::TimeSpan waitMax_;
350  bool flush_;
351 
352  // copying is not available
353  DQMNet(const DQMNet &);
354  DQMNet &operator=(const DQMNet &);
355 };
356 
357 template <class ObjType>
358 class DQMImplNet : public DQMNet
359 {
360 public:
361  struct ImplPeer;
362 
363  typedef std::set<std::string> DirMap;
364  typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
365  typedef std::map<lat::Socket *, ImplPeer> PeerMap;
366  struct ImplPeer : Peer
367  {
368  ImplPeer(void) {}
371  };
372 
373  DQMImplNet(const std::string &appname = "")
374  : DQMNet(appname)
375  {}
376 
378  {}
379 
380 protected:
381  virtual Object *
382  findObject(Peer *p, const std::string &name, Peer **owner = 0)
383  {
384  size_t slash = name.rfind('/');
385  size_t dirpos = (slash == std::string::npos ? 0 : slash);
386  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
387  std::string path(name, 0, dirpos);
388  ObjType proto;
389  proto.hash = dqmhash(name.c_str(), name.size());
390  proto.dirname = &path;
391  proto.objname.append(name, namepos, std::string::npos);
392 
393  typename ObjectMap::iterator pos;
394  typename PeerMap::iterator i, e;
395  if (owner)
396  *owner = 0;
397  if (p)
398  {
399  ImplPeer *ip = static_cast<ImplPeer *>(p);
400  pos = ip->objs.find(proto);
401  if (pos == ip->objs.end())
402  return 0;
403  else
404  {
405  if (owner) *owner = ip;
406  return const_cast<ObjType *>(&*pos);
407  }
408  }
409  else
410  {
411  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
412  {
413  pos = i->second.objs.find(proto);
414  if (pos != i->second.objs.end())
415  {
416  if (owner) *owner = &i->second;
417  return const_cast<ObjType *>(&*pos);
418  }
419  }
420  return 0;
421  }
422  }
423 
424  virtual Object *
426  {
427  ImplPeer *ip = static_cast<ImplPeer *>(p);
428  size_t slash = name.rfind('/');
429  size_t dirpos = (slash == std::string::npos ? 0 : slash);
430  size_t namepos = (slash == std::string::npos ? 0 : slash+1);
431  ObjType o;
432  o.flags = 0;
433  o.tag = 0;
434  o.version = 0;
435  o.lastreq = 0;
436  o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
437  o.objname.append(name, namepos, std::string::npos);
438  o.hash = dqmhash(name.c_str(), name.size());
439  return const_cast<ObjType *>(&*ip->objs.insert(o).first);
440  }
441 
442  // Mark all the objects dead. This is intended to be used when
443  // starting to process a complete list of objects, in order to
444  // flag the objects that need to be killed at the end. After
445  // call to this method, revive all live objects by removing the
446  // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
447  // to remove the dead ones. This also turns off object request
448  // for objects we've lost interest in.
449  virtual void
451  {
452  uint64_t minreq
453  = (lat::Time::current()
454  - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
455  ImplPeer *ip = static_cast<ImplPeer *>(p);
456  typename ObjectMap::iterator i, e;
457  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
458  {
459  if (i->lastreq && i->lastreq < minreq)
460  const_cast<ObjType &>(*i).lastreq = 0;
461  const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
462  }
463  }
464 
465  // Mark remaining zombie objects as dead. See markObjectsDead().
466  virtual void
468  {
469  ImplPeer *ip = static_cast<ImplPeer *>(p);
470  typename ObjectMap::iterator i, e;
471  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
472  {
473  if (i->flags & DQM_PROP_DEAD)
474  ip->objs.erase(i++);
475  else
476  ++i;
477  }
478  }
479 
480  virtual Peer *
481  getPeer(lat::Socket *s)
482  {
483  typename PeerMap::iterator pos = peers_.find(s);
484  typename PeerMap::iterator end = peers_.end();
485  return pos == end ? 0 : &pos->second;
486  }
487 
488  virtual Peer *
489  createPeer(lat::Socket *s)
490  {
491  ImplPeer *ip = &peers_[s];
492  ip->socket = 0;
493  ip->sendq = 0;
494  ip->sendpos = 0;
495  ip->mask = 0;
496  ip->source = false;
497  ip->update = false;
498  ip->updated = false;
499  ip->updates = 0;
500  ip->waiting = 0;
501  ip->automatic = 0;
502  return ip;
503  }
504 
505  virtual void
506  removePeer(Peer *p, lat::Socket *s)
507  {
508  ImplPeer *ip = static_cast<ImplPeer *>(p);
509  bool needflush = ! ip->objs.empty();
510 
511  typename ObjectMap::iterator i, e;
512  for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
513  ip->objs.erase(i++);
514 
515  peers_.erase(s);
516 
517  // If we removed a peer with objects, our list of objects
518  // has changed and we need to update downstream peers.
519  if (needflush)
521  }
522 
524  virtual void
526  {
527  typename PeerMap::iterator pi, pe;
528  typename ObjectMap::iterator oi, oe;
529  size_t size = 0;
530  size_t numobjs = 0;
531  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
532  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
533  if (all || (oi->flags & DQM_PROP_NEW))
534  size += 9*sizeof(uint32_t) + oi->dirname->size()
535  + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
536  + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
537 
538  msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
539 
540  uint32_t nupdates = 0;
541  uint32_t words [4];
542  words[0] = sizeof(words);
543  words[1] = DQM_REPLY_LIST_BEGIN;
544  words[2] = numobjs;
545  words[3] = all;
546  copydata(msg, &words[0], sizeof(words));
547 
548  for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
549  for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
550  if (all || (oi->flags & DQM_PROP_NEW))
551  {
552  sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
553  if (clear)
554  const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
555  ++nupdates;
556  }
557 
558  words[1] = DQM_REPLY_LIST_END;
559  words[2] = nupdates;
560  copydata(msg, &words[0], sizeof(words));
561  }
562 
563  virtual void
565  {
566  typename PeerMap::iterator i, e;
567  typename ObjectMap::iterator oi, oe;
568  for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
569  {
570  ImplPeer &p = i->second;
571  if (! p.update)
572  continue;
573 
574  if (debug_)
575  logme()
576  << "DEBUG: notifying " << p.peeraddr << std::endl;
577 
578  Bucket msg;
579  msg.next = 0;
580  sendObjectListToPeer(&msg, !p.updated || all, true);
581 
582  if (! msg.data.empty())
583  {
584  Bucket **prev = &p.sendq;
585  while (*prev)
586  prev = &(*prev)->next;
587 
588  *prev = new Bucket;
589  (*prev)->next = 0;
590  (*prev)->data.swap(msg.data);
591  }
592  p.updated = true;
593  }
594  }
595 
596  virtual void
598  {
599  typename PeerMap::iterator i, e;
600  for (i = peers_.begin(), e = peers_.end(); i != e; )
601  updateMask(&(i++)->second);
602  }
603 
604 protected:
606 };
607 
608 
609 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
610 {
611 public:
612  DQMBasicNet(const std::string &appname = "");
613 
614  void reserveLocalSpace(uint32_t size);
615  void updateLocalObject(Object &o);
616  bool removeLocalExcept(const std::set<std::string> &known);
617 
618 private:
619  ImplPeer *local_;
620 };
621 
622 
623 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
Definition: DQMNet.h:481
void run(void)
Definition: DQMNet.cc:1290
lat::Time time
Definition: DQMNet.h:126
static const uint32_t DQM_PROP_TYPE_DATABLOB
Definition: DQMNet.h:42
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:341
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1064
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:66
int i
Definition: DBlmapReader.cc:9
DataBlob incoming
Definition: DQMNet.h:137
virtual bool shouldStop(void)
Definition: DQMNet.cc:384
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1234
uint32_t moduleId
Definition: DQMNet.h:103
static const uint32_t DQM_PROP_TYPE_TH1S
Definition: DQMNet.h:32
void sendLocalChanges(void)
Definition: DQMNet.cc:1430
std::map< lat::Socket *, ImplPeer > PeerMap
Definition: DQMNet.h:365
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1003
QReports qreports
Definition: DQMNet.h:106
virtual Object * makeObject(Peer *p, const std::string &name)
Definition: DQMNet.h:425
pthread_t communicate_
Definition: DQMNet.h:344
bool operator()(const Object &a, const Object &b) const
Definition: DQMNet.h:208
static const uint32_t DQM_PROP_TYPE_TPROF
Definition: DQMNet.h:40
std::string algorithm
Definition: DQMNet.h:92
uint64_t version
Definition: DQMNet.h:99
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:142
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:71
static const uint32_t DQM_PROP_TYPE_TH2D
Definition: DQMNet.h:36
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
Definition: DQMNet.h:44
lat::Time version_
Definition: DQMNet.h:338
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:22
virtual void sendObjectListToPeers(bool all)
Definition: DQMNet.h:564
int delay_
Definition: DQMNet.h:347
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
pthread_mutex_t lock_
Definition: DQMNet.h:317
#define dqmhashmix(a, b, c)
void delay(int delay)
Definition: DQMNet.cc:1106
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:97
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::string peeraddr
Definition: DQMNet.h:135
static const uint32_t DQM_PROP_TAGGED
Definition: DQMNet.h:54
lat::Socket * socket
Definition: DQMNet.h:136
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
Definition: DQMNet.h:63
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1116
static const uint32_t DQM_PROP_TYPE_TH3F
Definition: DQMNet.h:37
static const uint32_t DQM_PROP_RESET
Definition: DQMNet.h:56
uint32_t tag
Definition: DQMNet.h:98
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
const std::string * dirname
Definition: DQMNet.h:104
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1257
AutoPeer * automatic
Definition: DQMNet.h:147
AutoPeer upstream_
Definition: DQMNet.h:340
static const uint32_t DQM_PROP_TYPE_TH1F
Definition: DQMNet.h:31
virtual void markObjectsDead(Peer *p)
Definition: DQMNet.h:450
int pid_
Definition: DQMNet.h:333
DQMImplNet(const std::string &appname="")
Definition: DQMNet.h:373
static const uint32_t DQM_MSG_HELLO
Definition: DQMNet.h:65
uint32_t run
Definition: DQMNet.h:100
std::string qdata
Definition: DQMNet.h:115
int port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1098
bool updated
Definition: DQMNet.h:144
virtual void updatePeerMasks(void)
Definition: DQMNet.h:597
static const uint32_t DQM_PROP_ACCUMULATE
Definition: DQMNet.h:55
virtual ~DQMNet(void)
Definition: DQMNet.cc:1090
static const uint32_t DQM_PROP_HAS_REFERENCE
Definition: DQMNet.h:53
tuple path
else: Piece not in the list, fine.
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:112
std::string appname_
Definition: DQMNet.h:332
static const uint32_t DQM_PROP_TYPE_INT
Definition: DQMNet.h:28
DQMNet & operator=(const DQMNet &)
std::string name
Definition: DQMNet.h:127
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:463
DataBlob data
Definition: DQMNet.h:121
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:167
uint32_t lumi
Definition: DQMNet.h:101
Peer * peer
Definition: DQMNet.h:152
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:79
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:763
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:215
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
#define end
Definition: vmac.h:37
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
static const uint32_t DQM_PROP_TYPE_TH1D
Definition: DQMNet.h:33
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:392
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool first
Definition: L1TdeRCT.cc:75
sig_atomic_t shutdown_
Definition: DQMNet.h:345
std::string objname
Definition: DQMNet.h:105
bool update
Definition: DQMNet.h:143
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:75
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1265
std::string qtname
Definition: DQMNet.h:91
int k[5][pyjets_maxn]
std::string scalar
Definition: DQMNet.h:114
lat::Time next
Definition: DQMNet.h:153
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:73
virtual Object * makeObject(Peer *p, const std::string &name)=0
string host
Definition: query.py:114
std::list< WaitObject > WaitList
Definition: DQMNet.h:84
unsigned mask
Definition: DQMNet.h:141
std::vector< uint32_t > TagList
Definition: DQMNet.h:83
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:413
void start(void)
Definition: DQMNet.cc:1275
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:68
~DQMImplNet(void)
Definition: DQMNet.h:377
uint32_t operator()(const Object &a) const
Definition: DQMNet.h:200
ObjectMap objs
Definition: DQMNet.h:369
unsigned long long uint64_t
Definition: Time.h:15
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1479
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:946
size_t updates
Definition: DQMNet.h:145
static const uint32_t DQM_PROP_TYPE_TH3S
Definition: DQMNet.h:38
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
Definition: DQMNet.h:382
std::string host
Definition: DQMNet.h:154
double b
Definition: hdecay.h:120
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
Definition: DQMNet.h:525
static const uint32_t DQM_PROP_REPORT_ALARM
Definition: DQMNet.h:49
void startLocalServer(int port)
Definition: DQMNet.cc:1125
uint64_t hash
Definition: DQMNet.h:111
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:70
std::string info
Definition: DQMNet.h:128
virtual void purgeDeadObjects(Peer *p)
Definition: DQMNet.h:467
lat::Pipe wakeup_
Definition: DQMNet.h:337
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:316
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:146
#define dqmhashfinal(a, b, c)
list key
Definition: combine.py:13
static bool setOrder(const CoreObject &a, const CoreObject &b)
Definition: DQMNet.h:178
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:72
double a
Definition: hdecay.h:121
static const uint32_t DQM_PROP_TYPE_TH2S
Definition: DQMNet.h:35
virtual void removePeer(Peer *p, lat::Socket *s)=0
std::string message
Definition: DQMNet.h:90
size_t sendpos
Definition: DQMNet.h:139
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1217
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:67
void updateMask(Peer *p)
Definition: DQMNet.cc:1034
bool flush_
Definition: DQMNet.h:350
lat::Socket * server_
Definition: DQMNet.h:336
static const uint32_t DQM_PROP_TYPE_STRING
Definition: DQMNet.h:30
static const uint32_t DQM_PROP_TYPE_TH3D
Definition: DQMNet.h:39
T w() const
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
Definition: DQMNet.h:364
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1455
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:58
double pi
PeerMap peers_
Definition: DQMNet.h:605
lat::TimeSpan waitMax_
Definition: DQMNet.h:349
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
Bucket * next
Definition: DQMNet.h:120
std::vector< QValue > QReports
Definition: DQMNet.h:82
Bucket * sendq
Definition: DQMNet.h:138
lat::TimeSpan waitStale_
Definition: DQMNet.h:348
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1447
virtual void purgeDeadObjects(Peer *p)=0
uint32_t streamId
Definition: DQMNet.h:102
float qtresult
Definition: DQMNet.h:89
static const uint32_t DQM_PROP_REPORT_CLEAR
Definition: DQMNet.h:45
static const uint32_t DQM_PROP_LUMI
Definition: DQMNet.h:60
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1198
WaitList waiting_
Definition: DQMNet.h:342
std::set< std::string > DirMap
Definition: DQMNet.h:361
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:59
static const uint32_t DQM_PROP_TYPE_REAL
Definition: DQMNet.h:29
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
Definition: DQMNet.h:27
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1439
virtual Peer * createPeer(lat::Socket *s)
Definition: DQMNet.h:489
static const uint32_t DQM_PROP_TYPE_TPROF2D
Definition: DQMNet.h:41
DataBlob rawdata
Definition: DQMNet.h:113
virtual void removePeer(Peer *p, lat::Socket *s)
Definition: DQMNet.h:506
static const uint32_t DQM_PROP_TYPE_TH2F
Definition: DQMNet.h:34
ImplPeer * local_
Definition: DQMNet.h:619
lat::IOSelector sel_
Definition: DQMNet.h:335
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0