00001 #include "DQMServices/Core/interface/DQMNet.h"
00002 #include "DQMServices/Core/interface/DQMStore.h"
00003 #include "DQMServices/Core/interface/MonitorElement.h"
00004 #include "FWCore/Utilities/interface/EDMException.h"
00005 #include "classlib/sysapi/InetSocket.h"
00006 #include "classlib/iobase/Filename.h"
00007 #include "classlib/utils/TimeInfo.h"
00008 #include "classlib/utils/StringList.h"
00009 #include "classlib/utils/StringFormat.h"
00010 #include "classlib/utils/StringOps.h"
00011 #include "classlib/utils/SystemError.h"
00012 #include "classlib/utils/Regexp.h"
00013 #include "DQMRootBuffer.h"
00014 #include "TObjString.h"
00015 #include "TObject.h"
00016 #include "TProfile2D.h"
00017 #include "TProfile.h"
00018 #include "TH3F.h"
00019 #include "TH2F.h"
00020 #include "TH2S.h"
00021 #include "TH1F.h"
00022 #include "TH1S.h"
00023 #include <unistd.h>
00024 #include <fcntl.h>
00025 #include <sys/wait.h>
00026 #include <stdio.h>
00027 #include <stdint.h>
00028 #include <iostream>
00029 #include <cassert>
00030
00031 #define MESSAGE_SIZE_LIMIT (2*1024*1024)
00032 #define SOCKET_BUF_SIZE (8*1024*1024)
00033 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
00034 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
00035
00036 using namespace lat;
00037
00038 static const Regexp s_rxmeval ("<(.*)>(i|f|s|qr)=(.*)</\\1>");
00039 static const Regexp s_rxmeqr ("^st\\.(\\d+)\\.(.*)$");
00040
00042
00043
00044
00045
00046
00047 static bool parseInt (const char *&p, const char *name, size_t len, int &value)
00048 {
00049 if (! strncmp(p, name, len))
00050 {
00051 value = strtol(p+len, (char **) &p, 10);
00052 return true;
00053 }
00054 return false;
00055 }
00056
00058
00059 std::ostream &
00060 DQMNet::logme (void)
00061 {
00062 return std::cerr
00063 << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00064 << " " << appname_ << "[" << pid_ << "]: ";
00065 }
00066
00067
00068 void
00069 DQMNet::copydata(Bucket *b, const void *data, size_t len)
00070 {
00071 b->data.insert(b->data.end(),
00072 (const unsigned char *)data,
00073 (const unsigned char *)data + len);
00074 }
00075
00076
00077 void
00078 DQMNet::discard (Bucket *&b)
00079 {
00080 while (b)
00081 {
00082 Bucket *next = b->next;
00083 delete b;
00084 b = next;
00085 }
00086 }
00087
00089
00092 bool
00093 DQMNet::losePeer(const char *reason,
00094 Peer *peer,
00095 IOSelectEvent *ev,
00096 Error *err)
00097 {
00098 if (reason)
00099 logme ()
00100 << reason << peer->peeraddr
00101 << (err ? "; error was: " + err->explain() : std::string(""))
00102 << std::endl;
00103
00104 Socket *s = peer->socket;
00105
00106 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00107 if (i->peer == peer)
00108 waiting_.erase(i++);
00109 else
00110 ++i;
00111
00112 if (ev)
00113 ev->source = 0;
00114
00115 discard(peer->sendq);
00116 if (peer->automatic)
00117 peer->automatic->peer = 0;
00118
00119 sel_.detach (s);
00120 s->close();
00121 removePeer (peer, s);
00122 delete s;
00123 return true;
00124 }
00125
00127 void
00128 DQMNet::requestObject(Peer *p, const char *name, size_t len)
00129 {
00130 Bucket **msg = &p->sendq;
00131 while (*msg)
00132 msg = &(*msg)->next;
00133 *msg = new Bucket;
00134 (*msg)->next = 0;
00135
00136 uint32_t words[3];
00137 words[0] = sizeof(words) + len;
00138 words[1] = DQM_MSG_GET_OBJECT;
00139 words[2] = len;
00140 copydata(*msg, words, sizeof(words));
00141 copydata(*msg, name, len);
00142 }
00143
00146 void
00147 DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00148 {
00149
00150
00151
00152
00153
00154 requestObject(owner, name.size() ? &name[0] : 0, name.size());
00155 WaitObject wo = { Time::current(), name, info, p };
00156 waiting_.push_back(wo);
00157 p->waiting++;
00158 }
00159
00160
00161
00162 void
00163 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
00164 {
00165 Bucket **msg = &i->peer->sendq;
00166 while (*msg)
00167 msg = &(*msg)->next;
00168 *msg = new Bucket;
00169 (*msg)->next = 0;
00170
00171 releaseFromWait(*msg, *i, o);
00172
00173 assert(i->peer->waiting > 0);
00174 i->peer->waiting--;
00175 waiting_.erase(i);
00176 }
00177
00178
00179 void
00180 DQMNet::releaseWaiters(Object *o)
00181 {
00182 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00183 if (i->name == o->name)
00184 releaseFromWait(i++, o);
00185 else
00186 ++i;
00187 }
00188
00190
00191 static TObject *
00192 extractNextObject(DQMRootBuffer &buf)
00193 {
00194 if (buf.Length() == buf.BufferSize())
00195 return 0;
00196
00197 buf.InitMap();
00198 Int_t pos = buf.Length();
00199 TClass *c = buf.ReadClass();
00200 buf.SetBufferOffset(pos);
00201 buf.ResetMap();
00202 return c ? buf.ReadObject(c) : 0;
00203 }
00204
00205
00206 static bool
00207 abortReconstructObject(DQMNet::Object &o)
00208 {
00209 o.qreports.clear();
00210 delete o.object;
00211 o.object = 0;
00212 return false;
00213 }
00214
00215
00216 bool
00217 DQMNet::reconstructObject(Object &o)
00218 {
00219 DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00220 buf.Reset();
00221
00222
00223 if (! (o.object = extractNextObject(buf)))
00224 return false;
00225
00226
00227 o.reference = extractNextObject(buf);
00228
00229
00230 int slash = StringOps::rfind(o.name, '/');
00231 std::string qrbase;
00232 qrbase.reserve(o.name.size()+2);
00233 qrbase = (slash == -1 ? o.name : o.name.substr(slash+1, std::string::npos));
00234 qrbase += ".";
00235
00236
00237 while (TObjString *qrstr = dynamic_cast<TObjString *>(extractNextObject(buf)))
00238 {
00239 RegexpMatch m;
00240 if (! s_rxmeval.match(qrstr->GetName(), 0, 0, &m))
00241 {
00242 logme()
00243 << "ERROR: unexpected quality report string '"
00244 << qrstr->GetName() << "' for object '"
00245 << o.name << "'\n";
00246 return abortReconstructObject(o);
00247 }
00248
00249 std::string label = m.matchString(qrstr->GetName(), 1);
00250 std::string type = m.matchString(qrstr->GetName(), 2);
00251 std::string value = m.matchString(qrstr->GetName(), 3);
00252
00253 if (type != "qr")
00254 {
00255 logme()
00256 << "ERROR: expected a 'qr' for a quality report '"
00257 << qrstr->GetName() << "' but found '" << type
00258 << "' instead\n";
00259 return abortReconstructObject(o);
00260 }
00261
00262 std::string qrname = label;
00263 qrname.replace(0, qrbase.size(), "");
00264 if (qrname == label)
00265 {
00266 logme()
00267 << "ERROR: quality report label in '"
00268 << qrstr->GetName()
00269 << "' does not match object name '"
00270 << o.name << "'\n";
00271 return abortReconstructObject(o);
00272 }
00273
00274 m.reset();
00275 if (! s_rxmeqr.match(value, 0, 0, &m))
00276 {
00277 logme()
00278 << "ERROR: quality test value '"
00279 << value << "' is incorrectly formatted\n";
00280 return abortReconstructObject(o);
00281 }
00282
00283 QValue qval;
00284 qval.code = 0;
00285 qval.qtname = qrname;
00286 qval.message = m.matchString(value, 2);
00287 std::string strcode = m.matchString(value, 1);
00288 const char *p = strcode.c_str();
00289 if (! parseInt(p, "", 0, qval.code) || *p)
00290 {
00291 logme()
00292 << "ERROR: failed to determine quality test code from '"
00293 << value << "'\n";
00294 return abortReconstructObject(o);
00295 }
00296
00297 o.qreports.push_back(qval);
00298 }
00299
00300 return true;
00301 }
00302
00303 bool
00304 DQMNet::reinstateObject(DQMStore *store, Object &o)
00305 {
00306 if (! reconstructObject (o))
00307 return false;
00308
00309
00310 std::string folder = o.name;
00311 std::string name = o.name;
00312 folder.erase(folder.rfind('/'), std::string::npos);
00313 name.erase(0, name.rfind('/')+1);
00314 store->setCurrentFolder(folder);
00315 if (TProfile2D *t = dynamic_cast<TProfile2D *>(o.object))
00316 store->bookProfile2D(name, t);
00317 else if (TProfile *t = dynamic_cast<TProfile *>(o.object))
00318 store->bookProfile(name, t);
00319 else if (TH3F *t = dynamic_cast<TH3F *>(o.object))
00320 store->book3D(name, t);
00321 else if (TH2F *t = dynamic_cast<TH2F *>(o.object))
00322 store->book2D(name, t);
00323 else if (TH2S *t = dynamic_cast<TH2S *>(o.object))
00324 store->book2S(name, t);
00325 else if (TH1F *t = dynamic_cast<TH1F *>(o.object))
00326 store->book1D(name, t);
00327 else if (TH1S *t = dynamic_cast<TH1S *>(o.object))
00328 store->book1S(name, t);
00329 else if (TObjString *t = dynamic_cast<TObjString *>(o.object))
00330 {
00331 RegexpMatch m;
00332 if (! s_rxmeval.match(t->GetName(), 0, 0, &m))
00333 {
00334 logme()
00335 << "ERROR: unexpected monitor element string '"
00336 << t->GetName() << "' for object '"
00337 << o.name << "'\n";
00338 return false;
00339 }
00340
00341
00342 std::string type = m.matchString(t->GetName(), 2);
00343 std::string value = m.matchString(t->GetName(), 3);
00344
00345 if (type == "i")
00346 store->bookInt(name)->Fill(atoi(value.c_str()));
00347 else if (type == "f")
00348 store->bookFloat(name)->Fill(atof(value.c_str()));
00349 else if (type == "s")
00350 store->bookString(name, value);
00351 else
00352 {
00353 logme()
00354 << "ERROR: unexpected string monitor element of type '"
00355 << type << "' (from '" << t->GetName() << "') for object '"
00356 << o.name << "'\n";
00357 return false;
00358 }
00359 }
00360
00361
00362 for (size_t i = 0, e = o.tags.size(); i < e; ++i)
00363 store->tag(o.name, o.tags[i]);
00364
00365
00366
00367
00368 return true;
00369 }
00370
00372
00373 bool
00374 DQMNet::shouldStop(void)
00375 {
00376 return shutdown_;
00377 }
00378
00379
00380
00381 void
00382 DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00383 {
00384 if (o)
00385 sendObjectToPeer (msg, *o, true, sendScalarAsText_);
00386 else
00387 {
00388 uint32_t words [3];
00389 words[0] = sizeof(words) + w.name.size();
00390 words[1] = DQM_REPLY_NONE;
00391 words[2] = w.name.size();
00392
00393 msg->data.reserve(msg->data.size() + words[0]);
00394 copydata(msg, &words[0], sizeof(words));
00395 copydata(msg, &w.name[0], w.name.size());
00396 }
00397 }
00398
00399
00400 bool
00401 DQMNet::extractScalarData(DataBlob &objdata, Object &o)
00402 {
00403 if (! o.flags & DQM_FLAG_SCALAR)
00404 return false;
00405
00406 TObject *obj = o.object;
00407 if (! obj && o.rawdata.size())
00408 {
00409 DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00410 buf.InitMap();
00411 buf.Reset();
00412 obj = extractNextObject(buf);
00413 }
00414
00415 if (TObjString *ostr = dynamic_cast<TObjString *>(obj))
00416 {
00417 const TString &s = ostr->String();
00418 objdata.insert(objdata.end(),
00419 (unsigned char *) s.Data(),
00420 (unsigned char *) s.Data() + s.Length());
00421 return true;
00422 }
00423
00424 return false;
00425 }
00426
00427
00428
00429
00430
00431 void
00432 DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data, bool text)
00433 {
00434 uint32_t flags = o.flags & ~DQM_FLAG_ZOMBIE;
00435 DataBlob objdata;
00436
00437 if (text && extractScalarData(objdata, o))
00438 flags |= DQM_FLAG_TEXT;
00439 else if (data || (flags & DQM_FLAG_SCALAR))
00440 objdata.insert(objdata.end(),
00441 &o.rawdata[0],
00442 &o.rawdata[0] + o.rawdata.size());
00443
00444 uint32_t words [8];
00445 uint32_t namelen = o.name.size();
00446 uint32_t taglen = o.tags.size() * sizeof(uint32_t);
00447 uint32_t datalen = objdata.size();
00448
00449 words[0] = 8*sizeof(uint32_t) + namelen + taglen + datalen;
00450 words[1] = DQM_REPLY_OBJECT;
00451 words[2] = flags;
00452 words[3] = (o.version >> 0 ) & 0xffffffff;
00453 words[4] = (o.version >> 32) & 0xffffffff;
00454 words[5] = namelen;
00455 words[6] = taglen / sizeof(uint32_t);
00456 words[7] = datalen;
00457
00458 msg->data.reserve(msg->data.size() + words[0]);
00459 copydata(msg, &words[0], 8*sizeof(uint32_t));
00460 if (namelen)
00461 copydata(msg, &o.name[0], namelen);
00462 if (taglen)
00463 copydata(msg, &o.tags[0], taglen);
00464 if (datalen)
00465 copydata(msg, &objdata[0], datalen);
00466 }
00467
00469
00470 bool
00471 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00472 {
00473
00474 uint32_t type;
00475 memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00476 switch (type)
00477 {
00478 case DQM_MSG_UPDATE_ME:
00479 {
00480 if (len != 3*sizeof(uint32_t))
00481 {
00482 logme()
00483 << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00484 << " from peer " << p->peeraddr << std::endl;
00485 return false;
00486 }
00487
00488
00489 uint32_t full;
00490 memcpy(&full, data + 2*sizeof(uint32_t), sizeof(uint32_t));
00491
00492 if (debug_)
00493 logme()
00494 << "DEBUG: received message 'UPDATE ME' from peer "
00495 << p->peeraddr << ", full = " << full << std::endl;
00496
00497 p->update = true;
00498 p->updatefull = full;
00499
00500 if (full && ! requestFullUpdates_)
00501 {
00502 if (debug_)
00503 logme()
00504 << "WARNING: forcing full update request mode on due to "
00505 << "request from " << p->peeraddr << std::endl;
00506 requestFullUpdates_ = true;
00507 requestFullUpdatesFromPeers();
00508 }
00509 }
00510 return true;
00511
00512 case DQM_MSG_LIST_OBJECTS:
00513 {
00514 if (debug_)
00515 logme()
00516 << "DEBUG: received message 'LIST OBJECTS' from peer "
00517 << p->peeraddr << std::endl;
00518
00519
00520 lock();
00521 sendObjectListToPeer(msg, p->updatefull, true, false);
00522 unlock();
00523 }
00524 return true;
00525
00526 case DQM_MSG_GET_OBJECT:
00527 {
00528 if (debug_)
00529 logme()
00530 << "DEBUG: received message 'GET OBJECT' from peer "
00531 << p->peeraddr << std::endl;
00532
00533 if (len < 3*sizeof(uint32_t))
00534 {
00535 logme()
00536 << "ERROR: corrupt 'GET IMAGE' message of length " << len
00537 << " from peer " << p->peeraddr << std::endl;
00538 return false;
00539 }
00540
00541 uint32_t namelen;
00542 memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00543 if (len != 3*sizeof(uint32_t) + namelen)
00544 {
00545 logme()
00546 << "ERROR: corrupt 'GET OBJECT' message of length " << len
00547 << " from peer " << p->peeraddr
00548 << ", expected length " << (3*sizeof(uint32_t))
00549 << " + " << namelen << std::endl;
00550 return false;
00551 }
00552
00553 lock();
00554 std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
00555 Peer *owner = 0;
00556 Object *o = findObject(0, name, &owner);
00557 if (o)
00558 {
00559 o->lastreq = Time::current();
00560 if (o->rawdata.empty())
00561 waitForData(p, name, "", owner);
00562 else
00563 sendObjectToPeer(msg, *o, true, sendScalarAsText_);
00564 }
00565 else
00566 {
00567 uint32_t words [3];
00568 words[0] = sizeof(words) + name.size();
00569 words[1] = DQM_REPLY_NONE;
00570 words[2] = name.size();
00571
00572 msg->data.reserve(msg->data.size() + words[0]);
00573 copydata(msg, &words[0], sizeof(words));
00574 copydata(msg, &name[0], name.size());
00575 }
00576 unlock();
00577 }
00578 return true;
00579
00580 case DQM_REPLY_LIST_BEGIN:
00581 {
00582 if (len != 4*sizeof(uint32_t))
00583 {
00584 logme()
00585 << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00586 << " from peer " << p->peeraddr << std::endl;
00587 return false;
00588 }
00589
00590 if (debug_)
00591 logme()
00592 << "DEBUG: received message 'LIST BEGIN' from "
00593 << p->peeraddr << std::endl;
00594
00595
00596 uint32_t flags;
00597 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00598
00599
00600
00601
00602
00603
00604 if (flags)
00605 {
00606 lock();
00607 markObjectsZombies(p);
00608 unlock();
00609 }
00610 }
00611 return true;
00612
00613 case DQM_REPLY_LIST_END:
00614 {
00615 if (len != 4*sizeof(uint32_t))
00616 {
00617 logme()
00618 << "ERROR: corrupt 'LIST END' message of length " << len
00619 << " from peer " << p->peeraddr << std::endl;
00620 return false;
00621 }
00622
00623
00624 uint32_t flags;
00625 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00626
00627
00628
00629
00630
00631 if (flags)
00632 {
00633 lock();
00634 markObjectsDead(p);
00635 unlock();
00636 }
00637
00638 if (debug_)
00639 logme()
00640 << "DEBUG: received message 'LIST END' from "
00641 << p->peeraddr << std::endl;
00642
00643
00644
00645 flush_ = true;
00646 p->updates++;
00647 }
00648 return true;
00649
00650 case DQM_REPLY_OBJECT:
00651 {
00652 uint32_t words[8];
00653 if (len < sizeof(words))
00654 {
00655 logme()
00656 << "ERROR: corrupt 'OBJECT' message of length " << len
00657 << " from peer " << p->peeraddr << std::endl;
00658 return false;
00659 }
00660
00661 memcpy (&words[0], data, sizeof(words));
00662 uint32_t &namelen = words[5];
00663 uint32_t &taglen = words[6];
00664 uint32_t &datalen = words[7];
00665
00666 if (len != sizeof(words) + namelen + taglen*sizeof(uint32_t) + datalen)
00667 {
00668 logme()
00669 << "ERROR: corrupt 'OBJECT' message of length " << len
00670 << " from peer " << p->peeraddr
00671 << ", expected length " << sizeof(words)
00672 << " + " << namelen
00673 << " + " << (taglen*sizeof(uint32_t))
00674 << " + " << datalen
00675 << std::endl;
00676 return false;
00677 }
00678
00679 unsigned char *namedata = data + sizeof(words);
00680 unsigned char *tagdata = namedata + namelen;
00681 unsigned char *objdata = tagdata + taglen*sizeof(uint32_t);
00682 unsigned char *enddata = objdata + datalen;
00683 std::string name ((char *) namedata, namelen);
00684 assert (enddata == data + len);
00685
00686 if (debug_)
00687 logme()
00688 << "DEBUG: received message 'OBJECT " << name
00689 << "' from " << p->peeraddr << std::endl;
00690
00691
00692 p->source = true;
00693
00694
00695 lock();
00696 Object *o = findObject(p, name);
00697 if (! o)
00698 o = makeObject(p, name);
00699
00700 o->flags = words[2] | DQM_FLAG_NEW | DQM_FLAG_RECEIVED;
00701 o->version = ((uint64_t) words[4] << 32 | words[3]);
00702 o->tags.clear();
00703 o->tags.insert(o->tags.end(), (uint32_t *) tagdata, (uint32_t *) objdata);
00704 o->rawdata.clear();
00705 o->rawdata.insert (o->rawdata.end(), objdata, enddata);
00706
00707 bool hadobject = (o->object != 0);
00708 delete o->object;
00709 o->object = 0;
00710 delete o->reference;
00711 o->reference = 0;
00712
00713
00714
00715 if (hadobject && ! datalen)
00716 requestObject(p, (namelen ? &name[0] : 0), namelen);
00717
00718
00719 if (datalen)
00720 releaseWaiters(o);
00721 unlock();
00722 }
00723 return true;
00724
00725 case DQM_REPLY_NONE:
00726 {
00727 uint32_t words[3];
00728 if (len < sizeof(words))
00729 {
00730 logme()
00731 << "ERROR: corrupt 'NONE' message of length " << len
00732 << " from peer " << p->peeraddr << std::endl;
00733 return false;
00734 }
00735
00736 memcpy (&words[0], data, sizeof(words));
00737 uint32_t &namelen = words[2];
00738
00739 if (len != sizeof(words) + namelen)
00740 {
00741 logme()
00742 << "ERROR: corrupt 'NONE' message of length " << len
00743 << " from peer " << p->peeraddr
00744 << ", expected length " << sizeof(words)
00745 << " + " << namelen << std::endl;
00746 return false;
00747 }
00748
00749 unsigned char *namedata = data + sizeof(words);
00750 unsigned char *enddata = namedata + namelen;
00751 std::string name ((char *) namedata, namelen);
00752 assert (enddata == data + len);
00753
00754 if (debug_)
00755 logme()
00756 << "DEBUG: received message 'NONE " << name
00757 << "' from " << p->peeraddr << std::endl;
00758
00759
00760 p->source = true;
00761
00762
00763 lock();
00764 Object *o = findObject(p, name);
00765 if (o)
00766 o->flags |= DQM_FLAG_DEAD;
00767
00768
00769 releaseWaiters(o);
00770 unlock();
00771 }
00772 return true;
00773
00774 default:
00775 logme()
00776 << "ERROR: unrecognised message of length " << len
00777 << " and type " << type << " from peer " << p->peeraddr
00778 << std::endl;
00779 return false;
00780 }
00781 }
00782
00785 bool
00786 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
00787 {
00788 assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00789
00790
00791
00792
00793
00794 if (ev->events & IOUrgent)
00795 {
00796 if (p->automatic)
00797 {
00798 logme()
00799 << "WARNING: connection to the DQM server at " << p->peeraddr
00800 << " lost (will attempt to reconnect in 15 seconds)\n";
00801 return losePeer(0, p, ev);
00802 }
00803 else
00804 return losePeer("WARNING: lost peer connection ", p, ev);
00805 }
00806
00807
00808 if (ev->events & IOWrite)
00809 {
00810 while (Bucket *b = p->sendq)
00811 {
00812 IOSize len = b->data.size() - p->sendpos;
00813 const void *data = (len ? (const void *)&b->data[p->sendpos]
00814 : (const void *)&data);
00815 IOSize done;
00816
00817 try
00818 {
00819 done = (len ? ev->source->write (data, len) : 0);
00820 if (debug_ && len)
00821 logme()
00822 << "DEBUG: sent " << done << " bytes to peer "
00823 << p->peeraddr << std::endl;
00824 }
00825 catch (Error &e)
00826 {
00827 return losePeer("WARNING: unable to write to peer ",
00828 p, ev, &e);
00829 }
00830
00831 p->sendpos += done;
00832 if (p->sendpos == b->data.size())
00833 {
00834 Bucket *old = p->sendq;
00835 p->sendq = old->next;
00836 p->sendpos = 0;
00837 old->next = 0;
00838 discard(old);
00839 }
00840
00841 if (! done && len)
00842
00843 break;
00844 }
00845 }
00846
00847
00848
00849 if (ev->events & IORead)
00850 {
00851
00852
00853
00854 IOSize sz;
00855 try
00856 {
00857 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00858 do
00859 if ((sz = ev->source->read(&buf[0], buf.size())))
00860 {
00861 if (debug_)
00862 logme()
00863 << "DEBUG: received " << sz << " bytes from peer "
00864 << p->peeraddr << std::endl;
00865 DataBlob &data = p->incoming;
00866 if (data.capacity () < data.size () + sz)
00867 data.reserve (data.size() + SOCKET_READ_GROWTH);
00868 data.insert (data.end(), &buf[0], &buf[0] + sz);
00869 }
00870 while (sz == sizeof (buf));
00871 }
00872 catch (Error &e)
00873 {
00874 SystemError *next = dynamic_cast<SystemError *>(e.next());
00875 if (next && next->portable() == SysErr::ErrTryAgain)
00876 sz = 1;
00877 else
00878
00879 return losePeer("WARNING: failed to read from peer ",
00880 p, ev, &e);
00881 }
00882
00883
00884 size_t consumed = 0;
00885 DataBlob &data = p->incoming;
00886 while (data.size()-consumed >= sizeof(uint32_t)
00887 && p->waiting < MAX_PEER_WAITREQS)
00888 {
00889 uint32_t msglen;
00890 memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00891
00892 if (msglen >= MESSAGE_SIZE_LIMIT)
00893 return losePeer("WARNING: excessively large message from ", p, ev);
00894
00895 if (data.size()-consumed >= msglen)
00896 {
00897 bool valid = true;
00898 if (msglen < 2*sizeof(uint32_t))
00899 {
00900 logme()
00901 << "ERROR: corrupt peer message of length " << msglen
00902 << " from peer " << p->peeraddr << std::endl;
00903 valid = false;
00904 }
00905 else
00906 {
00907
00908 Bucket msg;
00909 msg.next = 0;
00910 valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00911
00912
00913 if (! msg.data.empty())
00914 {
00915 Bucket **prev = &p->sendq;
00916 while (*prev)
00917 prev = &(*prev)->next;
00918
00919 *prev = new Bucket;
00920 (*prev)->next = 0;
00921 (*prev)->data.swap(msg.data);
00922 }
00923 }
00924
00925 if (! valid)
00926 return losePeer("WARNING: data stream error with ", p, ev);
00927
00928 consumed += msglen;
00929 }
00930 else
00931 break;
00932 }
00933
00934 data.erase(data.begin(), data.begin()+consumed);
00935
00936
00937
00938
00939 if (sz == 0)
00940 sel_.setMask(p->socket, p->mask &= ~IORead);
00941 }
00942
00943
00944 return false;
00945 }
00946
00951 bool
00952 DQMNet::onPeerConnect(IOSelectEvent *ev)
00953 {
00954
00955 assert (ev->source == server_);
00956
00957
00958 Socket *s = server_->accept();
00959 assert (s);
00960 assert (! s->isBlocking());
00961
00962
00963 Peer *p = createPeer(s);
00964 InetAddress peeraddr = ((InetSocket *) s)->peername();
00965 InetAddress myaddr = ((InetSocket *) s)->sockname();
00966 p->peeraddr = StringFormat("%1:%2")
00967 .arg(peeraddr.hostname())
00968 .arg(peeraddr.port());
00969 p->mask = IORead|IOUrgent;
00970 p->socket = s;
00971
00972
00973 if (debug_)
00974 logme()
00975 << "INFO: new peer " << p->peeraddr << " is now connected to "
00976 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00977
00978
00979 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
00980
00981
00982 return false;
00983 }
00984
00991 bool
00992 DQMNet::onLocalNotify(IOSelectEvent *ev)
00993 {
00994
00995 try
00996 {
00997 IOSize sz;
00998 unsigned char buf [1024];
00999 while ((sz = ev->source->read(buf, sizeof(buf))))
01000 ;
01001 }
01002 catch (Error &e)
01003 {
01004 SystemError *next = dynamic_cast<SystemError *>(e.next());
01005 if (next && next->portable() == SysErr::ErrTryAgain)
01006 ;
01007 else
01008 logme()
01009 << "WARNING: error reading from notification pipe: "
01010 << e.explain() << std::endl;
01011 }
01012
01013
01014 flush_ = true;
01015
01016
01017 return false;
01018 }
01019
01022 void
01023 DQMNet::updateMask(Peer *p)
01024 {
01025 if (! p->socket)
01026 return;
01027
01028
01029 unsigned oldmask = p->mask;
01030 if (! p->sendq && (p->mask & IOWrite))
01031 sel_.setMask(p->socket, p->mask &= ~IOWrite);
01032
01033 if (p->sendq && ! (p->mask & IOWrite))
01034 sel_.setMask(p->socket, p->mask |= IOWrite);
01035
01036 if (debug_ && oldmask != p->mask)
01037 logme()
01038 << "DEBUG: updating mask for " << p->peeraddr << " to "
01039 << p->mask << " from " << oldmask << std::endl;
01040
01041
01042
01043 if (p->mask == IOUrgent && ! p->waiting)
01044 {
01045 assert (! p->sendq);
01046 if (debug_)
01047 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
01048 losePeer(0, p, 0);
01049 }
01050 }
01051
01053 DQMNet::DQMNet (const std::string &appname )
01054 : debug_ (false),
01055 sendScalarAsText_ (false),
01056 requestFullUpdates_ (false),
01057 appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
01058 pid_ (getpid()),
01059 server_ (0),
01060 version_ (Time::current()),
01061 communicate_ ((pthread_t) -1),
01062 shutdown_ (0),
01063 delay_ (1000),
01064 flush_ (false)
01065 {
01066
01067
01068
01069 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
01070 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
01071
01072
01073 upstream_.peer = downstream_.peer = 0;
01074 upstream_.next = downstream_.next = 0;
01075 upstream_.port = downstream_.port = 0;
01076 upstream_.update = downstream_.update = false;
01077 upstream_.warned = downstream_.warned = false;
01078 }
01079
01080 DQMNet::~DQMNet(void)
01081 {
01082
01083 }
01084
01087 void
01088 DQMNet::debug(bool doit)
01089 {
01090 debug_ = doit;
01091 }
01092
01095 void
01096 DQMNet::delay(int delay)
01097 {
01098 delay_ = delay;
01099 }
01100
01104 void
01105 DQMNet::sendScalarAsText(bool doit)
01106 {
01107 sendScalarAsText_ = doit;
01108 }
01109
01115 void
01116 DQMNet::requestFullUpdates(bool doit)
01117 {
01118 requestFullUpdates_ = doit;
01119 }
01120
01124 void
01125 DQMNet::startLocalServer(int port)
01126 {
01127 if (server_)
01128 {
01129 logme() << "ERROR: DQM server was already started.\n";
01130 return;
01131 }
01132
01133 try
01134 {
01135 server_ = new InetServerSocket(InetAddress (port), 10);
01136 server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
01137 server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
01138 server_->setBlocking(false);
01139 sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01140 }
01141 catch (Error &e)
01142 {
01143
01144
01145 logme()
01146 << "ERROR: Failed to start server at port " << port << ": "
01147 << e.explain() << std::endl;
01148
01149
01150 throw cms::Exception("DQMNet::startLocalServer")
01151 << "Failed to start server at port " << port << ": "
01152 << e.explain();
01153 }
01154
01155 logme() << "INFO: DQM server started at port " << port << std::endl;
01156 }
01157
01161 void
01162 DQMNet::updateToCollector(const std::string &host, int port)
01163 {
01164 if (! downstream_.host.empty())
01165 {
01166 logme()
01167 << "ERROR: Already updating another collector at "
01168 << downstream_.host << ":" << downstream_.port << std::endl;
01169 return;
01170 }
01171
01172 downstream_.update = true;
01173 downstream_.host = host;
01174 downstream_.port = port;
01175 }
01176
01180 void
01181 DQMNet::listenToCollector(const std::string &host, int port)
01182 {
01183 if (! upstream_.host.empty())
01184 {
01185 logme()
01186 << "ERROR: Already receiving data from another collector at "
01187 << upstream_.host << ":" << upstream_.port << std::endl;
01188 return;
01189 }
01190
01191 upstream_.update = false;
01192 upstream_.host = host;
01193 upstream_.port = port;
01194 }
01195
01197 void
01198 DQMNet::shutdown(void)
01199 {
01200 shutdown_ = 1;
01201 if (communicate_ != (pthread_t) -1)
01202 pthread_join(communicate_, 0);
01203 }
01204
01210 static void *communicate(void *obj)
01211 {
01212 sigset_t sigs;
01213 sigfillset (&sigs);
01214 pthread_sigmask (SIG_BLOCK, &sigs, 0);
01215 ((DQMNet *)obj)->run();
01216 return 0;
01217 }
01218
01220 void
01221 DQMNet::lock(void)
01222 {
01223 if (communicate_ != (pthread_t) -1)
01224 pthread_mutex_lock(&lock_);
01225 }
01226
01228 void
01229 DQMNet::unlock(void)
01230 {
01231 if (communicate_ != (pthread_t) -1)
01232 pthread_mutex_unlock(&lock_);
01233 }
01234
01238 void
01239 DQMNet::start(void)
01240 {
01241 if (communicate_ != (pthread_t) -1)
01242 {
01243 logme()
01244 << "ERROR: DQM networking thread has already been started\n";
01245 return;
01246 }
01247
01248 pthread_mutex_init(&lock_, 0);
01249 pthread_create (&communicate_, 0, &communicate, this);
01250 }
01251
01253 void
01254 DQMNet::run(void)
01255 {
01256 Time now;
01257 Time nextFlush = 0;
01258 AutoPeer *automatic[2] = { &upstream_, &downstream_ };
01259
01260
01261 while (! shouldStop())
01262 {
01263 for (int i = 0; i < 2; ++i)
01264 {
01265 AutoPeer *ap = automatic[i];
01266
01267
01268
01269
01270 if (! ap->host.empty()
01271 && ! ap->peer
01272 && (now = Time::current()) > ap->next)
01273 {
01274 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
01275 InetSocket *s = 0;
01276 try
01277 {
01278 s = new InetSocket (SocketConst::TypeStream);
01279 s->setBlocking (false);
01280 s->connect(InetAddress (ap->host.c_str(), ap->port));
01281 s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
01282 s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
01283 }
01284 catch (Error &e)
01285 {
01286 SystemError *sys = dynamic_cast<SystemError *>(e.next());
01287 if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
01288 {
01289
01290
01291
01292 if (! ap->warned)
01293 {
01294 logme()
01295 << "NOTE: DQM server at " << ap->host << ":" << ap->port
01296 << " is unavailable. Connection will be established"
01297 << " automatically on the background once the server"
01298 << " becomes available. Error from the attempt was: "
01299 << e.explain() << '\n';
01300 ap->warned = true;
01301 }
01302
01303 if (s)
01304 s->abort();
01305 delete s;
01306 s = 0;
01307 }
01308 }
01309
01310
01311
01312 if (s)
01313 {
01314 lock();
01315 Peer *p = createPeer(s);
01316 ap->peer = p;
01317 ap->warned = false;
01318 unlock();
01319
01320 InetAddress peeraddr = ((InetSocket *) s)->peername();
01321 InetAddress myaddr = ((InetSocket *) s)->sockname();
01322 p->peeraddr = StringFormat("%1:%2")
01323 .arg(peeraddr.hostname())
01324 .arg(peeraddr.port());
01325 p->mask = IORead|IOWrite|IOUrgent;
01326 p->update = ap->update;
01327 p->automatic = ap;
01328 p->socket = s;
01329 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
01330 if (ap == &upstream_)
01331 {
01332 uint32_t words[5] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
01333 3*sizeof(uint32_t), DQM_MSG_UPDATE_ME,
01334 requestFullUpdates_ };
01335 p->sendq = new Bucket;
01336 p->sendq->next = 0;
01337 copydata(p->sendq, words, sizeof(words));
01338 }
01339
01340
01341 if (debug_)
01342 logme()
01343 << "INFO: now connected to " << p->peeraddr << " from "
01344 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01345 }
01346 }
01347 }
01348
01349
01350 sel_.dispatch(delay_);
01351 now = Time::current();
01352
01353
01354
01355
01356
01357 if (flush_ && now > nextFlush)
01358 {
01359 flush_ = false;
01360 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
01361
01362 lock();
01363 purgeDeadObjects(now - TimeSpan(0, 0, 2 , 0, 0),
01364 now - TimeSpan(0, 0, 20 , 0, 0));
01365 sendObjectListToPeers(true);
01366 unlock();
01367 }
01368
01369
01370
01371
01372
01373
01374 updatePeerMasks();
01375
01376
01377 lock();
01378 Time waitold = now - TimeSpan(0, 0, 2 , 0, 0);
01379 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01380 {
01381
01382 if (i->time < waitold)
01383 releaseFromWait(i++, findObject(0, i->name));
01384
01385
01386 else
01387 ++i;
01388 }
01389 unlock();
01390 }
01391 }
01392
01393 int
01394 DQMNet::receive(DQMStore *)
01395 {
01396 logme() << "ERROR: receive() method is not supported.\n";
01397 return 0;
01398 }
01399
01400 void
01401 DQMNet::updateLocalObject(Object &o)
01402 {
01403 logme() << "ERROR: updateLocalObject() method is not supported.\n";
01404 }
01405
01406 void
01407 DQMNet::removeLocalObject(const std::string &name)
01408 {
01409 logme() << "ERROR: removeLocalObject() method is not supported.\n";
01410 }
01411
01412
01413
01414 void
01415 DQMNet::sendLocalChanges(void)
01416 {
01417 char byte = 0;
01418 wakeup_.sink()->write(&byte, 1);
01419 }
01420
01424 DQMBasicNet::DQMBasicNet(const std::string &appname )
01425 : DQMImplNet<DQMNet::Object>(appname)
01426 {
01427 local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
01428 }
01429
01430 int
01431 DQMBasicNet::receive(DQMStore *store)
01432 {
01433 int updates = 0;
01434
01435 lock();
01436 PeerMap::iterator pi, pe;
01437 ObjectMap::iterator oi, oe;
01438 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01439 {
01440 ImplPeer &p = pi->second;
01441 if (&p == local_)
01442 continue;
01443
01444 updates += p.updates;
01445
01446 for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01447 {
01448 Object &o = oi->second;
01449 if (o.flags & DQM_FLAG_DEAD)
01450 {
01451 std::string folder = o.name;
01452 std::string name = o.name;
01453 folder.erase(folder.rfind('/'), std::string::npos);
01454 name.erase(0, name.rfind('/')+1);
01455 store->setCurrentFolder(folder);
01456 store->removeElement(name);
01457 p.objs.erase(oi++);
01458 }
01459 else if ((o.flags & DQM_FLAG_RECEIVED) && reinstateObject(store, o))
01460 {
01461 o.flags &= ~DQM_FLAG_RECEIVED;
01462 ++oi;
01463 }
01464 }
01465 }
01466 unlock();
01467
01468 return updates;
01469 }
01470
01473 void
01474 DQMBasicNet::updateLocalObject(Object &o)
01475 {
01476 ObjectMap::iterator pos = local_->objs.find(o.name);
01477 if (pos == local_->objs.end())
01478 local_->objs.insert(ObjectMap::value_type(o.name, o));
01479 else
01480 {
01481 std::swap(pos->second.version, o.version);
01482 std::swap(pos->second.tags, o.tags);
01483 std::swap(pos->second.qreports, o.qreports);
01484 std::swap(pos->second.flags, o.flags);
01485 std::swap(pos->second.rawdata, o.rawdata);
01486
01487 delete pos->second.object;
01488 pos->second.object = 0;
01489 delete pos->second.reference;
01490 pos->second.reference = 0;
01491 pos->second.lastreq = 0;
01492 }
01493 }
01494
01497 void
01498 DQMBasicNet::removeLocalObject(const std::string &path)
01499 {
01500 local_->objs.erase(path);
01501 }
01502
01503 void
01504 DQMBasicNet::requestFullUpdatesFromPeers(void)
01505 {
01506 for (PeerMap::iterator i = peers_.begin(), e = peers_.end(); i != e; ++i)
01507 {
01508 ImplPeer &p = i->second;
01509 if (! p.source)
01510 continue;
01511
01512 Bucket **msg = &p.sendq;
01513 while (*msg)
01514 msg = &(*msg)->next;
01515 *msg = new Bucket;
01516 (*msg)->next = 0;
01517
01518 uint32_t words[3] = { 3*sizeof(uint32_t), DQM_MSG_UPDATE_ME, 1 };
01519 copydata(*msg, words, sizeof(words));
01520 }
01521 }