00001 #include "DQMServices/Core/interface/DQMNet.h"
00002 #include "DQMServices/Core/interface/DQMDefinitions.h"
00003 #include "DQMServices/Core/src/DQMError.h"
00004 #include "classlib/iobase/InetServerSocket.h"
00005 #include "classlib/iobase/LocalServerSocket.h"
00006 #include "classlib/iobase/Filename.h"
00007 #include "classlib/sysapi/InetSocket.h"
00008 #include "classlib/utils/TimeInfo.h"
00009 #include "classlib/utils/StringList.h"
00010 #include "classlib/utils/StringFormat.h"
00011 #include "classlib/utils/StringOps.h"
00012 #include "classlib/utils/SystemError.h"
00013 #include "classlib/utils/Regexp.h"
00014 #include <unistd.h>
00015 #include <fcntl.h>
00016 #include <sys/wait.h>
00017 #include <stdio.h>
00018 #include <stdint.h>
00019 #include <iostream>
00020 #include <sstream>
00021 #include <cassert>
00022 #include <cfloat>
00023 #include <inttypes.h>
00024
00025 #define MESSAGE_SIZE_LIMIT (8*1024*1024)
00026 #define SOCKET_BUF_SIZE (8*1024*1024)
00027 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
00028 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
00029
00030 using namespace lat;
00031
00032 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
00033
00035
00036 std::ostream &
00037 DQMNet::logme (void)
00038 {
00039 Time now = Time::current();
00040 return std::cout
00041 << now.format(true, "%Y-%m-%d %H:%M:%S.")
00042 << now.nanoformat(3, 3)
00043 << " " << appname_ << "[" << pid_ << "]: ";
00044 }
00045
00046
00047 void
00048 DQMNet::copydata(Bucket *b, const void *data, size_t len)
00049 {
00050 b->data.insert(b->data.end(),
00051 (const unsigned char *)data,
00052 (const unsigned char *)data + len);
00053 }
00054
00055
00056 void
00057 DQMNet::discard (Bucket *&b)
00058 {
00059 while (b)
00060 {
00061 Bucket *next = b->next;
00062 delete b;
00063 b = next;
00064 }
00065 }
00066
00068
00071 void
00072 DQMNet::losePeer(const char *reason,
00073 Peer *peer,
00074 IOSelectEvent *ev,
00075 Error *err)
00076 {
00077 if (reason)
00078 logme ()
00079 << reason << peer->peeraddr
00080 << (err ? "; error was: " + err->explain() : std::string(""))
00081 << std::endl;
00082
00083 Socket *s = peer->socket;
00084
00085 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00086 if (i->peer == peer)
00087 waiting_.erase(i++);
00088 else
00089 ++i;
00090
00091 if (ev)
00092 ev->source = 0;
00093
00094 discard(peer->sendq);
00095 if (peer->automatic)
00096 peer->automatic->peer = 0;
00097
00098 sel_.detach (s);
00099 s->close();
00100 removePeer(peer, s);
00101 delete s;
00102 }
00103
00105 void
00106 DQMNet::requestObjectData(Peer *p, const char *name, size_t len)
00107 {
00108
00109 Bucket **msg = &p->sendq;
00110 while (*msg)
00111 msg = &(*msg)->next;
00112 *msg = new Bucket;
00113 (*msg)->next = 0;
00114
00115 uint32_t words[3];
00116 words[0] = sizeof(words) + len;
00117 words[1] = DQM_MSG_GET_OBJECT;
00118 words[2] = len;
00119 copydata(*msg, words, sizeof(words));
00120 copydata(*msg, name, len);
00121 }
00122
00125 void
00126 DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00127 {
00128
00129
00130
00131
00132
00133 requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
00134 WaitObject wo = { Time::current(), name, info, p };
00135 waiting_.push_back(wo);
00136 p->waiting++;
00137 }
00138
00139
00140
00141 void
00142 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
00143 {
00144 Bucket **msg = &i->peer->sendq;
00145 while (*msg)
00146 msg = &(*msg)->next;
00147 *msg = new Bucket;
00148 (*msg)->next = 0;
00149
00150 releaseFromWait(*msg, *i, o);
00151
00152 assert(i->peer->waiting > 0);
00153 i->peer->waiting--;
00154 waiting_.erase(i);
00155 }
00156
00157
00158 void
00159 DQMNet::releaseWaiters(const std::string &name, Object *o)
00160 {
00161 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00162 if (i->name == name)
00163 releaseFromWait(i++, o);
00164 else
00165 ++i;
00166 }
00167
00171 void
00172 DQMNet::packQualityData(std::string &into, const QReports &qr)
00173 {
00174 char buf[64];
00175 std::ostringstream qrs;
00176 QReports::const_iterator qi, qe;
00177 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
00178 {
00179 int pos = 0;
00180 sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
00181 qrs << buf << '\0'
00182 << buf+pos << '\0'
00183 << qi->qtname << '\0'
00184 << qi->algorithm << '\0'
00185 << qi->message << '\0'
00186 << '\0';
00187 }
00188 into = qrs.str();
00189 }
00190
00193 void
00194 DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
00195 {
00196 const char *qdata = from;
00197
00198
00199 size_t nqv = 0;
00200 while (*qdata)
00201 {
00202 ++nqv;
00203 while (*qdata) ++qdata; ++qdata;
00204 while (*qdata) ++qdata; ++qdata;
00205 while (*qdata) ++qdata; ++qdata;
00206 while (*qdata) ++qdata; ++qdata;
00207 while (*qdata) ++qdata; ++qdata;
00208 }
00209
00210
00211 qdata = from;
00212 qr.reserve(nqv);
00213 while (*qdata)
00214 {
00215 qr.push_back(DQMNet::QValue());
00216 DQMNet::QValue &qv = qr.back();
00217
00218 qv.code = atoi(qdata);
00219 while (*qdata) ++qdata;
00220 switch (qv.code)
00221 {
00222 case dqm::qstatus::STATUS_OK:
00223 break;
00224 case dqm::qstatus::WARNING:
00225 flags |= DQMNet::DQM_PROP_REPORT_WARN;
00226 break;
00227 case dqm::qstatus::ERROR:
00228 flags |= DQMNet::DQM_PROP_REPORT_ERROR;
00229 break;
00230 default:
00231 flags |= DQMNet::DQM_PROP_REPORT_OTHER;
00232 break;
00233 }
00234
00235 qv.qtresult = atof(++qdata);
00236 while (*qdata) ++qdata;
00237
00238 qv.qtname = ++qdata;
00239 while (*qdata) ++qdata;
00240
00241 qv.algorithm = ++qdata;
00242 while (*qdata) ++qdata;
00243
00244 qv.message = ++qdata;
00245 while (*qdata) ++qdata;
00246 ++qdata;
00247 }
00248 }
00249
00250 #if 0
00251
00252 static TObject *
00253 extractNextObject(TBufferFile &buf)
00254 {
00255 if (buf.Length() == buf.BufferSize())
00256 return 0;
00257
00258 buf.InitMap();
00259 Int_t pos = buf.Length();
00260 TClass *c = buf.ReadClass();
00261 buf.SetBufferOffset(pos);
00262 buf.ResetMap();
00263 return c ? buf.ReadObject(c) : 0;
00264 }
00265
00266
00267 bool
00268 DQMNet::reconstructObject(Object &o)
00269 {
00270 TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00271 buf.Reset();
00272
00273
00274 if (! (o.object = extractNextObject(buf)))
00275 return false;
00276
00277
00278 o.reference = extractNextObject(buf);
00279
00280
00281 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
00282 return true;
00283 }
00284 #endif
00285
00286 #if 0
00287 bool
00288 DQMNet::reinstateObject(DQMStore *store, Object &o)
00289 {
00290 if (! reconstructObject (o))
00291 return false;
00292
00293
00294 MonitorElement *obj = 0;
00295 store->setCurrentFolder(*o.dirname);
00296 switch (o.flags & DQM_PROP_TYPE_MASK)
00297 {
00298 case DQM_PROP_TYPE_INT:
00299 obj = store->bookInt(o.objname);
00300 obj->Fill(atoll(o.scalar.c_str()));
00301 break;
00302
00303 case DQM_PROP_TYPE_REAL:
00304 obj = store->bookFloat(name);
00305 obj->Fill(atof(o.scalar.c_str()));
00306 break;
00307
00308 case DQM_PROP_TYPE_STRING:
00309 obj = store->bookString(name, o.scalar);
00310 break;
00311
00312 case DQM_PROP_TYPE_TH1F:
00313 obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
00314 break;
00315
00316 case DQM_PROP_TYPE_TH1S:
00317 obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
00318 break;
00319
00320 case DQM_PROP_TYPE_TH1D:
00321 obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
00322 break;
00323
00324 case DQM_PROP_TYPE_TH2F:
00325 obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
00326 break;
00327
00328 case DQM_PROP_TYPE_TH2S:
00329 obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
00330 break;
00331
00332 case DQM_PROP_TYPE_TH2D:
00333 obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
00334 break;
00335
00336 case DQM_PROP_TYPE_TH3F:
00337 obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
00338 break;
00339
00340 case DQM_PROP_TYPE_TH3S:
00341 obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
00342 break;
00343
00344 case DQM_PROP_TYPE_TH3D:
00345 obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
00346 break;
00347
00348 case DQM_PROP_TYPE_PROF:
00349 obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
00350 break;
00351
00352 case DQM_PROP_TYPE_PROF2D:
00353 obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
00354 break;
00355
00356 default:
00357 logme()
00358 << "ERROR: unexpected monitor element of type "
00359 << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
00360 << *o.dirname << '/' << o.objname << "'\n";
00361 return false;
00362 }
00363
00364
00365 if (obj)
00366 {
00367 obj->data_.tag = o.tag;
00368 obj->data_.qreports = o.qreports;
00369 }
00370
00371
00372 return true;
00373 }
00374 #endif
00375
00377
00378 bool
00379 DQMNet::shouldStop(void)
00380 {
00381 return shutdown_;
00382 }
00383
00384
00385
00386 void
00387 DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00388 {
00389 if (o)
00390 sendObjectToPeer(msg, *o, true);
00391 else
00392 {
00393 uint32_t words [3];
00394 words[0] = sizeof(words) + w.name.size();
00395 words[1] = DQM_REPLY_NONE;
00396 words[2] = w.name.size();
00397
00398 msg->data.reserve(msg->data.size() + words[0]);
00399 copydata(msg, &words[0], sizeof(words));
00400 copydata(msg, &w.name[0], w.name.size());
00401 }
00402 }
00403
00404
00405
00406
00407 void
00408 DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00409 {
00410 uint32_t flags = o.flags & ~DQM_PROP_DEAD;
00411 DataBlob objdata;
00412
00413 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00414 objdata.insert(objdata.end(),
00415 &o.scalar[0],
00416 &o.scalar[0] + o.scalar.size());
00417 else if (data)
00418 objdata.insert(objdata.end(),
00419 &o.rawdata[0],
00420 &o.rawdata[0] + o.rawdata.size());
00421
00422 uint32_t words [9];
00423 uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
00424 uint32_t datalen = objdata.size();
00425 uint32_t qlen = o.qdata.size();
00426
00427 if (o.dirname->empty())
00428 --namelen;
00429
00430 words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
00431 words[1] = DQM_REPLY_OBJECT;
00432 words[2] = flags;
00433 words[3] = (o.version >> 0 ) & 0xffffffff;
00434 words[4] = (o.version >> 32) & 0xffffffff;
00435 words[5] = o.tag;
00436 words[6] = namelen;
00437 words[7] = datalen;
00438 words[8] = qlen;
00439
00440 msg->data.reserve(msg->data.size() + words[0]);
00441 copydata(msg, &words[0], 9*sizeof(uint32_t));
00442 if (namelen)
00443 {
00444 copydata(msg, &(*o.dirname)[0], o.dirname->size());
00445 if (! o.dirname->empty())
00446 copydata(msg, "/", 1);
00447 copydata(msg, &o.objname[0], o.objname.size());
00448 }
00449 if (datalen)
00450 copydata(msg, &objdata[0], datalen);
00451 if (qlen)
00452 copydata(msg, &o.qdata[0], qlen);
00453 }
00454
00456
00457 bool
00458 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00459 {
00460
00461 uint32_t type;
00462 memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00463 switch (type)
00464 {
00465 case DQM_MSG_UPDATE_ME:
00466 {
00467 if (len != 2*sizeof(uint32_t))
00468 {
00469 logme()
00470 << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00471 << " from peer " << p->peeraddr << std::endl;
00472 return false;
00473 }
00474
00475 if (debug_)
00476 logme()
00477 << "DEBUG: received message 'UPDATE ME' from peer "
00478 << p->peeraddr << ", size " << len << std::endl;
00479
00480 p->update = true;
00481 }
00482 return true;
00483
00484 case DQM_MSG_LIST_OBJECTS:
00485 {
00486 if (debug_)
00487 logme()
00488 << "DEBUG: received message 'LIST OBJECTS' from peer "
00489 << p->peeraddr << ", size " << len << std::endl;
00490
00491
00492 sendObjectListToPeer(msg, true, false);
00493 }
00494 return true;
00495
00496 case DQM_MSG_GET_OBJECT:
00497 {
00498 if (debug_)
00499 logme()
00500 << "DEBUG: received message 'GET OBJECT' from peer "
00501 << p->peeraddr << ", size " << len << std::endl;
00502
00503 if (len < 3*sizeof(uint32_t))
00504 {
00505 logme()
00506 << "ERROR: corrupt 'GET IMAGE' message of length " << len
00507 << " from peer " << p->peeraddr << std::endl;
00508 return false;
00509 }
00510
00511 uint32_t namelen;
00512 memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00513 if (len != 3*sizeof(uint32_t) + namelen)
00514 {
00515 logme()
00516 << "ERROR: corrupt 'GET OBJECT' message of length " << len
00517 << " from peer " << p->peeraddr
00518 << ", expected length " << (3*sizeof(uint32_t))
00519 << " + " << namelen << std::endl;
00520 return false;
00521 }
00522
00523 std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
00524 Peer *owner = 0;
00525 Object *o = findObject(0, name, &owner);
00526 if (o)
00527 {
00528 o->lastreq = Time::current().ns();
00529 if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
00530 && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00531 waitForData(p, name, "", owner);
00532 else
00533 sendObjectToPeer(msg, *o, true);
00534 }
00535 else
00536 {
00537 uint32_t words [3];
00538 words[0] = sizeof(words) + name.size();
00539 words[1] = DQM_REPLY_NONE;
00540 words[2] = name.size();
00541
00542 msg->data.reserve(msg->data.size() + words[0]);
00543 copydata(msg, &words[0], sizeof(words));
00544 copydata(msg, &name[0], name.size());
00545 }
00546 }
00547 return true;
00548
00549 case DQM_REPLY_LIST_BEGIN:
00550 {
00551 if (len != 4*sizeof(uint32_t))
00552 {
00553 logme()
00554 << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00555 << " from peer " << p->peeraddr << std::endl;
00556 return false;
00557 }
00558
00559
00560 uint32_t flags;
00561 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00562
00563 if (debug_)
00564 logme()
00565 << "DEBUG: received message 'LIST BEGIN "
00566 << (flags ? "FULL" : "INCREMENTAL")
00567 << "' from " << p->peeraddr
00568 << ", size " << len << std::endl;
00569
00570
00571
00572
00573
00574
00575
00576 if (flags)
00577 markObjectsDead(p);
00578 }
00579 return true;
00580
00581 case DQM_REPLY_LIST_END:
00582 {
00583 if (len != 4*sizeof(uint32_t))
00584 {
00585 logme()
00586 << "ERROR: corrupt 'LIST END' message of length " << len
00587 << " from peer " << p->peeraddr << std::endl;
00588 return false;
00589 }
00590
00591
00592 uint32_t flags;
00593 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00594
00595
00596
00597
00598
00599 if (flags)
00600 purgeDeadObjects(p);
00601
00602 if (debug_)
00603 logme()
00604 << "DEBUG: received message 'LIST END "
00605 << (flags ? "FULL" : "INCREMENTAL")
00606 << "' from " << p->peeraddr
00607 << ", size " << len << std::endl;
00608
00609
00610
00611 flush_ = true;
00612 p->updates++;
00613 }
00614 return true;
00615
00616 case DQM_REPLY_OBJECT:
00617 {
00618 uint32_t words[9];
00619 if (len < sizeof(words))
00620 {
00621 logme()
00622 << "ERROR: corrupt 'OBJECT' message of length " << len
00623 << " from peer " << p->peeraddr << std::endl;
00624 return false;
00625 }
00626
00627 memcpy (&words[0], data, sizeof(words));
00628 uint32_t &namelen = words[6];
00629 uint32_t &datalen = words[7];
00630 uint32_t &qlen = words[8];
00631
00632 if (len != sizeof(words) + namelen + datalen + qlen)
00633 {
00634 logme()
00635 << "ERROR: corrupt 'OBJECT' message of length " << len
00636 << " from peer " << p->peeraddr
00637 << ", expected length " << sizeof(words)
00638 << " + " << namelen
00639 << " + " << datalen
00640 << " + " << qlen
00641 << std::endl;
00642 return false;
00643 }
00644
00645 unsigned char *namedata = data + sizeof(words);
00646 unsigned char *objdata = namedata + namelen;
00647 unsigned char *qdata = objdata + datalen;
00648 unsigned char *enddata = qdata + qlen;
00649 std::string name ((char *) namedata, namelen);
00650 assert (enddata == data + len);
00651
00652 if (debug_)
00653 logme()
00654 << "DEBUG: received message 'OBJECT " << name
00655 << "' from " << p->peeraddr
00656 << ", size " << len << std::endl;
00657
00658
00659 p->source = true;
00660
00661
00662 Object *o = findObject(p, name);
00663 if (! o)
00664 o = makeObject(p, name);
00665
00666 o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
00667 o->tag = words[5];
00668 o->version = ((uint64_t) words[4] << 32 | words[3]);
00669 o->scalar.clear();
00670 o->qdata.clear();
00671 if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00672 {
00673 o->rawdata.clear();
00674 o->scalar.insert(o->scalar.end(), objdata, qdata);
00675 }
00676 else if (datalen)
00677 {
00678 o->rawdata.clear();
00679 o->rawdata.insert(o->rawdata.end(), objdata, qdata);
00680 }
00681 else if (! o->rawdata.empty())
00682 o->flags |= DQM_PROP_STALE;
00683 o->qdata.insert(o->qdata.end(), qdata, enddata);
00684
00685
00686
00687 if (o->lastreq
00688 && ! datalen
00689 && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00690 requestObjectData(p, (namelen ? &name[0] : 0), namelen);
00691
00692
00693 if (datalen)
00694 releaseWaiters(name, o);
00695 }
00696 return true;
00697
00698 case DQM_REPLY_NONE:
00699 {
00700 uint32_t words[3];
00701 if (len < sizeof(words))
00702 {
00703 logme()
00704 << "ERROR: corrupt 'NONE' message of length " << len
00705 << " from peer " << p->peeraddr << std::endl;
00706 return false;
00707 }
00708
00709 memcpy (&words[0], data, sizeof(words));
00710 uint32_t &namelen = words[2];
00711
00712 if (len != sizeof(words) + namelen)
00713 {
00714 logme()
00715 << "ERROR: corrupt 'NONE' message of length " << len
00716 << " from peer " << p->peeraddr
00717 << ", expected length " << sizeof(words)
00718 << " + " << namelen << std::endl;
00719 return false;
00720 }
00721
00722 unsigned char *namedata = data + sizeof(words);
00723 std::string name((char *) namedata, namelen);
00724
00725 if (debug_)
00726 logme()
00727 << "DEBUG: received message 'NONE " << name
00728 << "' from " << p->peeraddr
00729 << ", size " << len << std::endl;
00730
00731
00732 p->source = true;
00733
00734
00735 if (Object *o = findObject(p, name))
00736 {
00737 o->flags |= DQM_PROP_DEAD;
00738 purgeDeadObjects(p);
00739 }
00740
00741
00742 releaseWaiters(name, 0);
00743 }
00744 return true;
00745
00746 default:
00747 logme()
00748 << "ERROR: unrecognised message of length " << len
00749 << " and type " << type << " from peer " << p->peeraddr
00750 << std::endl;
00751 return false;
00752 }
00753 }
00754
00757 bool
00758 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
00759 {
00760 lock();
00761 assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00762
00763
00764
00765
00766
00767 if (ev->events & IOUrgent)
00768 {
00769 if (p->automatic)
00770 {
00771 logme()
00772 << "WARNING: connection to the DQM server at " << p->peeraddr
00773 << " lost (will attempt to reconnect in 15 seconds)\n";
00774 losePeer(0, p, ev);
00775 }
00776 else
00777 losePeer("WARNING: lost peer connection ", p, ev);
00778
00779 unlock();
00780 return true;
00781 }
00782
00783
00784 if (ev->events & IOWrite)
00785 {
00786 while (Bucket *b = p->sendq)
00787 {
00788 IOSize len = b->data.size() - p->sendpos;
00789 const void *data = (len ? (const void *)&b->data[p->sendpos]
00790 : (const void *)&data);
00791 IOSize done;
00792
00793 try
00794 {
00795 done = (len ? ev->source->write (data, len) : 0);
00796 if (debug_ && len)
00797 logme()
00798 << "DEBUG: sent " << done << " bytes to peer "
00799 << p->peeraddr << std::endl;
00800 }
00801 catch (Error &e)
00802 {
00803 losePeer("WARNING: unable to write to peer ", p, ev, &e);
00804 unlock();
00805 return true;
00806 }
00807
00808 p->sendpos += done;
00809 if (p->sendpos == b->data.size())
00810 {
00811 Bucket *old = p->sendq;
00812 p->sendq = old->next;
00813 p->sendpos = 0;
00814 old->next = 0;
00815 discard(old);
00816 }
00817
00818 if (! done && len)
00819
00820 break;
00821 }
00822 }
00823
00824
00825
00826 if (ev->events & IORead)
00827 {
00828
00829
00830
00831 IOSize sz;
00832 try
00833 {
00834 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00835 do
00836 if ((sz = ev->source->read(&buf[0], buf.size())))
00837 {
00838 if (debug_)
00839 logme()
00840 << "DEBUG: received " << sz << " bytes from peer "
00841 << p->peeraddr << std::endl;
00842 DataBlob &data = p->incoming;
00843 if (data.capacity () < data.size () + sz)
00844 data.reserve (data.size() + SOCKET_READ_GROWTH);
00845 data.insert (data.end(), &buf[0], &buf[0] + sz);
00846 }
00847 while (sz == sizeof (buf));
00848 }
00849 catch (Error &e)
00850 {
00851 SystemError *next = dynamic_cast<SystemError *>(e.next());
00852 if (next && next->portable() == SysErr::ErrTryAgain)
00853 sz = 1;
00854 else
00855 {
00856
00857 losePeer("WARNING: failed to read from peer ", p, ev, &e);
00858 unlock();
00859 return true;
00860 }
00861 }
00862
00863
00864 size_t consumed = 0;
00865 DataBlob &data = p->incoming;
00866 while (data.size()-consumed >= sizeof(uint32_t)
00867 && p->waiting < MAX_PEER_WAITREQS)
00868 {
00869 uint32_t msglen;
00870 memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00871
00872 if (msglen >= MESSAGE_SIZE_LIMIT)
00873 {
00874 losePeer("WARNING: excessively large message from ", p, ev);
00875 unlock();
00876 return true;
00877 }
00878
00879 if (data.size()-consumed >= msglen)
00880 {
00881 bool valid = true;
00882 if (msglen < 2*sizeof(uint32_t))
00883 {
00884 logme()
00885 << "ERROR: corrupt peer message of length " << msglen
00886 << " from peer " << p->peeraddr << std::endl;
00887 valid = false;
00888 }
00889 else
00890 {
00891
00892 Bucket msg;
00893 msg.next = 0;
00894 valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00895
00896
00897 if (! msg.data.empty())
00898 {
00899 Bucket **prev = &p->sendq;
00900 while (*prev)
00901 prev = &(*prev)->next;
00902
00903 *prev = new Bucket;
00904 (*prev)->next = 0;
00905 (*prev)->data.swap(msg.data);
00906 }
00907 }
00908
00909 if (! valid)
00910 {
00911 losePeer("WARNING: data stream error with ", p, ev);
00912 unlock();
00913 return true;
00914 }
00915
00916 consumed += msglen;
00917 }
00918 else
00919 break;
00920 }
00921
00922 data.erase(data.begin(), data.begin()+consumed);
00923
00924
00925
00926
00927 if (sz == 0)
00928 sel_.setMask(p->socket, p->mask &= ~IORead);
00929 }
00930
00931
00932 unlock();
00933 return false;
00934 }
00935
00940 bool
00941 DQMNet::onPeerConnect(IOSelectEvent *ev)
00942 {
00943
00944 assert (ev->source == server_);
00945
00946
00947 Socket *s = server_->accept();
00948 assert (s);
00949 assert (! s->isBlocking());
00950
00951
00952 lock();
00953 Peer *p = createPeer(s);
00954 std::string localaddr;
00955 if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
00956 {
00957 InetAddress peeraddr = inet->peername();
00958 InetAddress myaddr = inet->sockname();
00959 p->peeraddr = StringFormat("%1:%2")
00960 .arg(peeraddr.hostname())
00961 .arg(peeraddr.port());
00962 localaddr = StringFormat("%1:%2")
00963 .arg(myaddr.hostname())
00964 .arg(myaddr.port());
00965 }
00966 else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
00967 {
00968 p->peeraddr = local->peername().path();
00969 localaddr = local->sockname().path();
00970 }
00971 else
00972 assert(false);
00973
00974 p->mask = IORead|IOUrgent;
00975 p->socket = s;
00976
00977
00978 if (debug_)
00979 logme()
00980 << "INFO: new peer " << p->peeraddr << " is now connected to "
00981 << localaddr << std::endl;
00982
00983
00984 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
00985 unlock();
00986
00987
00988 return false;
00989 }
00990
00997 bool
00998 DQMNet::onLocalNotify(IOSelectEvent *ev)
00999 {
01000
01001 try
01002 {
01003 IOSize sz;
01004 unsigned char buf [1024];
01005 while ((sz = ev->source->read(buf, sizeof(buf))))
01006 ;
01007 }
01008 catch (Error &e)
01009 {
01010 SystemError *next = dynamic_cast<SystemError *>(e.next());
01011 if (next && next->portable() == SysErr::ErrTryAgain)
01012 ;
01013 else
01014 logme()
01015 << "WARNING: error reading from notification pipe: "
01016 << e.explain() << std::endl;
01017 }
01018
01019
01020 flush_ = true;
01021
01022
01023 return false;
01024 }
01025
01028 void
01029 DQMNet::updateMask(Peer *p)
01030 {
01031 if (! p->socket)
01032 return;
01033
01034
01035 unsigned oldmask = p->mask;
01036 if (! p->sendq && (p->mask & IOWrite))
01037 sel_.setMask(p->socket, p->mask &= ~IOWrite);
01038
01039 if (p->sendq && ! (p->mask & IOWrite))
01040 sel_.setMask(p->socket, p->mask |= IOWrite);
01041
01042 if (debug_ && oldmask != p->mask)
01043 logme()
01044 << "DEBUG: updating mask for " << p->peeraddr << " to "
01045 << p->mask << " from " << oldmask << std::endl;
01046
01047
01048
01049 if (p->mask == IOUrgent && ! p->waiting)
01050 {
01051 assert (! p->sendq);
01052 if (debug_)
01053 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
01054 losePeer(0, p, 0);
01055 }
01056 }
01057
01059 DQMNet::DQMNet (const std::string &appname )
01060 : debug_ (false),
01061 appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
01062 pid_ (getpid()),
01063 server_ (0),
01064 version_ (Time::current()),
01065 communicate_ ((pthread_t) -1),
01066 shutdown_ (0),
01067 delay_ (1000),
01068 waitStale_ (0, 0, 0, 0, 500000000 ),
01069 waitMax_ (0, 0, 0, 5 , 0),
01070 flush_ (false)
01071 {
01072
01073
01074
01075 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
01076 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
01077
01078
01079 upstream_.peer = downstream_.peer = 0;
01080 upstream_.next = downstream_.next = 0;
01081 upstream_.port = downstream_.port = 0;
01082 upstream_.update = downstream_.update = false;
01083 }
01084
01085 DQMNet::~DQMNet(void)
01086 {
01087
01088 }
01089
01092 void
01093 DQMNet::debug(bool doit)
01094 {
01095 debug_ = doit;
01096 }
01097
01100 void
01101 DQMNet::delay(int delay)
01102 {
01103 delay_ = delay;
01104 }
01105
01110 void
01111 DQMNet::staleObjectWaitLimit(lat::TimeSpan time)
01112 {
01113 waitStale_ = time;
01114 }
01115
01119 void
01120 DQMNet::startLocalServer(int port)
01121 {
01122 if (server_)
01123 {
01124 logme() << "ERROR: DQM server was already started.\n";
01125 return;
01126 }
01127
01128 try
01129 {
01130 InetAddress addr("0.0.0.0", port);
01131 InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
01132 s->bind(addr);
01133 s->listen(10);
01134 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01135 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01136 s->setBlocking(false);
01137 sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01138 }
01139 catch (Error &e)
01140 {
01141
01142
01143 logme()
01144 << "ERROR: Failed to start server at port " << port << ": "
01145 << e.explain() << std::endl;
01146
01147 raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
01148 " %d: %s", port, e.explain().c_str());
01149 }
01150
01151 logme() << "INFO: DQM server started at port " << port << std::endl;
01152 }
01153
01157 void
01158 DQMNet::startLocalServer(const char *path)
01159 {
01160 if (server_)
01161 {
01162 logme() << "ERROR: DQM server was already started.\n";
01163 return;
01164 }
01165
01166 try
01167 {
01168 server_ = new LocalServerSocket(path, 10);
01169 server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01170 server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01171 server_->setBlocking(false);
01172 sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01173 }
01174 catch (Error &e)
01175 {
01176
01177
01178 logme()
01179 << "ERROR: Failed to start server at path " << path << ": "
01180 << e.explain() << std::endl;
01181
01182 raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
01183 " %s: %s", path, e.explain().c_str());
01184 }
01185
01186 logme() << "INFO: DQM server started at path " << path << std::endl;
01187 }
01188
01192 void
01193 DQMNet::updateToCollector(const std::string &host, int port)
01194 {
01195 if (! downstream_.host.empty())
01196 {
01197 logme()
01198 << "ERROR: Already updating another collector at "
01199 << downstream_.host << ":" << downstream_.port << std::endl;
01200 return;
01201 }
01202
01203 downstream_.update = true;
01204 downstream_.host = host;
01205 downstream_.port = port;
01206 }
01207
01211 void
01212 DQMNet::listenToCollector(const std::string &host, int port)
01213 {
01214 if (! upstream_.host.empty())
01215 {
01216 logme()
01217 << "ERROR: Already receiving data from another collector at "
01218 << upstream_.host << ":" << upstream_.port << std::endl;
01219 return;
01220 }
01221
01222 upstream_.update = false;
01223 upstream_.host = host;
01224 upstream_.port = port;
01225 }
01226
01228 void
01229 DQMNet::shutdown(void)
01230 {
01231 shutdown_ = 1;
01232 if (communicate_ != (pthread_t) -1)
01233 pthread_join(communicate_, 0);
01234 }
01235
01241 static void *communicate(void *obj)
01242 {
01243 sigset_t sigs;
01244 sigfillset (&sigs);
01245 pthread_sigmask (SIG_BLOCK, &sigs, 0);
01246 ((DQMNet *)obj)->run();
01247 return 0;
01248 }
01249
01251 void
01252 DQMNet::lock(void)
01253 {
01254 if (communicate_ != (pthread_t) -1)
01255 pthread_mutex_lock(&lock_);
01256 }
01257
01259 void
01260 DQMNet::unlock(void)
01261 {
01262 if (communicate_ != (pthread_t) -1)
01263 pthread_mutex_unlock(&lock_);
01264 }
01265
01269 void
01270 DQMNet::start(void)
01271 {
01272 if (communicate_ != (pthread_t) -1)
01273 {
01274 logme()
01275 << "ERROR: DQM networking thread has already been started\n";
01276 return;
01277 }
01278
01279 pthread_mutex_init(&lock_, 0);
01280 pthread_create (&communicate_, 0, &communicate, this);
01281 }
01282
01284 void
01285 DQMNet::run(void)
01286 {
01287 Time now;
01288 Time nextFlush = 0;
01289 AutoPeer *automatic[2] = { &upstream_, &downstream_ };
01290
01291
01292 while (! shouldStop())
01293 {
01294 for (int i = 0; i < 2; ++i)
01295 {
01296 AutoPeer *ap = automatic[i];
01297
01298
01299
01300
01301 if (! ap->host.empty()
01302 && ! ap->peer
01303 && (now = Time::current()) > ap->next)
01304 {
01305 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
01306 InetSocket *s = 0;
01307 try
01308 {
01309 InetAddress addr(ap->host.c_str(), ap->port);
01310 s = new InetSocket (SOCK_STREAM, 0, addr.family());
01311 s->setBlocking(false);
01312 s->connect(addr);
01313 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01314 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01315 }
01316 catch (Error &e)
01317 {
01318 SystemError *sys = dynamic_cast<SystemError *>(e.next());
01319 if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
01320 {
01321
01322
01323
01324 if (s)
01325 s->abort();
01326 delete s;
01327 s = 0;
01328 }
01329 }
01330
01331
01332
01333 if (s)
01334 {
01335 Peer *p = createPeer(s);
01336 ap->peer = p;
01337
01338 InetAddress peeraddr = ((InetSocket *) s)->peername();
01339 InetAddress myaddr = ((InetSocket *) s)->sockname();
01340 p->peeraddr = StringFormat("%1:%2")
01341 .arg(peeraddr.hostname())
01342 .arg(peeraddr.port());
01343 p->mask = IORead|IOWrite|IOUrgent;
01344 p->update = ap->update;
01345 p->automatic = ap;
01346 p->socket = s;
01347 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
01348 if (ap == &upstream_)
01349 {
01350 uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
01351 2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
01352 p->sendq = new Bucket;
01353 p->sendq->next = 0;
01354 copydata(p->sendq, words, sizeof(words));
01355 }
01356
01357
01358 if (debug_)
01359 logme()
01360 << "INFO: now connected to " << p->peeraddr << " from "
01361 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01362 }
01363 }
01364 }
01365
01366
01367 sel_.dispatch(delay_);
01368 now = Time::current();
01369 lock();
01370
01371
01372
01373 if (flush_ && now > nextFlush)
01374 {
01375 flush_ = false;
01376 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
01377 sendObjectListToPeers(true);
01378 }
01379
01380
01381
01382
01383
01384
01385 updatePeerMasks();
01386
01387
01388 Time waitold = now - waitMax_;
01389 Time waitstale = now - waitStale_;
01390 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01391 {
01392 Object *o = findObject(0, i->name);
01393
01394
01395
01396 if (i->time < waitold)
01397 {
01398 logme ()
01399 << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
01400 << "s to retrieval, releasing '" << i->name << "' from wait, have "
01401 << (o ? o->rawdata.size() : 0) << " data available\n";
01402 releaseFromWait(i++, o);
01403 }
01404 else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
01405 {
01406 logme ()
01407 << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
01408 << "s to update, releasing '" << i->name << "' from wait, have "
01409 << o->rawdata.size() << " data available\n";
01410 releaseFromWait(i++, o);
01411 }
01412
01413
01414 else
01415 ++i;
01416 }
01417
01418 unlock();
01419 }
01420 }
01421
01422
01423
01424 void
01425 DQMNet::sendLocalChanges(void)
01426 {
01427 char byte = 0;
01428 wakeup_.sink()->write(&byte, 1);
01429 }
01430
01434 DQMBasicNet::DQMBasicNet(const std::string &appname )
01435 : DQMImplNet<DQMNet::Object>(appname)
01436 {
01437 local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
01438 }
01439
01441 void
01442 DQMBasicNet::reserveLocalSpace(uint32_t size)
01443 {
01444 local_->objs.resize(size);
01445 }
01446
01449 void
01450 DQMBasicNet::updateLocalObject(Object &o)
01451 {
01452 o.dirname = &*local_->dirs.insert(*o.dirname).first;
01453 std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
01454 if (! info.second)
01455 {
01456
01457
01458
01459 Object &old = const_cast<Object &>(*info.first);
01460 std::swap(old.flags, o.flags);
01461 std::swap(old.tag, o.tag);
01462 std::swap(old.version, o.version);
01463 std::swap(old.qreports, o.qreports);
01464 std::swap(old.rawdata, o.rawdata);
01465 std::swap(old.scalar, o.scalar);
01466 std::swap(old.qdata, o.qdata);
01467 }
01468 }
01469
01473 bool
01474 DQMBasicNet::removeLocalExcept(const std::set<std::string> &known)
01475 {
01476 size_t removed = 0;
01477 std::string path;
01478 ObjectMap::iterator i, e;
01479 for (i = local_->objs.begin(), e = local_->objs.end(); i != e; )
01480 {
01481 path.clear();
01482 path.reserve(i->dirname->size() + i->objname.size() + 2);
01483 path += *i->dirname;
01484 if (! path.empty())
01485 path += '/';
01486 path += i->objname;
01487
01488 if (! known.count(path))
01489 ++removed, local_->objs.erase(i++);
01490 else
01491 ++i;
01492 }
01493
01494 return removed > 0;
01495 }