CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/DQMServices/Core/src/DQMNet.cc

Go to the documentation of this file.
00001 #include "DQMServices/Core/interface/DQMNet.h"
00002 #include "DQMServices/Core/interface/DQMDefinitions.h"
00003 #include "DQMServices/Core/src/DQMError.h"
00004 #include "classlib/iobase/InetServerSocket.h"
00005 #include "classlib/iobase/LocalServerSocket.h"
00006 #include "classlib/iobase/Filename.h"
00007 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
00008 #include "classlib/utils/TimeInfo.h"
00009 #include "classlib/utils/StringList.h"
00010 #include "classlib/utils/StringFormat.h"
00011 #include "classlib/utils/StringOps.h"
00012 #include "classlib/utils/SystemError.h"
00013 #include "classlib/utils/Regexp.h"
00014 #include <unistd.h>
00015 #include <fcntl.h>
00016 #include <sys/wait.h>
00017 #include <stdio.h>
00018 #include <stdint.h>
00019 #include <iostream>
00020 #include <sstream>
00021 #include <cassert>
00022 #include <cfloat>
00023 #include <inttypes.h>
00024 
00025 #define MESSAGE_SIZE_LIMIT      (8*1024*1024)
00026 #define SOCKET_BUF_SIZE         (8*1024*1024)
00027 #define SOCKET_READ_SIZE        (SOCKET_BUF_SIZE/8)
00028 #define SOCKET_READ_GROWTH      (SOCKET_BUF_SIZE)
00029 
00030 using namespace lat;
00031 
00032 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
00033 
00035 // Generate log prefix.
00036 std::ostream &
00037 DQMNet::logme (void)
00038 {
00039   Time now = Time::current();
00040   return std::cout
00041     << now.format(true, "%Y-%m-%d %H:%M:%S.")
00042     << now.nanoformat(3, 3)
00043     << " " << appname_ << "[" << pid_ << "]: ";
00044 }
00045 
00046 // Append data into a bucket.
00047 void
00048 DQMNet::copydata(Bucket *b, const void *data, size_t len)
00049 {
00050   b->data.insert(b->data.end(),
00051                  (const unsigned char *)data,
00052                  (const unsigned char *)data + len);
00053 }
00054 
00055 // Discard a bucket chain.
00056 void
00057 DQMNet::discard (Bucket *&b)
00058 {
00059   while (b)
00060   {
00061     Bucket *next = b->next;
00062     delete b;
00063     b = next;
00064   }
00065 }
00066 
00068 
00071 void
00072 DQMNet::losePeer(const char *reason,
00073                  Peer *peer,
00074                  IOSelectEvent *ev,
00075                  Error *err)
00076 {
00077   if (reason)
00078     logme ()
00079       << reason << peer->peeraddr
00080       << (err ? "; error was: " + err->explain() : std::string(""))
00081       << std::endl;
00082 
00083   Socket *s = peer->socket;
00084 
00085   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00086     if (i->peer == peer)
00087       waiting_.erase(i++);
00088     else
00089       ++i;
00090 
00091   if (ev)
00092     ev->source = 0;
00093 
00094   discard(peer->sendq);
00095   if (peer->automatic)
00096     peer->automatic->peer = 0;
00097 
00098   sel_.detach (s);
00099   s->close();
00100   removePeer(peer, s);
00101   delete s;
00102 }
00103 
00105 void
00106 DQMNet::requestObjectData(Peer *p, const char *name, size_t len)
00107 {
00108   // Issue request to peer.
00109   Bucket **msg = &p->sendq;
00110   while (*msg)
00111     msg = &(*msg)->next;
00112   *msg = new Bucket;
00113   (*msg)->next = 0;
00114 
00115   uint32_t words[3];
00116   words[0] = sizeof(words) + len;
00117   words[1] = DQM_MSG_GET_OBJECT;
00118   words[2] = len;
00119   copydata(*msg, words, sizeof(words));
00120   copydata(*msg, name, len);
00121 }
00122 
00125 void
00126 DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00127 {
00128   // FIXME: Should we automatically record which exact peer the waiter
00129   // is expecting to deliver data so we know to release the waiter if
00130   // the other peer vanishes?  The current implementation stands a
00131   // chance for the waiter to wait indefinitely -- although we do
00132   // force terminate the wait after a while.
00133   requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
00134   WaitObject wo = { Time::current(), name, info, p };
00135   waiting_.push_back(wo);
00136   p->waiting++;
00137 }
00138 
00139 // Once an object has been updated, this is invoked for all waiting
00140 // peers.  Send the object back to the peer in suitable form.
00141 void
00142 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
00143 {
00144   Bucket **msg = &i->peer->sendq;
00145   while (*msg)
00146     msg = &(*msg)->next;
00147   *msg = new Bucket;
00148   (*msg)->next = 0;
00149 
00150   releaseFromWait(*msg, *i, o);
00151 
00152   assert(i->peer->waiting > 0);
00153   i->peer->waiting--;
00154   waiting_.erase(i);
00155 }
00156 
00157 // Release everyone waiting for the object @a o.
00158 void
00159 DQMNet::releaseWaiters(const std::string &name, Object *o)
00160 {
00161   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00162     if (i->name == name)
00163       releaseFromWait(i++, o);
00164     else
00165       ++i;
00166 }
00167 
00171 void
00172 DQMNet::packQualityData(std::string &into, const QReports &qr)
00173 {
00174   char buf[64];
00175   std::ostringstream qrs;
00176   QReports::const_iterator qi, qe;
00177   for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
00178   {
00179     int pos = 0;
00180     sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
00181     qrs << buf << '\0'
00182         << buf+pos << '\0'
00183         << qi->qtname << '\0'
00184         << qi->algorithm << '\0'
00185         << qi->message << '\0'
00186         << '\0';
00187   }
00188   into = qrs.str();
00189 }
00190 
00193 void
00194 DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
00195 {
00196   const char *qdata = from;
00197 
00198   // Count how many qresults there are.
00199   size_t nqv = 0;
00200   while (*qdata)
00201   {
00202     ++nqv;
00203     while (*qdata) ++qdata; ++qdata;
00204     while (*qdata) ++qdata; ++qdata;
00205     while (*qdata) ++qdata; ++qdata;
00206     while (*qdata) ++qdata; ++qdata;
00207     while (*qdata) ++qdata; ++qdata;
00208   }
00209 
00210   // Now extract the qreports.
00211   qdata = from;
00212   qr.reserve(nqv);
00213   while (*qdata)
00214   {
00215     qr.push_back(DQMNet::QValue());
00216     DQMNet::QValue &qv = qr.back();
00217 
00218     qv.code = atoi(qdata);
00219     while (*qdata) ++qdata;
00220     switch (qv.code)
00221     {
00222     case dqm::qstatus::STATUS_OK:
00223       break;
00224     case dqm::qstatus::WARNING:
00225       flags |= DQMNet::DQM_PROP_REPORT_WARN;
00226       break;
00227     case dqm::qstatus::ERROR:
00228       flags |= DQMNet::DQM_PROP_REPORT_ERROR;
00229       break;
00230     default:
00231       flags |= DQMNet::DQM_PROP_REPORT_OTHER;
00232       break;
00233     }
00234 
00235     qv.qtresult = atof(++qdata);
00236     while (*qdata) ++qdata;
00237 
00238     qv.qtname = ++qdata;
00239     while (*qdata) ++qdata;
00240 
00241     qv.algorithm = ++qdata;
00242     while (*qdata) ++qdata;
00243 
00244     qv.message = ++qdata;
00245     while (*qdata) ++qdata;
00246     ++qdata;
00247   }
00248 }
00249 
00250 #if 0
00251 // Deserialise a ROOT object from a buffer at the current position.
00252 static TObject *
00253 extractNextObject(TBufferFile &buf)
00254 {
00255   if (buf.Length() == buf.BufferSize())
00256     return 0;
00257 
00258   buf.InitMap();
00259   Int_t pos = buf.Length();
00260   TClass *c = buf.ReadClass();
00261   buf.SetBufferOffset(pos);
00262   buf.ResetMap();
00263   return c ? buf.ReadObject(c) : 0;
00264 }
00265 
00266 // Reconstruct an object from the raw data.
00267 bool
00268 DQMNet::reconstructObject(Object &o)
00269 {
00270   TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00271   buf.Reset();
00272 
00273   // Extract the main object.
00274   if (! (o.object = extractNextObject(buf)))
00275     return false;
00276   
00277   // Extract the reference object.
00278   o.reference = extractNextObject(buf);
00279 
00280   // Extract quality reports.
00281   unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
00282   return true;
00283 }
00284 #endif
00285 
00286 #if 0
00287 bool
00288 DQMNet::reinstateObject(DQMStore *store, Object &o)
00289 {
00290   if (! reconstructObject (o))
00291     return false;
00292 
00293   // Reconstruct the main object
00294   MonitorElement *obj = 0;
00295   store->setCurrentFolder(*o.dirname);
00296   switch (o.flags & DQM_PROP_TYPE_MASK)
00297   {
00298   case DQM_PROP_TYPE_INT:
00299     obj = store->bookInt(o.objname);
00300     obj->Fill(atoll(o.scalar.c_str()));
00301     break;
00302 
00303   case DQM_PROP_TYPE_REAL:
00304     obj = store->bookFloat(name);
00305     obj->Fill(atof(o.scalar.c_str()));
00306     break;
00307 
00308   case DQM_PROP_TYPE_STRING:
00309     obj = store->bookString(name, o.scalar);
00310     break;
00311 
00312   case DQM_PROP_TYPE_TH1F:
00313     obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
00314     break;
00315 
00316   case DQM_PROP_TYPE_TH1S:
00317     obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
00318     break;
00319 
00320   case DQM_PROP_TYPE_TH1D:
00321     obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
00322     break;
00323 
00324   case DQM_PROP_TYPE_TH2F:
00325     obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
00326     break;
00327 
00328   case DQM_PROP_TYPE_TH2S:
00329     obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
00330     break;
00331 
00332   case DQM_PROP_TYPE_TH2D:
00333     obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
00334     break;
00335 
00336   case DQM_PROP_TYPE_TH3F:
00337     obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
00338     break;
00339 
00340   case DQM_PROP_TYPE_TH3S:
00341     obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
00342     break;
00343 
00344   case DQM_PROP_TYPE_TH3D:
00345     obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
00346     break;
00347 
00348   case DQM_PROP_TYPE_PROF:
00349     obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
00350     break;
00351 
00352   case DQM_PROP_TYPE_PROF2D:
00353     obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
00354     break;
00355 
00356   default:
00357     logme()
00358       << "ERROR: unexpected monitor element of type "
00359       << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
00360       << *o.dirname << '/' << o.objname << "'\n";
00361     return false;
00362   }
00363 
00364   // Reconstruct tag and qreports.
00365   if (obj)
00366   {
00367     obj->data_.tag = o.tag;
00368     obj->data_.qreports = o.qreports;
00369   }
00370 
00371   // Inidicate success.
00372   return true;
00373 }
00374 #endif
00375 
00377 // Check if the network layer should stop.
00378 bool
00379 DQMNet::shouldStop(void)
00380 {
00381   return shutdown_;
00382 }
00383 
00384 // Once an object has been updated, this is invoked for all waiting
00385 // peers.  Send the requested object to the waiting peer.
00386 void
00387 DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00388 {
00389   if (o)
00390     sendObjectToPeer(msg, *o, true);
00391   else
00392   {
00393     uint32_t words [3];
00394     words[0] = sizeof(words) + w.name.size();
00395     words[1] = DQM_REPLY_NONE;
00396     words[2] = w.name.size();
00397 
00398     msg->data.reserve(msg->data.size() + words[0]);
00399     copydata(msg, &words[0], sizeof(words));
00400     copydata(msg, &w.name[0], w.name.size());
00401   }
00402 }
00403 
00404 // Send an object to a peer.  If not @a data, only sends a summary
00405 // without the object data, except the data is always sent for scalar
00406 // objects.
00407 void
00408 DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00409 {
00410   uint32_t flags = o.flags & ~DQM_PROP_DEAD;
00411   DataBlob objdata;
00412 
00413   if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00414     objdata.insert(objdata.end(),
00415                    &o.scalar[0],
00416                    &o.scalar[0] + o.scalar.size());
00417   else if (data)
00418     objdata.insert(objdata.end(),
00419                    &o.rawdata[0],
00420                    &o.rawdata[0] + o.rawdata.size());
00421 
00422   uint32_t words [9];
00423   uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
00424   uint32_t datalen = objdata.size();
00425   uint32_t qlen = o.qdata.size();
00426 
00427   if (o.dirname->empty())
00428     --namelen;
00429 
00430   words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
00431   words[1] = DQM_REPLY_OBJECT;
00432   words[2] = flags;
00433   words[3] = (o.version >> 0 ) & 0xffffffff;
00434   words[4] = (o.version >> 32) & 0xffffffff;
00435   words[5] = o.tag;
00436   words[6] = namelen;
00437   words[7] = datalen;
00438   words[8] = qlen;
00439 
00440   msg->data.reserve(msg->data.size() + words[0]);
00441   copydata(msg, &words[0], 9*sizeof(uint32_t));
00442   if (namelen)
00443   {
00444     copydata(msg, &(*o.dirname)[0], o.dirname->size());
00445     if (! o.dirname->empty())
00446       copydata(msg, "/", 1);
00447     copydata(msg, &o.objname[0], o.objname.size());
00448   }
00449   if (datalen)
00450     copydata(msg, &objdata[0], datalen);
00451   if (qlen)
00452     copydata(msg, &o.qdata[0], qlen);
00453 }
00454 
00456 // Handle peer messages.
00457 bool
00458 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00459 {
00460   // Decode and process this message.
00461   uint32_t type;
00462   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00463   switch (type)
00464   {
00465   case DQM_MSG_UPDATE_ME:
00466     {
00467       if (len != 2*sizeof(uint32_t))
00468       {
00469         logme()
00470           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00471           << " from peer " << p->peeraddr << std::endl;
00472         return false;
00473       }
00474 
00475       if (debug_)
00476         logme()
00477           << "DEBUG: received message 'UPDATE ME' from peer "
00478           << p->peeraddr << ", size " << len << std::endl;
00479 
00480       p->update = true;
00481     }
00482     return true;
00483 
00484   case DQM_MSG_LIST_OBJECTS:
00485     {
00486       if (debug_)
00487         logme()
00488           << "DEBUG: received message 'LIST OBJECTS' from peer "
00489           << p->peeraddr << ", size " << len << std::endl;
00490 
00491       // Send over current status: list of known objects.
00492       sendObjectListToPeer(msg, true, false);
00493     }
00494     return true;
00495 
00496   case DQM_MSG_GET_OBJECT:
00497     {
00498       if (debug_)
00499         logme()
00500           << "DEBUG: received message 'GET OBJECT' from peer "
00501           << p->peeraddr << ", size " << len << std::endl;
00502 
00503       if (len < 3*sizeof(uint32_t))
00504       {
00505         logme()
00506           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00507           << " from peer " << p->peeraddr << std::endl;
00508         return false;
00509       }
00510 
00511       uint32_t namelen;
00512       memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00513       if (len != 3*sizeof(uint32_t) + namelen)
00514       {
00515         logme()
00516           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00517           << " from peer " << p->peeraddr
00518           << ", expected length " << (3*sizeof(uint32_t))
00519           << " + " << namelen << std::endl;
00520         return false;
00521       }
00522 
00523       std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
00524       Peer *owner = 0;
00525       Object *o = findObject(0, name, &owner);
00526       if (o)
00527       {
00528         o->lastreq = Time::current().ns();
00529         if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
00530             && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00531           waitForData(p, name, "", owner);
00532         else
00533           sendObjectToPeer(msg, *o, true);
00534       }
00535       else
00536       {
00537         uint32_t words [3];
00538         words[0] = sizeof(words) + name.size();
00539         words[1] = DQM_REPLY_NONE;
00540         words[2] = name.size();
00541 
00542         msg->data.reserve(msg->data.size() + words[0]);
00543         copydata(msg, &words[0], sizeof(words));
00544         copydata(msg, &name[0], name.size());
00545       }
00546     }
00547     return true;
00548 
00549   case DQM_REPLY_LIST_BEGIN:
00550     {
00551       if (len != 4*sizeof(uint32_t))
00552       {
00553         logme()
00554           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00555           << " from peer " << p->peeraddr << std::endl;
00556         return false;
00557       }
00558 
00559       // Get the update status: whether this is a full update.
00560       uint32_t flags;
00561       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00562 
00563       if (debug_)
00564         logme()
00565           << "DEBUG: received message 'LIST BEGIN "
00566           << (flags ? "FULL" : "INCREMENTAL")
00567           << "' from " << p->peeraddr
00568           << ", size " << len << std::endl;
00569 
00570       // If we are about to receive a full list of objects, flag all
00571       // objects as possibly dead.  Subsequent object notifications
00572       // will undo this for the live objects.  We cannot delete
00573       // objects quite yet, as we may get inquiry from another client
00574       // while we are processing the incoming list, so we keep the
00575       // objects tentatively alive as long as we've not seen the end.
00576       if (flags)
00577         markObjectsDead(p);
00578     }
00579     return true;
00580 
00581   case DQM_REPLY_LIST_END:
00582     {
00583       if (len != 4*sizeof(uint32_t))
00584       {
00585         logme()
00586           << "ERROR: corrupt 'LIST END' message of length " << len
00587           << " from peer " << p->peeraddr << std::endl;
00588         return false;
00589       }
00590 
00591       // Get the update status: whether this is a full update.
00592       uint32_t flags;
00593       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00594 
00595       // If we received a full list of objects, now purge all dead
00596       // objects. We need to do this in two stages in case we receive
00597       // updates in many parts, and end up sending updates to others in
00598       // between; this avoids us lying live objects are dead.
00599       if (flags)
00600         purgeDeadObjects(p);
00601 
00602       if (debug_)
00603         logme()
00604           << "DEBUG: received message 'LIST END "
00605           << (flags ? "FULL" : "INCREMENTAL")
00606           << "' from " << p->peeraddr
00607           << ", size " << len << std::endl;
00608 
00609       // Indicate we have received another update from this peer.
00610       // Also indicate we should flush to our clients.
00611       flush_ = true;
00612       p->updates++;
00613     }
00614     return true;
00615 
00616   case DQM_REPLY_OBJECT:
00617     {
00618       uint32_t words[9];
00619       if (len < sizeof(words))
00620       {
00621         logme()
00622           << "ERROR: corrupt 'OBJECT' message of length " << len
00623           << " from peer " << p->peeraddr << std::endl;
00624         return false;
00625       }
00626 
00627       memcpy (&words[0], data, sizeof(words));
00628       uint32_t &namelen = words[6];
00629       uint32_t &datalen = words[7];
00630       uint32_t &qlen = words[8];
00631 
00632       if (len != sizeof(words) + namelen + datalen + qlen)
00633       {
00634         logme()
00635           << "ERROR: corrupt 'OBJECT' message of length " << len
00636           << " from peer " << p->peeraddr
00637           << ", expected length " << sizeof(words)
00638           << " + " << namelen
00639           << " + " << datalen
00640           << " + " << qlen
00641           << std::endl;
00642         return false;
00643       }
00644 
00645       unsigned char *namedata = data + sizeof(words);
00646       unsigned char *objdata = namedata + namelen;
00647       unsigned char *qdata = objdata + datalen;
00648       unsigned char *enddata = qdata + qlen;
00649       std::string name ((char *) namedata, namelen);
00650       assert (enddata == data + len);
00651 
00652       if (debug_)
00653         logme()
00654           << "DEBUG: received message 'OBJECT " << name
00655           << "' from " << p->peeraddr
00656           << ", size " << len << std::endl;
00657 
00658       // Mark the peer as a known object source.
00659       p->source = true;
00660 
00661       // Initialise or update an object entry.
00662       Object *o = findObject(p, name);
00663       if (! o)
00664         o = makeObject(p, name);
00665 
00666       o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
00667       o->tag = words[5];
00668       o->version = ((uint64_t) words[4] << 32 | words[3]);
00669       o->scalar.clear();
00670       o->qdata.clear();
00671       if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00672       {
00673         o->rawdata.clear();
00674         o->scalar.insert(o->scalar.end(), objdata, qdata);
00675       }
00676       else if (datalen)
00677       {
00678         o->rawdata.clear();
00679         o->rawdata.insert(o->rawdata.end(), objdata, qdata);
00680       }
00681       else if (! o->rawdata.empty())
00682         o->flags |= DQM_PROP_STALE;
00683       o->qdata.insert(o->qdata.end(), qdata, enddata);
00684 
00685       // If we had an object for this one already and this is a list
00686       // update without data, issue an immediate data get request.
00687       if (o->lastreq
00688           && ! datalen
00689           && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00690         requestObjectData(p, (namelen ? &name[0] : 0), namelen);
00691 
00692       // If we have the object data, release from wait.
00693       if (datalen)
00694         releaseWaiters(name, o);
00695     }
00696     return true;
00697 
00698   case DQM_REPLY_NONE:
00699     {
00700       uint32_t words[3];
00701       if (len < sizeof(words))
00702       {
00703         logme()
00704           << "ERROR: corrupt 'NONE' message of length " << len
00705           << " from peer " << p->peeraddr << std::endl;
00706         return false;
00707       }
00708 
00709       memcpy (&words[0], data, sizeof(words));
00710       uint32_t &namelen = words[2];
00711 
00712       if (len != sizeof(words) + namelen)
00713       {
00714         logme()
00715           << "ERROR: corrupt 'NONE' message of length " << len
00716           << " from peer " << p->peeraddr
00717           << ", expected length " << sizeof(words)
00718           << " + " << namelen << std::endl;
00719         return false;
00720       }
00721 
00722       unsigned char *namedata = data + sizeof(words);
00723       std::string name((char *) namedata, namelen);
00724 
00725       if (debug_)
00726         logme()
00727           << "DEBUG: received message 'NONE " << name
00728           << "' from " << p->peeraddr
00729           << ", size " << len << std::endl;
00730 
00731       // Mark the peer as a known object source.
00732       p->source = true;
00733 
00734       // If this was a known object, kill it.
00735       if (Object *o = findObject(p, name))
00736       {
00737         o->flags |= DQM_PROP_DEAD;
00738         purgeDeadObjects(p);
00739       }
00740 
00741       // If someone was waiting for this, let them go.
00742       releaseWaiters(name, 0);
00743     }
00744     return true;
00745 
00746   default:
00747     logme()
00748       << "ERROR: unrecognised message of length " << len
00749       << " and type " << type << " from peer " << p->peeraddr
00750       << std::endl;
00751     return false;
00752   }
00753 }
00754 
00757 bool
00758 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
00759 {
00760   lock();
00761   assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00762 
00763   // If there is a problem with the peer socket, discard the peer
00764   // and tell the selector to stop prcessing events for it.  If
00765   // this is a server connection, we will eventually recreate
00766   // everything if/when the data server comes back.
00767   if (ev->events & IOUrgent)
00768   {
00769     if (p->automatic)
00770     {
00771       logme()
00772         << "WARNING: connection to the DQM server at " << p->peeraddr
00773         << " lost (will attempt to reconnect in 15 seconds)\n";
00774       losePeer(0, p, ev);
00775     }
00776     else
00777       losePeer("WARNING: lost peer connection ", p, ev);
00778 
00779     unlock();
00780     return true;
00781   }
00782 
00783   // If we can write to the peer socket, pump whatever we can into it.
00784   if (ev->events & IOWrite)
00785   {
00786     while (Bucket *b = p->sendq)
00787     {
00788       IOSize len = b->data.size() - p->sendpos;
00789       const void *data = (len ? (const void *)&b->data[p->sendpos]
00790                           : (const void *)&data);
00791       IOSize done;
00792 
00793       try
00794       {
00795         done = (len ? ev->source->write (data, len) : 0);
00796         if (debug_ && len)
00797           logme()
00798             << "DEBUG: sent " << done << " bytes to peer "
00799             << p->peeraddr << std::endl;
00800       }
00801       catch (Error &e)
00802       {
00803         losePeer("WARNING: unable to write to peer ", p, ev, &e);
00804         unlock();
00805         return true;
00806       }
00807 
00808       p->sendpos += done;
00809       if (p->sendpos == b->data.size())
00810       {
00811         Bucket *old = p->sendq;
00812         p->sendq = old->next;
00813         p->sendpos = 0;
00814         old->next = 0;
00815         discard(old);
00816       }
00817 
00818       if (! done && len)
00819         // Cannot write any more.
00820         break;
00821     }
00822   }
00823 
00824   // If there is data to be read from the peer, first receive what we
00825   // can get out the socket, the process all complete requests.
00826   if (ev->events & IORead)
00827   {
00828     // First build up the incoming buffer of data in the socket.
00829     // Remember the last size returned by the socket; we need
00830     // it to determine if the remote end closed the connection.
00831     IOSize sz;
00832     try
00833     {
00834       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00835       do
00836         if ((sz = ev->source->read(&buf[0], buf.size())))
00837         {
00838           if (debug_)
00839             logme()
00840               << "DEBUG: received " << sz << " bytes from peer "
00841               << p->peeraddr << std::endl;
00842           DataBlob &data = p->incoming;
00843           if (data.capacity () < data.size () + sz)
00844             data.reserve (data.size() + SOCKET_READ_GROWTH);
00845           data.insert (data.end(), &buf[0], &buf[0] + sz);
00846         }
00847       while (sz == sizeof (buf));
00848     }
00849     catch (Error &e)
00850     {
00851       SystemError *next = dynamic_cast<SystemError *>(e.next());
00852       if (next && next->portable() == SysErr::ErrTryAgain)
00853         sz = 1; // Ignore it, and fake no end of data.
00854       else
00855       {
00856         // Houston we have a problem.
00857         losePeer("WARNING: failed to read from peer ", p, ev, &e);
00858         unlock();
00859         return true;
00860       }
00861     }
00862 
00863     // Process fully received messages as long as we can.
00864     size_t consumed = 0;
00865     DataBlob &data = p->incoming;
00866     while (data.size()-consumed >= sizeof(uint32_t)
00867            && p->waiting < MAX_PEER_WAITREQS)
00868     {
00869       uint32_t msglen;
00870       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00871 
00872       if (msglen >= MESSAGE_SIZE_LIMIT)
00873       {
00874         losePeer("WARNING: excessively large message from ", p, ev);
00875         unlock();
00876         return true;
00877       }
00878 
00879       if (data.size()-consumed >= msglen)
00880       {
00881         bool valid = true;
00882         if (msglen < 2*sizeof(uint32_t))
00883         {
00884           logme()
00885             << "ERROR: corrupt peer message of length " << msglen
00886             << " from peer " << p->peeraddr << std::endl;
00887           valid = false;
00888         }
00889         else
00890         {
00891           // Decode and process this message.
00892           Bucket msg;
00893           msg.next = 0;
00894           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00895 
00896           // If we created a response, chain it to the write queue.
00897           if (! msg.data.empty())
00898           {
00899             Bucket **prev = &p->sendq;
00900             while (*prev)
00901               prev = &(*prev)->next;
00902 
00903             *prev = new Bucket;
00904             (*prev)->next = 0;
00905             (*prev)->data.swap(msg.data);
00906           }
00907         }
00908 
00909         if (! valid)
00910         {
00911           losePeer("WARNING: data stream error with ", p, ev);
00912           unlock();
00913           return true;
00914         }
00915 
00916         consumed += msglen;
00917       }
00918       else
00919         break;
00920     }
00921 
00922     data.erase(data.begin(), data.begin()+consumed);
00923 
00924     // If the client has closed the connection, shut down our end.  If
00925     // we have something to send back still, leave the write direction
00926     // open.  Otherwise close the shop for this client.
00927     if (sz == 0)
00928       sel_.setMask(p->socket, p->mask &= ~IORead);
00929   }
00930 
00931   // Yes, please keep processing events for this socket.
00932   unlock();
00933   return false;
00934 }
00935 
00940 bool
00941 DQMNet::onPeerConnect(IOSelectEvent *ev)
00942 {
00943   // Recover the server socket.
00944   assert (ev->source == server_);
00945 
00946   // Accept the connection.
00947   Socket *s = server_->accept();
00948   assert (s);
00949   assert (! s->isBlocking());
00950 
00951   // Record it to our list of peers.
00952   lock();
00953   Peer *p = createPeer(s);
00954   std::string localaddr;
00955   if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
00956   {
00957     InetAddress peeraddr = inet->peername();
00958     InetAddress myaddr = inet->sockname();
00959     p->peeraddr = StringFormat("%1:%2")
00960                   .arg(peeraddr.hostname())
00961                   .arg(peeraddr.port());
00962     localaddr = StringFormat("%1:%2")
00963                 .arg(myaddr.hostname())
00964                 .arg(myaddr.port());
00965   }
00966   else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
00967   {
00968     p->peeraddr = local->peername().path();
00969     localaddr = local->sockname().path();
00970   }
00971   else
00972     assert(false);
00973 
00974   p->mask = IORead|IOUrgent;
00975   p->socket = s;
00976 
00977   // Report the new connection.
00978   if (debug_)
00979     logme()
00980       << "INFO: new peer " << p->peeraddr << " is now connected to "
00981       << localaddr << std::endl;
00982 
00983   // Attach it to the listener.
00984   sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
00985   unlock();
00986 
00987   // We are never done.
00988   return false;
00989 }
00990 
00997 bool
00998 DQMNet::onLocalNotify(IOSelectEvent *ev)
00999 {
01000   // Discard the data in the pipe, we care only about the wakeup.
01001   try
01002   {
01003     IOSize sz;
01004     unsigned char buf [1024];
01005     while ((sz = ev->source->read(buf, sizeof(buf))))
01006       ;
01007   }
01008   catch (Error &e)
01009   {
01010     SystemError *next = dynamic_cast<SystemError *>(e.next());
01011     if (next && next->portable() == SysErr::ErrTryAgain)
01012       ; // Ignore it
01013     else
01014       logme()
01015         << "WARNING: error reading from notification pipe: "
01016         << e.explain() << std::endl;
01017   }
01018 
01019   // Tell the main event pump to send an update in a little while.
01020   flush_ = true;
01021 
01022   // We are never done, always keep going.
01023   return false;
01024 }
01025 
01028 void
01029 DQMNet::updateMask(Peer *p)
01030 {
01031   if (! p->socket)
01032     return;
01033 
01034   // Listen to writes iff we have data to send.
01035   unsigned oldmask = p->mask;
01036   if (! p->sendq && (p->mask & IOWrite))
01037     sel_.setMask(p->socket, p->mask &= ~IOWrite);
01038 
01039   if (p->sendq && ! (p->mask & IOWrite))
01040     sel_.setMask(p->socket, p->mask |= IOWrite);
01041 
01042   if (debug_ && oldmask != p->mask)
01043     logme()
01044       << "DEBUG: updating mask for " << p->peeraddr << " to "
01045       << p->mask << " from " << oldmask << std::endl;
01046 
01047   // If we have nothing more to send and are no longer listening
01048   // for reads, close up the shop for this peer.
01049   if (p->mask == IOUrgent && ! p->waiting)
01050   {
01051     assert (! p->sendq);
01052     if (debug_)
01053       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
01054     losePeer(0, p, 0);
01055   }
01056 }
01057 
01059 DQMNet::DQMNet (const std::string &appname /* = "" */)
01060   : debug_ (false),
01061     appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
01062     pid_ (getpid()),
01063     server_ (0),
01064     version_ (Time::current()),
01065     communicate_ ((pthread_t) -1),
01066     shutdown_ (0),
01067     delay_ (1000),
01068     waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
01069     waitMax_ (0, 0, 0, 5 /* seconds */, 0),
01070     flush_ (false)
01071 {
01072   // Create a pipe for the local DQM to tell the communicator
01073   // thread that local DQM data has changed and that the peers
01074   // should be notified.
01075   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
01076   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
01077 
01078   // Initialise the upstream and downstream to empty.
01079   upstream_.peer   = downstream_.peer   = 0;
01080   upstream_.next   = downstream_.next   = 0;
01081   upstream_.port   = downstream_.port   = 0;
01082   upstream_.update = downstream_.update = false;
01083 }
01084 
01085 DQMNet::~DQMNet(void)
01086 {
01087   // FIXME
01088 }
01089 
01092 void
01093 DQMNet::debug(bool doit)
01094 {
01095   debug_ = doit;
01096 }
01097 
01100 void
01101 DQMNet::delay(int delay)
01102 {
01103   delay_ = delay;
01104 }
01105 
01110 void
01111 DQMNet::staleObjectWaitLimit(lat::TimeSpan time)
01112 {
01113   waitStale_ = time;
01114 }
01115 
01119 void
01120 DQMNet::startLocalServer(int port)
01121 {
01122   if (server_)
01123   {
01124     logme() << "ERROR: DQM server was already started.\n";
01125     return;
01126   }
01127 
01128   try
01129   {
01130     InetAddress addr("0.0.0.0", port);
01131     InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
01132     s->bind(addr);
01133     s->listen(10);
01134     s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01135     s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01136     s->setBlocking(false);
01137     sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01138   }
01139   catch (Error &e)
01140   {
01141     // FIXME: Do we need to do this when we throw an exception anyway?
01142     // FIXME: Abort instead?
01143     logme()
01144       << "ERROR: Failed to start server at port " << port << ": "
01145       << e.explain() << std::endl;
01146 
01147     raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
01148                   " %d: %s", port, e.explain().c_str());
01149   }
01150   
01151   logme() << "INFO: DQM server started at port " << port << std::endl;
01152 }
01153 
01157 void
01158 DQMNet::startLocalServer(const char *path)
01159 {
01160   if (server_)
01161   {
01162     logme() << "ERROR: DQM server was already started.\n";
01163     return;
01164   }
01165 
01166   try
01167   {
01168     server_ = new LocalServerSocket(path, 10);
01169     server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01170     server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01171     server_->setBlocking(false);
01172     sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01173   }
01174   catch (Error &e)
01175   {
01176     // FIXME: Do we need to do this when we throw an exception anyway?
01177     // FIXME: Abort instead?
01178     logme()
01179       << "ERROR: Failed to start server at path " << path << ": "
01180       << e.explain() << std::endl;
01181 
01182     raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
01183                   " %s: %s", path, e.explain().c_str());
01184   }
01185   
01186   logme() << "INFO: DQM server started at path " << path << std::endl;
01187 }
01188 
01192 void
01193 DQMNet::updateToCollector(const std::string &host, int port)
01194 {
01195   if (! downstream_.host.empty())
01196   {
01197     logme()
01198       << "ERROR: Already updating another collector at "
01199       << downstream_.host << ":" << downstream_.port << std::endl;
01200     return;
01201   }
01202 
01203   downstream_.update = true;
01204   downstream_.host = host;
01205   downstream_.port = port;
01206 }
01207 
01211 void
01212 DQMNet::listenToCollector(const std::string &host, int port)
01213 {
01214   if (! upstream_.host.empty())
01215   {
01216     logme()
01217       << "ERROR: Already receiving data from another collector at "
01218       << upstream_.host << ":" << upstream_.port << std::endl;
01219     return;
01220   }
01221 
01222   upstream_.update = false;
01223   upstream_.host = host;
01224   upstream_.port = port;
01225 }
01226 
01228 void
01229 DQMNet::shutdown(void)
01230 {
01231   shutdown_ = 1;
01232   if (communicate_ != (pthread_t) -1)
01233     pthread_join(communicate_, 0);
01234 }
01235 
01241 static void *communicate(void *obj)
01242 {
01243   sigset_t sigs;
01244   sigfillset (&sigs);
01245   pthread_sigmask (SIG_BLOCK, &sigs, 0);
01246   ((DQMNet *)obj)->run();
01247   return 0;
01248 }
01249 
01251 void
01252 DQMNet::lock(void)
01253 {
01254   if (communicate_ != (pthread_t) -1)
01255     pthread_mutex_lock(&lock_);
01256 }
01257 
01259 void
01260 DQMNet::unlock(void)
01261 {
01262   if (communicate_ != (pthread_t) -1)
01263     pthread_mutex_unlock(&lock_);
01264 }
01265 
01269 void
01270 DQMNet::start(void)
01271 {
01272   if (communicate_ != (pthread_t) -1)
01273   {
01274     logme()
01275       << "ERROR: DQM networking thread has already been started\n";
01276     return;
01277   }
01278 
01279   pthread_mutex_init(&lock_, 0);
01280   pthread_create (&communicate_, 0, &communicate, this);
01281 }
01282 
01284 void
01285 DQMNet::run(void)
01286 {
01287   Time now;
01288   Time nextFlush = 0;
01289   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
01290 
01291   // Perform I/O.  Every once in a while flush updates to peers.
01292   while (! shouldStop())
01293   {
01294     for (int i = 0; i < 2; ++i)
01295     {
01296       AutoPeer *ap = automatic[i];
01297 
01298       // If we need a server connection and don't have one yet,
01299       // initiate asynchronous connection creation.  Swallow errors
01300       // in case the server won't talk to us.
01301       if (! ap->host.empty()
01302           && ! ap->peer
01303           && (now = Time::current()) > ap->next)
01304       {
01305         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01306         InetSocket *s = 0;
01307         try
01308         {
01309           InetAddress addr(ap->host.c_str(), ap->port);
01310           s = new InetSocket (SOCK_STREAM, 0, addr.family());
01311           s->setBlocking(false);
01312           s->connect(addr);
01313           s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01314           s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01315         }
01316         catch (Error &e)
01317         {
01318           SystemError *sys = dynamic_cast<SystemError *>(e.next());
01319           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
01320           {
01321             // "In progress" just means the connection is in progress.
01322             // The connection is ready when the socket is writeable.
01323             // Anything else is a real problem.
01324             if (s)
01325               s->abort();
01326             delete s;
01327             s = 0;
01328           }
01329         }
01330 
01331         // Set up with the selector if we were successful.  If this is
01332         // the upstream collector, queue a request for updates.
01333         if (s)
01334         {
01335           Peer *p = createPeer(s);
01336           ap->peer = p;
01337 
01338           InetAddress peeraddr = ((InetSocket *) s)->peername();
01339           InetAddress myaddr = ((InetSocket *) s)->sockname();
01340           p->peeraddr = StringFormat("%1:%2")
01341                         .arg(peeraddr.hostname())
01342                         .arg(peeraddr.port());
01343           p->mask = IORead|IOWrite|IOUrgent;
01344           p->update = ap->update;
01345           p->automatic = ap;
01346           p->socket = s;
01347           sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
01348           if (ap == &upstream_)
01349           {
01350             uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
01351                                   2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
01352             p->sendq = new Bucket;
01353             p->sendq->next = 0;
01354             copydata(p->sendq, words, sizeof(words));
01355           }
01356 
01357           // Report the new connection.
01358           if (debug_)
01359             logme()
01360               << "INFO: now connected to " << p->peeraddr << " from "
01361               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01362         }
01363       }
01364     }
01365 
01366     // Pump events for a while.
01367     sel_.dispatch(delay_);
01368     now = Time::current();
01369     lock();
01370 
01371     // Check if flush is required.  Flush only if one is needed.
01372     // Always sends the full object list, but only rarely.
01373     if (flush_ && now > nextFlush)
01374     {
01375       flush_ = false;
01376       nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01377       sendObjectListToPeers(true);
01378     }
01379 
01380     // Update the data server and peer selection masks.  If we
01381     // have no more data to send and listening for writes, remove
01382     // the write mask.  If we have something to write and aren't
01383     // listening for writes, start listening so we can send off
01384     // the data.
01385     updatePeerMasks();
01386 
01387     // Release peers that have been waiting for data for too long.
01388     Time waitold = now - waitMax_;
01389     Time waitstale = now - waitStale_;
01390     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01391     {
01392       Object *o = findObject(0, i->name);
01393 
01394       // If we have (stale) object data, wait only up to stale limit.
01395       // Otherwise if we have no data at all, wait up to the max limit.
01396       if (i->time < waitold)
01397       {
01398         logme ()
01399           << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
01400           << "s to retrieval, releasing '" << i->name << "' from wait, have "
01401           << (o ? o->rawdata.size() : 0) << " data available\n";
01402         releaseFromWait(i++, o);
01403       }
01404       else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
01405       {
01406         logme ()
01407           << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
01408           << "s to update, releasing '" << i->name << "' from wait, have "
01409           << o->rawdata.size() << " data available\n";
01410         releaseFromWait(i++, o);
01411       }
01412 
01413       // Keep it for now.
01414       else
01415         ++i;
01416     }
01417 
01418     unlock();
01419   }
01420 }
01421 
01422 // Tell the network cache that there have been local changes that
01423 // should be advertised to the downstream listeners.
01424 void
01425 DQMNet::sendLocalChanges(void)
01426 {
01427   char byte = 0;
01428   wakeup_.sink()->write(&byte, 1);
01429 }
01430 
01434 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */)
01435   : DQMImplNet<DQMNet::Object>(appname)
01436 {
01437   local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
01438 }
01439 
01441 void
01442 DQMBasicNet::reserveLocalSpace(uint32_t size)
01443 {
01444   local_->objs.resize(size);
01445 }
01446 
01449 void
01450 DQMBasicNet::updateLocalObject(Object &o)
01451 {
01452   o.dirname = &*local_->dirs.insert(*o.dirname).first;
01453   std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
01454   if (! info.second)
01455   {
01456     // Somewhat hackish. Sets are supposedly immutable, but we
01457     // need to change the non-key parts of the object. Erasing
01458     // and re-inserting would produce too much memory churn.
01459     Object &old = const_cast<Object &>(*info.first);
01460     std::swap(old.flags,     o.flags);
01461     std::swap(old.tag,       o.tag);
01462     std::swap(old.version,   o.version);
01463     std::swap(old.qreports,  o.qreports);
01464     std::swap(old.rawdata,   o.rawdata);
01465     std::swap(old.scalar,    o.scalar);
01466     std::swap(old.qdata,     o.qdata);
01467   }
01468 }
01469 
01473 bool
01474 DQMBasicNet::removeLocalExcept(const std::set<std::string> &known)
01475 {
01476   size_t removed = 0;
01477   std::string path;
01478   ObjectMap::iterator i, e;
01479   for (i = local_->objs.begin(), e = local_->objs.end(); i != e; )
01480   {
01481     path.clear();
01482     path.reserve(i->dirname->size() + i->objname.size() + 2);
01483     path += *i->dirname;
01484     if (! path.empty())
01485       path += '/';
01486     path += i->objname;
01487 
01488     if (! known.count(path))
01489       ++removed, local_->objs.erase(i++);
01490     else
01491       ++i;
01492   }
01493 
01494   return removed > 0;
01495 }