CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
DQMNet.cc
Go to the documentation of this file.
2 #include "classlib/iobase/InetServerSocket.h"
3 #include "classlib/iobase/LocalServerSocket.h"
4 #include "classlib/iobase/Filename.h"
5 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
6 #include "classlib/utils/TimeInfo.h"
7 #include "classlib/utils/StringList.h"
8 #include "classlib/utils/StringFormat.h"
9 #include "classlib/utils/StringOps.h"
10 #include "classlib/utils/SystemError.h"
11 #include "classlib/utils/Regexp.h"
12 #include <unistd.h>
13 #include <fcntl.h>
14 #include <sys/wait.h>
15 #include <cstdio>
16 #include <cstdint>
17 #include <iostream>
18 #include <sstream>
19 #include <cassert>
20 #include <cfloat>
21 #include <cinttypes>
22 
24 
25 #if __APPLE__
26 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
27 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
28 #else
29 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
30 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
31 #endif
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
34 
35 using namespace lat;
36 
37 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
38 
39 // TODO: Can't include the header file since that leads to ambiguities.
40 namespace dqm {
41  namespace qstatus {
42  static const int STATUS_OK = 100; //< Test was succesful.
43  static const int WARNING = 200; //< Test had some problems.
44  static const int ERROR = 300; //< Test has failed.
45  } // namespace qstatus
46 } // namespace dqm
47 
49 // Generate log prefix.
50 std::ostream &DQMNet::logme() {
51  Time now = Time::current();
52  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
53  << "]: ";
54 }
55 
56 // Append data into a bucket.
57 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
58  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
59 }
60 
61 // Discard a bucket chain.
63  while (b) {
64  Bucket *next = b->next;
65  delete b;
66  b = next;
67  }
68 }
69 
71 
74 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
75  if (reason)
76  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
77 
78  Socket *s = peer->socket;
79 
80  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
81  if (i->peer == peer)
82  waiting_.erase(i++);
83  else
84  ++i;
85 
86  if (ev)
87  ev->source = nullptr;
88 
89  discard(peer->sendq);
90  if (peer->automatic)
91  peer->automatic->peer = nullptr;
92 
93  sel_.detach(s);
94  s->close();
95  removePeer(peer, s);
96  delete s;
97 }
98 
100 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
101  // Issue request to peer.
102  Bucket **msg = &p->sendq;
103  while (*msg)
104  msg = &(*msg)->next;
105  *msg = new Bucket;
106  (*msg)->next = nullptr;
107 
108  uint32_t words[3];
109  words[0] = sizeof(words) + len;
110  words[1] = DQM_MSG_GET_OBJECT;
111  words[2] = len;
112  copydata(*msg, words, sizeof(words));
113  copydata(*msg, name, len);
114 }
115 
118 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
119  // FIXME: Should we automatically record which exact peer the waiter
120  // is expecting to deliver data so we know to release the waiter if
121  // the other peer vanishes? The current implementation stands a
122  // chance for the waiter to wait indefinitely -- although we do
123  // force terminate the wait after a while.
124  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
125  WaitObject wo = {Time::current(), name, info, p};
126  waiting_.push_back(wo);
127  p->waiting++;
128 }
129 
130 // Once an object has been updated, this is invoked for all waiting
131 // peers. Send the object back to the peer in suitable form.
132 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
133  Bucket **msg = &i->peer->sendq;
134  while (*msg)
135  msg = &(*msg)->next;
136  *msg = new Bucket;
137  (*msg)->next = nullptr;
138 
139  releaseFromWait(*msg, *i, o);
140 
141  assert(i->peer->waiting > 0);
142  i->peer->waiting--;
143  waiting_.erase(i);
144 }
145 
146 // Release everyone waiting for the object @a o.
148  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
149  if (i->name == name)
150  releaseFromWait(i++, o);
151  else
152  ++i;
153 }
154 
159  char buf[64];
160  std::ostringstream qrs;
161  QReports::const_iterator qi, qe;
162  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
163  int pos = 0;
164  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
165  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
166  << '\0';
167  }
168  into = qrs.str();
169 }
170 
173 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
174  const char *qdata = from;
175 
176  // Count how many qresults there are.
177  size_t nqv = 0;
178  while (*qdata) {
179  ++nqv;
180  while (*qdata)
181  ++qdata;
182  ++qdata;
183  while (*qdata)
184  ++qdata;
185  ++qdata;
186  while (*qdata)
187  ++qdata;
188  ++qdata;
189  while (*qdata)
190  ++qdata;
191  ++qdata;
192  while (*qdata)
193  ++qdata;
194  ++qdata;
195  }
196 
197  // Now extract the qreports.
198  qdata = from;
199  qr.reserve(nqv);
200  while (*qdata) {
201  qr.emplace_back();
202  DQMNet::QValue &qv = qr.back();
203 
204  qv.code = atoi(qdata);
205  while (*qdata)
206  ++qdata;
207  switch (qv.code) {
209  break;
212  break;
213  case dqm::qstatus::ERROR:
215  break;
216  default:
218  break;
219  }
220 
221  qv.qtresult = atof(++qdata);
222  while (*qdata)
223  ++qdata;
224 
225  qv.qtname = ++qdata;
226  while (*qdata)
227  ++qdata;
228 
229  qv.algorithm = ++qdata;
230  while (*qdata)
231  ++qdata;
232 
233  qv.message = ++qdata;
234  while (*qdata)
235  ++qdata;
236  ++qdata;
237  }
238 }
239 
240 #if 0
241 // Deserialise a ROOT object from a buffer at the current position.
242 static TObject *
243 extractNextObject(TBufferFile &buf)
244 {
245  if (buf.Length() == buf.BufferSize())
246  return 0;
247 
248  buf.InitMap();
249  Int_t pos = buf.Length();
250  TClass *c = buf.ReadClass();
251  buf.SetBufferOffset(pos);
252  buf.ResetMap();
253  return c ? buf.ReadObject(c) : 0;
254 }
255 
256 // Reconstruct an object from the raw data.
257 bool
258 DQMNet::reconstructObject(Object &o)
259 {
260  TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
261  buf.Reset();
262 
263  // Extract the main object.
264  if (! (o.object = extractNextObject(buf)))
265  return false;
266 
267  // Extract the reference object.
268  o.reference = extractNextObject(buf);
269 
270  // Extract quality reports.
271  unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
272  return true;
273 }
274 #endif
275 
276 #if 0
277 bool
278 DQMNet::reinstateObject(DQMStore *store, Object &o)
279 {
280  if (! reconstructObject (o))
281  return false;
282 
283  // Reconstruct the main object
284  MonitorElement *obj = 0;
285  store->setCurrentFolder(o.dirname);
286  switch (o.flags & DQM_PROP_TYPE_MASK)
287  {
288  case DQM_PROP_TYPE_INT:
289  obj = store->bookInt(o.objname);
290  obj->Fill(atoll(o.scalar.c_str()));
291  break;
292 
293  case DQM_PROP_TYPE_REAL:
294  obj = store->bookFloat(name);
295  obj->Fill(atof(o.scalar.c_str()));
296  break;
297 
298  case DQM_PROP_TYPE_STRING:
299  obj = store->bookString(name, o.scalar);
300  break;
301 
302  case DQM_PROP_TYPE_TH1F:
303  obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
304  break;
305 
306  case DQM_PROP_TYPE_TH1S:
307  obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
308  break;
309 
310  case DQM_PROP_TYPE_TH1D:
311  obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
312  break;
313 
314  case DQM_PROP_TYPE_TH2F:
315  obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
316  break;
317 
318  case DQM_PROP_TYPE_TH2S:
319  obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
320  break;
321 
322  case DQM_PROP_TYPE_TH2D:
323  obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
324  break;
325 
326  case DQM_PROP_TYPE_TH3F:
327  obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
328  break;
329 
330  case DQM_PROP_TYPE_TH3S:
331  obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
332  break;
333 
334  case DQM_PROP_TYPE_TH3D:
335  obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
336  break;
337 
338  case DQM_PROP_TYPE_PROF:
339  obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
340  break;
341 
342  case DQM_PROP_TYPE_PROF2D:
343  obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
344  break;
345 
346  default:
347  logme()
348  << "ERROR: unexpected monitor element of type "
349  << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
350  << o.dirname << '/' << o.objname << "'\n";
351  return false;
352  }
353 
354  // Reconstruct tag and qreports.
355  if (obj)
356  {
357  obj->data_.tag = o.tag;
358  obj->data_.qreports = o.qreports;
359  }
360 
361  // Inidicate success.
362  return true;
363 }
364 #endif
365 
367 // Check if the network layer should stop.
368 bool DQMNet::shouldStop() { return shutdown_; }
369 
370 // Once an object has been updated, this is invoked for all waiting
371 // peers. Send the requested object to the waiting peer.
373  if (o)
374  sendObjectToPeer(msg, *o, true);
375  else {
376  uint32_t words[3];
377  words[0] = sizeof(words) + w.name.size();
378  words[1] = DQM_REPLY_NONE;
379  words[2] = w.name.size();
380 
381  msg->data.reserve(msg->data.size() + words[0]);
382  copydata(msg, &words[0], sizeof(words));
383  copydata(msg, &w.name[0], w.name.size());
384  }
385 }
386 
387 // Send an object to a peer. If not @a data, only sends a summary
388 // without the object data, except the data is always sent for scalar
389 // objects.
391  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
392  DataBlob objdata;
393 
394  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
395  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
396  else if (data)
397  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
398 
399  uint32_t words[9];
400  uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
401  uint32_t datalen = objdata.size();
402  uint32_t qlen = o.qdata.size();
403 
404  if (o.dirname.empty())
405  --namelen;
406 
407  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
408  words[1] = DQM_REPLY_OBJECT;
409  words[2] = flags;
410  words[3] = (o.version >> 0) & 0xffffffff;
411  words[4] = (o.version >> 32) & 0xffffffff;
412  words[5] = o.tag;
413  words[6] = namelen;
414  words[7] = datalen;
415  words[8] = qlen;
416 
417  msg->data.reserve(msg->data.size() + words[0]);
418  copydata(msg, &words[0], 9 * sizeof(uint32_t));
419  if (namelen) {
420  copydata(msg, &(o.dirname)[0], o.dirname.size());
421  if (!o.dirname.empty())
422  copydata(msg, "/", 1);
423  copydata(msg, &o.objname[0], o.objname.size());
424  }
425  if (datalen)
426  copydata(msg, &objdata[0], datalen);
427  if (qlen)
428  copydata(msg, &o.qdata[0], qlen);
429 }
430 
432 // Handle peer messages.
433 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
434  // Decode and process this message.
435  uint32_t type;
436  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
437  switch (type) {
438  case DQM_MSG_UPDATE_ME: {
439  if (len != 2 * sizeof(uint32_t)) {
440  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
441  return false;
442  }
443 
444  if (debug_)
445  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
446 
447  p->update = true;
448  }
449  return true;
450 
451  case DQM_MSG_LIST_OBJECTS: {
452  if (debug_)
453  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
454 
455  // Send over current status: list of known objects.
456  sendObjectListToPeer(msg, true, false);
457  }
458  return true;
459 
460  case DQM_MSG_GET_OBJECT: {
461  if (debug_)
462  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
463 
464  if (len < 3 * sizeof(uint32_t)) {
465  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
466  return false;
467  }
468 
469  uint32_t namelen;
470  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
471  if (len != 3 * sizeof(uint32_t) + namelen) {
472  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
473  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
474  return false;
475  }
476 
477  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
478  Peer *owner = nullptr;
479  Object *o = findObject(nullptr, name, &owner);
480  if (o) {
481  o->lastreq = Time::current().ns();
482  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
483  (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
484  waitForData(p, name, "", owner);
485  else
486  sendObjectToPeer(msg, *o, true);
487  } else {
488  uint32_t words[3];
489  words[0] = sizeof(words) + name.size();
490  words[1] = DQM_REPLY_NONE;
491  words[2] = name.size();
492 
493  msg->data.reserve(msg->data.size() + words[0]);
494  copydata(msg, &words[0], sizeof(words));
495  copydata(msg, &name[0], name.size());
496  }
497  }
498  return true;
499 
500  case DQM_REPLY_LIST_BEGIN: {
501  if (len != 4 * sizeof(uint32_t)) {
502  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
503  return false;
504  }
505 
506  // Get the update status: whether this is a full update.
507  uint32_t flags;
508  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
509 
510  if (debug_)
511  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
512  << p->peeraddr << ", size " << len << std::endl;
513 
514  // If we are about to receive a full list of objects, flag all
515  // objects as possibly dead. Subsequent object notifications
516  // will undo this for the live objects. We cannot delete
517  // objects quite yet, as we may get inquiry from another client
518  // while we are processing the incoming list, so we keep the
519  // objects tentatively alive as long as we've not seen the end.
520  if (flags)
521  markObjectsDead(p);
522  }
523  return true;
524 
525  case DQM_REPLY_LIST_END: {
526  if (len != 4 * sizeof(uint32_t)) {
527  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
528  return false;
529  }
530 
531  // Get the update status: whether this is a full update.
532  uint32_t flags;
533  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
534 
535  // If we received a full list of objects, now purge all dead
536  // objects. We need to do this in two stages in case we receive
537  // updates in many parts, and end up sending updates to others in
538  // between; this avoids us lying live objects are dead.
539  if (flags)
540  purgeDeadObjects(p);
541 
542  if (debug_)
543  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
544  << ", size " << len << std::endl;
545 
546  // Indicate we have received another update from this peer.
547  // Also indicate we should flush to our clients.
548  flush_ = true;
549  p->updates++;
550  }
551  return true;
552 
553  case DQM_REPLY_OBJECT: {
554  uint32_t words[9];
555  if (len < sizeof(words)) {
556  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
557  return false;
558  }
559 
560  memcpy(&words[0], data, sizeof(words));
561  uint32_t &namelen = words[6];
562  uint32_t &datalen = words[7];
563  uint32_t &qlen = words[8];
564 
565  if (len != sizeof(words) + namelen + datalen + qlen) {
566  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
567  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
568  << std::endl;
569  return false;
570  }
571 
572  unsigned char *namedata = data + sizeof(words);
573  unsigned char *objdata = namedata + namelen;
574  unsigned char *qdata = objdata + datalen;
575  unsigned char *enddata = qdata + qlen;
576  std::string name((char *)namedata, namelen);
577  assert(enddata == data + len);
578 
579  if (debug_)
580  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
581  << std::endl;
582 
583  // Mark the peer as a known object source.
584  p->source = true;
585 
586  // Initialise or update an object entry.
587  Object *o = findObject(p, name);
588  if (!o)
589  o = makeObject(p, name);
590 
591  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
592  o->tag = words[5];
593  o->version = ((uint64_t)words[4] << 32 | words[3]);
594  o->scalar.clear();
595  o->qdata.clear();
596  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
597  o->rawdata.clear();
598  o->scalar.insert(o->scalar.end(), objdata, qdata);
599  } else if (datalen) {
600  o->rawdata.clear();
601  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
602  } else if (!o->rawdata.empty())
603  o->flags |= DQM_PROP_STALE;
604  o->qdata.insert(o->qdata.end(), qdata, enddata);
605 
606  // If we had an object for this one already and this is a list
607  // update without data, issue an immediate data get request.
608  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
609  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
610 
611  // If we have the object data, release from wait.
612  if (datalen)
613  releaseWaiters(name, o);
614  }
615  return true;
616 
617  case DQM_REPLY_NONE: {
618  uint32_t words[3];
619  if (len < sizeof(words)) {
620  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
621  return false;
622  }
623 
624  memcpy(&words[0], data, sizeof(words));
625  uint32_t &namelen = words[2];
626 
627  if (len != sizeof(words) + namelen) {
628  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
629  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
630  return false;
631  }
632 
633  unsigned char *namedata = data + sizeof(words);
634  std::string name((char *)namedata, namelen);
635 
636  if (debug_)
637  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
638  << std::endl;
639 
640  // Mark the peer as a known object source.
641  p->source = true;
642 
643  // If this was a known object, kill it.
644  if (Object *o = findObject(p, name)) {
645  o->flags |= DQM_PROP_DEAD;
646  purgeDeadObjects(p);
647  }
648 
649  // If someone was waiting for this, let them go.
650  releaseWaiters(name, nullptr);
651  }
652  return true;
653 
654  default:
655  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
656  << std::endl;
657  return false;
658  }
659 }
660 
663 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
664  lock();
665  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
666 
667  // If there is a problem with the peer socket, discard the peer
668  // and tell the selector to stop prcessing events for it. If
669  // this is a server connection, we will eventually recreate
670  // everything if/when the data server comes back.
671  if (ev->events & IOUrgent) {
672  if (p->automatic) {
673  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
674  << " lost (will attempt to reconnect in 15 seconds)\n";
675  losePeer(nullptr, p, ev);
676  } else
677  losePeer("WARNING: lost peer connection ", p, ev);
678 
679  unlock();
680  return true;
681  }
682 
683  // If we can write to the peer socket, pump whatever we can into it.
684  if (ev->events & IOWrite) {
685  while (Bucket *b = p->sendq) {
686  IOSize len = b->data.size() - p->sendpos;
687  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
688  IOSize done;
689 
690  try {
691  done = (len ? ev->source->write(data, len) : 0);
692  if (debug_ && len)
693  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
694  } catch (Error &e) {
695  losePeer("WARNING: unable to write to peer ", p, ev, &e);
696  unlock();
697  return true;
698  }
699 
700  p->sendpos += done;
701  if (p->sendpos == b->data.size()) {
702  Bucket *old = p->sendq;
703  p->sendq = old->next;
704  p->sendpos = 0;
705  old->next = nullptr;
706  discard(old);
707  }
708 
709  if (!done && len)
710  // Cannot write any more.
711  break;
712  }
713  }
714 
715  // If there is data to be read from the peer, first receive what we
716  // can get out the socket, the process all complete requests.
717  if (ev->events & IORead) {
718  // First build up the incoming buffer of data in the socket.
719  // Remember the last size returned by the socket; we need
720  // it to determine if the remote end closed the connection.
721  IOSize sz;
722  try {
723  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
724  do
725  if ((sz = ev->source->read(&buf[0], buf.size()))) {
726  if (debug_)
727  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
728  DataBlob &data = p->incoming;
729  if (data.capacity() < data.size() + sz)
730  data.reserve(data.size() + SOCKET_READ_GROWTH);
731  data.insert(data.end(), &buf[0], &buf[0] + sz);
732  }
733  while (sz == sizeof(buf));
734  } catch (Error &e) {
735  auto *next = dynamic_cast<SystemError *>(e.next());
736  if (next && next->portable() == SysErr::ErrTryAgain)
737  sz = 1; // Ignore it, and fake no end of data.
738  else {
739  // Houston we have a problem.
740  losePeer("WARNING: failed to read from peer ", p, ev, &e);
741  unlock();
742  return true;
743  }
744  }
745 
746  // Process fully received messages as long as we can.
747  size_t consumed = 0;
748  DataBlob &data = p->incoming;
749  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
750  uint32_t msglen;
751  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
752 
753  if (msglen >= MESSAGE_SIZE_LIMIT) {
754  losePeer("WARNING: excessively large message from ", p, ev);
755  unlock();
756  return true;
757  }
758 
759  if (data.size() - consumed >= msglen) {
760  bool valid = true;
761  if (msglen < 2 * sizeof(uint32_t)) {
762  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
763  valid = false;
764  } else {
765  // Decode and process this message.
766  Bucket msg;
767  msg.next = nullptr;
768  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
769 
770  // If we created a response, chain it to the write queue.
771  if (!msg.data.empty()) {
772  Bucket **prev = &p->sendq;
773  while (*prev)
774  prev = &(*prev)->next;
775 
776  *prev = new Bucket;
777  (*prev)->next = nullptr;
778  (*prev)->data.swap(msg.data);
779  }
780  }
781 
782  if (!valid) {
783  losePeer("WARNING: data stream error with ", p, ev);
784  unlock();
785  return true;
786  }
787 
788  consumed += msglen;
789  } else
790  break;
791  }
792 
793  data.erase(data.begin(), data.begin() + consumed);
794 
795  // If the client has closed the connection, shut down our end. If
796  // we have something to send back still, leave the write direction
797  // open. Otherwise close the shop for this client.
798  if (sz == 0)
799  sel_.setMask(p->socket, p->mask &= ~IORead);
800  }
801 
802  // Yes, please keep processing events for this socket.
803  unlock();
804  return false;
805 }
806 
811 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
812  // Recover the server socket.
813  assert(ev->source == server_);
814 
815  // Accept the connection.
816  Socket *s = server_->accept();
817  assert(s);
818  assert(!s->isBlocking());
819 
820  // Record it to our list of peers.
821  lock();
822  Peer *p = createPeer(s);
823  std::string localaddr;
824  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
825  InetAddress peeraddr = inet->peername();
826  InetAddress myaddr = inet->sockname();
827  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
828  localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
829  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
830  p->peeraddr = local->peername().path();
831  localaddr = local->sockname().path();
832  } else
833  assert(false);
834 
835  p->mask = IORead | IOUrgent;
836  p->socket = s;
837 
838  // Report the new connection.
839  if (debug_)
840  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
841 
842  // Attach it to the listener.
843  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
844  unlock();
845 
846  // We are never done.
847  return false;
848 }
849 
856 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
857  // Discard the data in the pipe, we care only about the wakeup.
858  try {
859  IOSize sz;
860  unsigned char buf[1024];
861  while ((sz = ev->source->read(buf, sizeof(buf))))
862  ;
863  } catch (Error &e) {
864  auto *next = dynamic_cast<SystemError *>(e.next());
865  if (next && next->portable() == SysErr::ErrTryAgain)
866  ; // Ignore it
867  else
868  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
869  }
870 
871  // Tell the main event pump to send an update in a little while.
872  flush_ = true;
873 
874  // We are never done, always keep going.
875  return false;
876 }
877 
881  if (!p->socket)
882  return;
883 
884  // Listen to writes iff we have data to send.
885  unsigned oldmask = p->mask;
886  if (!p->sendq && (p->mask & IOWrite))
887  sel_.setMask(p->socket, p->mask &= ~IOWrite);
888 
889  if (p->sendq && !(p->mask & IOWrite))
890  sel_.setMask(p->socket, p->mask |= IOWrite);
891 
892  if (debug_ && oldmask != p->mask)
893  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
894 
895  // If we have nothing more to send and are no longer listening
896  // for reads, close up the shop for this peer.
897  if (p->mask == IOUrgent && !p->waiting) {
898  assert(!p->sendq);
899  if (debug_)
900  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
901  losePeer(nullptr, p, nullptr);
902  }
903 }
904 
906 DQMNet::DQMNet(const std::string &appname /* = "" */)
907  : debug_(false),
908  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
909  pid_(getpid()),
910  server_(nullptr),
911  version_(Time::current()),
912  communicate_((pthread_t)-1),
913  shutdown_(0),
914  delay_(1000),
915  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
916  waitMax_(0, 0, 0, 5 /* seconds */, 0),
917  flush_(false) {
918  // Create a pipe for the local DQM to tell the communicator
919  // thread that local DQM data has changed and that the peers
920  // should be notified.
921  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
922  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
923 
924  // Initialise the upstream and downstream to empty.
925  upstream_.peer = downstream_.peer = nullptr;
929 }
930 
932  // FIXME
933 }
934 
937 void DQMNet::debug(bool doit) { debug_ = doit; }
938 
941 void DQMNet::delay(int delay) { delay_ = delay; }
942 
947 void DQMNet::staleObjectWaitLimit(lat::TimeSpan time) { waitStale_ = time; }
948 
953  if (server_) {
954  logme() << "ERROR: DQM server was already started.\n";
955  return;
956  }
957 
958  try {
959  InetAddress addr("0.0.0.0", port);
960  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
961  s->bind(addr);
962  s->listen(10);
963  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
964  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
965  s->setBlocking(false);
966  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
967  } catch (Error &e) {
968  // FIXME: Do we need to do this when we throw an exception anyway?
969  // FIXME: Abort instead?
970  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
971 
972  throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
973 
974  port << ": " << e.explain().c_str();
975  }
976 
977  logme() << "INFO: DQM server started at port " << port << std::endl;
978 }
979 
983 void DQMNet::startLocalServer(const char *path) {
984  if (server_) {
985  logme() << "ERROR: DQM server was already started.\n";
986  return;
987  }
988 
989  try {
990  server_ = new LocalServerSocket(path, 10);
991  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
992  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
993  server_->setBlocking(false);
994  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
995  } catch (Error &e) {
996  // FIXME: Do we need to do this when we throw an exception anyway?
997  // FIXME: Abort instead?
998  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
999 
1000  throw cms::Exception("DQMNet::startLocalServer")
1001  << "Failed to start server at path " << path << ": " << e.explain().c_str();
1002  }
1003 
1004  logme() << "INFO: DQM server started at path " << path << std::endl;
1005 }
1006 
1011  if (!downstream_.host.empty()) {
1012  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1013  << std::endl;
1014  return;
1015  }
1016 
1017  downstream_.update = true;
1018  downstream_.host = host;
1019  downstream_.port = port;
1020 }
1021 
1026  if (!upstream_.host.empty()) {
1027  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1028  << std::endl;
1029  return;
1030  }
1031 
1032  upstream_.update = false;
1033  upstream_.host = host;
1034  upstream_.port = port;
1035 }
1036 
1039  shutdown_ = 1;
1040  if (communicate_ != (pthread_t)-1)
1041  pthread_join(communicate_, nullptr);
1042 }
1043 
1049 static void *communicate(void *obj) {
1050  sigset_t sigs;
1051  sigfillset(&sigs);
1052  pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1053  ((DQMNet *)obj)->run();
1054  return nullptr;
1055 }
1056 
1059  if (communicate_ != (pthread_t)-1)
1060  pthread_mutex_lock(&lock_);
1061 }
1062 
1065  if (communicate_ != (pthread_t)-1)
1066  pthread_mutex_unlock(&lock_);
1067 }
1068 
1073  if (communicate_ != (pthread_t)-1) {
1074  logme() << "ERROR: DQM networking thread has already been started\n";
1075  return;
1076  }
1077 
1078  pthread_mutex_init(&lock_, nullptr);
1079  pthread_create(&communicate_, nullptr, &communicate, this);
1080 }
1081 
1083 void DQMNet::run() {
1084  Time now;
1085  Time nextFlush = 0;
1086  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1087 
1088  // Perform I/O. Every once in a while flush updates to peers.
1089  while (!shouldStop()) {
1090  for (auto ap : automatic) {
1091  // If we need a server connection and don't have one yet,
1092  // initiate asynchronous connection creation. Swallow errors
1093  // in case the server won't talk to us.
1094  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1095  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1096  InetSocket *s = nullptr;
1097  try {
1098  InetAddress addr(ap->host.c_str(), ap->port);
1099  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1100  s->setBlocking(false);
1101  s->connect(addr);
1102  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1103  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1104  } catch (Error &e) {
1105  auto *sys = dynamic_cast<SystemError *>(e.next());
1106  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1107  // "In progress" just means the connection is in progress.
1108  // The connection is ready when the socket is writeable.
1109  // Anything else is a real problem.
1110  if (s)
1111  s->abort();
1112  delete s;
1113  s = nullptr;
1114  }
1115  }
1116 
1117  // Set up with the selector if we were successful. If this is
1118  // the upstream collector, queue a request for updates.
1119  if (s) {
1120  Peer *p = createPeer(s);
1121  ap->peer = p;
1122 
1123  InetAddress peeraddr = ((InetSocket *)s)->peername();
1124  InetAddress myaddr = ((InetSocket *)s)->sockname();
1125  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1126  p->mask = IORead | IOWrite | IOUrgent;
1127  p->update = ap->update;
1128  p->automatic = ap;
1129  p->socket = s;
1130  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1131  if (ap == &upstream_) {
1132  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1133  p->sendq = new Bucket;
1134  p->sendq->next = nullptr;
1135  copydata(p->sendq, words, sizeof(words));
1136  }
1137 
1138  // Report the new connection.
1139  if (debug_)
1140  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1141  << std::endl;
1142  }
1143  }
1144  }
1145 
1146  // Pump events for a while.
1147  sel_.dispatch(delay_);
1148  now = Time::current();
1149  lock();
1150 
1151  // Check if flush is required. Flush only if one is needed.
1152  // Always sends the full object list, but only rarely.
1153  if (flush_ && now > nextFlush) {
1154  flush_ = false;
1155  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1156  sendObjectListToPeers(true);
1157  }
1158 
1159  // Update the data server and peer selection masks. If we
1160  // have no more data to send and listening for writes, remove
1161  // the write mask. If we have something to write and aren't
1162  // listening for writes, start listening so we can send off
1163  // the data.
1164  updatePeerMasks();
1165 
1166  // Release peers that have been waiting for data for too long.
1167  Time waitold = now - waitMax_;
1168  Time waitstale = now - waitStale_;
1169  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1170  Object *o = findObject(nullptr, i->name);
1171 
1172  // If we have (stale) object data, wait only up to stale limit.
1173  // Otherwise if we have no data at all, wait up to the max limit.
1174  if (i->time < waitold) {
1175  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1176  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1177  releaseFromWait(i++, o);
1178  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1179  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1180  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1181  releaseFromWait(i++, o);
1182  }
1183 
1184  // Keep it for now.
1185  else
1186  ++i;
1187  }
1188 
1189  unlock();
1190  }
1191 }
1192 
1193 // Tell the network cache that there have been local changes that
1194 // should be advertised to the downstream listeners.
1196  char byte = 0;
1197  wakeup_.sink()->write(&byte, 1);
1198 }
1199 
1203 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */) : DQMImplNet<DQMNet::Object>(appname) {
1204  local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1205 }
1206 
1208 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.resize(size); }
1209 
1213  o.dirname = *local_->dirs.insert(o.dirname).first;
1214  std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1215  if (!info.second) {
1216  // Somewhat hackish. Sets are supposedly immutable, but we
1217  // need to change the non-key parts of the object. Erasing
1218  // and re-inserting would produce too much memory churn.
1219  auto &old = const_cast<Object &>(*info.first);
1220  std::swap(old.flags, o.flags);
1221  std::swap(old.tag, o.tag);
1222  std::swap(old.version, o.version);
1223  std::swap(old.qreports, o.qreports);
1224  std::swap(old.rawdata, o.rawdata);
1225  std::swap(old.scalar, o.scalar);
1226  std::swap(old.qdata, o.qdata);
1227  }
1228 }
1229 
1233 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1234  size_t removed = 0;
1235  std::string path;
1236  ObjectMap::iterator i, e;
1237  for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1238  path.clear();
1239  path.reserve(i->dirname.size() + i->objname.size() + 2);
1240  path += i->dirname;
1241  if (!path.empty())
1242  path += '/';
1243  path += i->objname;
1244 
1245  if (!known.count(path))
1246  ++removed, local_->objs.erase(i++);
1247  else
1248  ++i;
1249  }
1250 
1251  return removed > 0;
1252 }
edm::ErrorSummaryEntry Error
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:173
AutoPeer downstream_
Definition: DQMNet.h:361
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:906
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:50
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
DataBlob incoming
Definition: DQMNet.h:125
MonitorElement * bookFloat(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:80
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:856
const edm::EventSetup & c
QReports qreports
Definition: DQMNet.h:98
MonitorElement * bookProfile2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, double lowZ, double highZ, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:399
pthread_t communicate_
Definition: DQMNet.h:364
MonitorElement * book2S(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:219
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:91
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:130
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1058
std::ostream & logme()
Definition: DQMNet.cc:50
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:162
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:26
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:367
pthread_mutex_t lock_
Definition: DQMNet.h:340
void delay(int delay)
Definition: DQMNet.cc:941
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:89
void setCurrentFolder(std::string const &fullpath) override
Definition: DQMStore.h:569
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:147
std::string peeraddr
Definition: DQMNet.h:123
static const int WARNING
lat::Socket * socket
Definition: DQMNet.h:124
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:947
bool ev
uint32_t tag
Definition: DQMNet.h:90
AutoPeer * automatic
Definition: DQMNet.h:135
AutoPeer upstream_
Definition: DQMNet.h:360
static void * communicate(void *obj)
Definition: DQMNet.cc:1049
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1038
assert(be >=bs)
A arg
Definition: Factorize.h:31
std::string qdata
Definition: DQMNet.h:106
int port
Definition: query.py:116
void debug(bool doit)
Definition: DQMNet.cc:937
MonitorElement * bookString(TString const &name, TString const &value, FUNC onbooking=NOOP())
Definition: DQMStore.h:87
void Fill(long long x)
MonitorElement * book1DD(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:155
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:118
virtual bool shouldStop()
Definition: DQMNet.cc:368
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:100
uint64_t lastreq
Definition: DQMNet.h:103
MonitorElement * bookProfile(TString const &name, TString const &title, int nchX, double lowX, double highX, int, double lowY, double highY, char const *option="s", FUNC onbooking=NOOP())
Definition: DQMStore.h:322
std::string name
Definition: DQMNet.h:116
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:433
DataBlob data
Definition: DQMNet.h:111
void start()
Definition: DQMNet.cc:1072
Peer * peer
Definition: DQMNet.h:139
if(conf_.getParameter< bool >("UseStripCablingDB"))
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:663
void sendLocalChanges()
Definition: DQMNet.cc:1195
MonitorElement * book1S(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:133
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:49
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
std::string dirname
Definition: DQMNet.h:96
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:51
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:372
sig_atomic_t shutdown_
Definition: DQMNet.h:365
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:486
std::string objname
Definition: DQMNet.h:97
bool update
Definition: DQMNet.h:131
std::string scalar
Definition: DQMNet.h:105
lat::Time next
Definition: DQMNet.h:140
string host
Definition: query.py:115
size_t IOSize
Definition: IOTypes.h:15
unsigned mask
Definition: DQMNet.h:129
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:390
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1233
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:811
size_t updates
Definition: DQMNet.h:133
MonitorElement * book2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:177
MonitorElement * bookInt(TString const &name, FUNC onbooking=NOOP())
Definition: DQMStore.h:73
std::string host
Definition: DQMNet.h:141
double b
Definition: hdecay.h:118
void run()
Definition: DQMNet.cc:1083
void startLocalServer(int port)
Definition: DQMNet.cc:952
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:158
tuple msg
Definition: mps_check.py:285
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
MonitorElement * book2DD(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
Definition: DQMStore.h:261
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>")
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:74
lat::Pipe wakeup_
Definition: DQMNet.h:357
bool debug_
Definition: DQMNet.h:339
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
size_t waiting
Definition: DQMNet.h:134
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1064
virtual ~DQMNet()
Definition: DQMNet.cc:931
size_t sendpos
Definition: DQMNet.h:127
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1025
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const int STATUS_OK
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
tuple cout
Definition: gather_cfg.py:144
void updateMask(Peer *p)
Definition: DQMNet.cc:880
bool flush_
Definition: DQMNet.h:370
lat::Socket * server_
Definition: DQMNet.h:356
#define O_NONBLOCK
Definition: SysFile.h:23
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1212
lat::TimeSpan waitMax_
Definition: DQMNet.h:369
Bucket * next
Definition: DQMNet.h:110
MonitorElement * book1D(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
Definition: DQMStore.h:98
Bucket * sendq
Definition: DQMNet.h:126
std::vector< QValue > QReports
Definition: DQMNet.h:84
lat::TimeSpan waitStale_
Definition: DQMNet.h:368
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1208
MonitorElement * book3D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ, FUNC onbooking=NOOP())
Definition: DQMStore.h:290
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1010
WaitList waiting_
Definition: DQMNet.h:362
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:83
tuple size
Write out results.
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1203
DataBlob rawdata
Definition: DQMNet.h:104
static const int ERROR
ImplPeer * local_
Definition: DQMNet.h:600
lat::IOSelector sel_
Definition: DQMNet.h:355