00001 #include "DQMServices/Core/interface/DQMNet.h"
00002 #include "DQMServices/Core/interface/DQMDefinitions.h"
00003 #include "DQMServices/Core/src/DQMError.h"
00004 #include "classlib/iobase/InetServerSocket.h"
00005 #include "classlib/iobase/LocalServerSocket.h"
00006 #include "classlib/iobase/Filename.h"
00007 #include "classlib/sysapi/InetSocket.h"
00008 #include "classlib/utils/TimeInfo.h"
00009 #include "classlib/utils/StringList.h"
00010 #include "classlib/utils/StringFormat.h"
00011 #include "classlib/utils/StringOps.h"
00012 #include "classlib/utils/SystemError.h"
00013 #include "classlib/utils/Regexp.h"
00014 #include <unistd.h>
00015 #include <fcntl.h>
00016 #include <sys/wait.h>
00017 #include <stdio.h>
00018 #include <stdint.h>
00019 #include <iostream>
00020 #include <sstream>
00021 #include <cassert>
00022 #include <cfloat>
00023 #include <inttypes.h>
00024
00025 #if __APPLE__
00026 # define MESSAGE_SIZE_LIMIT (1*1024*1024)
00027 # define SOCKET_BUF_SIZE (1*1024*1024)
00028 #else
00029 # define MESSAGE_SIZE_LIMIT (8*1024*1024)
00030 # define SOCKET_BUF_SIZE (8*1024*1024)
00031 #endif
00032 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
00033 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
00034
00035 using namespace lat;
00036
00037 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
00038
00040
00041 std::ostream &
00042 DQMNet::logme (void)
00043 {
00044 Time now = Time::current();
00045 return std::cout
00046 << now.format(true, "%Y-%m-%d %H:%M:%S.")
00047 << now.nanoformat(3, 3)
00048 << " " << appname_ << "[" << pid_ << "]: ";
00049 }
00050
00051
00052 void
00053 DQMNet::copydata(Bucket *b, const void *data, size_t len)
00054 {
00055 b->data.insert(b->data.end(),
00056 (const unsigned char *)data,
00057 (const unsigned char *)data + len);
00058 }
00059
00060
00061 void
00062 DQMNet::discard (Bucket *&b)
00063 {
00064 while (b)
00065 {
00066 Bucket *next = b->next;
00067 delete b;
00068 b = next;
00069 }
00070 }
00071
00073
00076 void
00077 DQMNet::losePeer(const char *reason,
00078 Peer *peer,
00079 IOSelectEvent *ev,
00080 Error *err)
00081 {
00082 if (reason)
00083 logme ()
00084 << reason << peer->peeraddr
00085 << (err ? "; error was: " + err->explain() : std::string(""))
00086 << std::endl;
00087
00088 Socket *s = peer->socket;
00089
00090 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00091 if (i->peer == peer)
00092 waiting_.erase(i++);
00093 else
00094 ++i;
00095
00096 if (ev)
00097 ev->source = 0;
00098
00099 discard(peer->sendq);
00100 if (peer->automatic)
00101 peer->automatic->peer = 0;
00102
00103 sel_.detach (s);
00104 s->close();
00105 removePeer(peer, s);
00106 delete s;
00107 }
00108
00110 void
00111 DQMNet::requestObjectData(Peer *p, const char *name, size_t len)
00112 {
00113
00114 Bucket **msg = &p->sendq;
00115 while (*msg)
00116 msg = &(*msg)->next;
00117 *msg = new Bucket;
00118 (*msg)->next = 0;
00119
00120 uint32_t words[3];
00121 words[0] = sizeof(words) + len;
00122 words[1] = DQM_MSG_GET_OBJECT;
00123 words[2] = len;
00124 copydata(*msg, words, sizeof(words));
00125 copydata(*msg, name, len);
00126 }
00127
00130 void
00131 DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00132 {
00133
00134
00135
00136
00137
00138 requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
00139 WaitObject wo = { Time::current(), name, info, p };
00140 waiting_.push_back(wo);
00141 p->waiting++;
00142 }
00143
00144
00145
00146 void
00147 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
00148 {
00149 Bucket **msg = &i->peer->sendq;
00150 while (*msg)
00151 msg = &(*msg)->next;
00152 *msg = new Bucket;
00153 (*msg)->next = 0;
00154
00155 releaseFromWait(*msg, *i, o);
00156
00157 assert(i->peer->waiting > 0);
00158 i->peer->waiting--;
00159 waiting_.erase(i);
00160 }
00161
00162
00163 void
00164 DQMNet::releaseWaiters(const std::string &name, Object *o)
00165 {
00166 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00167 if (i->name == name)
00168 releaseFromWait(i++, o);
00169 else
00170 ++i;
00171 }
00172
00176 void
00177 DQMNet::packQualityData(std::string &into, const QReports &qr)
00178 {
00179 char buf[64];
00180 std::ostringstream qrs;
00181 QReports::const_iterator qi, qe;
00182 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
00183 {
00184 int pos = 0;
00185 sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
00186 qrs << buf << '\0'
00187 << buf+pos << '\0'
00188 << qi->qtname << '\0'
00189 << qi->algorithm << '\0'
00190 << qi->message << '\0'
00191 << '\0';
00192 }
00193 into = qrs.str();
00194 }
00195
00198 void
00199 DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
00200 {
00201 const char *qdata = from;
00202
00203
00204 size_t nqv = 0;
00205 while (*qdata)
00206 {
00207 ++nqv;
00208 while (*qdata) ++qdata; ++qdata;
00209 while (*qdata) ++qdata; ++qdata;
00210 while (*qdata) ++qdata; ++qdata;
00211 while (*qdata) ++qdata; ++qdata;
00212 while (*qdata) ++qdata; ++qdata;
00213 }
00214
00215
00216 qdata = from;
00217 qr.reserve(nqv);
00218 while (*qdata)
00219 {
00220 qr.push_back(DQMNet::QValue());
00221 DQMNet::QValue &qv = qr.back();
00222
00223 qv.code = atoi(qdata);
00224 while (*qdata) ++qdata;
00225 switch (qv.code)
00226 {
00227 case dqm::qstatus::STATUS_OK:
00228 break;
00229 case dqm::qstatus::WARNING:
00230 flags |= DQMNet::DQM_PROP_REPORT_WARN;
00231 break;
00232 case dqm::qstatus::ERROR:
00233 flags |= DQMNet::DQM_PROP_REPORT_ERROR;
00234 break;
00235 default:
00236 flags |= DQMNet::DQM_PROP_REPORT_OTHER;
00237 break;
00238 }
00239
00240 qv.qtresult = atof(++qdata);
00241 while (*qdata) ++qdata;
00242
00243 qv.qtname = ++qdata;
00244 while (*qdata) ++qdata;
00245
00246 qv.algorithm = ++qdata;
00247 while (*qdata) ++qdata;
00248
00249 qv.message = ++qdata;
00250 while (*qdata) ++qdata;
00251 ++qdata;
00252 }
00253 }
00254
00255 #if 0
00256
00257 static TObject *
00258 extractNextObject(TBufferFile &buf)
00259 {
00260 if (buf.Length() == buf.BufferSize())
00261 return 0;
00262
00263 buf.InitMap();
00264 Int_t pos = buf.Length();
00265 TClass *c = buf.ReadClass();
00266 buf.SetBufferOffset(pos);
00267 buf.ResetMap();
00268 return c ? buf.ReadObject(c) : 0;
00269 }
00270
00271
00272 bool
00273 DQMNet::reconstructObject(Object &o)
00274 {
00275 TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00276 buf.Reset();
00277
00278
00279 if (! (o.object = extractNextObject(buf)))
00280 return false;
00281
00282
00283 o.reference = extractNextObject(buf);
00284
00285
00286 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
00287 return true;
00288 }
00289 #endif
00290
00291 #if 0
00292 bool
00293 DQMNet::reinstateObject(DQMStore *store, Object &o)
00294 {
00295 if (! reconstructObject (o))
00296 return false;
00297
00298
00299 MonitorElement *obj = 0;
00300 store->setCurrentFolder(*o.dirname);
00301 switch (o.flags & DQM_PROP_TYPE_MASK)
00302 {
00303 case DQM_PROP_TYPE_INT:
00304 obj = store->bookInt(o.objname);
00305 obj->Fill(atoll(o.scalar.c_str()));
00306 break;
00307
00308 case DQM_PROP_TYPE_REAL:
00309 obj = store->bookFloat(name);
00310 obj->Fill(atof(o.scalar.c_str()));
00311 break;
00312
00313 case DQM_PROP_TYPE_STRING:
00314 obj = store->bookString(name, o.scalar);
00315 break;
00316
00317 case DQM_PROP_TYPE_TH1F:
00318 obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
00319 break;
00320
00321 case DQM_PROP_TYPE_TH1S:
00322 obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
00323 break;
00324
00325 case DQM_PROP_TYPE_TH1D:
00326 obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
00327 break;
00328
00329 case DQM_PROP_TYPE_TH2F:
00330 obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
00331 break;
00332
00333 case DQM_PROP_TYPE_TH2S:
00334 obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
00335 break;
00336
00337 case DQM_PROP_TYPE_TH2D:
00338 obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
00339 break;
00340
00341 case DQM_PROP_TYPE_TH3F:
00342 obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
00343 break;
00344
00345 case DQM_PROP_TYPE_TH3S:
00346 obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
00347 break;
00348
00349 case DQM_PROP_TYPE_TH3D:
00350 obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
00351 break;
00352
00353 case DQM_PROP_TYPE_PROF:
00354 obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
00355 break;
00356
00357 case DQM_PROP_TYPE_PROF2D:
00358 obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
00359 break;
00360
00361 default:
00362 logme()
00363 << "ERROR: unexpected monitor element of type "
00364 << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
00365 << *o.dirname << '/' << o.objname << "'\n";
00366 return false;
00367 }
00368
00369
00370 if (obj)
00371 {
00372 obj->data_.tag = o.tag;
00373 obj->data_.qreports = o.qreports;
00374 }
00375
00376
00377 return true;
00378 }
00379 #endif
00380
00382
00383 bool
00384 DQMNet::shouldStop(void)
00385 {
00386 return shutdown_;
00387 }
00388
00389
00390
00391 void
00392 DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00393 {
00394 if (o)
00395 sendObjectToPeer(msg, *o, true);
00396 else
00397 {
00398 uint32_t words [3];
00399 words[0] = sizeof(words) + w.name.size();
00400 words[1] = DQM_REPLY_NONE;
00401 words[2] = w.name.size();
00402
00403 msg->data.reserve(msg->data.size() + words[0]);
00404 copydata(msg, &words[0], sizeof(words));
00405 copydata(msg, &w.name[0], w.name.size());
00406 }
00407 }
00408
00409
00410
00411
00412 void
00413 DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00414 {
00415 uint32_t flags = o.flags & ~DQM_PROP_DEAD;
00416 DataBlob objdata;
00417
00418 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00419 objdata.insert(objdata.end(),
00420 &o.scalar[0],
00421 &o.scalar[0] + o.scalar.size());
00422 else if (data)
00423 objdata.insert(objdata.end(),
00424 &o.rawdata[0],
00425 &o.rawdata[0] + o.rawdata.size());
00426
00427 uint32_t words [9];
00428 uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
00429 uint32_t datalen = objdata.size();
00430 uint32_t qlen = o.qdata.size();
00431
00432 if (o.dirname->empty())
00433 --namelen;
00434
00435 words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
00436 words[1] = DQM_REPLY_OBJECT;
00437 words[2] = flags;
00438 words[3] = (o.version >> 0 ) & 0xffffffff;
00439 words[4] = (o.version >> 32) & 0xffffffff;
00440 words[5] = o.tag;
00441 words[6] = namelen;
00442 words[7] = datalen;
00443 words[8] = qlen;
00444
00445 msg->data.reserve(msg->data.size() + words[0]);
00446 copydata(msg, &words[0], 9*sizeof(uint32_t));
00447 if (namelen)
00448 {
00449 copydata(msg, &(*o.dirname)[0], o.dirname->size());
00450 if (! o.dirname->empty())
00451 copydata(msg, "/", 1);
00452 copydata(msg, &o.objname[0], o.objname.size());
00453 }
00454 if (datalen)
00455 copydata(msg, &objdata[0], datalen);
00456 if (qlen)
00457 copydata(msg, &o.qdata[0], qlen);
00458 }
00459
00461
00462 bool
00463 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00464 {
00465
00466 uint32_t type;
00467 memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00468 switch (type)
00469 {
00470 case DQM_MSG_UPDATE_ME:
00471 {
00472 if (len != 2*sizeof(uint32_t))
00473 {
00474 logme()
00475 << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00476 << " from peer " << p->peeraddr << std::endl;
00477 return false;
00478 }
00479
00480 if (debug_)
00481 logme()
00482 << "DEBUG: received message 'UPDATE ME' from peer "
00483 << p->peeraddr << ", size " << len << std::endl;
00484
00485 p->update = true;
00486 }
00487 return true;
00488
00489 case DQM_MSG_LIST_OBJECTS:
00490 {
00491 if (debug_)
00492 logme()
00493 << "DEBUG: received message 'LIST OBJECTS' from peer "
00494 << p->peeraddr << ", size " << len << std::endl;
00495
00496
00497 sendObjectListToPeer(msg, true, false);
00498 }
00499 return true;
00500
00501 case DQM_MSG_GET_OBJECT:
00502 {
00503 if (debug_)
00504 logme()
00505 << "DEBUG: received message 'GET OBJECT' from peer "
00506 << p->peeraddr << ", size " << len << std::endl;
00507
00508 if (len < 3*sizeof(uint32_t))
00509 {
00510 logme()
00511 << "ERROR: corrupt 'GET IMAGE' message of length " << len
00512 << " from peer " << p->peeraddr << std::endl;
00513 return false;
00514 }
00515
00516 uint32_t namelen;
00517 memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00518 if (len != 3*sizeof(uint32_t) + namelen)
00519 {
00520 logme()
00521 << "ERROR: corrupt 'GET OBJECT' message of length " << len
00522 << " from peer " << p->peeraddr
00523 << ", expected length " << (3*sizeof(uint32_t))
00524 << " + " << namelen << std::endl;
00525 return false;
00526 }
00527
00528 std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
00529 Peer *owner = 0;
00530 Object *o = findObject(0, name, &owner);
00531 if (o)
00532 {
00533 o->lastreq = Time::current().ns();
00534 if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
00535 && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00536 waitForData(p, name, "", owner);
00537 else
00538 sendObjectToPeer(msg, *o, true);
00539 }
00540 else
00541 {
00542 uint32_t words [3];
00543 words[0] = sizeof(words) + name.size();
00544 words[1] = DQM_REPLY_NONE;
00545 words[2] = name.size();
00546
00547 msg->data.reserve(msg->data.size() + words[0]);
00548 copydata(msg, &words[0], sizeof(words));
00549 copydata(msg, &name[0], name.size());
00550 }
00551 }
00552 return true;
00553
00554 case DQM_REPLY_LIST_BEGIN:
00555 {
00556 if (len != 4*sizeof(uint32_t))
00557 {
00558 logme()
00559 << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00560 << " from peer " << p->peeraddr << std::endl;
00561 return false;
00562 }
00563
00564
00565 uint32_t flags;
00566 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00567
00568 if (debug_)
00569 logme()
00570 << "DEBUG: received message 'LIST BEGIN "
00571 << (flags ? "FULL" : "INCREMENTAL")
00572 << "' from " << p->peeraddr
00573 << ", size " << len << std::endl;
00574
00575
00576
00577
00578
00579
00580
00581 if (flags)
00582 markObjectsDead(p);
00583 }
00584 return true;
00585
00586 case DQM_REPLY_LIST_END:
00587 {
00588 if (len != 4*sizeof(uint32_t))
00589 {
00590 logme()
00591 << "ERROR: corrupt 'LIST END' message of length " << len
00592 << " from peer " << p->peeraddr << std::endl;
00593 return false;
00594 }
00595
00596
00597 uint32_t flags;
00598 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00599
00600
00601
00602
00603
00604 if (flags)
00605 purgeDeadObjects(p);
00606
00607 if (debug_)
00608 logme()
00609 << "DEBUG: received message 'LIST END "
00610 << (flags ? "FULL" : "INCREMENTAL")
00611 << "' from " << p->peeraddr
00612 << ", size " << len << std::endl;
00613
00614
00615
00616 flush_ = true;
00617 p->updates++;
00618 }
00619 return true;
00620
00621 case DQM_REPLY_OBJECT:
00622 {
00623 uint32_t words[9];
00624 if (len < sizeof(words))
00625 {
00626 logme()
00627 << "ERROR: corrupt 'OBJECT' message of length " << len
00628 << " from peer " << p->peeraddr << std::endl;
00629 return false;
00630 }
00631
00632 memcpy (&words[0], data, sizeof(words));
00633 uint32_t &namelen = words[6];
00634 uint32_t &datalen = words[7];
00635 uint32_t &qlen = words[8];
00636
00637 if (len != sizeof(words) + namelen + datalen + qlen)
00638 {
00639 logme()
00640 << "ERROR: corrupt 'OBJECT' message of length " << len
00641 << " from peer " << p->peeraddr
00642 << ", expected length " << sizeof(words)
00643 << " + " << namelen
00644 << " + " << datalen
00645 << " + " << qlen
00646 << std::endl;
00647 return false;
00648 }
00649
00650 unsigned char *namedata = data + sizeof(words);
00651 unsigned char *objdata = namedata + namelen;
00652 unsigned char *qdata = objdata + datalen;
00653 unsigned char *enddata = qdata + qlen;
00654 std::string name ((char *) namedata, namelen);
00655 assert (enddata == data + len);
00656
00657 if (debug_)
00658 logme()
00659 << "DEBUG: received message 'OBJECT " << name
00660 << "' from " << p->peeraddr
00661 << ", size " << len << std::endl;
00662
00663
00664 p->source = true;
00665
00666
00667 Object *o = findObject(p, name);
00668 if (! o)
00669 o = makeObject(p, name);
00670
00671 o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
00672 o->tag = words[5];
00673 o->version = ((uint64_t) words[4] << 32 | words[3]);
00674 o->scalar.clear();
00675 o->qdata.clear();
00676 if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00677 {
00678 o->rawdata.clear();
00679 o->scalar.insert(o->scalar.end(), objdata, qdata);
00680 }
00681 else if (datalen)
00682 {
00683 o->rawdata.clear();
00684 o->rawdata.insert(o->rawdata.end(), objdata, qdata);
00685 }
00686 else if (! o->rawdata.empty())
00687 o->flags |= DQM_PROP_STALE;
00688 o->qdata.insert(o->qdata.end(), qdata, enddata);
00689
00690
00691
00692 if (o->lastreq
00693 && ! datalen
00694 && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00695 requestObjectData(p, (namelen ? &name[0] : 0), namelen);
00696
00697
00698 if (datalen)
00699 releaseWaiters(name, o);
00700 }
00701 return true;
00702
00703 case DQM_REPLY_NONE:
00704 {
00705 uint32_t words[3];
00706 if (len < sizeof(words))
00707 {
00708 logme()
00709 << "ERROR: corrupt 'NONE' message of length " << len
00710 << " from peer " << p->peeraddr << std::endl;
00711 return false;
00712 }
00713
00714 memcpy (&words[0], data, sizeof(words));
00715 uint32_t &namelen = words[2];
00716
00717 if (len != sizeof(words) + namelen)
00718 {
00719 logme()
00720 << "ERROR: corrupt 'NONE' message of length " << len
00721 << " from peer " << p->peeraddr
00722 << ", expected length " << sizeof(words)
00723 << " + " << namelen << std::endl;
00724 return false;
00725 }
00726
00727 unsigned char *namedata = data + sizeof(words);
00728 std::string name((char *) namedata, namelen);
00729
00730 if (debug_)
00731 logme()
00732 << "DEBUG: received message 'NONE " << name
00733 << "' from " << p->peeraddr
00734 << ", size " << len << std::endl;
00735
00736
00737 p->source = true;
00738
00739
00740 if (Object *o = findObject(p, name))
00741 {
00742 o->flags |= DQM_PROP_DEAD;
00743 purgeDeadObjects(p);
00744 }
00745
00746
00747 releaseWaiters(name, 0);
00748 }
00749 return true;
00750
00751 default:
00752 logme()
00753 << "ERROR: unrecognised message of length " << len
00754 << " and type " << type << " from peer " << p->peeraddr
00755 << std::endl;
00756 return false;
00757 }
00758 }
00759
00762 bool
00763 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
00764 {
00765 lock();
00766 assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00767
00768
00769
00770
00771
00772 if (ev->events & IOUrgent)
00773 {
00774 if (p->automatic)
00775 {
00776 logme()
00777 << "WARNING: connection to the DQM server at " << p->peeraddr
00778 << " lost (will attempt to reconnect in 15 seconds)\n";
00779 losePeer(0, p, ev);
00780 }
00781 else
00782 losePeer("WARNING: lost peer connection ", p, ev);
00783
00784 unlock();
00785 return true;
00786 }
00787
00788
00789 if (ev->events & IOWrite)
00790 {
00791 while (Bucket *b = p->sendq)
00792 {
00793 IOSize len = b->data.size() - p->sendpos;
00794 const void *data = (len ? (const void *)&b->data[p->sendpos]
00795 : (const void *)&data);
00796 IOSize done;
00797
00798 try
00799 {
00800 done = (len ? ev->source->write (data, len) : 0);
00801 if (debug_ && len)
00802 logme()
00803 << "DEBUG: sent " << done << " bytes to peer "
00804 << p->peeraddr << std::endl;
00805 }
00806 catch (Error &e)
00807 {
00808 losePeer("WARNING: unable to write to peer ", p, ev, &e);
00809 unlock();
00810 return true;
00811 }
00812
00813 p->sendpos += done;
00814 if (p->sendpos == b->data.size())
00815 {
00816 Bucket *old = p->sendq;
00817 p->sendq = old->next;
00818 p->sendpos = 0;
00819 old->next = 0;
00820 discard(old);
00821 }
00822
00823 if (! done && len)
00824
00825 break;
00826 }
00827 }
00828
00829
00830
00831 if (ev->events & IORead)
00832 {
00833
00834
00835
00836 IOSize sz;
00837 try
00838 {
00839 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00840 do
00841 if ((sz = ev->source->read(&buf[0], buf.size())))
00842 {
00843 if (debug_)
00844 logme()
00845 << "DEBUG: received " << sz << " bytes from peer "
00846 << p->peeraddr << std::endl;
00847 DataBlob &data = p->incoming;
00848 if (data.capacity () < data.size () + sz)
00849 data.reserve (data.size() + SOCKET_READ_GROWTH);
00850 data.insert (data.end(), &buf[0], &buf[0] + sz);
00851 }
00852 while (sz == sizeof (buf));
00853 }
00854 catch (Error &e)
00855 {
00856 SystemError *next = dynamic_cast<SystemError *>(e.next());
00857 if (next && next->portable() == SysErr::ErrTryAgain)
00858 sz = 1;
00859 else
00860 {
00861
00862 losePeer("WARNING: failed to read from peer ", p, ev, &e);
00863 unlock();
00864 return true;
00865 }
00866 }
00867
00868
00869 size_t consumed = 0;
00870 DataBlob &data = p->incoming;
00871 while (data.size()-consumed >= sizeof(uint32_t)
00872 && p->waiting < MAX_PEER_WAITREQS)
00873 {
00874 uint32_t msglen;
00875 memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00876
00877 if (msglen >= MESSAGE_SIZE_LIMIT)
00878 {
00879 losePeer("WARNING: excessively large message from ", p, ev);
00880 unlock();
00881 return true;
00882 }
00883
00884 if (data.size()-consumed >= msglen)
00885 {
00886 bool valid = true;
00887 if (msglen < 2*sizeof(uint32_t))
00888 {
00889 logme()
00890 << "ERROR: corrupt peer message of length " << msglen
00891 << " from peer " << p->peeraddr << std::endl;
00892 valid = false;
00893 }
00894 else
00895 {
00896
00897 Bucket msg;
00898 msg.next = 0;
00899 valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00900
00901
00902 if (! msg.data.empty())
00903 {
00904 Bucket **prev = &p->sendq;
00905 while (*prev)
00906 prev = &(*prev)->next;
00907
00908 *prev = new Bucket;
00909 (*prev)->next = 0;
00910 (*prev)->data.swap(msg.data);
00911 }
00912 }
00913
00914 if (! valid)
00915 {
00916 losePeer("WARNING: data stream error with ", p, ev);
00917 unlock();
00918 return true;
00919 }
00920
00921 consumed += msglen;
00922 }
00923 else
00924 break;
00925 }
00926
00927 data.erase(data.begin(), data.begin()+consumed);
00928
00929
00930
00931
00932 if (sz == 0)
00933 sel_.setMask(p->socket, p->mask &= ~IORead);
00934 }
00935
00936
00937 unlock();
00938 return false;
00939 }
00940
00945 bool
00946 DQMNet::onPeerConnect(IOSelectEvent *ev)
00947 {
00948
00949 assert (ev->source == server_);
00950
00951
00952 Socket *s = server_->accept();
00953 assert (s);
00954 assert (! s->isBlocking());
00955
00956
00957 lock();
00958 Peer *p = createPeer(s);
00959 std::string localaddr;
00960 if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
00961 {
00962 InetAddress peeraddr = inet->peername();
00963 InetAddress myaddr = inet->sockname();
00964 p->peeraddr = StringFormat("%1:%2")
00965 .arg(peeraddr.hostname())
00966 .arg(peeraddr.port());
00967 localaddr = StringFormat("%1:%2")
00968 .arg(myaddr.hostname())
00969 .arg(myaddr.port());
00970 }
00971 else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
00972 {
00973 p->peeraddr = local->peername().path();
00974 localaddr = local->sockname().path();
00975 }
00976 else
00977 assert(false);
00978
00979 p->mask = IORead|IOUrgent;
00980 p->socket = s;
00981
00982
00983 if (debug_)
00984 logme()
00985 << "INFO: new peer " << p->peeraddr << " is now connected to "
00986 << localaddr << std::endl;
00987
00988
00989 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
00990 unlock();
00991
00992
00993 return false;
00994 }
00995
01002 bool
01003 DQMNet::onLocalNotify(IOSelectEvent *ev)
01004 {
01005
01006 try
01007 {
01008 IOSize sz;
01009 unsigned char buf [1024];
01010 while ((sz = ev->source->read(buf, sizeof(buf))))
01011 ;
01012 }
01013 catch (Error &e)
01014 {
01015 SystemError *next = dynamic_cast<SystemError *>(e.next());
01016 if (next && next->portable() == SysErr::ErrTryAgain)
01017 ;
01018 else
01019 logme()
01020 << "WARNING: error reading from notification pipe: "
01021 << e.explain() << std::endl;
01022 }
01023
01024
01025 flush_ = true;
01026
01027
01028 return false;
01029 }
01030
01033 void
01034 DQMNet::updateMask(Peer *p)
01035 {
01036 if (! p->socket)
01037 return;
01038
01039
01040 unsigned oldmask = p->mask;
01041 if (! p->sendq && (p->mask & IOWrite))
01042 sel_.setMask(p->socket, p->mask &= ~IOWrite);
01043
01044 if (p->sendq && ! (p->mask & IOWrite))
01045 sel_.setMask(p->socket, p->mask |= IOWrite);
01046
01047 if (debug_ && oldmask != p->mask)
01048 logme()
01049 << "DEBUG: updating mask for " << p->peeraddr << " to "
01050 << p->mask << " from " << oldmask << std::endl;
01051
01052
01053
01054 if (p->mask == IOUrgent && ! p->waiting)
01055 {
01056 assert (! p->sendq);
01057 if (debug_)
01058 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
01059 losePeer(0, p, 0);
01060 }
01061 }
01062
01064 DQMNet::DQMNet (const std::string &appname )
01065 : debug_ (false),
01066 appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
01067 pid_ (getpid()),
01068 server_ (0),
01069 version_ (Time::current()),
01070 communicate_ ((pthread_t) -1),
01071 shutdown_ (0),
01072 delay_ (1000),
01073 waitStale_ (0, 0, 0, 0, 500000000 ),
01074 waitMax_ (0, 0, 0, 5 , 0),
01075 flush_ (false)
01076 {
01077
01078
01079
01080 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
01081 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
01082
01083
01084 upstream_.peer = downstream_.peer = 0;
01085 upstream_.next = downstream_.next = 0;
01086 upstream_.port = downstream_.port = 0;
01087 upstream_.update = downstream_.update = false;
01088 }
01089
01090 DQMNet::~DQMNet(void)
01091 {
01092
01093 }
01094
01097 void
01098 DQMNet::debug(bool doit)
01099 {
01100 debug_ = doit;
01101 }
01102
01105 void
01106 DQMNet::delay(int delay)
01107 {
01108 delay_ = delay;
01109 }
01110
01115 void
01116 DQMNet::staleObjectWaitLimit(lat::TimeSpan time)
01117 {
01118 waitStale_ = time;
01119 }
01120
01124 void
01125 DQMNet::startLocalServer(int port)
01126 {
01127 if (server_)
01128 {
01129 logme() << "ERROR: DQM server was already started.\n";
01130 return;
01131 }
01132
01133 try
01134 {
01135 InetAddress addr("0.0.0.0", port);
01136 InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
01137 s->bind(addr);
01138 s->listen(10);
01139 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01140 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01141 s->setBlocking(false);
01142 sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01143 }
01144 catch (Error &e)
01145 {
01146
01147
01148 logme()
01149 << "ERROR: Failed to start server at port " << port << ": "
01150 << e.explain() << std::endl;
01151
01152 raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
01153 " %d: %s", port, e.explain().c_str());
01154 }
01155
01156 logme() << "INFO: DQM server started at port " << port << std::endl;
01157 }
01158
01162 void
01163 DQMNet::startLocalServer(const char *path)
01164 {
01165 if (server_)
01166 {
01167 logme() << "ERROR: DQM server was already started.\n";
01168 return;
01169 }
01170
01171 try
01172 {
01173 server_ = new LocalServerSocket(path, 10);
01174 server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01175 server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01176 server_->setBlocking(false);
01177 sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01178 }
01179 catch (Error &e)
01180 {
01181
01182
01183 logme()
01184 << "ERROR: Failed to start server at path " << path << ": "
01185 << e.explain() << std::endl;
01186
01187 raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
01188 " %s: %s", path, e.explain().c_str());
01189 }
01190
01191 logme() << "INFO: DQM server started at path " << path << std::endl;
01192 }
01193
01197 void
01198 DQMNet::updateToCollector(const std::string &host, int port)
01199 {
01200 if (! downstream_.host.empty())
01201 {
01202 logme()
01203 << "ERROR: Already updating another collector at "
01204 << downstream_.host << ":" << downstream_.port << std::endl;
01205 return;
01206 }
01207
01208 downstream_.update = true;
01209 downstream_.host = host;
01210 downstream_.port = port;
01211 }
01212
01216 void
01217 DQMNet::listenToCollector(const std::string &host, int port)
01218 {
01219 if (! upstream_.host.empty())
01220 {
01221 logme()
01222 << "ERROR: Already receiving data from another collector at "
01223 << upstream_.host << ":" << upstream_.port << std::endl;
01224 return;
01225 }
01226
01227 upstream_.update = false;
01228 upstream_.host = host;
01229 upstream_.port = port;
01230 }
01231
01233 void
01234 DQMNet::shutdown(void)
01235 {
01236 shutdown_ = 1;
01237 if (communicate_ != (pthread_t) -1)
01238 pthread_join(communicate_, 0);
01239 }
01240
01246 static void *communicate(void *obj)
01247 {
01248 sigset_t sigs;
01249 sigfillset (&sigs);
01250 pthread_sigmask (SIG_BLOCK, &sigs, 0);
01251 ((DQMNet *)obj)->run();
01252 return 0;
01253 }
01254
01256 void
01257 DQMNet::lock(void)
01258 {
01259 if (communicate_ != (pthread_t) -1)
01260 pthread_mutex_lock(&lock_);
01261 }
01262
01264 void
01265 DQMNet::unlock(void)
01266 {
01267 if (communicate_ != (pthread_t) -1)
01268 pthread_mutex_unlock(&lock_);
01269 }
01270
01274 void
01275 DQMNet::start(void)
01276 {
01277 if (communicate_ != (pthread_t) -1)
01278 {
01279 logme()
01280 << "ERROR: DQM networking thread has already been started\n";
01281 return;
01282 }
01283
01284 pthread_mutex_init(&lock_, 0);
01285 pthread_create (&communicate_, 0, &communicate, this);
01286 }
01287
01289 void
01290 DQMNet::run(void)
01291 {
01292 Time now;
01293 Time nextFlush = 0;
01294 AutoPeer *automatic[2] = { &upstream_, &downstream_ };
01295
01296
01297 while (! shouldStop())
01298 {
01299 for (int i = 0; i < 2; ++i)
01300 {
01301 AutoPeer *ap = automatic[i];
01302
01303
01304
01305
01306 if (! ap->host.empty()
01307 && ! ap->peer
01308 && (now = Time::current()) > ap->next)
01309 {
01310 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
01311 InetSocket *s = 0;
01312 try
01313 {
01314 InetAddress addr(ap->host.c_str(), ap->port);
01315 s = new InetSocket (SOCK_STREAM, 0, addr.family());
01316 s->setBlocking(false);
01317 s->connect(addr);
01318 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01319 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01320 }
01321 catch (Error &e)
01322 {
01323 SystemError *sys = dynamic_cast<SystemError *>(e.next());
01324 if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
01325 {
01326
01327
01328
01329 if (s)
01330 s->abort();
01331 delete s;
01332 s = 0;
01333 }
01334 }
01335
01336
01337
01338 if (s)
01339 {
01340 Peer *p = createPeer(s);
01341 ap->peer = p;
01342
01343 InetAddress peeraddr = ((InetSocket *) s)->peername();
01344 InetAddress myaddr = ((InetSocket *) s)->sockname();
01345 p->peeraddr = StringFormat("%1:%2")
01346 .arg(peeraddr.hostname())
01347 .arg(peeraddr.port());
01348 p->mask = IORead|IOWrite|IOUrgent;
01349 p->update = ap->update;
01350 p->automatic = ap;
01351 p->socket = s;
01352 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
01353 if (ap == &upstream_)
01354 {
01355 uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
01356 2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
01357 p->sendq = new Bucket;
01358 p->sendq->next = 0;
01359 copydata(p->sendq, words, sizeof(words));
01360 }
01361
01362
01363 if (debug_)
01364 logme()
01365 << "INFO: now connected to " << p->peeraddr << " from "
01366 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01367 }
01368 }
01369 }
01370
01371
01372 sel_.dispatch(delay_);
01373 now = Time::current();
01374 lock();
01375
01376
01377
01378 if (flush_ && now > nextFlush)
01379 {
01380 flush_ = false;
01381 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
01382 sendObjectListToPeers(true);
01383 }
01384
01385
01386
01387
01388
01389
01390 updatePeerMasks();
01391
01392
01393 Time waitold = now - waitMax_;
01394 Time waitstale = now - waitStale_;
01395 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01396 {
01397 Object *o = findObject(0, i->name);
01398
01399
01400
01401 if (i->time < waitold)
01402 {
01403 logme ()
01404 << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
01405 << "s to retrieval, releasing '" << i->name << "' from wait, have "
01406 << (o ? o->rawdata.size() : 0) << " data available\n";
01407 releaseFromWait(i++, o);
01408 }
01409 else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
01410 {
01411 logme ()
01412 << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
01413 << "s to update, releasing '" << i->name << "' from wait, have "
01414 << o->rawdata.size() << " data available\n";
01415 releaseFromWait(i++, o);
01416 }
01417
01418
01419 else
01420 ++i;
01421 }
01422
01423 unlock();
01424 }
01425 }
01426
01427
01428
01429 void
01430 DQMNet::sendLocalChanges(void)
01431 {
01432 char byte = 0;
01433 wakeup_.sink()->write(&byte, 1);
01434 }
01435
01439 DQMBasicNet::DQMBasicNet(const std::string &appname )
01440 : DQMImplNet<DQMNet::Object>(appname)
01441 {
01442 local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
01443 }
01444
01446 void
01447 DQMBasicNet::reserveLocalSpace(uint32_t size)
01448 {
01449 local_->objs.resize(size);
01450 }
01451
01454 void
01455 DQMBasicNet::updateLocalObject(Object &o)
01456 {
01457 o.dirname = &*local_->dirs.insert(*o.dirname).first;
01458 std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
01459 if (! info.second)
01460 {
01461
01462
01463
01464 Object &old = const_cast<Object &>(*info.first);
01465 std::swap(old.flags, o.flags);
01466 std::swap(old.tag, o.tag);
01467 std::swap(old.version, o.version);
01468 std::swap(old.qreports, o.qreports);
01469 std::swap(old.rawdata, o.rawdata);
01470 std::swap(old.scalar, o.scalar);
01471 std::swap(old.qdata, o.qdata);
01472 }
01473 }
01474
01478 bool
01479 DQMBasicNet::removeLocalExcept(const std::set<std::string> &known)
01480 {
01481 size_t removed = 0;
01482 std::string path;
01483 ObjectMap::iterator i, e;
01484 for (i = local_->objs.begin(), e = local_->objs.end(); i != e; )
01485 {
01486 path.clear();
01487 path.reserve(i->dirname->size() + i->objname.size() + 2);
01488 path += *i->dirname;
01489 if (! path.empty())
01490 path += '/';
01491 path += i->objname;
01492
01493 if (! known.count(path))
01494 ++removed, local_->objs.erase(i++);
01495 else
01496 ++i;
01497 }
01498
01499 return removed > 0;
01500 }