CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMNet.cc
Go to the documentation of this file.
4 #include "classlib/iobase/InetServerSocket.h"
5 #include "classlib/iobase/LocalServerSocket.h"
6 #include "classlib/iobase/Filename.h"
7 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
8 #include "classlib/utils/TimeInfo.h"
9 #include "classlib/utils/StringList.h"
10 #include "classlib/utils/StringFormat.h"
11 #include "classlib/utils/StringOps.h"
12 #include "classlib/utils/SystemError.h"
13 #include "classlib/utils/Regexp.h"
14 #include <unistd.h>
15 #include <fcntl.h>
16 #include <sys/wait.h>
17 #include <stdio.h>
18 #include <stdint.h>
19 #include <iostream>
20 #include <sstream>
21 #include <cassert>
22 #include <cfloat>
23 #include <inttypes.h>
24 
25 #if __APPLE__
26 # define MESSAGE_SIZE_LIMIT (1*1024*1024)
27 # define SOCKET_BUF_SIZE (1*1024*1024)
28 #else
29 # define MESSAGE_SIZE_LIMIT (8*1024*1024)
30 # define SOCKET_BUF_SIZE (8*1024*1024)
31 #endif
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
34 
35 using namespace lat;
36 
37 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
38 
40 // Generate log prefix.
41 std::ostream &
43 {
44  Time now = Time::current();
45  return std::cout
46  << now.format(true, "%Y-%m-%d %H:%M:%S.")
47  << now.nanoformat(3, 3)
48  << " " << appname_ << "[" << pid_ << "]: ";
49 }
50 
51 // Append data into a bucket.
52 void
53 DQMNet::copydata(Bucket *b, const void *data, size_t len)
54 {
55  b->data.insert(b->data.end(),
56  (const unsigned char *)data,
57  (const unsigned char *)data + len);
58 }
59 
60 // Discard a bucket chain.
61 void
63 {
64  while (b)
65  {
66  Bucket *next = b->next;
67  delete b;
68  b = next;
69  }
70 }
71 
73 
76 void
77 DQMNet::losePeer(const char *reason,
78  Peer *peer,
79  IOSelectEvent *ev,
80  Error *err)
81 {
82  if (reason)
83  logme ()
84  << reason << peer->peeraddr
85  << (err ? "; error was: " + err->explain() : std::string(""))
86  << std::endl;
87 
88  Socket *s = peer->socket;
89 
90  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
91  if (i->peer == peer)
92  waiting_.erase(i++);
93  else
94  ++i;
95 
96  if (ev)
97  ev->source = 0;
98 
99  discard(peer->sendq);
100  if (peer->automatic)
101  peer->automatic->peer = 0;
102 
103  sel_.detach (s);
104  s->close();
105  removePeer(peer, s);
106  delete s;
107 }
108 
110 void
111 DQMNet::requestObjectData(Peer *p, const char *name, size_t len)
112 {
113  // Issue request to peer.
114  Bucket **msg = &p->sendq;
115  while (*msg)
116  msg = &(*msg)->next;
117  *msg = new Bucket;
118  (*msg)->next = 0;
119 
120  uint32_t words[3];
121  words[0] = sizeof(words) + len;
122  words[1] = DQM_MSG_GET_OBJECT;
123  words[2] = len;
124  copydata(*msg, words, sizeof(words));
125  copydata(*msg, name, len);
126 }
127 
130 void
132 {
133  // FIXME: Should we automatically record which exact peer the waiter
134  // is expecting to deliver data so we know to release the waiter if
135  // the other peer vanishes? The current implementation stands a
136  // chance for the waiter to wait indefinitely -- although we do
137  // force terminate the wait after a while.
138  requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
139  WaitObject wo = { Time::current(), name, info, p };
140  waiting_.push_back(wo);
141  p->waiting++;
142 }
143 
144 // Once an object has been updated, this is invoked for all waiting
145 // peers. Send the object back to the peer in suitable form.
146 void
147 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
148 {
149  Bucket **msg = &i->peer->sendq;
150  while (*msg)
151  msg = &(*msg)->next;
152  *msg = new Bucket;
153  (*msg)->next = 0;
154 
155  releaseFromWait(*msg, *i, o);
156 
157  assert(i->peer->waiting > 0);
158  i->peer->waiting--;
159  waiting_.erase(i);
160 }
161 
162 // Release everyone waiting for the object @a o.
163 void
165 {
166  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
167  if (i->name == name)
168  releaseFromWait(i++, o);
169  else
170  ++i;
171 }
172 
176 void
178 {
179  char buf[64];
180  std::ostringstream qrs;
181  QReports::const_iterator qi, qe;
182  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
183  {
184  int pos = 0;
185  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
186  qrs << buf << '\0'
187  << buf+pos << '\0'
188  << qi->qtname << '\0'
189  << qi->algorithm << '\0'
190  << qi->message << '\0'
191  << '\0';
192  }
193  into = qrs.str();
194 }
195 
198 void
199 DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
200 {
201  const char *qdata = from;
202 
203  // Count how many qresults there are.
204  size_t nqv = 0;
205  while (*qdata)
206  {
207  ++nqv;
208  while (*qdata) ++qdata; ++qdata;
209  while (*qdata) ++qdata; ++qdata;
210  while (*qdata) ++qdata; ++qdata;
211  while (*qdata) ++qdata; ++qdata;
212  while (*qdata) ++qdata; ++qdata;
213  }
214 
215  // Now extract the qreports.
216  qdata = from;
217  qr.reserve(nqv);
218  while (*qdata)
219  {
220  qr.push_back(DQMNet::QValue());
221  DQMNet::QValue &qv = qr.back();
222 
223  qv.code = atoi(qdata);
224  while (*qdata) ++qdata;
225  switch (qv.code)
226  {
228  break;
231  break;
232  case dqm::qstatus::ERROR:
234  break;
235  default:
237  break;
238  }
239 
240  qv.qtresult = atof(++qdata);
241  while (*qdata) ++qdata;
242 
243  qv.qtname = ++qdata;
244  while (*qdata) ++qdata;
245 
246  qv.algorithm = ++qdata;
247  while (*qdata) ++qdata;
248 
249  qv.message = ++qdata;
250  while (*qdata) ++qdata;
251  ++qdata;
252  }
253 }
254 
255 #if 0
256 // Deserialise a ROOT object from a buffer at the current position.
257 static TObject *
258 extractNextObject(TBufferFile &buf)
259 {
260  if (buf.Length() == buf.BufferSize())
261  return 0;
262 
263  buf.InitMap();
264  Int_t pos = buf.Length();
265  TClass *c = buf.ReadClass();
266  buf.SetBufferOffset(pos);
267  buf.ResetMap();
268  return c ? buf.ReadObject(c) : 0;
269 }
270 
271 // Reconstruct an object from the raw data.
272 bool
273 DQMNet::reconstructObject(Object &o)
274 {
275  TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
276  buf.Reset();
277 
278  // Extract the main object.
279  if (! (o.object = extractNextObject(buf)))
280  return false;
281 
282  // Extract the reference object.
283  o.reference = extractNextObject(buf);
284 
285  // Extract quality reports.
286  unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
287  return true;
288 }
289 #endif
290 
291 #if 0
292 bool
293 DQMNet::reinstateObject(DQMStore *store, Object &o)
294 {
295  if (! reconstructObject (o))
296  return false;
297 
298  // Reconstruct the main object
299  MonitorElement *obj = 0;
300  store->setCurrentFolder(*o.dirname);
301  switch (o.flags & DQM_PROP_TYPE_MASK)
302  {
303  case DQM_PROP_TYPE_INT:
304  obj = store->bookInt(o.objname);
305  obj->Fill(atoll(o.scalar.c_str()));
306  break;
307 
308  case DQM_PROP_TYPE_REAL:
309  obj = store->bookFloat(name);
310  obj->Fill(atof(o.scalar.c_str()));
311  break;
312 
313  case DQM_PROP_TYPE_STRING:
314  obj = store->bookString(name, o.scalar);
315  break;
316 
317  case DQM_PROP_TYPE_TH1F:
318  obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
319  break;
320 
321  case DQM_PROP_TYPE_TH1S:
322  obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
323  break;
324 
325  case DQM_PROP_TYPE_TH1D:
326  obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
327  break;
328 
329  case DQM_PROP_TYPE_TH2F:
330  obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
331  break;
332 
333  case DQM_PROP_TYPE_TH2S:
334  obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
335  break;
336 
337  case DQM_PROP_TYPE_TH2D:
338  obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
339  break;
340 
341  case DQM_PROP_TYPE_TH3F:
342  obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
343  break;
344 
345  case DQM_PROP_TYPE_TH3S:
346  obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
347  break;
348 
349  case DQM_PROP_TYPE_TH3D:
350  obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
351  break;
352 
353  case DQM_PROP_TYPE_PROF:
354  obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
355  break;
356 
357  case DQM_PROP_TYPE_PROF2D:
358  obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
359  break;
360 
361  default:
362  logme()
363  << "ERROR: unexpected monitor element of type "
364  << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
365  << *o.dirname << '/' << o.objname << "'\n";
366  return false;
367  }
368 
369  // Reconstruct tag and qreports.
370  if (obj)
371  {
372  obj->data_.tag = o.tag;
373  obj->data_.qreports = o.qreports;
374  }
375 
376  // Inidicate success.
377  return true;
378 }
379 #endif
380 
382 // Check if the network layer should stop.
383 bool
385 {
386  return shutdown_;
387 }
388 
389 // Once an object has been updated, this is invoked for all waiting
390 // peers. Send the requested object to the waiting peer.
391 void
393 {
394  if (o)
395  sendObjectToPeer(msg, *o, true);
396  else
397  {
398  uint32_t words [3];
399  words[0] = sizeof(words) + w.name.size();
400  words[1] = DQM_REPLY_NONE;
401  words[2] = w.name.size();
402 
403  msg->data.reserve(msg->data.size() + words[0]);
404  copydata(msg, &words[0], sizeof(words));
405  copydata(msg, &w.name[0], w.name.size());
406  }
407 }
408 
409 // Send an object to a peer. If not @a data, only sends a summary
410 // without the object data, except the data is always sent for scalar
411 // objects.
412 void
414 {
415  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
416  DataBlob objdata;
417 
418  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
419  objdata.insert(objdata.end(),
420  &o.scalar[0],
421  &o.scalar[0] + o.scalar.size());
422  else if (data)
423  objdata.insert(objdata.end(),
424  &o.rawdata[0],
425  &o.rawdata[0] + o.rawdata.size());
426 
427  uint32_t words [9];
428  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
429  uint32_t datalen = objdata.size();
430  uint32_t qlen = o.qdata.size();
431 
432  if (o.dirname->empty())
433  --namelen;
434 
435  words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
436  words[1] = DQM_REPLY_OBJECT;
437  words[2] = flags;
438  words[3] = (o.version >> 0 ) & 0xffffffff;
439  words[4] = (o.version >> 32) & 0xffffffff;
440  words[5] = o.tag;
441  words[6] = namelen;
442  words[7] = datalen;
443  words[8] = qlen;
444 
445  msg->data.reserve(msg->data.size() + words[0]);
446  copydata(msg, &words[0], 9*sizeof(uint32_t));
447  if (namelen)
448  {
449  copydata(msg, &(*o.dirname)[0], o.dirname->size());
450  if (! o.dirname->empty())
451  copydata(msg, "/", 1);
452  copydata(msg, &o.objname[0], o.objname.size());
453  }
454  if (datalen)
455  copydata(msg, &objdata[0], datalen);
456  if (qlen)
457  copydata(msg, &o.qdata[0], qlen);
458 }
459 
461 // Handle peer messages.
462 bool
463 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
464 {
465  // Decode and process this message.
466  uint32_t type;
467  memcpy (&type, data + sizeof(uint32_t), sizeof (type));
468  switch (type)
469  {
470  case DQM_MSG_UPDATE_ME:
471  {
472  if (len != 2*sizeof(uint32_t))
473  {
474  logme()
475  << "ERROR: corrupt 'UPDATE_ME' message of length " << len
476  << " from peer " << p->peeraddr << std::endl;
477  return false;
478  }
479 
480  if (debug_)
481  logme()
482  << "DEBUG: received message 'UPDATE ME' from peer "
483  << p->peeraddr << ", size " << len << std::endl;
484 
485  p->update = true;
486  }
487  return true;
488 
489  case DQM_MSG_LIST_OBJECTS:
490  {
491  if (debug_)
492  logme()
493  << "DEBUG: received message 'LIST OBJECTS' from peer "
494  << p->peeraddr << ", size " << len << std::endl;
495 
496  // Send over current status: list of known objects.
497  sendObjectListToPeer(msg, true, false);
498  }
499  return true;
500 
501  case DQM_MSG_GET_OBJECT:
502  {
503  if (debug_)
504  logme()
505  << "DEBUG: received message 'GET OBJECT' from peer "
506  << p->peeraddr << ", size " << len << std::endl;
507 
508  if (len < 3*sizeof(uint32_t))
509  {
510  logme()
511  << "ERROR: corrupt 'GET IMAGE' message of length " << len
512  << " from peer " << p->peeraddr << std::endl;
513  return false;
514  }
515 
516  uint32_t namelen;
517  memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
518  if (len != 3*sizeof(uint32_t) + namelen)
519  {
520  logme()
521  << "ERROR: corrupt 'GET OBJECT' message of length " << len
522  << " from peer " << p->peeraddr
523  << ", expected length " << (3*sizeof(uint32_t))
524  << " + " << namelen << std::endl;
525  return false;
526  }
527 
528  std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
529  Peer *owner = 0;
530  Object *o = findObject(0, name, &owner);
531  if (o)
532  {
533  o->lastreq = Time::current().ns();
534  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
535  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
536  waitForData(p, name, "", owner);
537  else
538  sendObjectToPeer(msg, *o, true);
539  }
540  else
541  {
542  uint32_t words [3];
543  words[0] = sizeof(words) + name.size();
544  words[1] = DQM_REPLY_NONE;
545  words[2] = name.size();
546 
547  msg->data.reserve(msg->data.size() + words[0]);
548  copydata(msg, &words[0], sizeof(words));
549  copydata(msg, &name[0], name.size());
550  }
551  }
552  return true;
553 
554  case DQM_REPLY_LIST_BEGIN:
555  {
556  if (len != 4*sizeof(uint32_t))
557  {
558  logme()
559  << "ERROR: corrupt 'LIST BEGIN' message of length " << len
560  << " from peer " << p->peeraddr << std::endl;
561  return false;
562  }
563 
564  // Get the update status: whether this is a full update.
565  uint32_t flags;
566  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
567 
568  if (debug_)
569  logme()
570  << "DEBUG: received message 'LIST BEGIN "
571  << (flags ? "FULL" : "INCREMENTAL")
572  << "' from " << p->peeraddr
573  << ", size " << len << std::endl;
574 
575  // If we are about to receive a full list of objects, flag all
576  // objects as possibly dead. Subsequent object notifications
577  // will undo this for the live objects. We cannot delete
578  // objects quite yet, as we may get inquiry from another client
579  // while we are processing the incoming list, so we keep the
580  // objects tentatively alive as long as we've not seen the end.
581  if (flags)
582  markObjectsDead(p);
583  }
584  return true;
585 
586  case DQM_REPLY_LIST_END:
587  {
588  if (len != 4*sizeof(uint32_t))
589  {
590  logme()
591  << "ERROR: corrupt 'LIST END' message of length " << len
592  << " from peer " << p->peeraddr << std::endl;
593  return false;
594  }
595 
596  // Get the update status: whether this is a full update.
597  uint32_t flags;
598  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
599 
600  // If we received a full list of objects, now purge all dead
601  // objects. We need to do this in two stages in case we receive
602  // updates in many parts, and end up sending updates to others in
603  // between; this avoids us lying live objects are dead.
604  if (flags)
605  purgeDeadObjects(p);
606 
607  if (debug_)
608  logme()
609  << "DEBUG: received message 'LIST END "
610  << (flags ? "FULL" : "INCREMENTAL")
611  << "' from " << p->peeraddr
612  << ", size " << len << std::endl;
613 
614  // Indicate we have received another update from this peer.
615  // Also indicate we should flush to our clients.
616  flush_ = true;
617  p->updates++;
618  }
619  return true;
620 
621  case DQM_REPLY_OBJECT:
622  {
623  uint32_t words[9];
624  if (len < sizeof(words))
625  {
626  logme()
627  << "ERROR: corrupt 'OBJECT' message of length " << len
628  << " from peer " << p->peeraddr << std::endl;
629  return false;
630  }
631 
632  memcpy (&words[0], data, sizeof(words));
633  uint32_t &namelen = words[6];
634  uint32_t &datalen = words[7];
635  uint32_t &qlen = words[8];
636 
637  if (len != sizeof(words) + namelen + datalen + qlen)
638  {
639  logme()
640  << "ERROR: corrupt 'OBJECT' message of length " << len
641  << " from peer " << p->peeraddr
642  << ", expected length " << sizeof(words)
643  << " + " << namelen
644  << " + " << datalen
645  << " + " << qlen
646  << std::endl;
647  return false;
648  }
649 
650  unsigned char *namedata = data + sizeof(words);
651  unsigned char *objdata = namedata + namelen;
652  unsigned char *qdata = objdata + datalen;
653  unsigned char *enddata = qdata + qlen;
654  std::string name ((char *) namedata, namelen);
655  assert (enddata == data + len);
656 
657  if (debug_)
658  logme()
659  << "DEBUG: received message 'OBJECT " << name
660  << "' from " << p->peeraddr
661  << ", size " << len << std::endl;
662 
663  // Mark the peer as a known object source.
664  p->source = true;
665 
666  // Initialise or update an object entry.
667  Object *o = findObject(p, name);
668  if (! o)
669  o = makeObject(p, name);
670 
671  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
672  o->tag = words[5];
673  o->version = ((uint64_t) words[4] << 32 | words[3]);
674  o->scalar.clear();
675  o->qdata.clear();
676  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
677  {
678  o->rawdata.clear();
679  o->scalar.insert(o->scalar.end(), objdata, qdata);
680  }
681  else if (datalen)
682  {
683  o->rawdata.clear();
684  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
685  }
686  else if (! o->rawdata.empty())
687  o->flags |= DQM_PROP_STALE;
688  o->qdata.insert(o->qdata.end(), qdata, enddata);
689 
690  // If we had an object for this one already and this is a list
691  // update without data, issue an immediate data get request.
692  if (o->lastreq
693  && ! datalen
694  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
695  requestObjectData(p, (namelen ? &name[0] : 0), namelen);
696 
697  // If we have the object data, release from wait.
698  if (datalen)
699  releaseWaiters(name, o);
700  }
701  return true;
702 
703  case DQM_REPLY_NONE:
704  {
705  uint32_t words[3];
706  if (len < sizeof(words))
707  {
708  logme()
709  << "ERROR: corrupt 'NONE' message of length " << len
710  << " from peer " << p->peeraddr << std::endl;
711  return false;
712  }
713 
714  memcpy (&words[0], data, sizeof(words));
715  uint32_t &namelen = words[2];
716 
717  if (len != sizeof(words) + namelen)
718  {
719  logme()
720  << "ERROR: corrupt 'NONE' message of length " << len
721  << " from peer " << p->peeraddr
722  << ", expected length " << sizeof(words)
723  << " + " << namelen << std::endl;
724  return false;
725  }
726 
727  unsigned char *namedata = data + sizeof(words);
728  std::string name((char *) namedata, namelen);
729 
730  if (debug_)
731  logme()
732  << "DEBUG: received message 'NONE " << name
733  << "' from " << p->peeraddr
734  << ", size " << len << std::endl;
735 
736  // Mark the peer as a known object source.
737  p->source = true;
738 
739  // If this was a known object, kill it.
740  if (Object *o = findObject(p, name))
741  {
742  o->flags |= DQM_PROP_DEAD;
743  purgeDeadObjects(p);
744  }
745 
746  // If someone was waiting for this, let them go.
747  releaseWaiters(name, 0);
748  }
749  return true;
750 
751  default:
752  logme()
753  << "ERROR: unrecognised message of length " << len
754  << " and type " << type << " from peer " << p->peeraddr
755  << std::endl;
756  return false;
757  }
758 }
759 
762 bool
763 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
764 {
765  lock();
766  assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
767 
768  // If there is a problem with the peer socket, discard the peer
769  // and tell the selector to stop prcessing events for it. If
770  // this is a server connection, we will eventually recreate
771  // everything if/when the data server comes back.
772  if (ev->events & IOUrgent)
773  {
774  if (p->automatic)
775  {
776  logme()
777  << "WARNING: connection to the DQM server at " << p->peeraddr
778  << " lost (will attempt to reconnect in 15 seconds)\n";
779  losePeer(0, p, ev);
780  }
781  else
782  losePeer("WARNING: lost peer connection ", p, ev);
783 
784  unlock();
785  return true;
786  }
787 
788  // If we can write to the peer socket, pump whatever we can into it.
789  if (ev->events & IOWrite)
790  {
791  while (Bucket *b = p->sendq)
792  {
793  IOSize len = b->data.size() - p->sendpos;
794  const void *data = (len ? (const void *)&b->data[p->sendpos]
795  : (const void *)&data);
796  IOSize done;
797 
798  try
799  {
800  done = (len ? ev->source->write (data, len) : 0);
801  if (debug_ && len)
802  logme()
803  << "DEBUG: sent " << done << " bytes to peer "
804  << p->peeraddr << std::endl;
805  }
806  catch (Error &e)
807  {
808  losePeer("WARNING: unable to write to peer ", p, ev, &e);
809  unlock();
810  return true;
811  }
812 
813  p->sendpos += done;
814  if (p->sendpos == b->data.size())
815  {
816  Bucket *old = p->sendq;
817  p->sendq = old->next;
818  p->sendpos = 0;
819  old->next = 0;
820  discard(old);
821  }
822 
823  if (! done && len)
824  // Cannot write any more.
825  break;
826  }
827  }
828 
829  // If there is data to be read from the peer, first receive what we
830  // can get out the socket, the process all complete requests.
831  if (ev->events & IORead)
832  {
833  // First build up the incoming buffer of data in the socket.
834  // Remember the last size returned by the socket; we need
835  // it to determine if the remote end closed the connection.
836  IOSize sz;
837  try
838  {
839  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
840  do
841  if ((sz = ev->source->read(&buf[0], buf.size())))
842  {
843  if (debug_)
844  logme()
845  << "DEBUG: received " << sz << " bytes from peer "
846  << p->peeraddr << std::endl;
847  DataBlob &data = p->incoming;
848  if (data.capacity () < data.size () + sz)
849  data.reserve (data.size() + SOCKET_READ_GROWTH);
850  data.insert (data.end(), &buf[0], &buf[0] + sz);
851  }
852  while (sz == sizeof (buf));
853  }
854  catch (Error &e)
855  {
856  SystemError *next = dynamic_cast<SystemError *>(e.next());
857  if (next && next->portable() == SysErr::ErrTryAgain)
858  sz = 1; // Ignore it, and fake no end of data.
859  else
860  {
861  // Houston we have a problem.
862  losePeer("WARNING: failed to read from peer ", p, ev, &e);
863  unlock();
864  return true;
865  }
866  }
867 
868  // Process fully received messages as long as we can.
869  size_t consumed = 0;
870  DataBlob &data = p->incoming;
871  while (data.size()-consumed >= sizeof(uint32_t)
872  && p->waiting < MAX_PEER_WAITREQS)
873  {
874  uint32_t msglen;
875  memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
876 
877  if (msglen >= MESSAGE_SIZE_LIMIT)
878  {
879  losePeer("WARNING: excessively large message from ", p, ev);
880  unlock();
881  return true;
882  }
883 
884  if (data.size()-consumed >= msglen)
885  {
886  bool valid = true;
887  if (msglen < 2*sizeof(uint32_t))
888  {
889  logme()
890  << "ERROR: corrupt peer message of length " << msglen
891  << " from peer " << p->peeraddr << std::endl;
892  valid = false;
893  }
894  else
895  {
896  // Decode and process this message.
897  Bucket msg;
898  msg.next = 0;
899  valid = onMessage(&msg, p, &data[0]+consumed, msglen);
900 
901  // If we created a response, chain it to the write queue.
902  if (! msg.data.empty())
903  {
904  Bucket **prev = &p->sendq;
905  while (*prev)
906  prev = &(*prev)->next;
907 
908  *prev = new Bucket;
909  (*prev)->next = 0;
910  (*prev)->data.swap(msg.data);
911  }
912  }
913 
914  if (! valid)
915  {
916  losePeer("WARNING: data stream error with ", p, ev);
917  unlock();
918  return true;
919  }
920 
921  consumed += msglen;
922  }
923  else
924  break;
925  }
926 
927  data.erase(data.begin(), data.begin()+consumed);
928 
929  // If the client has closed the connection, shut down our end. If
930  // we have something to send back still, leave the write direction
931  // open. Otherwise close the shop for this client.
932  if (sz == 0)
933  sel_.setMask(p->socket, p->mask &= ~IORead);
934  }
935 
936  // Yes, please keep processing events for this socket.
937  unlock();
938  return false;
939 }
940 
945 bool
946 DQMNet::onPeerConnect(IOSelectEvent *ev)
947 {
948  // Recover the server socket.
949  assert (ev->source == server_);
950 
951  // Accept the connection.
952  Socket *s = server_->accept();
953  assert (s);
954  assert (! s->isBlocking());
955 
956  // Record it to our list of peers.
957  lock();
958  Peer *p = createPeer(s);
959  std::string localaddr;
960  if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
961  {
962  InetAddress peeraddr = inet->peername();
963  InetAddress myaddr = inet->sockname();
964  p->peeraddr = StringFormat("%1:%2")
965  .arg(peeraddr.hostname())
966  .arg(peeraddr.port());
967  localaddr = StringFormat("%1:%2")
968  .arg(myaddr.hostname())
969  .arg(myaddr.port());
970  }
971  else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
972  {
973  p->peeraddr = local->peername().path();
974  localaddr = local->sockname().path();
975  }
976  else
977  assert(false);
978 
979  p->mask = IORead|IOUrgent;
980  p->socket = s;
981 
982  // Report the new connection.
983  if (debug_)
984  logme()
985  << "INFO: new peer " << p->peeraddr << " is now connected to "
986  << localaddr << std::endl;
987 
988  // Attach it to the listener.
989  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
990  unlock();
991 
992  // We are never done.
993  return false;
994 }
995 
1002 bool
1003 DQMNet::onLocalNotify(IOSelectEvent *ev)
1004 {
1005  // Discard the data in the pipe, we care only about the wakeup.
1006  try
1007  {
1008  IOSize sz;
1009  unsigned char buf [1024];
1010  while ((sz = ev->source->read(buf, sizeof(buf))))
1011  ;
1012  }
1013  catch (Error &e)
1014  {
1015  SystemError *next = dynamic_cast<SystemError *>(e.next());
1016  if (next && next->portable() == SysErr::ErrTryAgain)
1017  ; // Ignore it
1018  else
1019  logme()
1020  << "WARNING: error reading from notification pipe: "
1021  << e.explain() << std::endl;
1022  }
1023 
1024  // Tell the main event pump to send an update in a little while.
1025  flush_ = true;
1026 
1027  // We are never done, always keep going.
1028  return false;
1029 }
1030 
1033 void
1035 {
1036  if (! p->socket)
1037  return;
1038 
1039  // Listen to writes iff we have data to send.
1040  unsigned oldmask = p->mask;
1041  if (! p->sendq && (p->mask & IOWrite))
1042  sel_.setMask(p->socket, p->mask &= ~IOWrite);
1043 
1044  if (p->sendq && ! (p->mask & IOWrite))
1045  sel_.setMask(p->socket, p->mask |= IOWrite);
1046 
1047  if (debug_ && oldmask != p->mask)
1048  logme()
1049  << "DEBUG: updating mask for " << p->peeraddr << " to "
1050  << p->mask << " from " << oldmask << std::endl;
1051 
1052  // If we have nothing more to send and are no longer listening
1053  // for reads, close up the shop for this peer.
1054  if (p->mask == IOUrgent && ! p->waiting)
1055  {
1056  assert (! p->sendq);
1057  if (debug_)
1058  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
1059  losePeer(0, p, 0);
1060  }
1061 }
1062 
1064 DQMNet::DQMNet (const std::string &appname /* = "" */)
1065  : debug_ (false),
1066  appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
1067  pid_ (getpid()),
1068  server_ (0),
1069  version_ (Time::current()),
1070  communicate_ ((pthread_t) -1),
1071  shutdown_ (0),
1072  delay_ (1000),
1073  waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
1074  waitMax_ (0, 0, 0, 5 /* seconds */, 0),
1075  flush_ (false)
1076 {
1077  // Create a pipe for the local DQM to tell the communicator
1078  // thread that local DQM data has changed and that the peers
1079  // should be notified.
1080  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
1081  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
1082 
1083  // Initialise the upstream and downstream to empty.
1087  upstream_.update = downstream_.update = false;
1088 }
1089 
1091 {
1092  // FIXME
1093 }
1094 
1097 void
1098 DQMNet::debug(bool doit)
1099 {
1100  debug_ = doit;
1101 }
1102 
1105 void
1106 DQMNet::delay(int delay)
1107 {
1108  delay_ = delay;
1109 }
1110 
1115 void
1116 DQMNet::staleObjectWaitLimit(lat::TimeSpan time)
1117 {
1118  waitStale_ = time;
1119 }
1120 
1124 void
1126 {
1127  if (server_)
1128  {
1129  logme() << "ERROR: DQM server was already started.\n";
1130  return;
1131  }
1132 
1133  try
1134  {
1135  InetAddress addr("0.0.0.0", port);
1136  InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
1137  s->bind(addr);
1138  s->listen(10);
1139  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1140  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1141  s->setBlocking(false);
1142  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1143  }
1144  catch (Error &e)
1145  {
1146  // FIXME: Do we need to do this when we throw an exception anyway?
1147  // FIXME: Abort instead?
1148  logme()
1149  << "ERROR: Failed to start server at port " << port << ": "
1150  << e.explain() << std::endl;
1151 
1152  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
1153  " %d: %s", port, e.explain().c_str());
1154  }
1155 
1156  logme() << "INFO: DQM server started at port " << port << std::endl;
1157 }
1158 
1162 void
1164 {
1165  if (server_)
1166  {
1167  logme() << "ERROR: DQM server was already started.\n";
1168  return;
1169  }
1170 
1171  try
1172  {
1173  server_ = new LocalServerSocket(path, 10);
1174  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1175  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1176  server_->setBlocking(false);
1177  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1178  }
1179  catch (Error &e)
1180  {
1181  // FIXME: Do we need to do this when we throw an exception anyway?
1182  // FIXME: Abort instead?
1183  logme()
1184  << "ERROR: Failed to start server at path " << path << ": "
1185  << e.explain() << std::endl;
1186 
1187  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
1188  " %s: %s", path, e.explain().c_str());
1189  }
1190 
1191  logme() << "INFO: DQM server started at path " << path << std::endl;
1192 }
1193 
1197 void
1199 {
1200  if (! downstream_.host.empty())
1201  {
1202  logme()
1203  << "ERROR: Already updating another collector at "
1204  << downstream_.host << ":" << downstream_.port << std::endl;
1205  return;
1206  }
1207 
1208  downstream_.update = true;
1209  downstream_.host = host;
1210  downstream_.port = port;
1211 }
1212 
1216 void
1218 {
1219  if (! upstream_.host.empty())
1220  {
1221  logme()
1222  << "ERROR: Already receiving data from another collector at "
1223  << upstream_.host << ":" << upstream_.port << std::endl;
1224  return;
1225  }
1226 
1227  upstream_.update = false;
1228  upstream_.host = host;
1229  upstream_.port = port;
1230 }
1231 
1233 void
1235 {
1236  shutdown_ = 1;
1237  if (communicate_ != (pthread_t) -1)
1238  pthread_join(communicate_, 0);
1239 }
1240 
1246 static void *communicate(void *obj)
1247 {
1248  sigset_t sigs;
1249  sigfillset (&sigs);
1250  pthread_sigmask (SIG_BLOCK, &sigs, 0);
1251  ((DQMNet *)obj)->run();
1252  return 0;
1253 }
1254 
1256 void
1258 {
1259  if (communicate_ != (pthread_t) -1)
1260  pthread_mutex_lock(&lock_);
1261 }
1262 
1264 void
1266 {
1267  if (communicate_ != (pthread_t) -1)
1268  pthread_mutex_unlock(&lock_);
1269 }
1270 
1274 void
1276 {
1277  if (communicate_ != (pthread_t) -1)
1278  {
1279  logme()
1280  << "ERROR: DQM networking thread has already been started\n";
1281  return;
1282  }
1283 
1284  pthread_mutex_init(&lock_, 0);
1285  pthread_create (&communicate_, 0, &communicate, this);
1286 }
1287 
1289 void
1291 {
1292  Time now;
1293  Time nextFlush = 0;
1294  AutoPeer *automatic[2] = { &upstream_, &downstream_ };
1295 
1296  // Perform I/O. Every once in a while flush updates to peers.
1297  while (! shouldStop())
1298  {
1299  for (int i = 0; i < 2; ++i)
1300  {
1301  AutoPeer *ap = automatic[i];
1302 
1303  // If we need a server connection and don't have one yet,
1304  // initiate asynchronous connection creation. Swallow errors
1305  // in case the server won't talk to us.
1306  if (! ap->host.empty()
1307  && ! ap->peer
1308  && (now = Time::current()) > ap->next)
1309  {
1310  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1311  InetSocket *s = 0;
1312  try
1313  {
1314  InetAddress addr(ap->host.c_str(), ap->port);
1315  s = new InetSocket (SOCK_STREAM, 0, addr.family());
1316  s->setBlocking(false);
1317  s->connect(addr);
1318  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1319  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1320  }
1321  catch (Error &e)
1322  {
1323  SystemError *sys = dynamic_cast<SystemError *>(e.next());
1324  if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
1325  {
1326  // "In progress" just means the connection is in progress.
1327  // The connection is ready when the socket is writeable.
1328  // Anything else is a real problem.
1329  if (s)
1330  s->abort();
1331  delete s;
1332  s = 0;
1333  }
1334  }
1335 
1336  // Set up with the selector if we were successful. If this is
1337  // the upstream collector, queue a request for updates.
1338  if (s)
1339  {
1340  Peer *p = createPeer(s);
1341  ap->peer = p;
1342 
1343  InetAddress peeraddr = ((InetSocket *) s)->peername();
1344  InetAddress myaddr = ((InetSocket *) s)->sockname();
1345  p->peeraddr = StringFormat("%1:%2")
1346  .arg(peeraddr.hostname())
1347  .arg(peeraddr.port());
1348  p->mask = IORead|IOWrite|IOUrgent;
1349  p->update = ap->update;
1350  p->automatic = ap;
1351  p->socket = s;
1352  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1353  if (ap == &upstream_)
1354  {
1355  uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
1356  2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
1357  p->sendq = new Bucket;
1358  p->sendq->next = 0;
1359  copydata(p->sendq, words, sizeof(words));
1360  }
1361 
1362  // Report the new connection.
1363  if (debug_)
1364  logme()
1365  << "INFO: now connected to " << p->peeraddr << " from "
1366  << myaddr.hostname() << ":" << myaddr.port() << std::endl;
1367  }
1368  }
1369  }
1370 
1371  // Pump events for a while.
1372  sel_.dispatch(delay_);
1373  now = Time::current();
1374  lock();
1375 
1376  // Check if flush is required. Flush only if one is needed.
1377  // Always sends the full object list, but only rarely.
1378  if (flush_ && now > nextFlush)
1379  {
1380  flush_ = false;
1381  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1382  sendObjectListToPeers(true);
1383  }
1384 
1385  // Update the data server and peer selection masks. If we
1386  // have no more data to send and listening for writes, remove
1387  // the write mask. If we have something to write and aren't
1388  // listening for writes, start listening so we can send off
1389  // the data.
1390  updatePeerMasks();
1391 
1392  // Release peers that have been waiting for data for too long.
1393  Time waitold = now - waitMax_;
1394  Time waitstale = now - waitStale_;
1395  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
1396  {
1397  Object *o = findObject(0, i->name);
1398 
1399  // If we have (stale) object data, wait only up to stale limit.
1400  // Otherwise if we have no data at all, wait up to the max limit.
1401  if (i->time < waitold)
1402  {
1403  logme ()
1404  << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
1405  << "s to retrieval, releasing '" << i->name << "' from wait, have "
1406  << (o ? o->rawdata.size() : 0) << " data available\n";
1407  releaseFromWait(i++, o);
1408  }
1409  else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
1410  {
1411  logme ()
1412  << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
1413  << "s to update, releasing '" << i->name << "' from wait, have "
1414  << o->rawdata.size() << " data available\n";
1415  releaseFromWait(i++, o);
1416  }
1417 
1418  // Keep it for now.
1419  else
1420  ++i;
1421  }
1422 
1423  unlock();
1424  }
1425 }
1426 
1427 // Tell the network cache that there have been local changes that
1428 // should be advertised to the downstream listeners.
1429 void
1431 {
1432  char byte = 0;
1433  wakeup_.sink()->write(&byte, 1);
1434 }
1435 
1439 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */)
1440  : DQMImplNet<DQMNet::Object>(appname)
1441 {
1442  local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
1443 }
1444 
1446 void
1448 {
1449  local_->objs.resize(size);
1450 }
1451 
1454 void
1456 {
1457  o.dirname = &*local_->dirs.insert(*o.dirname).first;
1458  std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1459  if (! info.second)
1460  {
1461  // Somewhat hackish. Sets are supposedly immutable, but we
1462  // need to change the non-key parts of the object. Erasing
1463  // and re-inserting would produce too much memory churn.
1464  Object &old = const_cast<Object &>(*info.first);
1465  std::swap(old.flags, o.flags);
1466  std::swap(old.tag, o.tag);
1467  std::swap(old.version, o.version);
1468  std::swap(old.qreports, o.qreports);
1469  std::swap(old.rawdata, o.rawdata);
1470  std::swap(old.scalar, o.scalar);
1471  std::swap(old.qdata, o.qdata);
1472  }
1473 }
1474 
1478 bool
1479 DQMBasicNet::removeLocalExcept(const std::set<std::string> &known)
1480 {
1481  size_t removed = 0;
1482  std::string path;
1483  ObjectMap::iterator i, e;
1484  for (i = local_->objs.begin(), e = local_->objs.end(); i != e; )
1485  {
1486  path.clear();
1487  path.reserve(i->dirname->size() + i->objname.size() + 2);
1488  path += *i->dirname;
1489  if (! path.empty())
1490  path += '/';
1491  path += i->objname;
1492 
1493  if (! known.count(path))
1494  ++removed, local_->objs.erase(i++);
1495  else
1496  ++i;
1497  }
1498 
1499  return removed > 0;
1500 }
void run(void)
Definition: DQMNet.cc:1290
type
Definition: HCALResponse.h:21
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
Definition: DQMNet.cc:199
AutoPeer downstream_
Definition: DQMNet.h:342
DQMNet(const std::string &appname="")
Definition: DQMNet.cc:1064
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:67
int i
Definition: DBlmapReader.cc:9
DataBlob incoming
Definition: DQMNet.h:138
virtual bool shouldStop(void)
Definition: DQMNet.cc:384
void shutdown(void)
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1234
void sendLocalChanges(void)
Definition: DQMNet.cc:1430
static const TGPicture * info(bool iBackgroundIsBlack)
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1003
QReports qreports
Definition: DQMNet.h:107
pthread_t communicate_
Definition: DQMNet.h:345
std::string algorithm
Definition: DQMNet.h:93
const double w
Definition: UKUtility.cc:23
uint64_t version
Definition: DQMNet.h:100
virtual void sendObjectListToPeers(bool all)=0
bool source
Definition: DQMNet.h:143
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
TObject * extractNextObject(TBufferFile &buf)
Definition: fastHadd.cc:181
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
Definition: DQMNet.h:22
int delay_
Definition: DQMNet.h:348
pthread_mutex_t lock_
Definition: DQMNet.h:318
void delay(int delay)
Definition: DQMNet.cc:1106
virtual Peer * createPeer(lat::Socket *s)=0
uint32_t flags
Definition: DQMNet.h:98
assert(m_qm.get())
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::string peeraddr
Definition: DQMNet.h:136
static const int WARNING
lat::Socket * socket
Definition: DQMNet.h:137
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void staleObjectWaitLimit(lat::TimeSpan time)
Definition: DQMNet.cc:1116
bool ev
uint32_t tag
Definition: DQMNet.h:99
const std::string * dirname
Definition: DQMNet.h:105
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1257
AutoPeer * automatic
Definition: DQMNet.h:148
AutoPeer upstream_
Definition: DQMNet.h:341
static void * communicate(void *obj)
Definition: DQMNet.cc:1246
A arg
Definition: Factorize.h:36
std::string qdata
Definition: DQMNet.h:116
int port
Definition: query.py:115
void debug(bool doit)
Definition: DQMNet.cc:1098
void Fill(long long x)
virtual ~DQMNet(void)
Definition: DQMNet.cc:1090
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
uint64_t lastreq
Definition: DQMNet.h:113
std::string name
Definition: DQMNet.h:128
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:463
DataBlob data
Definition: DQMNet.h:122
Peer * peer
Definition: DQMNet.h:153
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:763
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:392
sig_atomic_t shutdown_
Definition: DQMNet.h:346
std::string objname
Definition: DQMNet.h:106
bool update
Definition: DQMNet.h:144
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1265
std::string qtname
Definition: DQMNet.h:92
std::string scalar
Definition: DQMNet.h:115
DQMNet::CoreObject data_
lat::Time next
Definition: DQMNet.h:154
string host
Definition: query.py:114
unsigned mask
Definition: DQMNet.h:142
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:413
void start(void)
Definition: DQMNet.cc:1275
Definition: IOTypes.h:26
unsigned long long uint64_t
Definition: Time.h:15
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1479
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:946
size_t updates
Definition: DQMNet.h:146
std::string host
Definition: DQMNet.h:155
double b
Definition: hdecay.h:120
void startLocalServer(int port)
Definition: DQMNet.cc:1125
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>")
lat::Pipe wakeup_
Definition: DQMNet.h:338
bool debug_
Definition: DQMNet.h:317
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t waiting
Definition: DQMNet.h:147
if(dp >Float(M_PI)) dp-
std::string message
Definition: DQMNet.h:91
size_t sendpos
Definition: DQMNet.h:140
void listenToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1217
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const int STATUS_OK
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:68
tuple cout
Definition: gather_cfg.py:145
void updateMask(Peer *p)
Definition: DQMNet.cc:1034
bool flush_
Definition: DQMNet.h:351
lat::Socket * server_
Definition: DQMNet.h:337
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
volatile std::atomic< bool > shutdown_flag false
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1455
lat::TimeSpan waitMax_
Definition: DQMNet.h:350
Bucket * next
Definition: DQMNet.h:121
std::vector< QValue > QReports
Definition: DQMNet.h:83
Bucket * sendq
Definition: DQMNet.h:139
lat::TimeSpan waitStale_
Definition: DQMNet.h:349
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
Definition: DQMNet.cc:1447
float qtresult
Definition: DQMNet.h:90
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1198
WaitList waiting_
Definition: DQMNet.h:343
tuple size
Write out results.
DQMBasicNet(const std::string &appname="")
Definition: DQMNet.cc:1439
virtual Peer * createPeer(lat::Socket *s)
Definition: DQMNet.h:490
DataBlob rawdata
Definition: DQMNet.h:114
static const int ERROR
ImplPeer * local_
Definition: DQMNet.h:620
lat::IOSelector sel_
Definition: DQMNet.h:336
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11