CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/DQMServices/Core/src/DQMNet.cc

Go to the documentation of this file.
00001 #include "DQMServices/Core/interface/DQMNet.h"
00002 #include "DQMServices/Core/interface/DQMDefinitions.h"
00003 #include "DQMServices/Core/src/DQMError.h"
00004 #include "classlib/iobase/InetServerSocket.h"
00005 #include "classlib/iobase/LocalServerSocket.h"
00006 #include "classlib/iobase/Filename.h"
00007 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
00008 #include "classlib/utils/TimeInfo.h"
00009 #include "classlib/utils/StringList.h"
00010 #include "classlib/utils/StringFormat.h"
00011 #include "classlib/utils/StringOps.h"
00012 #include "classlib/utils/SystemError.h"
00013 #include "classlib/utils/Regexp.h"
00014 #include <unistd.h>
00015 #include <fcntl.h>
00016 #include <sys/wait.h>
00017 #include <stdio.h>
00018 #include <stdint.h>
00019 #include <iostream>
00020 #include <sstream>
00021 #include <cassert>
00022 #include <cfloat>
00023 #include <inttypes.h>
00024 
00025 #if __APPLE__
00026 # define MESSAGE_SIZE_LIMIT (1*1024*1024)
00027 # define SOCKET_BUF_SIZE    (1*1024*1024)
00028 #else
00029 # define MESSAGE_SIZE_LIMIT (8*1024*1024)
00030 # define SOCKET_BUF_SIZE    (8*1024*1024)
00031 #endif
00032 #define SOCKET_READ_SIZE        (SOCKET_BUF_SIZE/8)
00033 #define SOCKET_READ_GROWTH      (SOCKET_BUF_SIZE)
00034 
00035 using namespace lat;
00036 
00037 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
00038 
00040 // Generate log prefix.
00041 std::ostream &
00042 DQMNet::logme (void)
00043 {
00044   Time now = Time::current();
00045   return std::cout
00046     << now.format(true, "%Y-%m-%d %H:%M:%S.")
00047     << now.nanoformat(3, 3)
00048     << " " << appname_ << "[" << pid_ << "]: ";
00049 }
00050 
00051 // Append data into a bucket.
00052 void
00053 DQMNet::copydata(Bucket *b, const void *data, size_t len)
00054 {
00055   b->data.insert(b->data.end(),
00056                  (const unsigned char *)data,
00057                  (const unsigned char *)data + len);
00058 }
00059 
00060 // Discard a bucket chain.
00061 void
00062 DQMNet::discard (Bucket *&b)
00063 {
00064   while (b)
00065   {
00066     Bucket *next = b->next;
00067     delete b;
00068     b = next;
00069   }
00070 }
00071 
00073 
00076 void
00077 DQMNet::losePeer(const char *reason,
00078                  Peer *peer,
00079                  IOSelectEvent *ev,
00080                  Error *err)
00081 {
00082   if (reason)
00083     logme ()
00084       << reason << peer->peeraddr
00085       << (err ? "; error was: " + err->explain() : std::string(""))
00086       << std::endl;
00087 
00088   Socket *s = peer->socket;
00089 
00090   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00091     if (i->peer == peer)
00092       waiting_.erase(i++);
00093     else
00094       ++i;
00095 
00096   if (ev)
00097     ev->source = 0;
00098 
00099   discard(peer->sendq);
00100   if (peer->automatic)
00101     peer->automatic->peer = 0;
00102 
00103   sel_.detach (s);
00104   s->close();
00105   removePeer(peer, s);
00106   delete s;
00107 }
00108 
00110 void
00111 DQMNet::requestObjectData(Peer *p, const char *name, size_t len)
00112 {
00113   // Issue request to peer.
00114   Bucket **msg = &p->sendq;
00115   while (*msg)
00116     msg = &(*msg)->next;
00117   *msg = new Bucket;
00118   (*msg)->next = 0;
00119 
00120   uint32_t words[3];
00121   words[0] = sizeof(words) + len;
00122   words[1] = DQM_MSG_GET_OBJECT;
00123   words[2] = len;
00124   copydata(*msg, words, sizeof(words));
00125   copydata(*msg, name, len);
00126 }
00127 
00130 void
00131 DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00132 {
00133   // FIXME: Should we automatically record which exact peer the waiter
00134   // is expecting to deliver data so we know to release the waiter if
00135   // the other peer vanishes?  The current implementation stands a
00136   // chance for the waiter to wait indefinitely -- although we do
00137   // force terminate the wait after a while.
00138   requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
00139   WaitObject wo = { Time::current(), name, info, p };
00140   waiting_.push_back(wo);
00141   p->waiting++;
00142 }
00143 
00144 // Once an object has been updated, this is invoked for all waiting
00145 // peers.  Send the object back to the peer in suitable form.
00146 void
00147 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
00148 {
00149   Bucket **msg = &i->peer->sendq;
00150   while (*msg)
00151     msg = &(*msg)->next;
00152   *msg = new Bucket;
00153   (*msg)->next = 0;
00154 
00155   releaseFromWait(*msg, *i, o);
00156 
00157   assert(i->peer->waiting > 0);
00158   i->peer->waiting--;
00159   waiting_.erase(i);
00160 }
00161 
00162 // Release everyone waiting for the object @a o.
00163 void
00164 DQMNet::releaseWaiters(const std::string &name, Object *o)
00165 {
00166   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00167     if (i->name == name)
00168       releaseFromWait(i++, o);
00169     else
00170       ++i;
00171 }
00172 
00176 void
00177 DQMNet::packQualityData(std::string &into, const QReports &qr)
00178 {
00179   char buf[64];
00180   std::ostringstream qrs;
00181   QReports::const_iterator qi, qe;
00182   for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
00183   {
00184     int pos = 0;
00185     sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
00186     qrs << buf << '\0'
00187         << buf+pos << '\0'
00188         << qi->qtname << '\0'
00189         << qi->algorithm << '\0'
00190         << qi->message << '\0'
00191         << '\0';
00192   }
00193   into = qrs.str();
00194 }
00195 
00198 void
00199 DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
00200 {
00201   const char *qdata = from;
00202 
00203   // Count how many qresults there are.
00204   size_t nqv = 0;
00205   while (*qdata)
00206   {
00207     ++nqv;
00208     while (*qdata) ++qdata; ++qdata;
00209     while (*qdata) ++qdata; ++qdata;
00210     while (*qdata) ++qdata; ++qdata;
00211     while (*qdata) ++qdata; ++qdata;
00212     while (*qdata) ++qdata; ++qdata;
00213   }
00214 
00215   // Now extract the qreports.
00216   qdata = from;
00217   qr.reserve(nqv);
00218   while (*qdata)
00219   {
00220     qr.push_back(DQMNet::QValue());
00221     DQMNet::QValue &qv = qr.back();
00222 
00223     qv.code = atoi(qdata);
00224     while (*qdata) ++qdata;
00225     switch (qv.code)
00226     {
00227     case dqm::qstatus::STATUS_OK:
00228       break;
00229     case dqm::qstatus::WARNING:
00230       flags |= DQMNet::DQM_PROP_REPORT_WARN;
00231       break;
00232     case dqm::qstatus::ERROR:
00233       flags |= DQMNet::DQM_PROP_REPORT_ERROR;
00234       break;
00235     default:
00236       flags |= DQMNet::DQM_PROP_REPORT_OTHER;
00237       break;
00238     }
00239 
00240     qv.qtresult = atof(++qdata);
00241     while (*qdata) ++qdata;
00242 
00243     qv.qtname = ++qdata;
00244     while (*qdata) ++qdata;
00245 
00246     qv.algorithm = ++qdata;
00247     while (*qdata) ++qdata;
00248 
00249     qv.message = ++qdata;
00250     while (*qdata) ++qdata;
00251     ++qdata;
00252   }
00253 }
00254 
00255 #if 0
00256 // Deserialise a ROOT object from a buffer at the current position.
00257 static TObject *
00258 extractNextObject(TBufferFile &buf)
00259 {
00260   if (buf.Length() == buf.BufferSize())
00261     return 0;
00262 
00263   buf.InitMap();
00264   Int_t pos = buf.Length();
00265   TClass *c = buf.ReadClass();
00266   buf.SetBufferOffset(pos);
00267   buf.ResetMap();
00268   return c ? buf.ReadObject(c) : 0;
00269 }
00270 
00271 // Reconstruct an object from the raw data.
00272 bool
00273 DQMNet::reconstructObject(Object &o)
00274 {
00275   TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00276   buf.Reset();
00277 
00278   // Extract the main object.
00279   if (! (o.object = extractNextObject(buf)))
00280     return false;
00281   
00282   // Extract the reference object.
00283   o.reference = extractNextObject(buf);
00284 
00285   // Extract quality reports.
00286   unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
00287   return true;
00288 }
00289 #endif
00290 
00291 #if 0
00292 bool
00293 DQMNet::reinstateObject(DQMStore *store, Object &o)
00294 {
00295   if (! reconstructObject (o))
00296     return false;
00297 
00298   // Reconstruct the main object
00299   MonitorElement *obj = 0;
00300   store->setCurrentFolder(*o.dirname);
00301   switch (o.flags & DQM_PROP_TYPE_MASK)
00302   {
00303   case DQM_PROP_TYPE_INT:
00304     obj = store->bookInt(o.objname);
00305     obj->Fill(atoll(o.scalar.c_str()));
00306     break;
00307 
00308   case DQM_PROP_TYPE_REAL:
00309     obj = store->bookFloat(name);
00310     obj->Fill(atof(o.scalar.c_str()));
00311     break;
00312 
00313   case DQM_PROP_TYPE_STRING:
00314     obj = store->bookString(name, o.scalar);
00315     break;
00316 
00317   case DQM_PROP_TYPE_TH1F:
00318     obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
00319     break;
00320 
00321   case DQM_PROP_TYPE_TH1S:
00322     obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
00323     break;
00324 
00325   case DQM_PROP_TYPE_TH1D:
00326     obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
00327     break;
00328 
00329   case DQM_PROP_TYPE_TH2F:
00330     obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
00331     break;
00332 
00333   case DQM_PROP_TYPE_TH2S:
00334     obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
00335     break;
00336 
00337   case DQM_PROP_TYPE_TH2D:
00338     obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
00339     break;
00340 
00341   case DQM_PROP_TYPE_TH3F:
00342     obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
00343     break;
00344 
00345   case DQM_PROP_TYPE_TH3S:
00346     obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
00347     break;
00348 
00349   case DQM_PROP_TYPE_TH3D:
00350     obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
00351     break;
00352 
00353   case DQM_PROP_TYPE_PROF:
00354     obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
00355     break;
00356 
00357   case DQM_PROP_TYPE_PROF2D:
00358     obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
00359     break;
00360 
00361   default:
00362     logme()
00363       << "ERROR: unexpected monitor element of type "
00364       << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
00365       << *o.dirname << '/' << o.objname << "'\n";
00366     return false;
00367   }
00368 
00369   // Reconstruct tag and qreports.
00370   if (obj)
00371   {
00372     obj->data_.tag = o.tag;
00373     obj->data_.qreports = o.qreports;
00374   }
00375 
00376   // Inidicate success.
00377   return true;
00378 }
00379 #endif
00380 
00382 // Check if the network layer should stop.
00383 bool
00384 DQMNet::shouldStop(void)
00385 {
00386   return shutdown_;
00387 }
00388 
00389 // Once an object has been updated, this is invoked for all waiting
00390 // peers.  Send the requested object to the waiting peer.
00391 void
00392 DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00393 {
00394   if (o)
00395     sendObjectToPeer(msg, *o, true);
00396   else
00397   {
00398     uint32_t words [3];
00399     words[0] = sizeof(words) + w.name.size();
00400     words[1] = DQM_REPLY_NONE;
00401     words[2] = w.name.size();
00402 
00403     msg->data.reserve(msg->data.size() + words[0]);
00404     copydata(msg, &words[0], sizeof(words));
00405     copydata(msg, &w.name[0], w.name.size());
00406   }
00407 }
00408 
00409 // Send an object to a peer.  If not @a data, only sends a summary
00410 // without the object data, except the data is always sent for scalar
00411 // objects.
00412 void
00413 DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00414 {
00415   uint32_t flags = o.flags & ~DQM_PROP_DEAD;
00416   DataBlob objdata;
00417 
00418   if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00419     objdata.insert(objdata.end(),
00420                    &o.scalar[0],
00421                    &o.scalar[0] + o.scalar.size());
00422   else if (data)
00423     objdata.insert(objdata.end(),
00424                    &o.rawdata[0],
00425                    &o.rawdata[0] + o.rawdata.size());
00426 
00427   uint32_t words [9];
00428   uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
00429   uint32_t datalen = objdata.size();
00430   uint32_t qlen = o.qdata.size();
00431 
00432   if (o.dirname->empty())
00433     --namelen;
00434 
00435   words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
00436   words[1] = DQM_REPLY_OBJECT;
00437   words[2] = flags;
00438   words[3] = (o.version >> 0 ) & 0xffffffff;
00439   words[4] = (o.version >> 32) & 0xffffffff;
00440   words[5] = o.tag;
00441   words[6] = namelen;
00442   words[7] = datalen;
00443   words[8] = qlen;
00444 
00445   msg->data.reserve(msg->data.size() + words[0]);
00446   copydata(msg, &words[0], 9*sizeof(uint32_t));
00447   if (namelen)
00448   {
00449     copydata(msg, &(*o.dirname)[0], o.dirname->size());
00450     if (! o.dirname->empty())
00451       copydata(msg, "/", 1);
00452     copydata(msg, &o.objname[0], o.objname.size());
00453   }
00454   if (datalen)
00455     copydata(msg, &objdata[0], datalen);
00456   if (qlen)
00457     copydata(msg, &o.qdata[0], qlen);
00458 }
00459 
00461 // Handle peer messages.
00462 bool
00463 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00464 {
00465   // Decode and process this message.
00466   uint32_t type;
00467   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00468   switch (type)
00469   {
00470   case DQM_MSG_UPDATE_ME:
00471     {
00472       if (len != 2*sizeof(uint32_t))
00473       {
00474         logme()
00475           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00476           << " from peer " << p->peeraddr << std::endl;
00477         return false;
00478       }
00479 
00480       if (debug_)
00481         logme()
00482           << "DEBUG: received message 'UPDATE ME' from peer "
00483           << p->peeraddr << ", size " << len << std::endl;
00484 
00485       p->update = true;
00486     }
00487     return true;
00488 
00489   case DQM_MSG_LIST_OBJECTS:
00490     {
00491       if (debug_)
00492         logme()
00493           << "DEBUG: received message 'LIST OBJECTS' from peer "
00494           << p->peeraddr << ", size " << len << std::endl;
00495 
00496       // Send over current status: list of known objects.
00497       sendObjectListToPeer(msg, true, false);
00498     }
00499     return true;
00500 
00501   case DQM_MSG_GET_OBJECT:
00502     {
00503       if (debug_)
00504         logme()
00505           << "DEBUG: received message 'GET OBJECT' from peer "
00506           << p->peeraddr << ", size " << len << std::endl;
00507 
00508       if (len < 3*sizeof(uint32_t))
00509       {
00510         logme()
00511           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00512           << " from peer " << p->peeraddr << std::endl;
00513         return false;
00514       }
00515 
00516       uint32_t namelen;
00517       memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00518       if (len != 3*sizeof(uint32_t) + namelen)
00519       {
00520         logme()
00521           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00522           << " from peer " << p->peeraddr
00523           << ", expected length " << (3*sizeof(uint32_t))
00524           << " + " << namelen << std::endl;
00525         return false;
00526       }
00527 
00528       std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
00529       Peer *owner = 0;
00530       Object *o = findObject(0, name, &owner);
00531       if (o)
00532       {
00533         o->lastreq = Time::current().ns();
00534         if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
00535             && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00536           waitForData(p, name, "", owner);
00537         else
00538           sendObjectToPeer(msg, *o, true);
00539       }
00540       else
00541       {
00542         uint32_t words [3];
00543         words[0] = sizeof(words) + name.size();
00544         words[1] = DQM_REPLY_NONE;
00545         words[2] = name.size();
00546 
00547         msg->data.reserve(msg->data.size() + words[0]);
00548         copydata(msg, &words[0], sizeof(words));
00549         copydata(msg, &name[0], name.size());
00550       }
00551     }
00552     return true;
00553 
00554   case DQM_REPLY_LIST_BEGIN:
00555     {
00556       if (len != 4*sizeof(uint32_t))
00557       {
00558         logme()
00559           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00560           << " from peer " << p->peeraddr << std::endl;
00561         return false;
00562       }
00563 
00564       // Get the update status: whether this is a full update.
00565       uint32_t flags;
00566       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00567 
00568       if (debug_)
00569         logme()
00570           << "DEBUG: received message 'LIST BEGIN "
00571           << (flags ? "FULL" : "INCREMENTAL")
00572           << "' from " << p->peeraddr
00573           << ", size " << len << std::endl;
00574 
00575       // If we are about to receive a full list of objects, flag all
00576       // objects as possibly dead.  Subsequent object notifications
00577       // will undo this for the live objects.  We cannot delete
00578       // objects quite yet, as we may get inquiry from another client
00579       // while we are processing the incoming list, so we keep the
00580       // objects tentatively alive as long as we've not seen the end.
00581       if (flags)
00582         markObjectsDead(p);
00583     }
00584     return true;
00585 
00586   case DQM_REPLY_LIST_END:
00587     {
00588       if (len != 4*sizeof(uint32_t))
00589       {
00590         logme()
00591           << "ERROR: corrupt 'LIST END' message of length " << len
00592           << " from peer " << p->peeraddr << std::endl;
00593         return false;
00594       }
00595 
00596       // Get the update status: whether this is a full update.
00597       uint32_t flags;
00598       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00599 
00600       // If we received a full list of objects, now purge all dead
00601       // objects. We need to do this in two stages in case we receive
00602       // updates in many parts, and end up sending updates to others in
00603       // between; this avoids us lying live objects are dead.
00604       if (flags)
00605         purgeDeadObjects(p);
00606 
00607       if (debug_)
00608         logme()
00609           << "DEBUG: received message 'LIST END "
00610           << (flags ? "FULL" : "INCREMENTAL")
00611           << "' from " << p->peeraddr
00612           << ", size " << len << std::endl;
00613 
00614       // Indicate we have received another update from this peer.
00615       // Also indicate we should flush to our clients.
00616       flush_ = true;
00617       p->updates++;
00618     }
00619     return true;
00620 
00621   case DQM_REPLY_OBJECT:
00622     {
00623       uint32_t words[9];
00624       if (len < sizeof(words))
00625       {
00626         logme()
00627           << "ERROR: corrupt 'OBJECT' message of length " << len
00628           << " from peer " << p->peeraddr << std::endl;
00629         return false;
00630       }
00631 
00632       memcpy (&words[0], data, sizeof(words));
00633       uint32_t &namelen = words[6];
00634       uint32_t &datalen = words[7];
00635       uint32_t &qlen = words[8];
00636 
00637       if (len != sizeof(words) + namelen + datalen + qlen)
00638       {
00639         logme()
00640           << "ERROR: corrupt 'OBJECT' message of length " << len
00641           << " from peer " << p->peeraddr
00642           << ", expected length " << sizeof(words)
00643           << " + " << namelen
00644           << " + " << datalen
00645           << " + " << qlen
00646           << std::endl;
00647         return false;
00648       }
00649 
00650       unsigned char *namedata = data + sizeof(words);
00651       unsigned char *objdata = namedata + namelen;
00652       unsigned char *qdata = objdata + datalen;
00653       unsigned char *enddata = qdata + qlen;
00654       std::string name ((char *) namedata, namelen);
00655       assert (enddata == data + len);
00656 
00657       if (debug_)
00658         logme()
00659           << "DEBUG: received message 'OBJECT " << name
00660           << "' from " << p->peeraddr
00661           << ", size " << len << std::endl;
00662 
00663       // Mark the peer as a known object source.
00664       p->source = true;
00665 
00666       // Initialise or update an object entry.
00667       Object *o = findObject(p, name);
00668       if (! o)
00669         o = makeObject(p, name);
00670 
00671       o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
00672       o->tag = words[5];
00673       o->version = ((uint64_t) words[4] << 32 | words[3]);
00674       o->scalar.clear();
00675       o->qdata.clear();
00676       if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
00677       {
00678         o->rawdata.clear();
00679         o->scalar.insert(o->scalar.end(), objdata, qdata);
00680       }
00681       else if (datalen)
00682       {
00683         o->rawdata.clear();
00684         o->rawdata.insert(o->rawdata.end(), objdata, qdata);
00685       }
00686       else if (! o->rawdata.empty())
00687         o->flags |= DQM_PROP_STALE;
00688       o->qdata.insert(o->qdata.end(), qdata, enddata);
00689 
00690       // If we had an object for this one already and this is a list
00691       // update without data, issue an immediate data get request.
00692       if (o->lastreq
00693           && ! datalen
00694           && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
00695         requestObjectData(p, (namelen ? &name[0] : 0), namelen);
00696 
00697       // If we have the object data, release from wait.
00698       if (datalen)
00699         releaseWaiters(name, o);
00700     }
00701     return true;
00702 
00703   case DQM_REPLY_NONE:
00704     {
00705       uint32_t words[3];
00706       if (len < sizeof(words))
00707       {
00708         logme()
00709           << "ERROR: corrupt 'NONE' message of length " << len
00710           << " from peer " << p->peeraddr << std::endl;
00711         return false;
00712       }
00713 
00714       memcpy (&words[0], data, sizeof(words));
00715       uint32_t &namelen = words[2];
00716 
00717       if (len != sizeof(words) + namelen)
00718       {
00719         logme()
00720           << "ERROR: corrupt 'NONE' message of length " << len
00721           << " from peer " << p->peeraddr
00722           << ", expected length " << sizeof(words)
00723           << " + " << namelen << std::endl;
00724         return false;
00725       }
00726 
00727       unsigned char *namedata = data + sizeof(words);
00728       std::string name((char *) namedata, namelen);
00729 
00730       if (debug_)
00731         logme()
00732           << "DEBUG: received message 'NONE " << name
00733           << "' from " << p->peeraddr
00734           << ", size " << len << std::endl;
00735 
00736       // Mark the peer as a known object source.
00737       p->source = true;
00738 
00739       // If this was a known object, kill it.
00740       if (Object *o = findObject(p, name))
00741       {
00742         o->flags |= DQM_PROP_DEAD;
00743         purgeDeadObjects(p);
00744       }
00745 
00746       // If someone was waiting for this, let them go.
00747       releaseWaiters(name, 0);
00748     }
00749     return true;
00750 
00751   default:
00752     logme()
00753       << "ERROR: unrecognised message of length " << len
00754       << " and type " << type << " from peer " << p->peeraddr
00755       << std::endl;
00756     return false;
00757   }
00758 }
00759 
00762 bool
00763 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
00764 {
00765   lock();
00766   assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00767 
00768   // If there is a problem with the peer socket, discard the peer
00769   // and tell the selector to stop prcessing events for it.  If
00770   // this is a server connection, we will eventually recreate
00771   // everything if/when the data server comes back.
00772   if (ev->events & IOUrgent)
00773   {
00774     if (p->automatic)
00775     {
00776       logme()
00777         << "WARNING: connection to the DQM server at " << p->peeraddr
00778         << " lost (will attempt to reconnect in 15 seconds)\n";
00779       losePeer(0, p, ev);
00780     }
00781     else
00782       losePeer("WARNING: lost peer connection ", p, ev);
00783 
00784     unlock();
00785     return true;
00786   }
00787 
00788   // If we can write to the peer socket, pump whatever we can into it.
00789   if (ev->events & IOWrite)
00790   {
00791     while (Bucket *b = p->sendq)
00792     {
00793       IOSize len = b->data.size() - p->sendpos;
00794       const void *data = (len ? (const void *)&b->data[p->sendpos]
00795                           : (const void *)&data);
00796       IOSize done;
00797 
00798       try
00799       {
00800         done = (len ? ev->source->write (data, len) : 0);
00801         if (debug_ && len)
00802           logme()
00803             << "DEBUG: sent " << done << " bytes to peer "
00804             << p->peeraddr << std::endl;
00805       }
00806       catch (Error &e)
00807       {
00808         losePeer("WARNING: unable to write to peer ", p, ev, &e);
00809         unlock();
00810         return true;
00811       }
00812 
00813       p->sendpos += done;
00814       if (p->sendpos == b->data.size())
00815       {
00816         Bucket *old = p->sendq;
00817         p->sendq = old->next;
00818         p->sendpos = 0;
00819         old->next = 0;
00820         discard(old);
00821       }
00822 
00823       if (! done && len)
00824         // Cannot write any more.
00825         break;
00826     }
00827   }
00828 
00829   // If there is data to be read from the peer, first receive what we
00830   // can get out the socket, the process all complete requests.
00831   if (ev->events & IORead)
00832   {
00833     // First build up the incoming buffer of data in the socket.
00834     // Remember the last size returned by the socket; we need
00835     // it to determine if the remote end closed the connection.
00836     IOSize sz;
00837     try
00838     {
00839       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00840       do
00841         if ((sz = ev->source->read(&buf[0], buf.size())))
00842         {
00843           if (debug_)
00844             logme()
00845               << "DEBUG: received " << sz << " bytes from peer "
00846               << p->peeraddr << std::endl;
00847           DataBlob &data = p->incoming;
00848           if (data.capacity () < data.size () + sz)
00849             data.reserve (data.size() + SOCKET_READ_GROWTH);
00850           data.insert (data.end(), &buf[0], &buf[0] + sz);
00851         }
00852       while (sz == sizeof (buf));
00853     }
00854     catch (Error &e)
00855     {
00856       SystemError *next = dynamic_cast<SystemError *>(e.next());
00857       if (next && next->portable() == SysErr::ErrTryAgain)
00858         sz = 1; // Ignore it, and fake no end of data.
00859       else
00860       {
00861         // Houston we have a problem.
00862         losePeer("WARNING: failed to read from peer ", p, ev, &e);
00863         unlock();
00864         return true;
00865       }
00866     }
00867 
00868     // Process fully received messages as long as we can.
00869     size_t consumed = 0;
00870     DataBlob &data = p->incoming;
00871     while (data.size()-consumed >= sizeof(uint32_t)
00872            && p->waiting < MAX_PEER_WAITREQS)
00873     {
00874       uint32_t msglen;
00875       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00876 
00877       if (msglen >= MESSAGE_SIZE_LIMIT)
00878       {
00879         losePeer("WARNING: excessively large message from ", p, ev);
00880         unlock();
00881         return true;
00882       }
00883 
00884       if (data.size()-consumed >= msglen)
00885       {
00886         bool valid = true;
00887         if (msglen < 2*sizeof(uint32_t))
00888         {
00889           logme()
00890             << "ERROR: corrupt peer message of length " << msglen
00891             << " from peer " << p->peeraddr << std::endl;
00892           valid = false;
00893         }
00894         else
00895         {
00896           // Decode and process this message.
00897           Bucket msg;
00898           msg.next = 0;
00899           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00900 
00901           // If we created a response, chain it to the write queue.
00902           if (! msg.data.empty())
00903           {
00904             Bucket **prev = &p->sendq;
00905             while (*prev)
00906               prev = &(*prev)->next;
00907 
00908             *prev = new Bucket;
00909             (*prev)->next = 0;
00910             (*prev)->data.swap(msg.data);
00911           }
00912         }
00913 
00914         if (! valid)
00915         {
00916           losePeer("WARNING: data stream error with ", p, ev);
00917           unlock();
00918           return true;
00919         }
00920 
00921         consumed += msglen;
00922       }
00923       else
00924         break;
00925     }
00926 
00927     data.erase(data.begin(), data.begin()+consumed);
00928 
00929     // If the client has closed the connection, shut down our end.  If
00930     // we have something to send back still, leave the write direction
00931     // open.  Otherwise close the shop for this client.
00932     if (sz == 0)
00933       sel_.setMask(p->socket, p->mask &= ~IORead);
00934   }
00935 
00936   // Yes, please keep processing events for this socket.
00937   unlock();
00938   return false;
00939 }
00940 
00945 bool
00946 DQMNet::onPeerConnect(IOSelectEvent *ev)
00947 {
00948   // Recover the server socket.
00949   assert (ev->source == server_);
00950 
00951   // Accept the connection.
00952   Socket *s = server_->accept();
00953   assert (s);
00954   assert (! s->isBlocking());
00955 
00956   // Record it to our list of peers.
00957   lock();
00958   Peer *p = createPeer(s);
00959   std::string localaddr;
00960   if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
00961   {
00962     InetAddress peeraddr = inet->peername();
00963     InetAddress myaddr = inet->sockname();
00964     p->peeraddr = StringFormat("%1:%2")
00965                   .arg(peeraddr.hostname())
00966                   .arg(peeraddr.port());
00967     localaddr = StringFormat("%1:%2")
00968                 .arg(myaddr.hostname())
00969                 .arg(myaddr.port());
00970   }
00971   else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
00972   {
00973     p->peeraddr = local->peername().path();
00974     localaddr = local->sockname().path();
00975   }
00976   else
00977     assert(false);
00978 
00979   p->mask = IORead|IOUrgent;
00980   p->socket = s;
00981 
00982   // Report the new connection.
00983   if (debug_)
00984     logme()
00985       << "INFO: new peer " << p->peeraddr << " is now connected to "
00986       << localaddr << std::endl;
00987 
00988   // Attach it to the listener.
00989   sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
00990   unlock();
00991 
00992   // We are never done.
00993   return false;
00994 }
00995 
01002 bool
01003 DQMNet::onLocalNotify(IOSelectEvent *ev)
01004 {
01005   // Discard the data in the pipe, we care only about the wakeup.
01006   try
01007   {
01008     IOSize sz;
01009     unsigned char buf [1024];
01010     while ((sz = ev->source->read(buf, sizeof(buf))))
01011       ;
01012   }
01013   catch (Error &e)
01014   {
01015     SystemError *next = dynamic_cast<SystemError *>(e.next());
01016     if (next && next->portable() == SysErr::ErrTryAgain)
01017       ; // Ignore it
01018     else
01019       logme()
01020         << "WARNING: error reading from notification pipe: "
01021         << e.explain() << std::endl;
01022   }
01023 
01024   // Tell the main event pump to send an update in a little while.
01025   flush_ = true;
01026 
01027   // We are never done, always keep going.
01028   return false;
01029 }
01030 
01033 void
01034 DQMNet::updateMask(Peer *p)
01035 {
01036   if (! p->socket)
01037     return;
01038 
01039   // Listen to writes iff we have data to send.
01040   unsigned oldmask = p->mask;
01041   if (! p->sendq && (p->mask & IOWrite))
01042     sel_.setMask(p->socket, p->mask &= ~IOWrite);
01043 
01044   if (p->sendq && ! (p->mask & IOWrite))
01045     sel_.setMask(p->socket, p->mask |= IOWrite);
01046 
01047   if (debug_ && oldmask != p->mask)
01048     logme()
01049       << "DEBUG: updating mask for " << p->peeraddr << " to "
01050       << p->mask << " from " << oldmask << std::endl;
01051 
01052   // If we have nothing more to send and are no longer listening
01053   // for reads, close up the shop for this peer.
01054   if (p->mask == IOUrgent && ! p->waiting)
01055   {
01056     assert (! p->sendq);
01057     if (debug_)
01058       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
01059     losePeer(0, p, 0);
01060   }
01061 }
01062 
01064 DQMNet::DQMNet (const std::string &appname /* = "" */)
01065   : debug_ (false),
01066     appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
01067     pid_ (getpid()),
01068     server_ (0),
01069     version_ (Time::current()),
01070     communicate_ ((pthread_t) -1),
01071     shutdown_ (0),
01072     delay_ (1000),
01073     waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
01074     waitMax_ (0, 0, 0, 5 /* seconds */, 0),
01075     flush_ (false)
01076 {
01077   // Create a pipe for the local DQM to tell the communicator
01078   // thread that local DQM data has changed and that the peers
01079   // should be notified.
01080   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
01081   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
01082 
01083   // Initialise the upstream and downstream to empty.
01084   upstream_.peer   = downstream_.peer   = 0;
01085   upstream_.next   = downstream_.next   = 0;
01086   upstream_.port   = downstream_.port   = 0;
01087   upstream_.update = downstream_.update = false;
01088 }
01089 
01090 DQMNet::~DQMNet(void)
01091 {
01092   // FIXME
01093 }
01094 
01097 void
01098 DQMNet::debug(bool doit)
01099 {
01100   debug_ = doit;
01101 }
01102 
01105 void
01106 DQMNet::delay(int delay)
01107 {
01108   delay_ = delay;
01109 }
01110 
01115 void
01116 DQMNet::staleObjectWaitLimit(lat::TimeSpan time)
01117 {
01118   waitStale_ = time;
01119 }
01120 
01124 void
01125 DQMNet::startLocalServer(int port)
01126 {
01127   if (server_)
01128   {
01129     logme() << "ERROR: DQM server was already started.\n";
01130     return;
01131   }
01132 
01133   try
01134   {
01135     InetAddress addr("0.0.0.0", port);
01136     InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
01137     s->bind(addr);
01138     s->listen(10);
01139     s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01140     s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01141     s->setBlocking(false);
01142     sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01143   }
01144   catch (Error &e)
01145   {
01146     // FIXME: Do we need to do this when we throw an exception anyway?
01147     // FIXME: Abort instead?
01148     logme()
01149       << "ERROR: Failed to start server at port " << port << ": "
01150       << e.explain() << std::endl;
01151 
01152     raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
01153                   " %d: %s", port, e.explain().c_str());
01154   }
01155   
01156   logme() << "INFO: DQM server started at port " << port << std::endl;
01157 }
01158 
01162 void
01163 DQMNet::startLocalServer(const char *path)
01164 {
01165   if (server_)
01166   {
01167     logme() << "ERROR: DQM server was already started.\n";
01168     return;
01169   }
01170 
01171   try
01172   {
01173     server_ = new LocalServerSocket(path, 10);
01174     server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01175     server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01176     server_->setBlocking(false);
01177     sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01178   }
01179   catch (Error &e)
01180   {
01181     // FIXME: Do we need to do this when we throw an exception anyway?
01182     // FIXME: Abort instead?
01183     logme()
01184       << "ERROR: Failed to start server at path " << path << ": "
01185       << e.explain() << std::endl;
01186 
01187     raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
01188                   " %s: %s", path, e.explain().c_str());
01189   }
01190   
01191   logme() << "INFO: DQM server started at path " << path << std::endl;
01192 }
01193 
01197 void
01198 DQMNet::updateToCollector(const std::string &host, int port)
01199 {
01200   if (! downstream_.host.empty())
01201   {
01202     logme()
01203       << "ERROR: Already updating another collector at "
01204       << downstream_.host << ":" << downstream_.port << std::endl;
01205     return;
01206   }
01207 
01208   downstream_.update = true;
01209   downstream_.host = host;
01210   downstream_.port = port;
01211 }
01212 
01216 void
01217 DQMNet::listenToCollector(const std::string &host, int port)
01218 {
01219   if (! upstream_.host.empty())
01220   {
01221     logme()
01222       << "ERROR: Already receiving data from another collector at "
01223       << upstream_.host << ":" << upstream_.port << std::endl;
01224     return;
01225   }
01226 
01227   upstream_.update = false;
01228   upstream_.host = host;
01229   upstream_.port = port;
01230 }
01231 
01233 void
01234 DQMNet::shutdown(void)
01235 {
01236   shutdown_ = 1;
01237   if (communicate_ != (pthread_t) -1)
01238     pthread_join(communicate_, 0);
01239 }
01240 
01246 static void *communicate(void *obj)
01247 {
01248   sigset_t sigs;
01249   sigfillset (&sigs);
01250   pthread_sigmask (SIG_BLOCK, &sigs, 0);
01251   ((DQMNet *)obj)->run();
01252   return 0;
01253 }
01254 
01256 void
01257 DQMNet::lock(void)
01258 {
01259   if (communicate_ != (pthread_t) -1)
01260     pthread_mutex_lock(&lock_);
01261 }
01262 
01264 void
01265 DQMNet::unlock(void)
01266 {
01267   if (communicate_ != (pthread_t) -1)
01268     pthread_mutex_unlock(&lock_);
01269 }
01270 
01274 void
01275 DQMNet::start(void)
01276 {
01277   if (communicate_ != (pthread_t) -1)
01278   {
01279     logme()
01280       << "ERROR: DQM networking thread has already been started\n";
01281     return;
01282   }
01283 
01284   pthread_mutex_init(&lock_, 0);
01285   pthread_create (&communicate_, 0, &communicate, this);
01286 }
01287 
01289 void
01290 DQMNet::run(void)
01291 {
01292   Time now;
01293   Time nextFlush = 0;
01294   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
01295 
01296   // Perform I/O.  Every once in a while flush updates to peers.
01297   while (! shouldStop())
01298   {
01299     for (int i = 0; i < 2; ++i)
01300     {
01301       AutoPeer *ap = automatic[i];
01302 
01303       // If we need a server connection and don't have one yet,
01304       // initiate asynchronous connection creation.  Swallow errors
01305       // in case the server won't talk to us.
01306       if (! ap->host.empty()
01307           && ! ap->peer
01308           && (now = Time::current()) > ap->next)
01309       {
01310         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01311         InetSocket *s = 0;
01312         try
01313         {
01314           InetAddress addr(ap->host.c_str(), ap->port);
01315           s = new InetSocket (SOCK_STREAM, 0, addr.family());
01316           s->setBlocking(false);
01317           s->connect(addr);
01318           s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
01319           s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
01320         }
01321         catch (Error &e)
01322         {
01323           SystemError *sys = dynamic_cast<SystemError *>(e.next());
01324           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
01325           {
01326             // "In progress" just means the connection is in progress.
01327             // The connection is ready when the socket is writeable.
01328             // Anything else is a real problem.
01329             if (s)
01330               s->abort();
01331             delete s;
01332             s = 0;
01333           }
01334         }
01335 
01336         // Set up with the selector if we were successful.  If this is
01337         // the upstream collector, queue a request for updates.
01338         if (s)
01339         {
01340           Peer *p = createPeer(s);
01341           ap->peer = p;
01342 
01343           InetAddress peeraddr = ((InetSocket *) s)->peername();
01344           InetAddress myaddr = ((InetSocket *) s)->sockname();
01345           p->peeraddr = StringFormat("%1:%2")
01346                         .arg(peeraddr.hostname())
01347                         .arg(peeraddr.port());
01348           p->mask = IORead|IOWrite|IOUrgent;
01349           p->update = ap->update;
01350           p->automatic = ap;
01351           p->socket = s;
01352           sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
01353           if (ap == &upstream_)
01354           {
01355             uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
01356                                   2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
01357             p->sendq = new Bucket;
01358             p->sendq->next = 0;
01359             copydata(p->sendq, words, sizeof(words));
01360           }
01361 
01362           // Report the new connection.
01363           if (debug_)
01364             logme()
01365               << "INFO: now connected to " << p->peeraddr << " from "
01366               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01367         }
01368       }
01369     }
01370 
01371     // Pump events for a while.
01372     sel_.dispatch(delay_);
01373     now = Time::current();
01374     lock();
01375 
01376     // Check if flush is required.  Flush only if one is needed.
01377     // Always sends the full object list, but only rarely.
01378     if (flush_ && now > nextFlush)
01379     {
01380       flush_ = false;
01381       nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01382       sendObjectListToPeers(true);
01383     }
01384 
01385     // Update the data server and peer selection masks.  If we
01386     // have no more data to send and listening for writes, remove
01387     // the write mask.  If we have something to write and aren't
01388     // listening for writes, start listening so we can send off
01389     // the data.
01390     updatePeerMasks();
01391 
01392     // Release peers that have been waiting for data for too long.
01393     Time waitold = now - waitMax_;
01394     Time waitstale = now - waitStale_;
01395     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01396     {
01397       Object *o = findObject(0, i->name);
01398 
01399       // If we have (stale) object data, wait only up to stale limit.
01400       // Otherwise if we have no data at all, wait up to the max limit.
01401       if (i->time < waitold)
01402       {
01403         logme ()
01404           << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
01405           << "s to retrieval, releasing '" << i->name << "' from wait, have "
01406           << (o ? o->rawdata.size() : 0) << " data available\n";
01407         releaseFromWait(i++, o);
01408       }
01409       else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
01410       {
01411         logme ()
01412           << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
01413           << "s to update, releasing '" << i->name << "' from wait, have "
01414           << o->rawdata.size() << " data available\n";
01415         releaseFromWait(i++, o);
01416       }
01417 
01418       // Keep it for now.
01419       else
01420         ++i;
01421     }
01422 
01423     unlock();
01424   }
01425 }
01426 
01427 // Tell the network cache that there have been local changes that
01428 // should be advertised to the downstream listeners.
01429 void
01430 DQMNet::sendLocalChanges(void)
01431 {
01432   char byte = 0;
01433   wakeup_.sink()->write(&byte, 1);
01434 }
01435 
01439 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */)
01440   : DQMImplNet<DQMNet::Object>(appname)
01441 {
01442   local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
01443 }
01444 
01446 void
01447 DQMBasicNet::reserveLocalSpace(uint32_t size)
01448 {
01449   local_->objs.resize(size);
01450 }
01451 
01454 void
01455 DQMBasicNet::updateLocalObject(Object &o)
01456 {
01457   o.dirname = &*local_->dirs.insert(*o.dirname).first;
01458   std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
01459   if (! info.second)
01460   {
01461     // Somewhat hackish. Sets are supposedly immutable, but we
01462     // need to change the non-key parts of the object. Erasing
01463     // and re-inserting would produce too much memory churn.
01464     Object &old = const_cast<Object &>(*info.first);
01465     std::swap(old.flags,     o.flags);
01466     std::swap(old.tag,       o.tag);
01467     std::swap(old.version,   o.version);
01468     std::swap(old.qreports,  o.qreports);
01469     std::swap(old.rawdata,   o.rawdata);
01470     std::swap(old.scalar,    o.scalar);
01471     std::swap(old.qdata,     o.qdata);
01472   }
01473 }
01474 
01478 bool
01479 DQMBasicNet::removeLocalExcept(const std::set<std::string> &known)
01480 {
01481   size_t removed = 0;
01482   std::string path;
01483   ObjectMap::iterator i, e;
01484   for (i = local_->objs.begin(), e = local_->objs.end(); i != e; )
01485   {
01486     path.clear();
01487     path.reserve(i->dirname->size() + i->objname.size() + 2);
01488     path += *i->dirname;
01489     if (! path.empty())
01490       path += '/';
01491     path += i->objname;
01492 
01493     if (! known.count(path))
01494       ++removed, local_->objs.erase(i++);
01495     else
01496       ++i;
01497   }
01498 
01499   return removed > 0;
01500 }