CMS 3D CMS Logo

DQMNet.cc
Go to the documentation of this file.
4 #include "classlib/iobase/InetServerSocket.h"
5 #include "classlib/iobase/LocalServerSocket.h"
6 #include "classlib/iobase/Filename.h"
7 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
8 #include "classlib/utils/TimeInfo.h"
9 #include "classlib/utils/StringList.h"
10 #include "classlib/utils/StringFormat.h"
11 #include "classlib/utils/StringOps.h"
12 #include "classlib/utils/SystemError.h"
13 #include "classlib/utils/Regexp.h"
14 #include <unistd.h>
15 #include <fcntl.h>
16 #include <sys/wait.h>
17 #include <cstdio>
18 #include <cstdint>
19 #include <iostream>
20 #include <sstream>
21 #include <cassert>
22 #include <cfloat>
23 #include <cinttypes>
24 
25 #if __APPLE__
26 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
27 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
28 #else
29 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
30 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
31 #endif
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
34 
35 using namespace lat;
36 
37 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
38 
40 // Generate log prefix.
41 std::ostream &DQMNet::logme() {
42  Time now = Time::current();
43  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
44  << "]: ";
45 }
46 
47 // Append data into a bucket.
48 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
49  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
50 }
51 
52 // Discard a bucket chain.
54  while (b) {
55  Bucket *next = b->next;
56  delete b;
57  b = next;
58  }
59 }
60 
62 
65 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
66  if (reason)
67  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
68 
69  Socket *s = peer->socket;
70 
71  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
72  if (i->peer == peer)
73  waiting_.erase(i++);
74  else
75  ++i;
76 
77  if (ev)
78  ev->source = nullptr;
79 
80  discard(peer->sendq);
81  if (peer->automatic)
82  peer->automatic->peer = nullptr;
83 
84  sel_.detach(s);
85  s->close();
86  removePeer(peer, s);
87  delete s;
88 }
89 
91 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
92  // Issue request to peer.
93  Bucket **msg = &p->sendq;
94  while (*msg)
95  msg = &(*msg)->next;
96  *msg = new Bucket;
97  (*msg)->next = nullptr;
98 
99  uint32_t words[3];
100  words[0] = sizeof(words) + len;
101  words[1] = DQM_MSG_GET_OBJECT;
102  words[2] = len;
103  copydata(*msg, words, sizeof(words));
104  copydata(*msg, name, len);
105 }
106 
109 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
110  // FIXME: Should we automatically record which exact peer the waiter
111  // is expecting to deliver data so we know to release the waiter if
112  // the other peer vanishes? The current implementation stands a
113  // chance for the waiter to wait indefinitely -- although we do
114  // force terminate the wait after a while.
115  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
116  WaitObject wo = {Time::current(), name, info, p};
117  waiting_.push_back(wo);
118  p->waiting++;
119 }
120 
121 // Once an object has been updated, this is invoked for all waiting
122 // peers. Send the object back to the peer in suitable form.
123 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
124  Bucket **msg = &i->peer->sendq;
125  while (*msg)
126  msg = &(*msg)->next;
127  *msg = new Bucket;
128  (*msg)->next = nullptr;
129 
130  releaseFromWait(*msg, *i, o);
131 
132  assert(i->peer->waiting > 0);
133  i->peer->waiting--;
134  waiting_.erase(i);
135 }
136 
137 // Release everyone waiting for the object @a o.
139  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
140  if (i->name == name)
141  releaseFromWait(i++, o);
142  else
143  ++i;
144 }
145 
150  char buf[64];
151  std::ostringstream qrs;
152  QReports::const_iterator qi, qe;
153  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
154  int pos = 0;
155  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
156  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
157  << '\0';
158  }
159  into = qrs.str();
160 }
161 
164 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
165  const char *qdata = from;
166 
167  // Count how many qresults there are.
168  size_t nqv = 0;
169  while (*qdata) {
170  ++nqv;
171  while (*qdata)
172  ++qdata;
173  ++qdata;
174  while (*qdata)
175  ++qdata;
176  ++qdata;
177  while (*qdata)
178  ++qdata;
179  ++qdata;
180  while (*qdata)
181  ++qdata;
182  ++qdata;
183  while (*qdata)
184  ++qdata;
185  ++qdata;
186  }
187 
188  // Now extract the qreports.
189  qdata = from;
190  qr.reserve(nqv);
191  while (*qdata) {
192  qr.emplace_back();
193  DQMNet::QValue &qv = qr.back();
194 
195  qv.code = atoi(qdata);
196  while (*qdata)
197  ++qdata;
198  switch (qv.code) {
200  break;
203  break;
204  case dqm::qstatus::ERROR:
206  break;
207  default:
209  break;
210  }
211 
212  qv.qtresult = atof(++qdata);
213  while (*qdata)
214  ++qdata;
215 
216  qv.qtname = ++qdata;
217  while (*qdata)
218  ++qdata;
219 
220  qv.algorithm = ++qdata;
221  while (*qdata)
222  ++qdata;
223 
224  qv.message = ++qdata;
225  while (*qdata)
226  ++qdata;
227  ++qdata;
228  }
229 }
230 
231 #if 0
232 // Deserialise a ROOT object from a buffer at the current position.
233 static TObject *
234 extractNextObject(TBufferFile &buf)
235 {
236  if (buf.Length() == buf.BufferSize())
237  return 0;
238 
239  buf.InitMap();
240  Int_t pos = buf.Length();
241  TClass *c = buf.ReadClass();
242  buf.SetBufferOffset(pos);
243  buf.ResetMap();
244  return c ? buf.ReadObject(c) : 0;
245 }
246 
247 // Reconstruct an object from the raw data.
248 bool
249 DQMNet::reconstructObject(Object &o)
250 {
251  TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
252  buf.Reset();
253 
254  // Extract the main object.
255  if (! (o.object = extractNextObject(buf)))
256  return false;
257 
258  // Extract the reference object.
259  o.reference = extractNextObject(buf);
260 
261  // Extract quality reports.
262  unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
263  return true;
264 }
265 #endif
266 
267 #if 0
268 bool
269 DQMNet::reinstateObject(DQMStore *store, Object &o)
270 {
271  if (! reconstructObject (o))
272  return false;
273 
274  // Reconstruct the main object
275  MonitorElement *obj = 0;
276  store->setCurrentFolder(*o.dirname);
277  switch (o.flags & DQM_PROP_TYPE_MASK)
278  {
279  case DQM_PROP_TYPE_INT:
280  obj = store->bookInt(o.objname);
281  obj->Fill(atoll(o.scalar.c_str()));
282  break;
283 
284  case DQM_PROP_TYPE_REAL:
285  obj = store->bookFloat(name);
286  obj->Fill(atof(o.scalar.c_str()));
287  break;
288 
289  case DQM_PROP_TYPE_STRING:
290  obj = store->bookString(name, o.scalar);
291  break;
292 
293  case DQM_PROP_TYPE_TH1F:
294  obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
295  break;
296 
297  case DQM_PROP_TYPE_TH1S:
298  obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
299  break;
300 
301  case DQM_PROP_TYPE_TH1D:
302  obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
303  break;
304 
305  case DQM_PROP_TYPE_TH2F:
306  obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
307  break;
308 
309  case DQM_PROP_TYPE_TH2S:
310  obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
311  break;
312 
313  case DQM_PROP_TYPE_TH2D:
314  obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
315  break;
316 
317  case DQM_PROP_TYPE_TH3F:
318  obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
319  break;
320 
321  case DQM_PROP_TYPE_TH3S:
322  obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
323  break;
324 
325  case DQM_PROP_TYPE_TH3D:
326  obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
327  break;
328 
329  case DQM_PROP_TYPE_PROF:
330  obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
331  break;
332 
333  case DQM_PROP_TYPE_PROF2D:
334  obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
335  break;
336 
337  default:
338  logme()
339  << "ERROR: unexpected monitor element of type "
340  << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
341  << *o.dirname << '/' << o.objname << "'\n";
342  return false;
343  }
344 
345  // Reconstruct tag and qreports.
346  if (obj)
347  {
348  obj->data_.tag = o.tag;
349  obj->data_.qreports = o.qreports;
350  }
351 
352  // Inidicate success.
353  return true;
354 }
355 #endif
356 
358 // Check if the network layer should stop.
359 bool DQMNet::shouldStop() { return shutdown_; }
360 
361 // Once an object has been updated, this is invoked for all waiting
362 // peers. Send the requested object to the waiting peer.
364  if (o)
365  sendObjectToPeer(msg, *o, true);
366  else {
367  uint32_t words[3];
368  words[0] = sizeof(words) + w.name.size();
369  words[1] = DQM_REPLY_NONE;
370  words[2] = w.name.size();
371 
372  msg->data.reserve(msg->data.size() + words[0]);
373  copydata(msg, &words[0], sizeof(words));
374  copydata(msg, &w.name[0], w.name.size());
375  }
376 }
377 
378 // Send an object to a peer. If not @a data, only sends a summary
379 // without the object data, except the data is always sent for scalar
380 // objects.
382  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
383  DataBlob objdata;
384 
385  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
386  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
387  else if (data)
388  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
389 
390  uint32_t words[9];
391  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
392  uint32_t datalen = objdata.size();
393  uint32_t qlen = o.qdata.size();
394 
395  if (o.dirname->empty())
396  --namelen;
397 
398  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
399  words[1] = DQM_REPLY_OBJECT;
400  words[2] = flags;
401  words[3] = (o.version >> 0) & 0xffffffff;
402  words[4] = (o.version >> 32) & 0xffffffff;
403  words[5] = o.tag;
404  words[6] = namelen;
405  words[7] = datalen;
406  words[8] = qlen;
407 
408  msg->data.reserve(msg->data.size() + words[0]);
409  copydata(msg, &words[0], 9 * sizeof(uint32_t));
410  if (namelen) {
411  copydata(msg, &(*o.dirname)[0], o.dirname->size());
412  if (!o.dirname->empty())
413  copydata(msg, "/", 1);
414  copydata(msg, &o.objname[0], o.objname.size());
415  }
416  if (datalen)
417  copydata(msg, &objdata[0], datalen);
418  if (qlen)
419  copydata(msg, &o.qdata[0], qlen);
420 }
421 
423 // Handle peer messages.
424 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
425  // Decode and process this message.
426  uint32_t type;
427  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
428  switch (type) {
429  case DQM_MSG_UPDATE_ME: {
430  if (len != 2 * sizeof(uint32_t)) {
431  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
432  return false;
433  }
434 
435  if (debug_)
436  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
437 
438  p->update = true;
439  }
440  return true;
441 
442  case DQM_MSG_LIST_OBJECTS: {
443  if (debug_)
444  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
445 
446  // Send over current status: list of known objects.
447  sendObjectListToPeer(msg, true, false);
448  }
449  return true;
450 
451  case DQM_MSG_GET_OBJECT: {
452  if (debug_)
453  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
454 
455  if (len < 3 * sizeof(uint32_t)) {
456  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
457  return false;
458  }
459 
460  uint32_t namelen;
461  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
462  if (len != 3 * sizeof(uint32_t) + namelen) {
463  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
464  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
465  return false;
466  }
467 
468  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
469  Peer *owner = nullptr;
470  Object *o = findObject(nullptr, name, &owner);
471  if (o) {
472  o->lastreq = Time::current().ns();
473  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
474  (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
475  waitForData(p, name, "", owner);
476  else
477  sendObjectToPeer(msg, *o, true);
478  } else {
479  uint32_t words[3];
480  words[0] = sizeof(words) + name.size();
481  words[1] = DQM_REPLY_NONE;
482  words[2] = name.size();
483 
484  msg->data.reserve(msg->data.size() + words[0]);
485  copydata(msg, &words[0], sizeof(words));
486  copydata(msg, &name[0], name.size());
487  }
488  }
489  return true;
490 
491  case DQM_REPLY_LIST_BEGIN: {
492  if (len != 4 * sizeof(uint32_t)) {
493  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
494  return false;
495  }
496 
497  // Get the update status: whether this is a full update.
498  uint32_t flags;
499  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
500 
501  if (debug_)
502  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
503  << p->peeraddr << ", size " << len << std::endl;
504 
505  // If we are about to receive a full list of objects, flag all
506  // objects as possibly dead. Subsequent object notifications
507  // will undo this for the live objects. We cannot delete
508  // objects quite yet, as we may get inquiry from another client
509  // while we are processing the incoming list, so we keep the
510  // objects tentatively alive as long as we've not seen the end.
511  if (flags)
512  markObjectsDead(p);
513  }
514  return true;
515 
516  case DQM_REPLY_LIST_END: {
517  if (len != 4 * sizeof(uint32_t)) {
518  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
519  return false;
520  }
521 
522  // Get the update status: whether this is a full update.
523  uint32_t flags;
524  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
525 
526  // If we received a full list of objects, now purge all dead
527  // objects. We need to do this in two stages in case we receive
528  // updates in many parts, and end up sending updates to others in
529  // between; this avoids us lying live objects are dead.
530  if (flags)
531  purgeDeadObjects(p);
532 
533  if (debug_)
534  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
535  << ", size " << len << std::endl;
536 
537  // Indicate we have received another update from this peer.
538  // Also indicate we should flush to our clients.
539  flush_ = true;
540  p->updates++;
541  }
542  return true;
543 
544  case DQM_REPLY_OBJECT: {
545  uint32_t words[9];
546  if (len < sizeof(words)) {
547  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
548  return false;
549  }
550 
551  memcpy(&words[0], data, sizeof(words));
552  uint32_t &namelen = words[6];
553  uint32_t &datalen = words[7];
554  uint32_t &qlen = words[8];
555 
556  if (len != sizeof(words) + namelen + datalen + qlen) {
557  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
558  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
559  << std::endl;
560  return false;
561  }
562 
563  unsigned char *namedata = data + sizeof(words);
564  unsigned char *objdata = namedata + namelen;
565  unsigned char *qdata = objdata + datalen;
566  unsigned char *enddata = qdata + qlen;
567  std::string name((char *)namedata, namelen);
568  assert(enddata == data + len);
569 
570  if (debug_)
571  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
572  << std::endl;
573 
574  // Mark the peer as a known object source.
575  p->source = true;
576 
577  // Initialise or update an object entry.
578  Object *o = findObject(p, name);
579  if (!o)
580  o = makeObject(p, name);
581 
582  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
583  o->tag = words[5];
584  o->version = ((uint64_t)words[4] << 32 | words[3]);
585  o->scalar.clear();
586  o->qdata.clear();
587  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
588  o->rawdata.clear();
589  o->scalar.insert(o->scalar.end(), objdata, qdata);
590  } else if (datalen) {
591  o->rawdata.clear();
592  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
593  } else if (!o->rawdata.empty())
594  o->flags |= DQM_PROP_STALE;
595  o->qdata.insert(o->qdata.end(), qdata, enddata);
596 
597  // If we had an object for this one already and this is a list
598  // update without data, issue an immediate data get request.
599  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
600  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
601 
602  // If we have the object data, release from wait.
603  if (datalen)
604  releaseWaiters(name, o);
605  }
606  return true;
607 
608  case DQM_REPLY_NONE: {
609  uint32_t words[3];
610  if (len < sizeof(words)) {
611  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
612  return false;
613  }
614 
615  memcpy(&words[0], data, sizeof(words));
616  uint32_t &namelen = words[2];
617 
618  if (len != sizeof(words) + namelen) {
619  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
620  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
621  return false;
622  }
623 
624  unsigned char *namedata = data + sizeof(words);
625  std::string name((char *)namedata, namelen);
626 
627  if (debug_)
628  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
629  << std::endl;
630 
631  // Mark the peer as a known object source.
632  p->source = true;
633 
634  // If this was a known object, kill it.
635  if (Object *o = findObject(p, name)) {
636  o->flags |= DQM_PROP_DEAD;
637  purgeDeadObjects(p);
638  }
639 
640  // If someone was waiting for this, let them go.
641  releaseWaiters(name, nullptr);
642  }
643  return true;
644 
645  default:
646  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
647  << std::endl;
648  return false;
649  }
650 }
651 
654 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
655  lock();
656  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
657 
658  // If there is a problem with the peer socket, discard the peer
659  // and tell the selector to stop prcessing events for it. If
660  // this is a server connection, we will eventually recreate
661  // everything if/when the data server comes back.
662  if (ev->events & IOUrgent) {
663  if (p->automatic) {
664  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
665  << " lost (will attempt to reconnect in 15 seconds)\n";
666  losePeer(nullptr, p, ev);
667  } else
668  losePeer("WARNING: lost peer connection ", p, ev);
669 
670  unlock();
671  return true;
672  }
673 
674  // If we can write to the peer socket, pump whatever we can into it.
675  if (ev->events & IOWrite) {
676  while (Bucket *b = p->sendq) {
677  IOSize len = b->data.size() - p->sendpos;
678  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
679  IOSize done;
680 
681  try {
682  done = (len ? ev->source->write(data, len) : 0);
683  if (debug_ && len)
684  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
685  } catch (Error &e) {
686  losePeer("WARNING: unable to write to peer ", p, ev, &e);
687  unlock();
688  return true;
689  }
690 
691  p->sendpos += done;
692  if (p->sendpos == b->data.size()) {
693  Bucket *old = p->sendq;
694  p->sendq = old->next;
695  p->sendpos = 0;
696  old->next = nullptr;
697  discard(old);
698  }
699 
700  if (!done && len)
701  // Cannot write any more.
702  break;
703  }
704  }
705 
706  // If there is data to be read from the peer, first receive what we
707  // can get out the socket, the process all complete requests.
708  if (ev->events & IORead) {
709  // First build up the incoming buffer of data in the socket.
710  // Remember the last size returned by the socket; we need
711  // it to determine if the remote end closed the connection.
712  IOSize sz;
713  try {
714  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
715  do
716  if ((sz = ev->source->read(&buf[0], buf.size()))) {
717  if (debug_)
718  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
719  DataBlob &data = p->incoming;
720  if (data.capacity() < data.size() + sz)
721  data.reserve(data.size() + SOCKET_READ_GROWTH);
722  data.insert(data.end(), &buf[0], &buf[0] + sz);
723  }
724  while (sz == sizeof(buf));
725  } catch (Error &e) {
726  auto *next = dynamic_cast<SystemError *>(e.next());
727  if (next && next->portable() == SysErr::ErrTryAgain)
728  sz = 1; // Ignore it, and fake no end of data.
729  else {
730  // Houston we have a problem.
731  losePeer("WARNING: failed to read from peer ", p, ev, &e);
732  unlock();
733  return true;
734  }
735  }
736 
737  // Process fully received messages as long as we can.
738  size_t consumed = 0;
739  DataBlob &data = p->incoming;
740  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
741  uint32_t msglen;
742  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
743 
744  if (msglen >= MESSAGE_SIZE_LIMIT) {
745  losePeer("WARNING: excessively large message from ", p, ev);
746  unlock();
747  return true;
748  }
749 
750  if (data.size() - consumed >= msglen) {
751  bool valid = true;
752  if (msglen < 2 * sizeof(uint32_t)) {
753  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
754  valid = false;
755  } else {
756  // Decode and process this message.
757  Bucket msg;
758  msg.next = nullptr;
759  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
760 
761  // If we created a response, chain it to the write queue.
762  if (!msg.data.empty()) {
763  Bucket **prev = &p->sendq;
764  while (*prev)
765  prev = &(*prev)->next;
766 
767  *prev = new Bucket;
768  (*prev)->next = nullptr;
769  (*prev)->data.swap(msg.data);
770  }
771  }
772 
773  if (!valid) {
774  losePeer("WARNING: data stream error with ", p, ev);
775  unlock();
776  return true;
777  }
778 
779  consumed += msglen;
780  } else
781  break;
782  }
783 
784  data.erase(data.begin(), data.begin() + consumed);
785 
786  // If the client has closed the connection, shut down our end. If
787  // we have something to send back still, leave the write direction
788  // open. Otherwise close the shop for this client.
789  if (sz == 0)
790  sel_.setMask(p->socket, p->mask &= ~IORead);
791  }
792 
793  // Yes, please keep processing events for this socket.
794  unlock();
795  return false;
796 }
797 
802 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
803  // Recover the server socket.
804  assert(ev->source == server_);
805 
806  // Accept the connection.
807  Socket *s = server_->accept();
808  assert(s);
809  assert(!s->isBlocking());
810 
811  // Record it to our list of peers.
812  lock();
813  Peer *p = createPeer(s);
814  std::string localaddr;
815  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
816  InetAddress peeraddr = inet->peername();
817  InetAddress myaddr = inet->sockname();
818  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
819  localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
820  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
821  p->peeraddr = local->peername().path();
822  localaddr = local->sockname().path();
823  } else
824  assert(false);
825 
826  p->mask = IORead | IOUrgent;
827  p->socket = s;
828 
829  // Report the new connection.
830  if (debug_)
831  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
832 
833  // Attach it to the listener.
834  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
835  unlock();
836 
837  // We are never done.
838  return false;
839 }
840 
847 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
848  // Discard the data in the pipe, we care only about the wakeup.
849  try {
850  IOSize sz;
851  unsigned char buf[1024];
852  while ((sz = ev->source->read(buf, sizeof(buf))))
853  ;
854  } catch (Error &e) {
855  auto *next = dynamic_cast<SystemError *>(e.next());
856  if (next && next->portable() == SysErr::ErrTryAgain)
857  ; // Ignore it
858  else
859  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
860  }
861 
862  // Tell the main event pump to send an update in a little while.
863  flush_ = true;
864 
865  // We are never done, always keep going.
866  return false;
867 }
868 
872  if (!p->socket)
873  return;
874 
875  // Listen to writes iff we have data to send.
876  unsigned oldmask = p->mask;
877  if (!p->sendq && (p->mask & IOWrite))
878  sel_.setMask(p->socket, p->mask &= ~IOWrite);
879 
880  if (p->sendq && !(p->mask & IOWrite))
881  sel_.setMask(p->socket, p->mask |= IOWrite);
882 
883  if (debug_ && oldmask != p->mask)
884  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
885 
886  // If we have nothing more to send and are no longer listening
887  // for reads, close up the shop for this peer.
888  if (p->mask == IOUrgent && !p->waiting) {
889  assert(!p->sendq);
890  if (debug_)
891  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
892  losePeer(nullptr, p, nullptr);
893  }
894 }
895 
897 DQMNet::DQMNet(const std::string &appname /* = "" */)
898  : debug_(false),
899  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
900  pid_(getpid()),
901  server_(nullptr),
902  version_(Time::current()),
903  communicate_((pthread_t)-1),
904  shutdown_(0),
905  delay_(1000),
906  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
907  waitMax_(0, 0, 0, 5 /* seconds */, 0),
908  flush_(false) {
909  // Create a pipe for the local DQM to tell the communicator
910  // thread that local DQM data has changed and that the peers
911  // should be notified.
912  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
913  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
914 
915  // Initialise the upstream and downstream to empty.
916  upstream_.peer = downstream_.peer = nullptr;
920 }
921 
923  // FIXME
924 }
925 
928 void DQMNet::debug(bool doit) { debug_ = doit; }
929 
932 void DQMNet::delay(int delay) { delay_ = delay; }
933 
939 
944  if (server_) {
945  logme() << "ERROR: DQM server was already started.\n";
946  return;
947  }
948 
949  try {
950  InetAddress addr("0.0.0.0", port);
951  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
952  s->bind(addr);
953  s->listen(10);
954  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
955  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
956  s->setBlocking(false);
957  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
958  } catch (Error &e) {
959  // FIXME: Do we need to do this when we throw an exception anyway?
960  // FIXME: Abort instead?
961  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
962 
963  raiseDQMError("DQMNet::startLocalServer",
964  "Failed to start server at port"
965  " %d: %s",
966  port,
967  e.explain().c_str());
968  }
969 
970  logme() << "INFO: DQM server started at port " << port << std::endl;
971 }
972 
976 void DQMNet::startLocalServer(const char *path) {
977  if (server_) {
978  logme() << "ERROR: DQM server was already started.\n";
979  return;
980  }
981 
982  try {
983  server_ = new LocalServerSocket(path, 10);
984  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
985  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
986  server_->setBlocking(false);
987  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
988  } catch (Error &e) {
989  // FIXME: Do we need to do this when we throw an exception anyway?
990  // FIXME: Abort instead?
991  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
992 
993  raiseDQMError("DQMNet::startLocalServer",
994  "Failed to start server at path"
995  " %s: %s",
996  path,
997  e.explain().c_str());
998  }
999 
1000  logme() << "INFO: DQM server started at path " << path << std::endl;
1001 }
1002 
1007  if (!downstream_.host.empty()) {
1008  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1009  << std::endl;
1010  return;
1011  }
1012 
1013  downstream_.update = true;
1014  downstream_.host = host;
1015  downstream_.port = port;
1016 }
1017 
1022  if (!upstream_.host.empty()) {
1023  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1024  << std::endl;
1025  return;
1026  }
1027 
1028  upstream_.update = false;
1029  upstream_.host = host;
1030  upstream_.port = port;
1031 }
1032 
1035  shutdown_ = 1;
1036  if (communicate_ != (pthread_t)-1)
1037  pthread_join(communicate_, nullptr);
1038 }
1039 
1045 static void *communicate(void *obj) {
1046  sigset_t sigs;
1047  sigfillset(&sigs);
1048  pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1049  ((DQMNet *)obj)->run();
1050  return nullptr;
1051 }
1052 
1055  if (communicate_ != (pthread_t)-1)
1056  pthread_mutex_lock(&lock_);
1057 }
1058 
1061  if (communicate_ != (pthread_t)-1)
1062  pthread_mutex_unlock(&lock_);
1063 }
1064 
1069  if (communicate_ != (pthread_t)-1) {
1070  logme() << "ERROR: DQM networking thread has already been started\n";
1071  return;
1072  }
1073 
1074  pthread_mutex_init(&lock_, nullptr);
1075  pthread_create(&communicate_, nullptr, &communicate, this);
1076 }
1077 
1079 void DQMNet::run() {
1080  Time now;
1081  Time nextFlush = 0;
1082  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1083 
1084  // Perform I/O. Every once in a while flush updates to peers.
1085  while (!shouldStop()) {
1086  for (auto ap : automatic) {
1087  // If we need a server connection and don't have one yet,
1088  // initiate asynchronous connection creation. Swallow errors
1089  // in case the server won't talk to us.
1090  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1091  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1092  InetSocket *s = nullptr;
1093  try {
1094  InetAddress addr(ap->host.c_str(), ap->port);
1095  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1096  s->setBlocking(false);
1097  s->connect(addr);
1098  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1099  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1100  } catch (Error &e) {
1101  auto *sys = dynamic_cast<SystemError *>(e.next());
1102  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1103  // "In progress" just means the connection is in progress.
1104  // The connection is ready when the socket is writeable.
1105  // Anything else is a real problem.
1106  if (s)
1107  s->abort();
1108  delete s;
1109  s = nullptr;
1110  }
1111  }
1112 
1113  // Set up with the selector if we were successful. If this is
1114  // the upstream collector, queue a request for updates.
1115  if (s) {
1116  Peer *p = createPeer(s);
1117  ap->peer = p;
1118 
1119  InetAddress peeraddr = ((InetSocket *)s)->peername();
1120  InetAddress myaddr = ((InetSocket *)s)->sockname();
1121  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1122  p->mask = IORead | IOWrite | IOUrgent;
1123  p->update = ap->update;
1124  p->automatic = ap;
1125  p->socket = s;
1126  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1127  if (ap == &upstream_) {
1128  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1129  p->sendq = new Bucket;
1130  p->sendq->next = nullptr;
1131  copydata(p->sendq, words, sizeof(words));
1132  }
1133 
1134  // Report the new connection.
1135  if (debug_)
1136  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1137  << std::endl;
1138  }
1139  }
1140  }
1141 
1142  // Pump events for a while.
1143  sel_.dispatch(delay_);
1144  now = Time::current();
1145  lock();
1146 
1147  // Check if flush is required. Flush only if one is needed.
1148  // Always sends the full object list, but only rarely.
1149  if (flush_ && now > nextFlush) {
1150  flush_ = false;
1151  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1152  sendObjectListToPeers(true);
1153  }
1154 
1155  // Update the data server and peer selection masks. If we
1156  // have no more data to send and listening for writes, remove
1157  // the write mask. If we have something to write and aren't
1158  // listening for writes, start listening so we can send off
1159  // the data.
1160  updatePeerMasks();
1161 
1162  // Release peers that have been waiting for data for too long.
1163  Time waitold = now - waitMax_;
1164  Time waitstale = now - waitStale_;
1165  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1166  Object *o = findObject(nullptr, i->name);
1167 
1168  // If we have (stale) object data, wait only up to stale limit.
1169  // Otherwise if we have no data at all, wait up to the max limit.
1170  if (i->time < waitold) {
1171  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1172  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1173  releaseFromWait(i++, o);
1174  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1175  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1176  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1177  releaseFromWait(i++, o);
1178  }
1179 
1180  // Keep it for now.
1181  else
1182  ++i;
1183  }
1184 
1185  unlock();
1186  }
1187 }
1188 
1189 // Tell the network cache that there have been local changes that
1190 // should be advertised to the downstream listeners.
1192  char byte = 0;
1193  wakeup_.sink()->write(&byte, 1);
1194 }
1195 
1199 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */) : DQMImplNet<DQMNet::Object>(appname) {
1200  local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1201 }
1202 
1204 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.resize(size); }
1205 
1209  o.dirname = &*local_->dirs.insert(*o.dirname).first;
1210  std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1211  if (!info.second) {
1212  // Somewhat hackish. Sets are supposedly immutable, but we
1213  // need to change the non-key parts of the object. Erasing
1214  // and re-inserting would produce too much memory churn.
1215  auto &old = const_cast<Object &>(*info.first);
1216  std::swap(old.flags, o.flags);
1217  std::swap(old.tag, o.tag);
1218  std::swap(old.version, o.version);
1219  std::swap(old.qreports, o.qreports);
1220  std::swap(old.rawdata, o.rawdata);
1221  std::swap(old.scalar, o.scalar);
1222  std::swap(old.qdata, o.qdata);
1223  }
1224 }
1225 
1229 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1230  size_t removed = 0;
1231  std::string path;
1232  ObjectMap::iterator i, e;
1233  for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1234  path.clear();
1235  path.reserve(i->dirname->size() + i->objname.size() + 2);
1236  path += *i->dirname;
1237  if (!path.empty())
1238  path += '/';
1239  path += i->objname;
1240 
1241  if (!known.count(path))
1242  ++removed, local_->objs.erase(i++);
1243  else
1244  ++i;
1245  }
1246 
1247  return removed > 0;
1248 }
size
Write out results.
type
Definition: HCALResponse.h:21
edm::ErrorSummaryEntry Error
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:164
AutoPeer downstream_
Definition: DQMNet.h:366
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:897
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
host
Definition: query.py:115
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:65
def logme(msg, args)
DataBlob incoming
Definition: DQMNet.h:130
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:847
QReports qreports
Definition: DQMNet.h:103
pthread_t communicate_
Definition: DQMNet.h:369
std::string algorithm
Definition: DQMNet.h:90
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:96
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:135
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1054
DQMNet::CoreObject data_
std::ostream & logme()
Definition: DQMNet.cc:41
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:162
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:53
Definition: DQMNet.h:23
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:372
pthread_mutex_t lock_
Definition: DQMNet.h:345
void delay(int delay)
Definition: DQMNet.cc:932
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:94
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:138
#define nullptr
std::string peeraddr
Definition: DQMNet.h:128
static const int WARNING
lat::Socket * socket
Definition: DQMNet.h:129
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:938
bool ev
uint32_t tag
Definition: DQMNet.h:95
const std::string * dirname
Definition: DQMNet.h:101
AutoPeer * automatic
Definition: DQMNet.h:140
AutoPeer upstream_
Definition: DQMNet.h:365
static void * communicate(void *obj)
Definition: DQMNet.cc:1045
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1034
Definition: DQMStore.h:35
A arg
Definition: Factorize.h:36
std::string qdata
Definition: DQMNet.h:111
port
Definition: query.py:116
void debug(bool doit)
Definition: DQMNet.cc:928
dqm::dqmstoreimpl::DQMStore DQMStore
Definition: DQMStore.h:706
void Fill(long long x)
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:109
virtual bool shouldStop()
Definition: DQMNet.cc:359
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:91
uint64_t lastreq
Definition: DQMNet.h:108
std::string name
Definition: DQMNet.h:121
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:424
DataBlob data
Definition: DQMNet.h:116
void start()
Definition: DQMNet.cc:1068
Peer * peer
Definition: DQMNet.h:144
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:654
void sendLocalChanges()
Definition: DQMNet.cc:1191
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:363
sig_atomic_t shutdown_
Definition: DQMNet.h:370
Peer * createPeer(lat::Socket *s) override
Definition: DQMNet.h:491
std::string objname
Definition: DQMNet.h:102
bool update
Definition: DQMNet.h:136
std::string qtname
Definition: DQMNet.h:89
std::string scalar
Definition: DQMNet.h:110
lat::Time next
Definition: DQMNet.h:145
unsigned mask
Definition: DQMNet.h:134
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:381
Definition: IOTypes.h:26
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1229
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:802
size_t updates
Definition: DQMNet.h:138
std::string host
Definition: DQMNet.h:146
double b
Definition: hdecay.h:118
void run()
Definition: DQMNet.cc:1079
void startLocalServer(int port)
Definition: DQMNet.cc:943
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:149
tuple msg
Definition: mps_check.py:285
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:60
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>")
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:65
lat::Pipe wakeup_
Definition: DQMNet.h:362
bool debug_
Definition: DQMNet.h:344
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
size_t waiting
Definition: DQMNet.h:139
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1060
virtual ~DQMNet()
Definition: DQMNet.cc:922
std::string message
Definition: DQMNet.h:88
size_t sendpos
Definition: DQMNet.h:132
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1021
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const int STATUS_OK
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:66
void updateMask(Peer *p)
Definition: DQMNet.cc:871
bool flush_
Definition: DQMNet.h:375
lat::Socket * server_
Definition: DQMNet.h:361
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1208
lat::TimeSpan waitMax_
Definition: DQMNet.h:374
Bucket * next
Definition: DQMNet.h:115
Bucket * sendq
Definition: DQMNet.h:131
std::vector< QValue > QReports
Definition: DQMNet.h:81
lat::TimeSpan waitStale_
Definition: DQMNet.h:373
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1204
float qtresult
Definition: DQMNet.h:87
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1006
WaitList waiting_
Definition: DQMNet.h:367
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1199
DataBlob rawdata
Definition: DQMNet.h:109
static const int ERROR
ImplPeer * local_
Definition: DQMNet.h:605
lat::IOSelector sel_
Definition: DQMNet.h:360
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:10