CMS 3D CMS Logo

DQMNet.cc
Go to the documentation of this file.
4 #include "classlib/iobase/InetServerSocket.h"
5 #include "classlib/iobase/LocalServerSocket.h"
6 #include "classlib/iobase/Filename.h"
7 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
8 #include "classlib/utils/TimeInfo.h"
9 #include "classlib/utils/StringList.h"
10 #include "classlib/utils/StringFormat.h"
11 #include "classlib/utils/StringOps.h"
12 #include "classlib/utils/SystemError.h"
13 #include "classlib/utils/Regexp.h"
14 #include <unistd.h>
15 #include <fcntl.h>
16 #include <sys/wait.h>
17 #include <stdio.h>
18 #include <stdint.h>
19 #include <iostream>
20 #include <sstream>
21 #include <cassert>
22 #include <cfloat>
23 #include <inttypes.h>
24 
25 #if __APPLE__
26 # define MESSAGE_SIZE_LIMIT (1*1024*1024)
27 # define SOCKET_BUF_SIZE (1*1024*1024)
28 #else
29 # define MESSAGE_SIZE_LIMIT (8*1024*1024)
30 # define SOCKET_BUF_SIZE (8*1024*1024)
31 #endif
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
34 
35 using namespace lat;
36 
37 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
38 
40 // Generate log prefix.
41 std::ostream &
43 {
44  Time now = Time::current();
45  return std::cout
46  << now.format(true, "%Y-%m-%d %H:%M:%S.")
47  << now.nanoformat(3, 3)
48  << " " << appname_ << "[" << pid_ << "]: ";
49 }
50 
51 // Append data into a bucket.
52 void
53 DQMNet::copydata(Bucket *b, const void *data, size_t len)
54 {
55  b->data.insert(b->data.end(),
56  (const unsigned char *)data,
57  (const unsigned char *)data + len);
58 }
59 
60 // Discard a bucket chain.
61 void
63 {
64  while (b)
65  {
66  Bucket *next = b->next;
67  delete b;
68  b = next;
69  }
70 }
71 
73 
76 void
77 DQMNet::losePeer(const char *reason,
78  Peer *peer,
79  IOSelectEvent *ev,
80  Error *err)
81 {
82  if (reason)
83  logme ()
84  << reason << peer->peeraddr
85  << (err ? "; error was: " + err->explain() : std::string(""))
86  << std::endl;
87 
88  Socket *s = peer->socket;
89 
90  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
91  if (i->peer == peer)
92  waiting_.erase(i++);
93  else
94  ++i;
95 
96  if (ev)
97  ev->source = 0;
98 
99  discard(peer->sendq);
100  if (peer->automatic)
101  peer->automatic->peer = 0;
102 
103  sel_.detach (s);
104  s->close();
105  removePeer(peer, s);
106  delete s;
107 }
108 
110 void
111 DQMNet::requestObjectData(Peer *p, const char *name, size_t len)
112 {
113  // Issue request to peer.
114  Bucket **msg = &p->sendq;
115  while (*msg)
116  msg = &(*msg)->next;
117  *msg = new Bucket;
118  (*msg)->next = 0;
119 
120  uint32_t words[3];
121  words[0] = sizeof(words) + len;
122  words[1] = DQM_MSG_GET_OBJECT;
123  words[2] = len;
124  copydata(*msg, words, sizeof(words));
125  copydata(*msg, name, len);
126 }
127 
130 void
132 {
133  // FIXME: Should we automatically record which exact peer the waiter
134  // is expecting to deliver data so we know to release the waiter if
135  // the other peer vanishes? The current implementation stands a
136  // chance for the waiter to wait indefinitely -- although we do
137  // force terminate the wait after a while.
138  requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
139  WaitObject wo = { Time::current(), name, info, p };
140  waiting_.push_back(wo);
141  p->waiting++;
142 }
143 
144 // Once an object has been updated, this is invoked for all waiting
145 // peers. Send the object back to the peer in suitable form.
146 void
147 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
148 {
149  Bucket **msg = &i->peer->sendq;
150  while (*msg)
151  msg = &(*msg)->next;
152  *msg = new Bucket;
153  (*msg)->next = 0;
154 
155  releaseFromWait(*msg, *i, o);
156 
157  assert(i->peer->waiting > 0);
158  i->peer->waiting--;
159  waiting_.erase(i);
160 }
161 
162 // Release everyone waiting for the object @a o.
163 void
165 {
166  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
167  if (i->name == name)
168  releaseFromWait(i++, o);
169  else
170  ++i;
171 }
172 
176 void
178 {
179  char buf[64];
180  std::ostringstream qrs;
181  QReports::const_iterator qi, qe;
182  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
183  {
184  int pos = 0;
185  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
186  qrs << buf << '\0'
187  << buf+pos << '\0'
188  << qi->qtname << '\0'
189  << qi->algorithm << '\0'
190  << qi->message << '\0'
191  << '\0';
192  }
193  into = qrs.str();
194 }
195 
198 void
199 DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
200 {
201  const char *qdata = from;
202 
203  // Count how many qresults there are.
204  size_t nqv = 0;
205  while (*qdata)
206  {
207  ++nqv;
208  while (*qdata) ++qdata;
209  ++qdata;
210  while (*qdata) ++qdata;
211  ++qdata;
212  while (*qdata) ++qdata;
213  ++qdata;
214  while (*qdata) ++qdata;
215  ++qdata;
216  while (*qdata) ++qdata;
217  ++qdata;
218  }
219 
220  // Now extract the qreports.
221  qdata = from;
222  qr.reserve(nqv);
223  while (*qdata)
224  {
225  qr.push_back(DQMNet::QValue());
226  DQMNet::QValue &qv = qr.back();
227 
228  qv.code = atoi(qdata);
229  while (*qdata) ++qdata;
230  switch (qv.code)
231  {
233  break;
236  break;
237  case dqm::qstatus::ERROR:
239  break;
240  default:
242  break;
243  }
244 
245  qv.qtresult = atof(++qdata);
246  while (*qdata) ++qdata;
247 
248  qv.qtname = ++qdata;
249  while (*qdata) ++qdata;
250 
251  qv.algorithm = ++qdata;
252  while (*qdata) ++qdata;
253 
254  qv.message = ++qdata;
255  while (*qdata) ++qdata;
256  ++qdata;
257  }
258 }
259 
260 #if 0
261 // Deserialise a ROOT object from a buffer at the current position.
262 static TObject *
263 extractNextObject(TBufferFile &buf)
264 {
265  if (buf.Length() == buf.BufferSize())
266  return 0;
267 
268  buf.InitMap();
269  Int_t pos = buf.Length();
270  TClass *c = buf.ReadClass();
271  buf.SetBufferOffset(pos);
272  buf.ResetMap();
273  return c ? buf.ReadObject(c) : 0;
274 }
275 
276 // Reconstruct an object from the raw data.
277 bool
278 DQMNet::reconstructObject(Object &o)
279 {
280  TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
281  buf.Reset();
282 
283  // Extract the main object.
284  if (! (o.object = extractNextObject(buf)))
285  return false;
286 
287  // Extract the reference object.
288  o.reference = extractNextObject(buf);
289 
290  // Extract quality reports.
291  unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
292  return true;
293 }
294 #endif
295 
296 #if 0
297 bool
298 DQMNet::reinstateObject(DQMStore *store, Object &o)
299 {
300  if (! reconstructObject (o))
301  return false;
302 
303  // Reconstruct the main object
304  MonitorElement *obj = 0;
305  store->setCurrentFolder(*o.dirname);
306  switch (o.flags & DQM_PROP_TYPE_MASK)
307  {
308  case DQM_PROP_TYPE_INT:
309  obj = store->bookInt(o.objname);
310  obj->Fill(atoll(o.scalar.c_str()));
311  break;
312 
313  case DQM_PROP_TYPE_REAL:
314  obj = store->bookFloat(name);
315  obj->Fill(atof(o.scalar.c_str()));
316  break;
317 
318  case DQM_PROP_TYPE_STRING:
319  obj = store->bookString(name, o.scalar);
320  break;
321 
322  case DQM_PROP_TYPE_TH1F:
323  obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
324  break;
325 
326  case DQM_PROP_TYPE_TH1S:
327  obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
328  break;
329 
330  case DQM_PROP_TYPE_TH1D:
331  obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
332  break;
333 
334  case DQM_PROP_TYPE_TH2F:
335  obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
336  break;
337 
338  case DQM_PROP_TYPE_TH2S:
339  obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
340  break;
341 
342  case DQM_PROP_TYPE_TH2D:
343  obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
344  break;
345 
346  case DQM_PROP_TYPE_TH3F:
347  obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
348  break;
349 
350  case DQM_PROP_TYPE_TH3S:
351  obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
352  break;
353 
354  case DQM_PROP_TYPE_TH3D:
355  obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
356  break;
357 
358  case DQM_PROP_TYPE_PROF:
359  obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
360  break;
361 
362  case DQM_PROP_TYPE_PROF2D:
363  obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
364  break;
365 
366  default:
367  logme()
368  << "ERROR: unexpected monitor element of type "
369  << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
370  << *o.dirname << '/' << o.objname << "'\n";
371  return false;
372  }
373 
374  // Reconstruct tag and qreports.
375  if (obj)
376  {
377  obj->data_.tag = o.tag;
378  obj->data_.qreports = o.qreports;
379  }
380 
381  // Inidicate success.
382  return true;
383 }
384 #endif
385 
387 // Check if the network layer should stop.
388 bool
390 {
391  return shutdown_;
392 }
393 
394 // Once an object has been updated, this is invoked for all waiting
395 // peers. Send the requested object to the waiting peer.
396 void
398 {
399  if (o)
400  sendObjectToPeer(msg, *o, true);
401  else
402  {
403  uint32_t words [3];
404  words[0] = sizeof(words) + w.name.size();
405  words[1] = DQM_REPLY_NONE;
406  words[2] = w.name.size();
407 
408  msg->data.reserve(msg->data.size() + words[0]);
409  copydata(msg, &words[0], sizeof(words));
410  copydata(msg, &w.name[0], w.name.size());
411  }
412 }
413 
414 // Send an object to a peer. If not @a data, only sends a summary
415 // without the object data, except the data is always sent for scalar
416 // objects.
417 void
419 {
420  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
421  DataBlob objdata;
422 
423  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
424  objdata.insert(objdata.end(),
425  &o.scalar[0],
426  &o.scalar[0] + o.scalar.size());
427  else if (data)
428  objdata.insert(objdata.end(),
429  &o.rawdata[0],
430  &o.rawdata[0] + o.rawdata.size());
431 
432  uint32_t words [9];
433  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
434  uint32_t datalen = objdata.size();
435  uint32_t qlen = o.qdata.size();
436 
437  if (o.dirname->empty())
438  --namelen;
439 
440  words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
441  words[1] = DQM_REPLY_OBJECT;
442  words[2] = flags;
443  words[3] = (o.version >> 0 ) & 0xffffffff;
444  words[4] = (o.version >> 32) & 0xffffffff;
445  words[5] = o.tag;
446  words[6] = namelen;
447  words[7] = datalen;
448  words[8] = qlen;
449 
450  msg->data.reserve(msg->data.size() + words[0]);
451  copydata(msg, &words[0], 9*sizeof(uint32_t));
452  if (namelen)
453  {
454  copydata(msg, &(*o.dirname)[0], o.dirname->size());
455  if (! o.dirname->empty())
456  copydata(msg, "/", 1);
457  copydata(msg, &o.objname[0], o.objname.size());
458  }
459  if (datalen)
460  copydata(msg, &objdata[0], datalen);
461  if (qlen)
462  copydata(msg, &o.qdata[0], qlen);
463 }
464 
466 // Handle peer messages.
467 bool
468 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
469 {
470  // Decode and process this message.
471  uint32_t type;
472  memcpy (&type, data + sizeof(uint32_t), sizeof (type));
473  switch (type)
474  {
475  case DQM_MSG_UPDATE_ME:
476  {
477  if (len != 2*sizeof(uint32_t))
478  {
479  logme()
480  << "ERROR: corrupt 'UPDATE_ME' message of length " << len
481  << " from peer " << p->peeraddr << std::endl;
482  return false;
483  }
484 
485  if (debug_)
486  logme()
487  << "DEBUG: received message 'UPDATE ME' from peer "
488  << p->peeraddr << ", size " << len << std::endl;
489 
490  p->update = true;
491  }
492  return true;
493 
494  case DQM_MSG_LIST_OBJECTS:
495  {
496  if (debug_)
497  logme()
498  << "DEBUG: received message 'LIST OBJECTS' from peer "
499  << p->peeraddr << ", size " << len << std::endl;
500 
501  // Send over current status: list of known objects.
502  sendObjectListToPeer(msg, true, false);
503  }
504  return true;
505 
506  case DQM_MSG_GET_OBJECT:
507  {
508  if (debug_)
509  logme()
510  << "DEBUG: received message 'GET OBJECT' from peer "
511  << p->peeraddr << ", size " << len << std::endl;
512 
513  if (len < 3*sizeof(uint32_t))
514  {
515  logme()
516  << "ERROR: corrupt 'GET IMAGE' message of length " << len
517  << " from peer " << p->peeraddr << std::endl;
518  return false;
519  }
520 
521  uint32_t namelen;
522  memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
523  if (len != 3*sizeof(uint32_t) + namelen)
524  {
525  logme()
526  << "ERROR: corrupt 'GET OBJECT' message of length " << len
527  << " from peer " << p->peeraddr
528  << ", expected length " << (3*sizeof(uint32_t))
529  << " + " << namelen << std::endl;
530  return false;
531  }
532 
533  std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
534  Peer *owner = 0;
535  Object *o = findObject(0, name, &owner);
536  if (o)
537  {
538  o->lastreq = Time::current().ns();
539  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
540  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
541  waitForData(p, name, "", owner);
542  else
543  sendObjectToPeer(msg, *o, true);
544  }
545  else
546  {
547  uint32_t words [3];
548  words[0] = sizeof(words) + name.size();
549  words[1] = DQM_REPLY_NONE;
550  words[2] = name.size();
551 
552  msg->data.reserve(msg->data.size() + words[0]);
553  copydata(msg, &words[0], sizeof(words));
554  copydata(msg, &name[0], name.size());
555  }
556  }
557  return true;
558 
559  case DQM_REPLY_LIST_BEGIN:
560  {
561  if (len != 4*sizeof(uint32_t))
562  {
563  logme()
564  << "ERROR: corrupt 'LIST BEGIN' message of length " << len
565  << " from peer " << p->peeraddr << std::endl;
566  return false;
567  }
568 
569  // Get the update status: whether this is a full update.
570  uint32_t flags;
571  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
572 
573  if (debug_)
574  logme()
575  << "DEBUG: received message 'LIST BEGIN "
576  << (flags ? "FULL" : "INCREMENTAL")
577  << "' from " << p->peeraddr
578  << ", size " << len << std::endl;
579 
580  // If we are about to receive a full list of objects, flag all
581  // objects as possibly dead. Subsequent object notifications
582  // will undo this for the live objects. We cannot delete
583  // objects quite yet, as we may get inquiry from another client
584  // while we are processing the incoming list, so we keep the
585  // objects tentatively alive as long as we've not seen the end.
586  if (flags)
587  markObjectsDead(p);
588  }
589  return true;
590 
591  case DQM_REPLY_LIST_END:
592  {
593  if (len != 4*sizeof(uint32_t))
594  {
595  logme()
596  << "ERROR: corrupt 'LIST END' message of length " << len
597  << " from peer " << p->peeraddr << std::endl;
598  return false;
599  }
600 
601  // Get the update status: whether this is a full update.
602  uint32_t flags;
603  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
604 
605  // If we received a full list of objects, now purge all dead
606  // objects. We need to do this in two stages in case we receive
607  // updates in many parts, and end up sending updates to others in
608  // between; this avoids us lying live objects are dead.
609  if (flags)
610  purgeDeadObjects(p);
611 
612  if (debug_)
613  logme()
614  << "DEBUG: received message 'LIST END "
615  << (flags ? "FULL" : "INCREMENTAL")
616  << "' from " << p->peeraddr
617  << ", size " << len << std::endl;
618 
619  // Indicate we have received another update from this peer.
620  // Also indicate we should flush to our clients.
621  flush_ = true;
622  p->updates++;
623  }
624  return true;
625 
626  case DQM_REPLY_OBJECT:
627  {
628  uint32_t words[9];
629  if (len < sizeof(words))
630  {
631  logme()
632  << "ERROR: corrupt 'OBJECT' message of length " << len
633  << " from peer " << p->peeraddr << std::endl;
634  return false;
635  }
636 
637  memcpy (&words[0], data, sizeof(words));
638  uint32_t &namelen = words[6];
639  uint32_t &datalen = words[7];
640  uint32_t &qlen = words[8];
641 
642  if (len != sizeof(words) + namelen + datalen + qlen)
643  {
644  logme()
645  << "ERROR: corrupt 'OBJECT' message of length " << len
646  << " from peer " << p->peeraddr
647  << ", expected length " << sizeof(words)
648  << " + " << namelen
649  << " + " << datalen
650  << " + " << qlen
651  << std::endl;
652  return false;
653  }
654 
655  unsigned char *namedata = data + sizeof(words);
656  unsigned char *objdata = namedata + namelen;
657  unsigned char *qdata = objdata + datalen;
658  unsigned char *enddata = qdata + qlen;
659  std::string name ((char *) namedata, namelen);
660  assert (enddata == data + len);
661 
662  if (debug_)
663  logme()
664  << "DEBUG: received message 'OBJECT " << name
665  << "' from " << p->peeraddr
666  << ", size " << len << std::endl;
667 
668  // Mark the peer as a known object source.
669  p->source = true;
670 
671  // Initialise or update an object entry.
672  Object *o = findObject(p, name);
673  if (! o)
674  o = makeObject(p, name);
675 
676  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
677  o->tag = words[5];
678  o->version = ((uint64_t) words[4] << 32 | words[3]);
679  o->scalar.clear();
680  o->qdata.clear();
681  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
682  {
683  o->rawdata.clear();
684  o->scalar.insert(o->scalar.end(), objdata, qdata);
685  }
686  else if (datalen)
687  {
688  o->rawdata.clear();
689  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
690  }
691  else if (! o->rawdata.empty())
692  o->flags |= DQM_PROP_STALE;
693  o->qdata.insert(o->qdata.end(), qdata, enddata);
694 
695  // If we had an object for this one already and this is a list
696  // update without data, issue an immediate data get request.
697  if (o->lastreq
698  && ! datalen
699  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
700  requestObjectData(p, (namelen ? &name[0] : 0), namelen);
701 
702  // If we have the object data, release from wait.
703  if (datalen)
704  releaseWaiters(name, o);
705  }
706  return true;
707 
708  case DQM_REPLY_NONE:
709  {
710  uint32_t words[3];
711  if (len < sizeof(words))
712  {
713  logme()
714  << "ERROR: corrupt 'NONE' message of length " << len
715  << " from peer " << p->peeraddr << std::endl;
716  return false;
717  }
718 
719  memcpy (&words[0], data, sizeof(words));
720  uint32_t &namelen = words[2];
721 
722  if (len != sizeof(words) + namelen)
723  {
724  logme()
725  << "ERROR: corrupt 'NONE' message of length " << len
726  << " from peer " << p->peeraddr
727  << ", expected length " << sizeof(words)
728  << " + " << namelen << std::endl;
729  return false;
730  }
731 
732  unsigned char *namedata = data + sizeof(words);
733  std::string name((char *) namedata, namelen);
734 
735  if (debug_)
736  logme()
737  << "DEBUG: received message 'NONE " << name
738  << "' from " << p->peeraddr
739  << ", size " << len << std::endl;
740 
741  // Mark the peer as a known object source.
742  p->source = true;
743 
744  // If this was a known object, kill it.
745  if (Object *o = findObject(p, name))
746  {
747  o->flags |= DQM_PROP_DEAD;
748  purgeDeadObjects(p);
749  }
750 
751  // If someone was waiting for this, let them go.
752  releaseWaiters(name, 0);
753  }
754  return true;
755 
756  default:
757  logme()
758  << "ERROR: unrecognised message of length " << len
759  << " and type " << type << " from peer " << p->peeraddr
760  << std::endl;
761  return false;
762  }
763 }
764 
767 bool
768 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
769 {
770  lock();
771  assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
772 
773  // If there is a problem with the peer socket, discard the peer
774  // and tell the selector to stop prcessing events for it. If
775  // this is a server connection, we will eventually recreate
776  // everything if/when the data server comes back.
777  if (ev->events & IOUrgent)
778  {
779  if (p->automatic)
780  {
781  logme()
782  << "WARNING: connection to the DQM server at " << p->peeraddr
783  << " lost (will attempt to reconnect in 15 seconds)\n";
784  losePeer(0, p, ev);
785  }
786  else
787  losePeer("WARNING: lost peer connection ", p, ev);
788 
789  unlock();
790  return true;
791  }
792 
793  // If we can write to the peer socket, pump whatever we can into it.
794  if (ev->events & IOWrite)
795  {
796  while (Bucket *b = p->sendq)
797  {
798  IOSize len = b->data.size() - p->sendpos;
799  const void *data = (len ? (const void *)&b->data[p->sendpos]
800  : (const void *)&data);
801  IOSize done;
802 
803  try
804  {
805  done = (len ? ev->source->write (data, len) : 0);
806  if (debug_ && len)
807  logme()
808  << "DEBUG: sent " << done << " bytes to peer "
809  << p->peeraddr << std::endl;
810  }
811  catch (Error &e)
812  {
813  losePeer("WARNING: unable to write to peer ", p, ev, &e);
814  unlock();
815  return true;
816  }
817 
818  p->sendpos += done;
819  if (p->sendpos == b->data.size())
820  {
821  Bucket *old = p->sendq;
822  p->sendq = old->next;
823  p->sendpos = 0;
824  old->next = 0;
825  discard(old);
826  }
827 
828  if (! done && len)
829  // Cannot write any more.
830  break;
831  }
832  }
833 
834  // If there is data to be read from the peer, first receive what we
835  // can get out the socket, the process all complete requests.
836  if (ev->events & IORead)
837  {
838  // First build up the incoming buffer of data in the socket.
839  // Remember the last size returned by the socket; we need
840  // it to determine if the remote end closed the connection.
841  IOSize sz;
842  try
843  {
844  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
845  do
846  if ((sz = ev->source->read(&buf[0], buf.size())))
847  {
848  if (debug_)
849  logme()
850  << "DEBUG: received " << sz << " bytes from peer "
851  << p->peeraddr << std::endl;
852  DataBlob &data = p->incoming;
853  if (data.capacity () < data.size () + sz)
854  data.reserve (data.size() + SOCKET_READ_GROWTH);
855  data.insert (data.end(), &buf[0], &buf[0] + sz);
856  }
857  while (sz == sizeof (buf));
858  }
859  catch (Error &e)
860  {
861  SystemError *next = dynamic_cast<SystemError *>(e.next());
862  if (next && next->portable() == SysErr::ErrTryAgain)
863  sz = 1; // Ignore it, and fake no end of data.
864  else
865  {
866  // Houston we have a problem.
867  losePeer("WARNING: failed to read from peer ", p, ev, &e);
868  unlock();
869  return true;
870  }
871  }
872 
873  // Process fully received messages as long as we can.
874  size_t consumed = 0;
875  DataBlob &data = p->incoming;
876  while (data.size()-consumed >= sizeof(uint32_t)
877  && p->waiting < MAX_PEER_WAITREQS)
878  {
879  uint32_t msglen;
880  memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
881 
882  if (msglen >= MESSAGE_SIZE_LIMIT)
883  {
884  losePeer("WARNING: excessively large message from ", p, ev);
885  unlock();
886  return true;
887  }
888 
889  if (data.size()-consumed >= msglen)
890  {
891  bool valid = true;
892  if (msglen < 2*sizeof(uint32_t))
893  {
894  logme()
895  << "ERROR: corrupt peer message of length " << msglen
896  << " from peer " << p->peeraddr << std::endl;
897  valid = false;
898  }
899  else
900  {
901  // Decode and process this message.
902  Bucket msg;
903  msg.next = 0;
904  valid = onMessage(&msg, p, &data[0]+consumed, msglen);
905 
906  // If we created a response, chain it to the write queue.
907  if (! msg.data.empty())
908  {
909  Bucket **prev = &p->sendq;
910  while (*prev)
911  prev = &(*prev)->next;
912 
913  *prev = new Bucket;
914  (*prev)->next = 0;
915  (*prev)->data.swap(msg.data);
916  }
917  }
918 
919  if (! valid)
920  {
921  losePeer("WARNING: data stream error with ", p, ev);
922  unlock();
923  return true;
924  }
925 
926  consumed += msglen;
927  }
928  else
929  break;
930  }
931 
932  data.erase(data.begin(), data.begin()+consumed);
933 
934  // If the client has closed the connection, shut down our end. If
935  // we have something to send back still, leave the write direction
936  // open. Otherwise close the shop for this client.
937  if (sz == 0)
938  sel_.setMask(p->socket, p->mask &= ~IORead);
939  }
940 
941  // Yes, please keep processing events for this socket.
942  unlock();
943  return false;
944 }
945 
950 bool
951 DQMNet::onPeerConnect(IOSelectEvent *ev)
952 {
953  // Recover the server socket.
954  assert (ev->source == server_);
955 
956  // Accept the connection.
957  Socket *s = server_->accept();
958  assert (s);
959  assert (! s->isBlocking());
960 
961  // Record it to our list of peers.
962  lock();
963  Peer *p = createPeer(s);
964  std::string localaddr;
965  if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
966  {
967  InetAddress peeraddr = inet->peername();
968  InetAddress myaddr = inet->sockname();
969  p->peeraddr = StringFormat("%1:%2")
970  .arg(peeraddr.hostname())
971  .arg(peeraddr.port()).value();
972  localaddr = StringFormat("%1:%2")
973  .arg(myaddr.hostname())
974  .arg(myaddr.port()).value();
975  }
976  else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
977  {
978  p->peeraddr = local->peername().path();
979  localaddr = local->sockname().path();
980  }
981  else
982  assert(false);
983 
984  p->mask = IORead|IOUrgent;
985  p->socket = s;
986 
987  // Report the new connection.
988  if (debug_)
989  logme()
990  << "INFO: new peer " << p->peeraddr << " is now connected to "
991  << localaddr << std::endl;
992 
993  // Attach it to the listener.
994  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
995  unlock();
996 
997  // We are never done.
998  return false;
999 }
1000 
1007 bool
1008 DQMNet::onLocalNotify(IOSelectEvent *ev)
1009 {
1010  // Discard the data in the pipe, we care only about the wakeup.
1011  try
1012  {
1013  IOSize sz;
1014  unsigned char buf [1024];
1015  while ((sz = ev->source->read(buf, sizeof(buf))))
1016  ;
1017  }
1018  catch (Error &e)
1019  {
1020  SystemError *next = dynamic_cast<SystemError *>(e.next());
1021  if (next && next->portable() == SysErr::ErrTryAgain)
1022  ; // Ignore it
1023  else
1024  logme()
1025  << "WARNING: error reading from notification pipe: "
1026  << e.explain() << std::endl;
1027  }
1028 
1029  // Tell the main event pump to send an update in a little while.
1030  flush_ = true;
1031 
1032  // We are never done, always keep going.
1033  return false;
1034 }
1035 
1038 void
1040 {
1041  if (! p->socket)
1042  return;
1043 
1044  // Listen to writes iff we have data to send.
1045  unsigned oldmask = p->mask;
1046  if (! p->sendq && (p->mask & IOWrite))
1047  sel_.setMask(p->socket, p->mask &= ~IOWrite);
1048 
1049  if (p->sendq && ! (p->mask & IOWrite))
1050  sel_.setMask(p->socket, p->mask |= IOWrite);
1051 
1052  if (debug_ && oldmask != p->mask)
1053  logme()
1054  << "DEBUG: updating mask for " << p->peeraddr << " to "
1055  << p->mask << " from " << oldmask << std::endl;
1056 
1057  // If we have nothing more to send and are no longer listening
1058  // for reads, close up the shop for this peer.
1059  if (p->mask == IOUrgent && ! p->waiting)
1060  {
1061  assert (! p->sendq);
1062  if (debug_)
1063  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
1064  losePeer(0, p, 0);
1065  }
1066 }
1067 
1069 DQMNet::DQMNet (const std::string &appname /* = "" */)
1070  : debug_ (false),
1071  appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
1072  pid_ (getpid()),
1073  server_ (0),
1074  version_ (Time::current()),
1075  communicate_ ((pthread_t) -1),
1076  shutdown_ (0),
1077  delay_ (1000),
1078  waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
1079  waitMax_ (0, 0, 0, 5 /* seconds */, 0),
1080  flush_ (false)
1081 {
1082  // Create a pipe for the local DQM to tell the communicator
1083  // thread that local DQM data has changed and that the peers
1084  // should be notified.
1085  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
1086  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
1087 
1088  // Initialise the upstream and downstream to empty.
1092  upstream_.update = downstream_.update = false;
1093 }
1094 
1096 {
1097  // FIXME
1098 }
1099 
1102 void
1103 DQMNet::debug(bool doit)
1104 {
1105  debug_ = doit;
1106 }
1107 
1110 void
1112 {
1113  delay_ = delay;
1114 }
1115 
1120 void
1122 {
1123  waitStale_ = time;
1124 }
1125 
1129 void
1131 {
1132  if (server_)
1133  {
1134  logme() << "ERROR: DQM server was already started.\n";
1135  return;
1136  }
1137 
1138  try
1139  {
1140  InetAddress addr("0.0.0.0", port);
1141  InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
1142  s->bind(addr);
1143  s->listen(10);
1144  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1145  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1146  s->setBlocking(false);
1147  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1148  }
1149  catch (Error &e)
1150  {
1151  // FIXME: Do we need to do this when we throw an exception anyway?
1152  // FIXME: Abort instead?
1153  logme()
1154  << "ERROR: Failed to start server at port " << port << ": "
1155  << e.explain() << std::endl;
1156 
1157  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
1158  " %d: %s", port, e.explain().c_str());
1159  }
1160 
1161  logme() << "INFO: DQM server started at port " << port << std::endl;
1162 }
1163 
1167 void
1169 {
1170  if (server_)
1171  {
1172  logme() << "ERROR: DQM server was already started.\n";
1173  return;
1174  }
1175 
1176  try
1177  {
1178  server_ = new LocalServerSocket(path, 10);
1179  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1180  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1181  server_->setBlocking(false);
1182  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1183  }
1184  catch (Error &e)
1185  {
1186  // FIXME: Do we need to do this when we throw an exception anyway?
1187  // FIXME: Abort instead?
1188  logme()
1189  << "ERROR: Failed to start server at path " << path << ": "
1190  << e.explain() << std::endl;
1191 
1192  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
1193  " %s: %s", path, e.explain().c_str());
1194  }
1195 
1196  logme() << "INFO: DQM server started at path " << path << std::endl;
1197 }
1198 
1202 void
1204 {
1205  if (! downstream_.host.empty())
1206  {
1207  logme()
1208  << "ERROR: Already updating another collector at "
1209  << downstream_.host << ":" << downstream_.port << std::endl;
1210  return;
1211  }
1212 
1213  downstream_.update = true;
1214  downstream_.host = host;
1215  downstream_.port = port;
1216 }
1217 
1221 void
1223 {
1224  if (! upstream_.host.empty())
1225  {
1226  logme()
1227  << "ERROR: Already receiving data from another collector at "
1228  << upstream_.host << ":" << upstream_.port << std::endl;
1229  return;
1230  }
1231 
1232  upstream_.update = false;
1233  upstream_.host = host;
1234  upstream_.port = port;
1235 }
1236 
1238 void
1240 {
1241  shutdown_ = 1;
1242  if (communicate_ != (pthread_t) -1)
1243  pthread_join(communicate_, 0);
1244 }
1245 
1251 static void *communicate(void *obj)
1252 {
1253  sigset_t sigs;
1254  sigfillset (&sigs);
1255  pthread_sigmask (SIG_BLOCK, &sigs, 0);
1256  ((DQMNet *)obj)->run();
1257  return 0;
1258 }
1259 
1261 void
1263 {
1264  if (communicate_ != (pthread_t) -1)
1265  pthread_mutex_lock(&lock_);
1266 }
1267 
1269 void
1271 {
1272  if (communicate_ != (pthread_t) -1)
1273  pthread_mutex_unlock(&lock_);
1274 }
1275 
1279 void
1281 {
1282  if (communicate_ != (pthread_t) -1)
1283  {
1284  logme()
1285  << "ERROR: DQM networking thread has already been started\n";
1286  return;
1287  }
1288 
1289  pthread_mutex_init(&lock_, 0);
1290  pthread_create (&communicate_, 0, &communicate, this);
1291 }
1292 
1294 void
1296 {
1297  Time now;
1298  Time nextFlush = 0;
1299  AutoPeer *automatic[2] = { &upstream_, &downstream_ };
1300 
1301  // Perform I/O. Every once in a while flush updates to peers.
1302  while (! shouldStop())
1303  {
1304  for (int i = 0; i < 2; ++i)
1305  {
1306  AutoPeer *ap = automatic[i];
1307 
1308  // If we need a server connection and don't have one yet,
1309  // initiate asynchronous connection creation. Swallow errors
1310  // in case the server won't talk to us.
1311  if (! ap->host.empty()
1312  && ! ap->peer
1313  && (now = Time::current()) > ap->next)
1314  {
1315  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1316  InetSocket *s = 0;
1317  try
1318  {
1319  InetAddress addr(ap->host.c_str(), ap->port);
1320  s = new InetSocket (SOCK_STREAM, 0, addr.family());
1321  s->setBlocking(false);
1322  s->connect(addr);
1323  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1324  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1325  }
1326  catch (Error &e)
1327  {
1328  SystemError *sys = dynamic_cast<SystemError *>(e.next());
1329  if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
1330  {
1331  // "In progress" just means the connection is in progress.
1332  // The connection is ready when the socket is writeable.
1333  // Anything else is a real problem.
1334  if (s)
1335  s->abort();
1336  delete s;
1337  s = 0;
1338  }
1339  }
1340 
1341  // Set up with the selector if we were successful. If this is
1342  // the upstream collector, queue a request for updates.
1343  if (s)
1344  {
1345  Peer *p = createPeer(s);
1346  ap->peer = p;
1347 
1348  InetAddress peeraddr = ((InetSocket *) s)->peername();
1349  InetAddress myaddr = ((InetSocket *) s)->sockname();
1350  p->peeraddr = StringFormat("%1:%2")
1351  .arg(peeraddr.hostname())
1352  .arg(peeraddr.port()).value();
1353  p->mask = IORead|IOWrite|IOUrgent;
1354  p->update = ap->update;
1355  p->automatic = ap;
1356  p->socket = s;
1357  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1358  if (ap == &upstream_)
1359  {
1360  uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
1361  2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
1362  p->sendq = new Bucket;
1363  p->sendq->next = 0;
1364  copydata(p->sendq, words, sizeof(words));
1365  }
1366 
1367  // Report the new connection.
1368  if (debug_)
1369  logme()
1370  << "INFO: now connected to " << p->peeraddr << " from "
1371  << myaddr.hostname() << ":" << myaddr.port() << std::endl;
1372  }
1373  }
1374  }
1375 
1376  // Pump events for a while.
1377  sel_.dispatch(delay_);
1378  now = Time::current();
1379  lock();
1380 
1381  // Check if flush is required. Flush only if one is needed.
1382  // Always sends the full object list, but only rarely.
1383  if (flush_ && now > nextFlush)
1384  {
1385  flush_ = false;
1386  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1387  sendObjectListToPeers(true);
1388  }
1389 
1390  // Update the data server and peer selection masks. If we
1391  // have no more data to send and listening for writes, remove
1392  // the write mask. If we have something to write and aren't
1393  // listening for writes, start listening so we can send off
1394  // the data.
1395  updatePeerMasks();
1396 
1397  // Release peers that have been waiting for data for too long.
1398  Time waitold = now - waitMax_;
1399  Time waitstale = now - waitStale_;
1400  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
1401  {
1402  Object *o = findObject(0, i->name);
1403 
1404  // If we have (stale) object data, wait only up to stale limit.
1405  // Otherwise if we have no data at all, wait up to the max limit.
1406  if (i->time < waitold)
1407  {
1408  logme ()
1409  << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
1410  << "s to retrieval, releasing '" << i->name << "' from wait, have "
1411  << (o ? o->rawdata.size() : 0) << " data available\n";
1412  releaseFromWait(i++, o);
1413  }
1414  else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
1415  {
1416  logme ()
1417  << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
1418  << "s to update, releasing '" << i->name << "' from wait, have "
1419  << o->rawdata.size() << " data available\n";
1420  releaseFromWait(i++, o);
1421  }
1422 
1423  // Keep it for now.
1424  else
1425  ++i;
1426  }
1427 
1428  unlock();
1429  }
1430 }
1431 
1432 // Tell the network cache that there have been local changes that
1433 // should be advertised to the downstream listeners.
1434 void
1436 {
1437  char byte = 0;
1438  wakeup_.sink()->write(&byte, 1);
1439 }
1440 
1444 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */)
1445  : DQMImplNet<DQMNet::Object>(appname)
1446 {
1447  local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
1448 }
1449 
1451 void
1453 {
1454  local_->objs.resize(size);
1455 }
1456 
1459 void
1461 {
1462  o.dirname = &*local_->dirs.insert(*o.dirname).first;
1463  std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1464  if (! info.second)
1465  {
1466  // Somewhat hackish. Sets are supposedly immutable, but we
1467  // need to change the non-key parts of the object. Erasing
1468  // and re-inserting would produce too much memory churn.
1469  Object &old = const_cast<Object &>(*info.first);
1470  std::swap(old.flags, o.flags);
1471  std::swap(old.tag, o.tag);
1472  std::swap(old.version, o.version);
1473  std::swap(old.qreports, o.qreports);
1474  std::swap(old.rawdata, o.rawdata);
1475  std::swap(old.scalar, o.scalar);
1476  std::swap(old.qdata, o.qdata);
1477  }
1478 }
1479 
1483 bool
1484 DQMBasicNet::removeLocalExcept(const std::set<std::string> &known)
1485 {
1486  size_t removed = 0;
1487  std::string path;
1488  ObjectMap::iterator i, e;
1489  for (i = local_->objs.begin(), e = local_->objs.end(); i != e; )
1490  {
1491  path.clear();
1492  path.reserve(i->dirname->size() + i->objname.size() + 2);
1493  path += *i->dirname;
1494  if (! path.empty())
1495  path += '/';
1496  path += i->objname;
1497 
1498  if (! known.count(path))
1499  ++removed, local_->objs.erase(i++);
1500  else
1501  ++i;
1502  }
1503 
1504  return removed > 0;
1505 }
size
Write out results.
void run(void)
Definition: DQMNet.cc:1295
type
Definition: HCALResponse.h:21
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:343
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1069
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:48
host
Definition: query.py:114
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
def logme(msg, args)
DataBlob incoming
Definition: DQMNet.h:139
virtual bool shouldStop(void)
Definition: DQMNet.cc:389
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1239
void sendLocalChanges(void)
Definition: DQMNet.cc:1435
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1008
QReports qreports
Definition: DQMNet.h:108
pthread_t communicate_
Definition: DQMNet.h:346
std::string algorithm
Definition: DQMNet.h:94
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:101
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:144
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:181
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:23
int delay_
Definition: DQMNet.h:349
pthread_mutex_t lock_
Definition: DQMNet.h:319
void delay(int delay)
Definition: DQMNet.cc:1111
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:99
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::string peeraddr
Definition: DQMNet.h:137
static const int WARNING
lat::Socket * socket
Definition: DQMNet.h:138
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1121
bool ev
uint32_t tag
Definition: DQMNet.h:100
const std::string * dirname
Definition: DQMNet.h:106
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1262
AutoPeer * automatic
Definition: DQMNet.h:149
AutoPeer upstream_
Definition: DQMNet.h:342
static void * communicate(void *obj)
Definition: DQMNet.cc:1251
Definition: DQMStore.h:25
A arg
Definition: Factorize.h:37
std::string qdata
Definition: DQMNet.h:117
port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1103
void Fill(long long x)
virtual ~DQMNet(void)
Definition: DQMNet.cc:1095
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:114
std::string name
Definition: DQMNet.h:129
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:468
DataBlob data
Definition: DQMNet.h:123
Peer * peer
Definition: DQMNet.h:154
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:81
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:768
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:47
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:49
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:397
sig_atomic_t shutdown_
Definition: DQMNet.h:347
std::string objname
Definition: DQMNet.h:107
bool update
Definition: DQMNet.h:145
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1270
std::string qtname
Definition: DQMNet.h:93
std::string scalar
Definition: DQMNet.h:116
DQMNet::CoreObject data_
lat::Time next
Definition: DQMNet.h:155
unsigned mask
Definition: DQMNet.h:143
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:418
void start(void)
Definition: DQMNet.cc:1280
Definition: IOTypes.h:26
unsigned long long uint64_t
Definition: Time.h:15
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1484
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:951
size_t updates
Definition: DQMNet.h:147
std::string host
Definition: DQMNet.h:156
double b
Definition: hdecay.h:120
void startLocalServer(int port)
Definition: DQMNet.cc:1130
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
virtual void updatePeerMasks(void)=0
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>")
lat::Pipe wakeup_
Definition: DQMNet.h:339
bool debug_
Definition: DQMNet.h:318
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:148
if(dp >Float(M_PI)) dp-
std::string message
Definition: DQMNet.h:92
size_t sendpos
Definition: DQMNet.h:141
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1222
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const int STATUS_OK
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
void updateMask(Peer *p)
Definition: DQMNet.cc:1039
bool flush_
Definition: DQMNet.h:352
lat::Socket * server_
Definition: DQMNet.h:338
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1460
lat::TimeSpan waitMax_
Definition: DQMNet.h:351
Bucket * next
Definition: DQMNet.h:122
std::vector< QValue > QReports
Definition: DQMNet.h:84
Bucket * sendq
Definition: DQMNet.h:140
lat::TimeSpan waitStale_
Definition: DQMNet.h:350
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1452
float qtresult
Definition: DQMNet.h:91
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1203
WaitList waiting_
Definition: DQMNet.h:344
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1444
virtual Peer * createPeer(lat::Socket *s)
Definition: DQMNet.h:491
DataBlob rawdata
Definition: DQMNet.h:115
static const int ERROR
ImplPeer * local_
Definition: DQMNet.h:621
lat::IOSelector sel_
Definition: DQMNet.h:337
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11