CMS 3D CMS Logo

DQMNet.cc
Go to the documentation of this file.
2 #include "classlib/iobase/InetServerSocket.h"
3 #include "classlib/iobase/LocalServerSocket.h"
4 #include "classlib/iobase/Filename.h"
5 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
6 #include "classlib/utils/TimeInfo.h"
7 #include "classlib/utils/StringList.h"
8 #include "classlib/utils/StringFormat.h"
9 #include "classlib/utils/StringOps.h"
10 #include "classlib/utils/SystemError.h"
11 #include "classlib/utils/Regexp.h"
12 #include <unistd.h>
13 #include <fcntl.h>
14 #include <sys/wait.h>
15 #include <cstdio>
16 #include <cstdint>
17 #include <iostream>
18 #include <sstream>
19 #include <cassert>
20 #include <cfloat>
21 #include <cinttypes>
22 
24 
25 #if __APPLE__
26 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
27 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
28 #else
29 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
30 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
31 #endif
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
34 
35 using namespace lat;
36 
37 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
38 
39 // TODO: Can't include the header file since that leads to ambiguities.
40 namespace dqm {
41  namespace qstatus {
42  static const int STATUS_OK = 100; //< Test was succesful.
43  static const int WARNING = 200; //< Test had some problems.
44  static const int ERROR = 300; //< Test has failed.
45  } // namespace qstatus
46 } // namespace dqm
47 
49 // Generate log prefix.
50 std::ostream &DQMNet::logme() {
51  Time now = Time::current();
52  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
53  << "]: ";
54 }
55 
56 // Append data into a bucket.
57 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
58  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
59 }
60 
61 // Discard a bucket chain.
63  while (b) {
64  Bucket *next = b->next;
65  delete b;
66  b = next;
67  }
68 }
69 
71 
74 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
75  if (reason)
76  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
77 
78  Socket *s = peer->socket;
79 
80  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
81  if (i->peer == peer)
82  waiting_.erase(i++);
83  else
84  ++i;
85 
86  if (ev)
87  ev->source = nullptr;
88 
89  discard(peer->sendq);
90  if (peer->automatic)
91  peer->automatic->peer = nullptr;
92 
93  sel_.detach(s);
94  s->close();
95  removePeer(peer, s);
96  delete s;
97 }
98 
100 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
101  // Issue request to peer.
102  Bucket **msg = &p->sendq;
103  while (*msg)
104  msg = &(*msg)->next;
105  *msg = new Bucket;
106  (*msg)->next = nullptr;
107 
108  uint32_t words[3];
109  words[0] = sizeof(words) + len;
110  words[1] = DQM_MSG_GET_OBJECT;
111  words[2] = len;
112  copydata(*msg, words, sizeof(words));
113  copydata(*msg, name, len);
114 }
115 
118 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
119  // FIXME: Should we automatically record which exact peer the waiter
120  // is expecting to deliver data so we know to release the waiter if
121  // the other peer vanishes? The current implementation stands a
122  // chance for the waiter to wait indefinitely -- although we do
123  // force terminate the wait after a while.
124  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
125  WaitObject wo = {Time::current(), name, info, p};
126  waiting_.push_back(wo);
127  p->waiting++;
128 }
129 
130 // Once an object has been updated, this is invoked for all waiting
131 // peers. Send the object back to the peer in suitable form.
132 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
133  Bucket **msg = &i->peer->sendq;
134  while (*msg)
135  msg = &(*msg)->next;
136  *msg = new Bucket;
137  (*msg)->next = nullptr;
138 
139  releaseFromWait(*msg, *i, o);
140 
141  assert(i->peer->waiting > 0);
142  i->peer->waiting--;
143  waiting_.erase(i);
144 }
145 
146 // Release everyone waiting for the object @a o.
148  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
149  if (i->name == name)
150  releaseFromWait(i++, o);
151  else
152  ++i;
153 }
154 
159  char buf[64];
160  std::ostringstream qrs;
161  QReports::const_iterator qi, qe;
162  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
163  int pos = 0;
164  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
165  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
166  << '\0';
167  }
168  into = qrs.str();
169 }
170 
173 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
174  const char *qdata = from;
175 
176  // Count how many qresults there are.
177  size_t nqv = 0;
178  while (*qdata) {
179  ++nqv;
180  while (*qdata)
181  ++qdata;
182  ++qdata;
183  while (*qdata)
184  ++qdata;
185  ++qdata;
186  while (*qdata)
187  ++qdata;
188  ++qdata;
189  while (*qdata)
190  ++qdata;
191  ++qdata;
192  while (*qdata)
193  ++qdata;
194  ++qdata;
195  }
196 
197  // Now extract the qreports.
198  qdata = from;
199  qr.reserve(nqv);
200  while (*qdata) {
201  qr.emplace_back();
202  DQMNet::QValue &qv = qr.back();
203 
204  qv.code = atoi(qdata);
205  while (*qdata)
206  ++qdata;
207  switch (qv.code) {
209  break;
212  break;
213  case dqm::qstatus::ERROR:
215  break;
216  default:
218  break;
219  }
220 
221  qv.qtresult = atof(++qdata);
222  while (*qdata)
223  ++qdata;
224 
225  qv.qtname = ++qdata;
226  while (*qdata)
227  ++qdata;
228 
229  qv.algorithm = ++qdata;
230  while (*qdata)
231  ++qdata;
232 
233  qv.message = ++qdata;
234  while (*qdata)
235  ++qdata;
236  ++qdata;
237  }
238 }
239 
240 #if 0
241 // Deserialise a ROOT object from a buffer at the current position.
242 static TObject *
243 extractNextObject(TBufferFile &buf)
244 {
245  if (buf.Length() == buf.BufferSize())
246  return 0;
247 
248  buf.InitMap();
249  Int_t pos = buf.Length();
250  TClass *c = buf.ReadClass();
251  buf.SetBufferOffset(pos);
252  buf.ResetMap();
253  return c ? buf.ReadObject(c) : 0;
254 }
255 
256 // Reconstruct an object from the raw data.
257 bool
258 DQMNet::reconstructObject(Object &o)
259 {
260  TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
261  buf.Reset();
262 
263  // Extract the main object.
264  if (! (o.object = extractNextObject(buf)))
265  return false;
266 
267  // Extract the reference object.
268  o.reference = extractNextObject(buf);
269 
270  // Extract quality reports.
271  unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
272  return true;
273 }
274 #endif
275 
276 #if 0
277 bool
278 DQMNet::reinstateObject(DQMStore *store, Object &o)
279 {
280  if (! reconstructObject (o))
281  return false;
282 
283  // Reconstruct the main object
284  MonitorElement *obj = 0;
285  store->setCurrentFolder(o.dirname);
286  switch (o.flags & DQM_PROP_TYPE_MASK)
287  {
288  case DQM_PROP_TYPE_INT:
289  obj = store->bookInt(o.objname);
290  obj->Fill(atoll(o.scalar.c_str()));
291  break;
292 
293  case DQM_PROP_TYPE_REAL:
294  obj = store->bookFloat(name);
295  obj->Fill(atof(o.scalar.c_str()));
296  break;
297 
298  case DQM_PROP_TYPE_STRING:
299  obj = store->bookString(name, o.scalar);
300  break;
301 
302  case DQM_PROP_TYPE_TH1F:
303  obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
304  break;
305 
306  case DQM_PROP_TYPE_TH1S:
307  obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
308  break;
309 
310  case DQM_PROP_TYPE_TH1D:
311  obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
312  break;
313 
314  case DQM_PROP_TYPE_TH1I:
315  obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
316  break;
317 
318  case DQM_PROP_TYPE_TH2F:
319  obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
320  break;
321 
322  case DQM_PROP_TYPE_TH2S:
323  obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
324  break;
325 
326  case DQM_PROP_TYPE_TH2D:
327  obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
328  break;
329 
330  case DQM_PROP_TYPE_TH2I:
331  obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
332  break;
333 
334  case DQM_PROP_TYPE_TH3F:
335  obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
336  break;
337 
338  case DQM_PROP_TYPE_TH3S:
339  obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
340  break;
341 
342  case DQM_PROP_TYPE_TH3D:
343  obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
344  break;
345 
346  case DQM_PROP_TYPE_PROF:
347  obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
348  break;
349 
350  case DQM_PROP_TYPE_PROF2D:
351  obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
352  break;
353 
354  default:
355  logme()
356  << "ERROR: unexpected monitor element of type "
357  << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
358  << o.dirname << '/' << o.objname << "'\n";
359  return false;
360  }
361 
362  // Reconstruct tag and qreports.
363  if (obj)
364  {
365  obj->data_.tag = o.tag;
366  obj->data_.qreports = o.qreports;
367  }
368 
369  // Inidicate success.
370  return true;
371 }
372 #endif
373 
375 // Check if the network layer should stop.
376 bool DQMNet::shouldStop() { return shutdown_; }
377 
378 // Once an object has been updated, this is invoked for all waiting
379 // peers. Send the requested object to the waiting peer.
381  if (o)
382  sendObjectToPeer(msg, *o, true);
383  else {
384  uint32_t words[3];
385  words[0] = sizeof(words) + w.name.size();
386  words[1] = DQM_REPLY_NONE;
387  words[2] = w.name.size();
388 
389  msg->data.reserve(msg->data.size() + words[0]);
390  copydata(msg, &words[0], sizeof(words));
391  copydata(msg, &w.name[0], w.name.size());
392  }
393 }
394 
395 // Send an object to a peer. If not @a data, only sends a summary
396 // without the object data, except the data is always sent for scalar
397 // objects.
399  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
400  DataBlob objdata;
401 
402  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
403  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
404  else if (data)
405  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
406 
407  uint32_t words[9];
408  uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
409  uint32_t datalen = objdata.size();
410  uint32_t qlen = o.qdata.size();
411 
412  if (o.dirname.empty())
413  --namelen;
414 
415  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
416  words[1] = DQM_REPLY_OBJECT;
417  words[2] = flags;
418  words[3] = (o.version >> 0) & 0xffffffff;
419  words[4] = (o.version >> 32) & 0xffffffff;
420  words[5] = o.tag;
421  words[6] = namelen;
422  words[7] = datalen;
423  words[8] = qlen;
424 
425  msg->data.reserve(msg->data.size() + words[0]);
426  copydata(msg, &words[0], 9 * sizeof(uint32_t));
427  if (namelen) {
428  copydata(msg, &(o.dirname)[0], o.dirname.size());
429  if (!o.dirname.empty())
430  copydata(msg, "/", 1);
431  copydata(msg, &o.objname[0], o.objname.size());
432  }
433  if (datalen)
434  copydata(msg, &objdata[0], datalen);
435  if (qlen)
436  copydata(msg, &o.qdata[0], qlen);
437 }
438 
440 // Handle peer messages.
441 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
442  // Decode and process this message.
443  uint32_t type;
444  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
445  switch (type) {
446  case DQM_MSG_UPDATE_ME: {
447  if (len != 2 * sizeof(uint32_t)) {
448  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
449  return false;
450  }
451 
452  if (debug_)
453  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
454 
455  p->update = true;
456  }
457  return true;
458 
459  case DQM_MSG_LIST_OBJECTS: {
460  if (debug_)
461  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
462 
463  // Send over current status: list of known objects.
464  sendObjectListToPeer(msg, true, false);
465  }
466  return true;
467 
468  case DQM_MSG_GET_OBJECT: {
469  if (debug_)
470  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
471 
472  if (len < 3 * sizeof(uint32_t)) {
473  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
474  return false;
475  }
476 
477  uint32_t namelen;
478  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
479  if (len != 3 * sizeof(uint32_t) + namelen) {
480  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
481  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
482  return false;
483  }
484 
485  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
486  Peer *owner = nullptr;
487  Object *o = findObject(nullptr, name, &owner);
488  if (o) {
489  o->lastreq = Time::current().ns();
490  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
491  (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
492  waitForData(p, name, "", owner);
493  else
494  sendObjectToPeer(msg, *o, true);
495  } else {
496  uint32_t words[3];
497  words[0] = sizeof(words) + name.size();
498  words[1] = DQM_REPLY_NONE;
499  words[2] = name.size();
500 
501  msg->data.reserve(msg->data.size() + words[0]);
502  copydata(msg, &words[0], sizeof(words));
503  copydata(msg, &name[0], name.size());
504  }
505  }
506  return true;
507 
508  case DQM_REPLY_LIST_BEGIN: {
509  if (len != 4 * sizeof(uint32_t)) {
510  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
511  return false;
512  }
513 
514  // Get the update status: whether this is a full update.
515  uint32_t flags;
516  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
517 
518  if (debug_)
519  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
520  << p->peeraddr << ", size " << len << std::endl;
521 
522  // If we are about to receive a full list of objects, flag all
523  // objects as possibly dead. Subsequent object notifications
524  // will undo this for the live objects. We cannot delete
525  // objects quite yet, as we may get inquiry from another client
526  // while we are processing the incoming list, so we keep the
527  // objects tentatively alive as long as we've not seen the end.
528  if (flags)
529  markObjectsDead(p);
530  }
531  return true;
532 
533  case DQM_REPLY_LIST_END: {
534  if (len != 4 * sizeof(uint32_t)) {
535  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
536  return false;
537  }
538 
539  // Get the update status: whether this is a full update.
540  uint32_t flags;
541  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
542 
543  // If we received a full list of objects, now purge all dead
544  // objects. We need to do this in two stages in case we receive
545  // updates in many parts, and end up sending updates to others in
546  // between; this avoids us lying live objects are dead.
547  if (flags)
548  purgeDeadObjects(p);
549 
550  if (debug_)
551  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
552  << ", size " << len << std::endl;
553 
554  // Indicate we have received another update from this peer.
555  // Also indicate we should flush to our clients.
556  flush_ = true;
557  p->updates++;
558  }
559  return true;
560 
561  case DQM_REPLY_OBJECT: {
562  uint32_t words[9];
563  if (len < sizeof(words)) {
564  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
565  return false;
566  }
567 
568  memcpy(&words[0], data, sizeof(words));
569  uint32_t &namelen = words[6];
570  uint32_t &datalen = words[7];
571  uint32_t &qlen = words[8];
572 
573  if (len != sizeof(words) + namelen + datalen + qlen) {
574  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
575  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
576  << std::endl;
577  return false;
578  }
579 
580  unsigned char *namedata = data + sizeof(words);
581  unsigned char *objdata = namedata + namelen;
582  unsigned char *qdata = objdata + datalen;
583  unsigned char *enddata = qdata + qlen;
584  std::string name((char *)namedata, namelen);
585  assert(enddata == data + len);
586 
587  if (debug_)
588  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
589  << std::endl;
590 
591  // Mark the peer as a known object source.
592  p->source = true;
593 
594  // Initialise or update an object entry.
595  Object *o = findObject(p, name);
596  if (!o)
597  o = makeObject(p, name);
598 
599  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
600  o->tag = words[5];
601  o->version = ((uint64_t)words[4] << 32 | words[3]);
602  o->scalar.clear();
603  o->qdata.clear();
604  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
605  o->rawdata.clear();
606  o->scalar.insert(o->scalar.end(), objdata, qdata);
607  } else if (datalen) {
608  o->rawdata.clear();
609  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
610  } else if (!o->rawdata.empty())
611  o->flags |= DQM_PROP_STALE;
612  o->qdata.insert(o->qdata.end(), qdata, enddata);
613 
614  // If we had an object for this one already and this is a list
615  // update without data, issue an immediate data get request.
616  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
617  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
618 
619  // If we have the object data, release from wait.
620  if (datalen)
621  releaseWaiters(name, o);
622  }
623  return true;
624 
625  case DQM_REPLY_NONE: {
626  uint32_t words[3];
627  if (len < sizeof(words)) {
628  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
629  return false;
630  }
631 
632  memcpy(&words[0], data, sizeof(words));
633  uint32_t &namelen = words[2];
634 
635  if (len != sizeof(words) + namelen) {
636  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
637  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
638  return false;
639  }
640 
641  unsigned char *namedata = data + sizeof(words);
642  std::string name((char *)namedata, namelen);
643 
644  if (debug_)
645  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
646  << std::endl;
647 
648  // Mark the peer as a known object source.
649  p->source = true;
650 
651  // If this was a known object, kill it.
652  if (Object *o = findObject(p, name)) {
653  o->flags |= DQM_PROP_DEAD;
654  purgeDeadObjects(p);
655  }
656 
657  // If someone was waiting for this, let them go.
658  releaseWaiters(name, nullptr);
659  }
660  return true;
661 
662  default:
663  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
664  << std::endl;
665  return false;
666  }
667 }
668 
671 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
672  lock();
673  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
674 
675  // If there is a problem with the peer socket, discard the peer
676  // and tell the selector to stop prcessing events for it. If
677  // this is a server connection, we will eventually recreate
678  // everything if/when the data server comes back.
679  if (ev->events & IOUrgent) {
680  if (p->automatic) {
681  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
682  << " lost (will attempt to reconnect in 15 seconds)\n";
683  losePeer(nullptr, p, ev);
684  } else
685  losePeer("WARNING: lost peer connection ", p, ev);
686 
687  unlock();
688  return true;
689  }
690 
691  // If we can write to the peer socket, pump whatever we can into it.
692  if (ev->events & IOWrite) {
693  while (Bucket *b = p->sendq) {
694  IOSize len = b->data.size() - p->sendpos;
695  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
696  IOSize done;
697 
698  try {
699  done = (len ? ev->source->write(data, len) : 0);
700  if (debug_ && len)
701  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
702  } catch (Error &e) {
703  losePeer("WARNING: unable to write to peer ", p, ev, &e);
704  unlock();
705  return true;
706  }
707 
708  p->sendpos += done;
709  if (p->sendpos == b->data.size()) {
710  Bucket *old = p->sendq;
711  p->sendq = old->next;
712  p->sendpos = 0;
713  old->next = nullptr;
714  discard(old);
715  }
716 
717  if (!done && len)
718  // Cannot write any more.
719  break;
720  }
721  }
722 
723  // If there is data to be read from the peer, first receive what we
724  // can get out the socket, the process all complete requests.
725  if (ev->events & IORead) {
726  // First build up the incoming buffer of data in the socket.
727  // Remember the last size returned by the socket; we need
728  // it to determine if the remote end closed the connection.
729  IOSize sz;
730  try {
731  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
732  do
733  if ((sz = ev->source->read(&buf[0], buf.size()))) {
734  if (debug_)
735  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
736  DataBlob &data = p->incoming;
737  if (data.capacity() < data.size() + sz)
738  data.reserve(data.size() + SOCKET_READ_GROWTH);
739  data.insert(data.end(), &buf[0], &buf[0] + sz);
740  }
741  while (sz == sizeof(buf));
742  } catch (Error &e) {
743  auto *next = dynamic_cast<SystemError *>(e.next());
744  if (next && next->portable() == SysErr::ErrTryAgain)
745  sz = 1; // Ignore it, and fake no end of data.
746  else {
747  // Houston we have a problem.
748  losePeer("WARNING: failed to read from peer ", p, ev, &e);
749  unlock();
750  return true;
751  }
752  }
753 
754  // Process fully received messages as long as we can.
755  size_t consumed = 0;
756  DataBlob &data = p->incoming;
757  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
758  uint32_t msglen;
759  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
760 
761  if (msglen >= MESSAGE_SIZE_LIMIT) {
762  losePeer("WARNING: excessively large message from ", p, ev);
763  unlock();
764  return true;
765  }
766 
767  if (data.size() - consumed >= msglen) {
768  bool valid = true;
769  if (msglen < 2 * sizeof(uint32_t)) {
770  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
771  valid = false;
772  } else {
773  // Decode and process this message.
774  Bucket msg;
775  msg.next = nullptr;
776  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
777 
778  // If we created a response, chain it to the write queue.
779  if (!msg.data.empty()) {
780  Bucket **prev = &p->sendq;
781  while (*prev)
782  prev = &(*prev)->next;
783 
784  *prev = new Bucket;
785  (*prev)->next = nullptr;
786  (*prev)->data.swap(msg.data);
787  }
788  }
789 
790  if (!valid) {
791  losePeer("WARNING: data stream error with ", p, ev);
792  unlock();
793  return true;
794  }
795 
796  consumed += msglen;
797  } else
798  break;
799  }
800 
801  data.erase(data.begin(), data.begin() + consumed);
802 
803  // If the client has closed the connection, shut down our end. If
804  // we have something to send back still, leave the write direction
805  // open. Otherwise close the shop for this client.
806  if (sz == 0)
807  sel_.setMask(p->socket, p->mask &= ~IORead);
808  }
809 
810  // Yes, please keep processing events for this socket.
811  unlock();
812  return false;
813 }
814 
819 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
820  // Recover the server socket.
821  assert(ev->source == server_);
822 
823  // Accept the connection.
824  Socket *s = server_->accept();
825  assert(s);
826  assert(!s->isBlocking());
827 
828  // Record it to our list of peers.
829  lock();
830  Peer *p = createPeer(s);
831  std::string localaddr;
832  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
833  InetAddress peeraddr = inet->peername();
834  InetAddress myaddr = inet->sockname();
835  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
836  localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
837  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
838  p->peeraddr = local->peername().path();
839  localaddr = local->sockname().path();
840  } else
841  assert(false);
842 
843  p->mask = IORead | IOUrgent;
844  p->socket = s;
845 
846  // Report the new connection.
847  if (debug_)
848  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
849 
850  // Attach it to the listener.
851  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
852  unlock();
853 
854  // We are never done.
855  return false;
856 }
857 
864 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
865  // Discard the data in the pipe, we care only about the wakeup.
866  try {
867  IOSize sz;
868  unsigned char buf[1024];
869  while ((sz = ev->source->read(buf, sizeof(buf))))
870  ;
871  } catch (Error &e) {
872  auto *next = dynamic_cast<SystemError *>(e.next());
873  if (next && next->portable() == SysErr::ErrTryAgain)
874  ; // Ignore it
875  else
876  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
877  }
878 
879  // Tell the main event pump to send an update in a little while.
880  flush_ = true;
881 
882  // We are never done, always keep going.
883  return false;
884 }
885 
889  if (!p->socket)
890  return;
891 
892  // Listen to writes iff we have data to send.
893  unsigned oldmask = p->mask;
894  if (!p->sendq && (p->mask & IOWrite))
895  sel_.setMask(p->socket, p->mask &= ~IOWrite);
896 
897  if (p->sendq && !(p->mask & IOWrite))
898  sel_.setMask(p->socket, p->mask |= IOWrite);
899 
900  if (debug_ && oldmask != p->mask)
901  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
902 
903  // If we have nothing more to send and are no longer listening
904  // for reads, close up the shop for this peer.
905  if (p->mask == IOUrgent && !p->waiting) {
906  assert(!p->sendq);
907  if (debug_)
908  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
909  losePeer(nullptr, p, nullptr);
910  }
911 }
912 
914 DQMNet::DQMNet(const std::string &appname /* = "" */)
915  : debug_(false),
916  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
917  pid_(getpid()),
918  server_(nullptr),
919  version_(Time::current()),
920  communicate_((pthread_t)-1),
921  shutdown_(0),
922  delay_(1000),
923  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
924  waitMax_(0, 0, 0, 5 /* seconds */, 0),
925  flush_(false) {
926  // Create a pipe for the local DQM to tell the communicator
927  // thread that local DQM data has changed and that the peers
928  // should be notified.
929  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
930  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
931 
932  // Initialise the upstream and downstream to empty.
933  upstream_.peer = downstream_.peer = nullptr;
937 }
938 
940  // FIXME
941 }
942 
945 void DQMNet::debug(bool doit) { debug_ = doit; }
946 
949 void DQMNet::delay(int delay) { delay_ = delay; }
950 
956 
961  if (server_) {
962  logme() << "ERROR: DQM server was already started.\n";
963  return;
964  }
965 
966  try {
967  InetAddress addr("0.0.0.0", port);
968  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
969  s->bind(addr);
970  s->listen(10);
971  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
972  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
973  s->setBlocking(false);
974  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
975  } catch (Error &e) {
976  // FIXME: Do we need to do this when we throw an exception anyway?
977  // FIXME: Abort instead?
978  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
979 
980  throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
981 
982  port << ": " << e.explain().c_str();
983  }
984 
985  logme() << "INFO: DQM server started at port " << port << std::endl;
986 }
987 
991 void DQMNet::startLocalServer(const char *path) {
992  if (server_) {
993  logme() << "ERROR: DQM server was already started.\n";
994  return;
995  }
996 
997  try {
998  server_ = new LocalServerSocket(path, 10);
999  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1000  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1001  server_->setBlocking(false);
1002  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1003  } catch (Error &e) {
1004  // FIXME: Do we need to do this when we throw an exception anyway?
1005  // FIXME: Abort instead?
1006  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1007 
1008  throw cms::Exception("DQMNet::startLocalServer")
1009  << "Failed to start server at path " << path << ": " << e.explain().c_str();
1010  }
1011 
1012  logme() << "INFO: DQM server started at path " << path << std::endl;
1013 }
1014 
1019  if (!downstream_.host.empty()) {
1020  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1021  << std::endl;
1022  return;
1023  }
1024 
1025  downstream_.update = true;
1026  downstream_.host = host;
1027  downstream_.port = port;
1028 }
1029 
1034  if (!upstream_.host.empty()) {
1035  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1036  << std::endl;
1037  return;
1038  }
1039 
1040  upstream_.update = false;
1041  upstream_.host = host;
1042  upstream_.port = port;
1043 }
1044 
1047  shutdown_ = 1;
1048  if (communicate_ != (pthread_t)-1)
1049  pthread_join(communicate_, nullptr);
1050 }
1051 
1057 static void *communicate(void *obj) {
1058  sigset_t sigs;
1059  sigfillset(&sigs);
1060  pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1061  ((DQMNet *)obj)->run();
1062  return nullptr;
1063 }
1064 
1067  if (communicate_ != (pthread_t)-1)
1068  pthread_mutex_lock(&lock_);
1069 }
1070 
1073  if (communicate_ != (pthread_t)-1)
1074  pthread_mutex_unlock(&lock_);
1075 }
1076 
1081  if (communicate_ != (pthread_t)-1) {
1082  logme() << "ERROR: DQM networking thread has already been started\n";
1083  return;
1084  }
1085 
1086  pthread_mutex_init(&lock_, nullptr);
1087  pthread_create(&communicate_, nullptr, &communicate, this);
1088 }
1089 
1091 void DQMNet::run() {
1092  Time now;
1093  Time nextFlush = 0;
1094  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1095 
1096  // Perform I/O. Every once in a while flush updates to peers.
1097  while (!shouldStop()) {
1098  for (auto ap : automatic) {
1099  // If we need a server connection and don't have one yet,
1100  // initiate asynchronous connection creation. Swallow errors
1101  // in case the server won't talk to us.
1102  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1103  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1104  InetSocket *s = nullptr;
1105  try {
1106  InetAddress addr(ap->host.c_str(), ap->port);
1107  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1108  s->setBlocking(false);
1109  s->connect(addr);
1110  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1111  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1112  } catch (Error &e) {
1113  auto *sys = dynamic_cast<SystemError *>(e.next());
1114  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1115  // "In progress" just means the connection is in progress.
1116  // The connection is ready when the socket is writeable.
1117  // Anything else is a real problem.
1118  if (s)
1119  s->abort();
1120  delete s;
1121  s = nullptr;
1122  }
1123  }
1124 
1125  // Set up with the selector if we were successful. If this is
1126  // the upstream collector, queue a request for updates.
1127  if (s) {
1128  Peer *p = createPeer(s);
1129  ap->peer = p;
1130 
1131  InetAddress peeraddr = ((InetSocket *)s)->peername();
1132  InetAddress myaddr = ((InetSocket *)s)->sockname();
1133  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1134  p->mask = IORead | IOWrite | IOUrgent;
1135  p->update = ap->update;
1136  p->automatic = ap;
1137  p->socket = s;
1138  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1139  if (ap == &upstream_) {
1140  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1141  p->sendq = new Bucket;
1142  p->sendq->next = nullptr;
1143  copydata(p->sendq, words, sizeof(words));
1144  }
1145 
1146  // Report the new connection.
1147  if (debug_)
1148  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1149  << std::endl;
1150  }
1151  }
1152  }
1153 
1154  // Pump events for a while.
1155  sel_.dispatch(delay_);
1156  now = Time::current();
1157  lock();
1158 
1159  // Check if flush is required. Flush only if one is needed.
1160  // Always sends the full object list, but only rarely.
1161  if (flush_ && now > nextFlush) {
1162  flush_ = false;
1163  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1164  sendObjectListToPeers(true);
1165  }
1166 
1167  // Update the data server and peer selection masks. If we
1168  // have no more data to send and listening for writes, remove
1169  // the write mask. If we have something to write and aren't
1170  // listening for writes, start listening so we can send off
1171  // the data.
1172  updatePeerMasks();
1173 
1174  // Release peers that have been waiting for data for too long.
1175  Time waitold = now - waitMax_;
1176  Time waitstale = now - waitStale_;
1177  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1178  Object *o = findObject(nullptr, i->name);
1179 
1180  // If we have (stale) object data, wait only up to stale limit.
1181  // Otherwise if we have no data at all, wait up to the max limit.
1182  if (i->time < waitold) {
1183  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1184  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1185  releaseFromWait(i++, o);
1186  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1187  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1188  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1189  releaseFromWait(i++, o);
1190  }
1191 
1192  // Keep it for now.
1193  else
1194  ++i;
1195  }
1196 
1197  unlock();
1198  }
1199 }
1200 
1201 // Tell the network cache that there have been local changes that
1202 // should be advertised to the downstream listeners.
1204  char byte = 0;
1205  wakeup_.sink()->write(&byte, 1);
1206 }
1207 
1211 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */) : DQMImplNet<DQMNet::Object>(appname) {
1212  local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1213 }
1214 
1216 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.reserve(size); }
1217 
1221  o.dirname = *local_->dirs.insert(o.dirname).first;
1222  std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1223  if (!info.second) {
1224  // Somewhat hackish. Sets are supposedly immutable, but we
1225  // need to change the non-key parts of the object. Erasing
1226  // and re-inserting would produce too much memory churn.
1227  auto &old = const_cast<Object &>(*info.first);
1228  std::swap(old.flags, o.flags);
1229  std::swap(old.tag, o.tag);
1230  std::swap(old.version, o.version);
1231  std::swap(old.qreports, o.qreports);
1232  std::swap(old.rawdata, o.rawdata);
1233  std::swap(old.scalar, o.scalar);
1234  std::swap(old.qdata, o.qdata);
1235  }
1236 }
1237 
1241 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1242  size_t removed = 0;
1243  std::string path;
1244  ObjectMap::iterator i, e;
1245  for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1246  path.clear();
1247  path.reserve(i->dirname.size() + i->objname.size() + 2);
1248  path += i->dirname;
1249  if (!path.empty())
1250  path += '/';
1251  path += i->objname;
1252 
1253  if (!known.count(path))
1254  ++removed, local_->objs.erase(i++);
1255  else
1256  ++i;
1257  }
1258 
1259  return removed > 0;
1260 }
size
Write out results.
edm::ErrorSummaryEntry Error
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:173
AutoPeer downstream_
Definition: DQMNet.h:363
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:914
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:52
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:70
def logme(msg, args)
MonitorElement * bookFloat(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:80
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:864
MonitorElement * bookProfile2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, double lowZ, double highZ, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:476
pthread_t communicate_
Definition: DQMNet.h:366
MonitorElement * book1I(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:177
string host
Definition: query.py:115
MonitorElement * book2S(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:254
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1066
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</>")
std::ostream & logme()
Definition: DQMNet.cc:50
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:162
T w() const
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:26
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:369
pthread_mutex_t lock_
Definition: DQMNet.h:342
void delay(int delay)
Definition: DQMNet.cc:949
virtual Peer * createPeer(lat::Socket *s)=0
void setCurrentFolder(std::string const &fullpath) override
Definition: DQMStore.h:646
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:147
std::string peeraddr
Definition: DQMNet.h:125
static const int WARNING
Definition: DQMNet.cc:43
lat::Socket * socket
Definition: DQMNet.h:126
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:955
AutoPeer * automatic
Definition: DQMNet.h:137
AutoPeer upstream_
Definition: DQMNet.h:362
int port
Definition: query.py:116
static void * communicate(void *obj)
Definition: DQMNet.cc:1057
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1046
assert(be >=bs)
A arg
Definition: Factorize.h:31
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
void debug(bool doit)
Definition: DQMNet.cc:945
MonitorElement * bookString(TString const &name, TString const &value, FUNC onbooking=NOOP())
Definition: DQMStore.h:87
MonitorElement * book1DD(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:155
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:118
virtual bool shouldStop()
Definition: DQMNet.cc:376
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:100
MonitorElement * bookProfile(TString const &name, TString const &title, int nchX, double lowX, double highX, int, double lowY, double highY, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:399
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:441
DataBlob data
Definition: DQMNet.h:113
void start()
Definition: DQMNet.cc:1080
Peer * peer
Definition: DQMNet.h:141
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:671
void sendLocalChanges()
Definition: DQMNet.cc:1203
MonitorElement * book1S(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:133
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:51
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:53
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:380
sig_atomic_t shutdown_
Definition: DQMNet.h:367
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:488
lat::Time next
Definition: DQMNet.h:142
size_t IOSize
Definition: IOTypes.h:15
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:398
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1241
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:819
MonitorElement * book2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:212
MonitorElement * bookInt(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:73
std::string host
Definition: DQMNet.h:143
double b
Definition: hdecay.h:120
void run()
Definition: DQMNet.cc:1091
void startLocalServer(int port)
Definition: DQMNet.cc:960
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:158
tuple msg
Definition: mps_check.py:286
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:65
MonitorElement * book2DD(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:338
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:74
lat::Pipe wakeup_
Definition: DQMNet.h:359
bool debug_
Definition: DQMNet.h:341
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1072
virtual ~DQMNet()
Definition: DQMNet.cc:939
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1033
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const int STATUS_OK
Definition: DQMNet.cc:42
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:71
void updateMask(Peer *p)
Definition: DQMNet.cc:888
bool flush_
Definition: DQMNet.h:372
lat::Socket * server_
Definition: DQMNet.h:358
#define O_NONBLOCK
Definition: SysFile.h:23
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1220
lat::TimeSpan waitMax_
Definition: DQMNet.h:371
Bucket * next
Definition: DQMNet.h:112
MonitorElement * book1D(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:98
Bucket * sendq
Definition: DQMNet.h:128
std::vector< QValue > QReports
Definition: DQMNet.h:86
lat::TimeSpan waitStale_
Definition: DQMNet.h:370
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1216
Definition: DQMStore.h:18
MonitorElement * book3D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ, FUNC onbooking=NOOP())
Definition: DQMStore.h:367
MonitorElement * book2I(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:296
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1018
WaitList waiting_
Definition: DQMNet.h:364
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:85
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1211
static const int ERROR
Definition: DQMNet.cc:44
ImplPeer * local_
Definition: DQMNet.h:602
lat::IOSelector sel_
Definition: DQMNet.h:357