CMS 3D CMS Logo

DQMNet.cc
Go to the documentation of this file.
2 #include "classlib/iobase/InetServerSocket.h"
3 #include "classlib/iobase/LocalServerSocket.h"
4 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
5 #include "classlib/utils/SystemError.h"
6 #include "classlib/utils/Regexp.h"
7 #include <fmt/format.h>
8 #include <unistd.h>
9 #include <fcntl.h>
10 #include <sys/wait.h>
11 #include <cstdio>
12 #include <cstdint>
13 #include <iostream>
14 #include <sstream>
15 #include <cassert>
16 #include <cfloat>
17 #include <cinttypes>
18 
20 
21 #if __APPLE__
22 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
23 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
24 #else
25 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
26 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
27 #endif
28 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
29 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
30 
31 using namespace lat;
32 
33 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
34 
35 // TODO: Can't include the header file since that leads to ambiguities.
36 namespace dqm {
37  namespace qstatus {
38  static const int STATUS_OK = 100; //< Test was succesful.
39  static const int WARNING = 200; //< Test had some problems.
40  static const int ERROR = 300; //< Test has failed.
41  } // namespace qstatus
42 } // namespace dqm
43 
45 // Generate log prefix.
46 std::ostream &DQMNet::logme() {
47  Time now = Time::current();
48  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
49  << "]: ";
50 }
51 
52 // Append data into a bucket.
53 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
54  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
55 }
56 
57 // Discard a bucket chain.
59  while (b) {
60  Bucket *next = b->next;
61  delete b;
62  b = next;
63  }
64 }
65 
67 
70 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
71  if (reason)
72  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
73 
74  Socket *s = peer->socket;
75 
76  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
77  if (i->peer == peer)
78  waiting_.erase(i++);
79  else
80  ++i;
81 
82  if (ev)
83  ev->source = nullptr;
84 
85  discard(peer->sendq);
86  if (peer->automatic)
87  peer->automatic->peer = nullptr;
88 
89  sel_.detach(s);
90  s->close();
91  removePeer(peer, s);
92  delete s;
93 }
94 
96 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
97  // Issue request to peer.
98  Bucket **msg = &p->sendq;
99  while (*msg)
100  msg = &(*msg)->next;
101  *msg = new Bucket;
102  (*msg)->next = nullptr;
103 
104  uint32_t words[3];
105  words[0] = sizeof(words) + len;
106  words[1] = DQM_MSG_GET_OBJECT;
107  words[2] = len;
108  copydata(*msg, words, sizeof(words));
109  copydata(*msg, name, len);
110 }
111 
114 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
115  // FIXME: Should we automatically record which exact peer the waiter
116  // is expecting to deliver data so we know to release the waiter if
117  // the other peer vanishes? The current implementation stands a
118  // chance for the waiter to wait indefinitely -- although we do
119  // force terminate the wait after a while.
120  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
121  WaitObject wo = {Time::current(), name, info, p};
122  waiting_.push_back(wo);
123  p->waiting++;
124 }
125 
126 // Once an object has been updated, this is invoked for all waiting
127 // peers. Send the object back to the peer in suitable form.
128 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
129  Bucket **msg = &i->peer->sendq;
130  while (*msg)
131  msg = &(*msg)->next;
132  *msg = new Bucket;
133  (*msg)->next = nullptr;
134 
135  releaseFromWait(*msg, *i, o);
136 
137  assert(i->peer->waiting > 0);
138  i->peer->waiting--;
139  waiting_.erase(i);
140 }
141 
142 // Release everyone waiting for the object @a o.
144  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
145  if (i->name == name)
146  releaseFromWait(i++, o);
147  else
148  ++i;
149 }
150 
155  char buf[64];
156  std::ostringstream qrs;
157  QReports::const_iterator qi, qe;
158  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
159  int pos = 0;
160  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
161  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
162  << '\0';
163  }
164  into = qrs.str();
165 }
166 
169 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
170  const char *qdata = from;
171 
172  // Count how many qresults there are.
173  size_t nqv = 0;
174  while (*qdata) {
175  ++nqv;
176  while (*qdata)
177  ++qdata;
178  ++qdata;
179  while (*qdata)
180  ++qdata;
181  ++qdata;
182  while (*qdata)
183  ++qdata;
184  ++qdata;
185  while (*qdata)
186  ++qdata;
187  ++qdata;
188  while (*qdata)
189  ++qdata;
190  ++qdata;
191  }
192 
193  // Now extract the qreports.
194  qdata = from;
195  qr.reserve(nqv);
196  while (*qdata) {
197  qr.emplace_back();
198  DQMNet::QValue &qv = qr.back();
199 
200  qv.code = atoi(qdata);
201  while (*qdata)
202  ++qdata;
203  switch (qv.code) {
205  break;
208  break;
209  case dqm::qstatus::ERROR:
211  break;
212  default:
214  break;
215  }
216 
217  qv.qtresult = atof(++qdata);
218  while (*qdata)
219  ++qdata;
220 
221  qv.qtname = ++qdata;
222  while (*qdata)
223  ++qdata;
224 
225  qv.algorithm = ++qdata;
226  while (*qdata)
227  ++qdata;
228 
229  qv.message = ++qdata;
230  while (*qdata)
231  ++qdata;
232  ++qdata;
233  }
234 }
235 
236 #if 0
237 // Deserialise a ROOT object from a buffer at the current position.
238 static TObject *
239 extractNextObject(TBufferFile &buf)
240 {
241  if (buf.Length() == buf.BufferSize())
242  return 0;
243 
244  buf.InitMap();
245  Int_t pos = buf.Length();
246  TClass *c = buf.ReadClass();
247  buf.SetBufferOffset(pos);
248  buf.ResetMap();
249  return c ? buf.ReadObject(c) : 0;
250 }
251 
252 // Reconstruct an object from the raw data.
253 bool
254 DQMNet::reconstructObject(Object &o)
255 {
256  TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
257  buf.Reset();
258 
259  // Extract the main object.
260  if (! (o.object = extractNextObject(buf)))
261  return false;
262 
263  // Extract the reference object.
264  o.reference = extractNextObject(buf);
265 
266  // Extract quality reports.
267  unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
268  return true;
269 }
270 #endif
271 
272 #if 0
273 bool
274 DQMNet::reinstateObject(DQMStore *store, Object &o)
275 {
276  if (! reconstructObject (o))
277  return false;
278 
279  // Reconstruct the main object
280  MonitorElement *obj = 0;
281  store->setCurrentFolder(o.dirname);
282  switch (o.flags & DQM_PROP_TYPE_MASK)
283  {
284  case DQM_PROP_TYPE_INT:
285  obj = store->bookInt(o.objname);
286  obj->Fill(atoll(o.scalar.c_str()));
287  break;
288 
289  case DQM_PROP_TYPE_REAL:
290  obj = store->bookFloat(name);
291  obj->Fill(atof(o.scalar.c_str()));
292  break;
293 
294  case DQM_PROP_TYPE_STRING:
295  obj = store->bookString(name, o.scalar);
296  break;
297 
298  case DQM_PROP_TYPE_TH1F:
299  obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
300  break;
301 
302  case DQM_PROP_TYPE_TH1S:
303  obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
304  break;
305 
306  case DQM_PROP_TYPE_TH1D:
307  obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
308  break;
309 
310  case DQM_PROP_TYPE_TH1I:
311  obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
312  break;
313 
314  case DQM_PROP_TYPE_TH2F:
315  obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
316  break;
317 
318  case DQM_PROP_TYPE_TH2S:
319  obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
320  break;
321 
322  case DQM_PROP_TYPE_TH2D:
323  obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
324  break;
325 
326  case DQM_PROP_TYPE_TH2I:
327  obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
328  break;
329 
330  case DQM_PROP_TYPE_TH3F:
331  obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
332  break;
333 
334  case DQM_PROP_TYPE_TH3S:
335  obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
336  break;
337 
338  case DQM_PROP_TYPE_TH3D:
339  obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
340  break;
341 
342  case DQM_PROP_TYPE_PROF:
343  obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
344  break;
345 
346  case DQM_PROP_TYPE_PROF2D:
347  obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
348  break;
349 
350  default:
351  logme()
352  << "ERROR: unexpected monitor element of type "
353  << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
354  << o.dirname << '/' << o.objname << "'\n";
355  return false;
356  }
357 
358  // Reconstruct tag and qreports.
359  if (obj)
360  {
361  obj->data_.tag = o.tag;
362  obj->data_.qreports = o.qreports;
363  }
364 
365  // Inidicate success.
366  return true;
367 }
368 #endif
369 
371 // Check if the network layer should stop.
372 bool DQMNet::shouldStop() { return shutdown_; }
373 
374 // Once an object has been updated, this is invoked for all waiting
375 // peers. Send the requested object to the waiting peer.
377  if (o)
378  sendObjectToPeer(msg, *o, true);
379  else {
380  uint32_t words[3];
381  words[0] = sizeof(words) + w.name.size();
382  words[1] = DQM_REPLY_NONE;
383  words[2] = w.name.size();
384 
385  msg->data.reserve(msg->data.size() + words[0]);
386  copydata(msg, &words[0], sizeof(words));
387  copydata(msg, &w.name[0], w.name.size());
388  }
389 }
390 
391 // Send an object to a peer. If not @a data, only sends a summary
392 // without the object data, except the data is always sent for scalar
393 // objects.
395  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
396  DataBlob objdata;
397 
398  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
399  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
400  else if (data)
401  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
402 
403  uint32_t words[9];
404  uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
405  uint32_t datalen = objdata.size();
406  uint32_t qlen = o.qdata.size();
407 
408  if (o.dirname.empty())
409  --namelen;
410 
411  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
412  words[1] = DQM_REPLY_OBJECT;
413  words[2] = flags;
414  words[3] = (o.version >> 0) & 0xffffffff;
415  words[4] = (o.version >> 32) & 0xffffffff;
416  words[5] = o.tag;
417  words[6] = namelen;
418  words[7] = datalen;
419  words[8] = qlen;
420 
421  msg->data.reserve(msg->data.size() + words[0]);
422  copydata(msg, &words[0], 9 * sizeof(uint32_t));
423  if (namelen) {
424  copydata(msg, &(o.dirname)[0], o.dirname.size());
425  if (!o.dirname.empty())
426  copydata(msg, "/", 1);
427  copydata(msg, &o.objname[0], o.objname.size());
428  }
429  if (datalen)
430  copydata(msg, &objdata[0], datalen);
431  if (qlen)
432  copydata(msg, &o.qdata[0], qlen);
433 }
434 
436 // Handle peer messages.
437 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
438  // Decode and process this message.
439  uint32_t type;
440  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
441  switch (type) {
442  case DQM_MSG_UPDATE_ME: {
443  if (len != 2 * sizeof(uint32_t)) {
444  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
445  return false;
446  }
447 
448  if (debug_)
449  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
450 
451  p->update = true;
452  }
453  return true;
454 
455  case DQM_MSG_LIST_OBJECTS: {
456  if (debug_)
457  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
458 
459  // Send over current status: list of known objects.
460  sendObjectListToPeer(msg, true, false);
461  }
462  return true;
463 
464  case DQM_MSG_GET_OBJECT: {
465  if (debug_)
466  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
467 
468  if (len < 3 * sizeof(uint32_t)) {
469  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
470  return false;
471  }
472 
473  uint32_t namelen;
474  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
475  if (len != 3 * sizeof(uint32_t) + namelen) {
476  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
477  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
478  return false;
479  }
480 
481  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
482  Peer *owner = nullptr;
483  Object *o = findObject(nullptr, name, &owner);
484  if (o) {
485  o->lastreq = Time::current().ns();
486  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
487  (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
488  waitForData(p, name, "", owner);
489  else
490  sendObjectToPeer(msg, *o, true);
491  } else {
492  uint32_t words[3];
493  words[0] = sizeof(words) + name.size();
494  words[1] = DQM_REPLY_NONE;
495  words[2] = name.size();
496 
497  msg->data.reserve(msg->data.size() + words[0]);
498  copydata(msg, &words[0], sizeof(words));
499  copydata(msg, &name[0], name.size());
500  }
501  }
502  return true;
503 
504  case DQM_REPLY_LIST_BEGIN: {
505  if (len != 4 * sizeof(uint32_t)) {
506  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
507  return false;
508  }
509 
510  // Get the update status: whether this is a full update.
511  uint32_t flags;
512  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
513 
514  if (debug_)
515  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
516  << p->peeraddr << ", size " << len << std::endl;
517 
518  // If we are about to receive a full list of objects, flag all
519  // objects as possibly dead. Subsequent object notifications
520  // will undo this for the live objects. We cannot delete
521  // objects quite yet, as we may get inquiry from another client
522  // while we are processing the incoming list, so we keep the
523  // objects tentatively alive as long as we've not seen the end.
524  if (flags)
525  markObjectsDead(p);
526  }
527  return true;
528 
529  case DQM_REPLY_LIST_END: {
530  if (len != 4 * sizeof(uint32_t)) {
531  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
532  return false;
533  }
534 
535  // Get the update status: whether this is a full update.
536  uint32_t flags;
537  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
538 
539  // If we received a full list of objects, now purge all dead
540  // objects. We need to do this in two stages in case we receive
541  // updates in many parts, and end up sending updates to others in
542  // between; this avoids us lying live objects are dead.
543  if (flags)
544  purgeDeadObjects(p);
545 
546  if (debug_)
547  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
548  << ", size " << len << std::endl;
549 
550  // Indicate we have received another update from this peer.
551  // Also indicate we should flush to our clients.
552  flush_ = true;
553  p->updates++;
554  }
555  return true;
556 
557  case DQM_REPLY_OBJECT: {
558  uint32_t words[9];
559  if (len < sizeof(words)) {
560  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
561  return false;
562  }
563 
564  memcpy(&words[0], data, sizeof(words));
565  uint32_t &namelen = words[6];
566  uint32_t &datalen = words[7];
567  uint32_t &qlen = words[8];
568 
569  if (len != sizeof(words) + namelen + datalen + qlen) {
570  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
571  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
572  << std::endl;
573  return false;
574  }
575 
576  unsigned char *namedata = data + sizeof(words);
577  unsigned char *objdata = namedata + namelen;
578  unsigned char *qdata = objdata + datalen;
579  unsigned char *enddata = qdata + qlen;
580  std::string name((char *)namedata, namelen);
581  assert(enddata == data + len);
582 
583  if (debug_)
584  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
585  << std::endl;
586 
587  // Mark the peer as a known object source.
588  p->source = true;
589 
590  // Initialise or update an object entry.
591  Object *o = findObject(p, name);
592  if (!o)
593  o = makeObject(p, name);
594 
595  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
596  o->tag = words[5];
597  o->version = ((uint64_t)words[4] << 32 | words[3]);
598  o->scalar.clear();
599  o->qdata.clear();
600  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
601  o->rawdata.clear();
602  o->scalar.insert(o->scalar.end(), objdata, qdata);
603  } else if (datalen) {
604  o->rawdata.clear();
605  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
606  } else if (!o->rawdata.empty())
607  o->flags |= DQM_PROP_STALE;
608  o->qdata.insert(o->qdata.end(), qdata, enddata);
609 
610  // If we had an object for this one already and this is a list
611  // update without data, issue an immediate data get request.
612  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
613  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
614 
615  // If we have the object data, release from wait.
616  if (datalen)
617  releaseWaiters(name, o);
618  }
619  return true;
620 
621  case DQM_REPLY_NONE: {
622  uint32_t words[3];
623  if (len < sizeof(words)) {
624  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
625  return false;
626  }
627 
628  memcpy(&words[0], data, sizeof(words));
629  uint32_t &namelen = words[2];
630 
631  if (len != sizeof(words) + namelen) {
632  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
633  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
634  return false;
635  }
636 
637  unsigned char *namedata = data + sizeof(words);
638  std::string name((char *)namedata, namelen);
639 
640  if (debug_)
641  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
642  << std::endl;
643 
644  // Mark the peer as a known object source.
645  p->source = true;
646 
647  // If this was a known object, kill it.
648  if (Object *o = findObject(p, name)) {
649  o->flags |= DQM_PROP_DEAD;
650  purgeDeadObjects(p);
651  }
652 
653  // If someone was waiting for this, let them go.
654  releaseWaiters(name, nullptr);
655  }
656  return true;
657 
658  default:
659  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
660  << std::endl;
661  return false;
662  }
663 }
664 
667 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
668  lock();
669  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
670 
671  // If there is a problem with the peer socket, discard the peer
672  // and tell the selector to stop prcessing events for it. If
673  // this is a server connection, we will eventually recreate
674  // everything if/when the data server comes back.
675  if (ev->events & IOUrgent) {
676  if (p->automatic) {
677  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
678  << " lost (will attempt to reconnect in 15 seconds)\n";
679  losePeer(nullptr, p, ev);
680  } else
681  losePeer("WARNING: lost peer connection ", p, ev);
682 
683  unlock();
684  return true;
685  }
686 
687  // If we can write to the peer socket, pump whatever we can into it.
688  if (ev->events & IOWrite) {
689  while (Bucket *b = p->sendq) {
690  IOSize len = b->data.size() - p->sendpos;
691  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
692  IOSize done;
693 
694  try {
695  done = (len ? ev->source->write(data, len) : 0);
696  if (debug_ && len)
697  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
698  } catch (Error &e) {
699  losePeer("WARNING: unable to write to peer ", p, ev, &e);
700  unlock();
701  return true;
702  }
703 
704  p->sendpos += done;
705  if (p->sendpos == b->data.size()) {
706  Bucket *old = p->sendq;
707  p->sendq = old->next;
708  p->sendpos = 0;
709  old->next = nullptr;
710  discard(old);
711  }
712 
713  if (!done && len)
714  // Cannot write any more.
715  break;
716  }
717  }
718 
719  // If there is data to be read from the peer, first receive what we
720  // can get out the socket, the process all complete requests.
721  if (ev->events & IORead) {
722  // First build up the incoming buffer of data in the socket.
723  // Remember the last size returned by the socket; we need
724  // it to determine if the remote end closed the connection.
725  IOSize sz;
726  try {
727  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
728  do
729  if ((sz = ev->source->read(&buf[0], buf.size()))) {
730  if (debug_)
731  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
732  DataBlob &data = p->incoming;
733  if (data.capacity() < data.size() + sz)
734  data.reserve(data.size() + SOCKET_READ_GROWTH);
735  data.insert(data.end(), &buf[0], &buf[0] + sz);
736  }
737  while (sz == sizeof(buf));
738  } catch (Error &e) {
739  auto *next = dynamic_cast<SystemError *>(e.next());
740  if (next && next->portable() == SysErr::ErrTryAgain)
741  sz = 1; // Ignore it, and fake no end of data.
742  else {
743  // Houston we have a problem.
744  losePeer("WARNING: failed to read from peer ", p, ev, &e);
745  unlock();
746  return true;
747  }
748  }
749 
750  // Process fully received messages as long as we can.
751  size_t consumed = 0;
752  DataBlob &data = p->incoming;
753  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
754  uint32_t msglen;
755  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
756 
757  if (msglen >= MESSAGE_SIZE_LIMIT) {
758  losePeer("WARNING: excessively large message from ", p, ev);
759  unlock();
760  return true;
761  }
762 
763  if (data.size() - consumed >= msglen) {
764  bool valid = true;
765  if (msglen < 2 * sizeof(uint32_t)) {
766  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
767  valid = false;
768  } else {
769  // Decode and process this message.
770  Bucket msg;
771  msg.next = nullptr;
772  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
773 
774  // If we created a response, chain it to the write queue.
775  if (!msg.data.empty()) {
776  Bucket **prev = &p->sendq;
777  while (*prev)
778  prev = &(*prev)->next;
779 
780  *prev = new Bucket;
781  (*prev)->next = nullptr;
782  (*prev)->data.swap(msg.data);
783  }
784  }
785 
786  if (!valid) {
787  losePeer("WARNING: data stream error with ", p, ev);
788  unlock();
789  return true;
790  }
791 
792  consumed += msglen;
793  } else
794  break;
795  }
796 
797  data.erase(data.begin(), data.begin() + consumed);
798 
799  // If the client has closed the connection, shut down our end. If
800  // we have something to send back still, leave the write direction
801  // open. Otherwise close the shop for this client.
802  if (sz == 0)
803  sel_.setMask(p->socket, p->mask &= ~IORead);
804  }
805 
806  // Yes, please keep processing events for this socket.
807  unlock();
808  return false;
809 }
810 
815 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
816  // Recover the server socket.
817  assert(ev->source == server_);
818 
819  // Accept the connection.
820  Socket *s = server_->accept();
821  assert(s);
822  assert(!s->isBlocking());
823 
824  // Record it to our list of peers.
825  lock();
826  Peer *p = createPeer(s);
827  std::string localaddr;
828  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
829  InetAddress peeraddr = inet->peername();
830  InetAddress myaddr = inet->sockname();
831  p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
832  localaddr = fmt::format("{}:{}", myaddr.hostname(), myaddr.port());
833  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
834  p->peeraddr = local->peername().path();
835  localaddr = local->sockname().path();
836  } else
837  assert(false);
838 
839  p->mask = IORead | IOUrgent;
840  p->socket = s;
841 
842  // Report the new connection.
843  if (debug_)
844  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
845 
846  // Attach it to the listener.
847  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
848  unlock();
849 
850  // We are never done.
851  return false;
852 }
853 
860 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
861  // Discard the data in the pipe, we care only about the wakeup.
862  try {
863  IOSize sz;
864  unsigned char buf[1024];
865  while ((sz = ev->source->read(buf, sizeof(buf))))
866  ;
867  } catch (Error &e) {
868  auto *next = dynamic_cast<SystemError *>(e.next());
869  if (next && next->portable() == SysErr::ErrTryAgain)
870  ; // Ignore it
871  else
872  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
873  }
874 
875  // Tell the main event pump to send an update in a little while.
876  flush_ = true;
877 
878  // We are never done, always keep going.
879  return false;
880 }
881 
885  if (!p->socket)
886  return;
887 
888  // Listen to writes iff we have data to send.
889  unsigned oldmask = p->mask;
890  if (!p->sendq && (p->mask & IOWrite))
891  sel_.setMask(p->socket, p->mask &= ~IOWrite);
892 
893  if (p->sendq && !(p->mask & IOWrite))
894  sel_.setMask(p->socket, p->mask |= IOWrite);
895 
896  if (debug_ && oldmask != p->mask)
897  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
898 
899  // If we have nothing more to send and are no longer listening
900  // for reads, close up the shop for this peer.
901  if (p->mask == IOUrgent && !p->waiting) {
902  assert(!p->sendq);
903  if (debug_)
904  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
905  losePeer(nullptr, p, nullptr);
906  }
907 }
908 
910 DQMNet::DQMNet(const std::string &appname /* = "" */)
911  : debug_(false),
912  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
913  pid_(getpid()),
914  server_(nullptr),
915  version_(Time::current()),
916  communicate_((pthread_t)-1),
917  shutdown_(0),
918  delay_(1000),
919  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
920  waitMax_(0, 0, 0, 5 /* seconds */, 0),
921  flush_(false) {
922  // Create a pipe for the local DQM to tell the communicator
923  // thread that local DQM data has changed and that the peers
924  // should be notified.
925  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
926  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
927 
928  // Initialise the upstream and downstream to empty.
929  upstream_.peer = downstream_.peer = nullptr;
933 }
934 
936  // FIXME
937 }
938 
941 void DQMNet::debug(bool doit) { debug_ = doit; }
942 
945 void DQMNet::delay(int delay) { delay_ = delay; }
946 
952 
957  if (server_) {
958  logme() << "ERROR: DQM server was already started.\n";
959  return;
960  }
961 
962  try {
963  InetAddress addr("0.0.0.0", port);
964  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
965  s->bind(addr);
966  s->listen(10);
967  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
968  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
969  s->setBlocking(false);
970  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
971  } catch (Error &e) {
972  // FIXME: Do we need to do this when we throw an exception anyway?
973  // FIXME: Abort instead?
974  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
975 
976  throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
977 
978  port << ": " << e.explain().c_str();
979  }
980 
981  logme() << "INFO: DQM server started at port " << port << std::endl;
982 }
983 
987 void DQMNet::startLocalServer(const char *path) {
988  if (server_) {
989  logme() << "ERROR: DQM server was already started.\n";
990  return;
991  }
992 
993  try {
994  server_ = new LocalServerSocket(path, 10);
995  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
996  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
997  server_->setBlocking(false);
998  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
999  } catch (Error &e) {
1000  // FIXME: Do we need to do this when we throw an exception anyway?
1001  // FIXME: Abort instead?
1002  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1003 
1004  throw cms::Exception("DQMNet::startLocalServer")
1005  << "Failed to start server at path " << path << ": " << e.explain().c_str();
1006  }
1007 
1008  logme() << "INFO: DQM server started at path " << path << std::endl;
1009 }
1010 
1015  if (!downstream_.host.empty()) {
1016  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1017  << std::endl;
1018  return;
1019  }
1020 
1021  downstream_.update = true;
1022  downstream_.host = host;
1023  downstream_.port = port;
1024 }
1025 
1030  if (!upstream_.host.empty()) {
1031  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1032  << std::endl;
1033  return;
1034  }
1035 
1036  upstream_.update = false;
1037  upstream_.host = host;
1038  upstream_.port = port;
1039 }
1040 
1043  shutdown_ = 1;
1044  if (communicate_ != (pthread_t)-1)
1045  pthread_join(communicate_, nullptr);
1046 }
1047 
1053 static void *communicate(void *obj) {
1054  sigset_t sigs;
1055  sigfillset(&sigs);
1056  pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1057  ((DQMNet *)obj)->run();
1058  return nullptr;
1059 }
1060 
1063  if (communicate_ != (pthread_t)-1)
1064  pthread_mutex_lock(&lock_);
1065 }
1066 
1069  if (communicate_ != (pthread_t)-1)
1070  pthread_mutex_unlock(&lock_);
1071 }
1072 
1077  if (communicate_ != (pthread_t)-1) {
1078  logme() << "ERROR: DQM networking thread has already been started\n";
1079  return;
1080  }
1081 
1082  pthread_mutex_init(&lock_, nullptr);
1083  pthread_create(&communicate_, nullptr, &communicate, this);
1084 }
1085 
1087 void DQMNet::run() {
1088  Time now;
1089  Time nextFlush = 0;
1090  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1091 
1092  // Perform I/O. Every once in a while flush updates to peers.
1093  while (!shouldStop()) {
1094  for (auto ap : automatic) {
1095  // If we need a server connection and don't have one yet,
1096  // initiate asynchronous connection creation. Swallow errors
1097  // in case the server won't talk to us.
1098  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1099  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1100  InetSocket *s = nullptr;
1101  try {
1102  InetAddress addr(ap->host.c_str(), ap->port);
1103  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1104  s->setBlocking(false);
1105  s->connect(addr);
1106  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1107  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1108  } catch (Error &e) {
1109  auto *sys = dynamic_cast<SystemError *>(e.next());
1110  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1111  // "In progress" just means the connection is in progress.
1112  // The connection is ready when the socket is writeable.
1113  // Anything else is a real problem.
1114  if (s)
1115  s->abort();
1116  delete s;
1117  s = nullptr;
1118  }
1119  }
1120 
1121  // Set up with the selector if we were successful. If this is
1122  // the upstream collector, queue a request for updates.
1123  if (s) {
1124  Peer *p = createPeer(s);
1125  ap->peer = p;
1126 
1127  InetAddress peeraddr = ((InetSocket *)s)->peername();
1128  InetAddress myaddr = ((InetSocket *)s)->sockname();
1129  p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
1130  p->mask = IORead | IOWrite | IOUrgent;
1131  p->update = ap->update;
1132  p->automatic = ap;
1133  p->socket = s;
1134  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1135  if (ap == &upstream_) {
1136  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1137  p->sendq = new Bucket;
1138  p->sendq->next = nullptr;
1139  copydata(p->sendq, words, sizeof(words));
1140  }
1141 
1142  // Report the new connection.
1143  if (debug_)
1144  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1145  << std::endl;
1146  }
1147  }
1148  }
1149 
1150  // Pump events for a while.
1151  sel_.dispatch(delay_);
1152  now = Time::current();
1153  lock();
1154 
1155  // Check if flush is required. Flush only if one is needed.
1156  // Always sends the full object list, but only rarely.
1157  if (flush_ && now > nextFlush) {
1158  flush_ = false;
1159  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1160  sendObjectListToPeers(true);
1161  }
1162 
1163  // Update the data server and peer selection masks. If we
1164  // have no more data to send and listening for writes, remove
1165  // the write mask. If we have something to write and aren't
1166  // listening for writes, start listening so we can send off
1167  // the data.
1168  updatePeerMasks();
1169 
1170  // Release peers that have been waiting for data for too long.
1171  Time waitold = now - waitMax_;
1172  Time waitstale = now - waitStale_;
1173  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1174  Object *o = findObject(nullptr, i->name);
1175 
1176  // If we have (stale) object data, wait only up to stale limit.
1177  // Otherwise if we have no data at all, wait up to the max limit.
1178  if (i->time < waitold) {
1179  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1180  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1181  releaseFromWait(i++, o);
1182  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1183  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1184  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1185  releaseFromWait(i++, o);
1186  }
1187 
1188  // Keep it for now.
1189  else
1190  ++i;
1191  }
1192 
1193  unlock();
1194  }
1195 }
1196 
1197 // Tell the network cache that there have been local changes that
1198 // should be advertised to the downstream listeners.
1200  char byte = 0;
1201  wakeup_.sink()->write(&byte, 1);
1202 }
1203 
1207 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */) : DQMImplNet<DQMNet::Object>(appname) {
1208  local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1209 }
1210 
1212 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.reserve(size); }
1213 
1217  o.dirname = *local_->dirs.insert(o.dirname).first;
1218  std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1219  if (!info.second) {
1220  // Somewhat hackish. Sets are supposedly immutable, but we
1221  // need to change the non-key parts of the object. Erasing
1222  // and re-inserting would produce too much memory churn.
1223  auto &old = const_cast<Object &>(*info.first);
1224  std::swap(old.flags, o.flags);
1225  std::swap(old.tag, o.tag);
1226  std::swap(old.version, o.version);
1227  std::swap(old.qreports, o.qreports);
1228  std::swap(old.rawdata, o.rawdata);
1229  std::swap(old.scalar, o.scalar);
1230  std::swap(old.qdata, o.qdata);
1231  }
1232 }
1233 
1237 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1238  size_t removed = 0;
1239  std::string path;
1240  ObjectMap::iterator i, e;
1241  for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1242  path.clear();
1243  path.reserve(i->dirname.size() + i->objname.size() + 2);
1244  path += i->dirname;
1245  if (!path.empty())
1246  path += '/';
1247  path += i->objname;
1248 
1249  if (!known.count(path))
1250  ++removed, local_->objs.erase(i++);
1251  else
1252  ++i;
1253  }
1254 
1255  return removed > 0;
1256 }
size
Write out results.
edm::ErrorSummaryEntry Error
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:169
AutoPeer downstream_
Definition: DQMNet.h:362
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:910
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:51
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:69
def logme(msg, args)
MonitorElement * bookFloat(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:80
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:860
MonitorElement * bookProfile2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, double lowZ, double highZ, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:485
pthread_t communicate_
Definition: DQMNet.h:365
MonitorElement * book1I(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:186
string host
Definition: query.py:115
MonitorElement * book2S(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:263
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1062
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</>")
std::ostream & logme()
Definition: DQMNet.cc:46
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:25
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:162
T w() const
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:28
static void discard(Bucket *&b)
Definition: DQMNet.cc:58
Definition: DQMNet.h:25
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:368
pthread_mutex_t lock_
Definition: DQMNet.h:341
void delay(int delay)
Definition: DQMNet.cc:945
virtual Peer * createPeer(lat::Socket *s)=0
void setCurrentFolder(std::string const &fullpath) override
Definition: DQMStore.h:656
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:143
std::string peeraddr
Definition: DQMNet.h:124
static const int WARNING
Definition: DQMNet.cc:39
lat::Socket * socket
Definition: DQMNet.h:125
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:951
AutoPeer * automatic
Definition: DQMNet.h:136
AutoPeer upstream_
Definition: DQMNet.h:361
int port
Definition: query.py:116
static void * communicate(void *obj)
Definition: DQMNet.cc:1053
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1042
assert(be >=bs)
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
void debug(bool doit)
Definition: DQMNet.cc:941
MonitorElement * bookString(TString const &name, TString const &value, FUNC onbooking=NOOP())
Definition: DQMStore.h:87
MonitorElement * book1DD(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:155
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:114
virtual bool shouldStop()
Definition: DQMNet.cc:372
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:96
MonitorElement * bookProfile(TString const &name, TString const &title, int nchX, double lowX, double highX, int, double lowY, double highY, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:408
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:437
DataBlob data
Definition: DQMNet.h:112
void start()
Definition: DQMNet.cc:1076
Peer * peer
Definition: DQMNet.h:140
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:667
void sendLocalChanges()
Definition: DQMNet.cc:1199
MonitorElement * book1S(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:133
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:50
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:29
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:52
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:376
sig_atomic_t shutdown_
Definition: DQMNet.h:366
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:487
lat::Time next
Definition: DQMNet.h:141
size_t IOSize
Definition: IOTypes.h:15
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:394
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1237
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:815
MonitorElement * book2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:221
MonitorElement * bookInt(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:73
std::string host
Definition: DQMNet.h:142
double b
Definition: hdecay.h:120
void run()
Definition: DQMNet.cc:1087
void startLocalServer(int port)
Definition: DQMNet.cc:956
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:154
tuple msg
Definition: mps_check.py:286
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:64
MonitorElement * book2DD(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:347
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:70
lat::Pipe wakeup_
Definition: DQMNet.h:358
bool debug_
Definition: DQMNet.h:340
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1068
virtual ~DQMNet()
Definition: DQMNet.cc:935
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1029
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:26
static const int STATUS_OK
Definition: DQMNet.cc:38
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:70
void updateMask(Peer *p)
Definition: DQMNet.cc:884
bool flush_
Definition: DQMNet.h:371
lat::Socket * server_
Definition: DQMNet.h:357
#define O_NONBLOCK
Definition: SysFile.h:23
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1216
lat::TimeSpan waitMax_
Definition: DQMNet.h:370
Bucket * next
Definition: DQMNet.h:111
MonitorElement * book1D(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:98
Bucket * sendq
Definition: DQMNet.h:127
std::vector< QValue > QReports
Definition: DQMNet.h:85
lat::TimeSpan waitStale_
Definition: DQMNet.h:369
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1212
Definition: DQMStore.h:18
MonitorElement * book3D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ, FUNC onbooking=NOOP())
Definition: DQMStore.h:376
MonitorElement * book2I(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:305
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1014
WaitList waiting_
Definition: DQMNet.h:363
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:84
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1207
static const int ERROR
Definition: DQMNet.cc:40
ImplPeer * local_
Definition: DQMNet.h:601
lat::IOSelector sel_
Definition: DQMNet.h:356