CMS 3D CMS Logo

DQMNet.cc

Go to the documentation of this file.
00001 #include "DQMServices/Core/interface/DQMNet.h"
00002 #include "DQMServices/Core/interface/DQMStore.h"
00003 #include "DQMServices/Core/interface/MonitorElement.h"
00004 #include "FWCore/Utilities/interface/EDMException.h"
00005 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
00006 #include "classlib/iobase/Filename.h"
00007 #include "classlib/utils/TimeInfo.h"
00008 #include "classlib/utils/StringList.h"
00009 #include "classlib/utils/StringFormat.h"
00010 #include "classlib/utils/StringOps.h"
00011 #include "classlib/utils/SystemError.h"
00012 #include "classlib/utils/Regexp.h"
00013 #include "DQMRootBuffer.h"
00014 #include "TObjString.h"
00015 #include "TObject.h"
00016 #include "TProfile2D.h"
00017 #include "TProfile.h"
00018 #include "TH3F.h"
00019 #include "TH2F.h"
00020 #include "TH2S.h"
00021 #include "TH1F.h"
00022 #include "TH1S.h"
00023 #include <unistd.h>
00024 #include <fcntl.h>
00025 #include <sys/wait.h>
00026 #include <stdio.h>
00027 #include <stdint.h>
00028 #include <iostream>
00029 #include <cassert>
00030 
00031 #define MESSAGE_SIZE_LIMIT      (2*1024*1024)
00032 #define SOCKET_BUF_SIZE         (8*1024*1024)
00033 #define SOCKET_READ_SIZE        (SOCKET_BUF_SIZE/8)
00034 #define SOCKET_READ_GROWTH      (SOCKET_BUF_SIZE)
00035 
00036 using namespace lat;
00037 
00038 static const Regexp s_rxmeval ("<(.*)>(i|f|s|qr)=(.*)</\\1>");
00039 static const Regexp s_rxmeqr  ("^st\\.(\\d+)\\.(.*)$");
00040 
00042 // Parse an integer parameter in an image spec.  If @ap starts with
00043 // the parameter prefix @a name of length @a len, extracts the number
00044 // that follows the prefix into @a value and advances @a p to the next
00045 // character after the extracted value.  Returns true if the parameter
00046 // was parsed, false otherwise.
00047 static bool parseInt (const char *&p, const char *name, size_t len, int &value)
00048 {
00049   if (! strncmp(p, name, len))
00050   {
00051     value = strtol(p+len, (char **) &p, 10);
00052     return true;
00053   }
00054   return false;
00055 }
00056 
00058 // Generate log prefix.
00059 std::ostream &
00060 DQMNet::logme (void)
00061 {
00062   return std::cerr
00063     << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00064     << " " << appname_ << "[" << pid_ << "]: ";
00065 }
00066 
00067 // Append data into a bucket.
00068 void
00069 DQMNet::copydata(Bucket *b, const void *data, size_t len)
00070 {
00071   b->data.insert(b->data.end(),
00072                  (const unsigned char *)data,
00073                  (const unsigned char *)data + len);
00074 }
00075 
00076 // Discard a bucket chain.
00077 void
00078 DQMNet::discard (Bucket *&b)
00079 {
00080   while (b)
00081   {
00082     Bucket *next = b->next;
00083     delete b;
00084     b = next;
00085   }
00086 }
00087 
00089 
00092 bool
00093 DQMNet::losePeer(const char *reason,
00094                  Peer *peer,
00095                  IOSelectEvent *ev,
00096                  Error *err)
00097 {
00098   if (reason)
00099     logme ()
00100       << reason << peer->peeraddr
00101       << (err ? "; error was: " + err->explain() : std::string(""))
00102       << std::endl;
00103 
00104   Socket *s = peer->socket;
00105 
00106   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00107     if (i->peer == peer)
00108       waiting_.erase(i++);
00109     else
00110       ++i;
00111 
00112   if (ev)
00113     ev->source = 0;
00114 
00115   discard(peer->sendq);
00116   if (peer->automatic)
00117     peer->automatic->peer = 0;
00118 
00119   sel_.detach (s);
00120   s->close();
00121   removePeer (peer, s);
00122   delete s;
00123   return true;
00124 }
00125 
00127 void
00128 DQMNet::requestObject(Peer *p, const char *name, size_t len)
00129 {
00130   Bucket **msg = &p->sendq;
00131   while (*msg)
00132     msg = &(*msg)->next;
00133   *msg = new Bucket;
00134   (*msg)->next = 0;
00135 
00136   uint32_t words[3];
00137   words[0] = sizeof(words) + len;
00138   words[1] = DQM_MSG_GET_OBJECT;
00139   words[2] = len;
00140   copydata(*msg, words, sizeof(words));
00141   copydata(*msg, name, len);
00142 }
00143 
00146 void
00147 DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00148 {
00149   // FIXME: Should we automatically record which exact peer the waiter
00150   // is expecting to deliver data so we know to release the waiter if
00151   // the other peer vanishes?  The current implementation stands a
00152   // chance for the waiter to wait indefinitely -- although we do
00153   // force terminate the wait after a while.
00154   requestObject(owner, name.size() ? &name[0] : 0, name.size());
00155   WaitObject wo = { Time::current(), name, info, p };
00156   waiting_.push_back(wo);
00157   p->waiting++;
00158 }
00159 
00160 // Once an object has been updated, this is invoked for all waiting
00161 // peers.  Send the object back to the peer in suitable form.
00162 void
00163 DQMNet::releaseFromWait(WaitList::iterator i, Object *o)
00164 {
00165   Bucket **msg = &i->peer->sendq;
00166   while (*msg)
00167     msg = &(*msg)->next;
00168   *msg = new Bucket;
00169   (*msg)->next = 0;
00170 
00171   releaseFromWait(*msg, *i, o);
00172 
00173   assert(i->peer->waiting > 0);
00174   i->peer->waiting--;
00175   waiting_.erase(i);
00176 }
00177 
00178 // Release everyone waiting for the object @a o.
00179 void
00180 DQMNet::releaseWaiters(Object *o)
00181 {
00182   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00183     if (i->name == o->name)
00184       releaseFromWait(i++, o);
00185     else
00186       ++i;
00187 }
00188 
00190 // Deserialise a ROOT object from a buffer at the current position.
00191 static TObject *
00192 extractNextObject(DQMRootBuffer &buf)
00193 {
00194   if (buf.Length() == buf.BufferSize())
00195     return 0;
00196 
00197   buf.InitMap();
00198   Int_t pos = buf.Length();
00199   TClass *c = buf.ReadClass();
00200   buf.SetBufferOffset(pos);
00201   buf.ResetMap();
00202   return c ? buf.ReadObject(c) : 0;
00203 }
00204 
00205 // Abort the construction of an object.
00206 static bool
00207 abortReconstructObject(DQMNet::Object &o)
00208 {
00209   o.qreports.clear();
00210   delete o.object;
00211   o.object = 0;
00212   return false;
00213 }
00214 
00215 // Reconstruct an object from the raw data.
00216 bool
00217 DQMNet::reconstructObject(Object &o)
00218 {
00219   DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00220   buf.Reset();
00221 
00222   // Extract the main object.
00223   if (! (o.object = extractNextObject(buf)))
00224     return false;
00225   
00226   // Extract the reference object.
00227   o.reference = extractNextObject(buf);
00228 
00229   // Calculate quality report base name.
00230   int slash = StringOps::rfind(o.name, '/');
00231   std::string qrbase;
00232   qrbase.reserve(o.name.size()+2);
00233   qrbase = (slash == -1 ? o.name : o.name.substr(slash+1, std::string::npos));
00234   qrbase += ".";
00235 
00236   // Extract quality reports.
00237   while (TObjString *qrstr = dynamic_cast<TObjString *>(extractNextObject(buf)))
00238   {
00239     RegexpMatch m;
00240     if (! s_rxmeval.match(qrstr->GetName(), 0, 0, &m))
00241     {
00242       logme()
00243         << "ERROR: unexpected quality report string '"
00244         << qrstr->GetName() << "' for object '"
00245         << o.name << "'\n";
00246       return abortReconstructObject(o);
00247     }
00248 
00249     std::string label = m.matchString(qrstr->GetName(), 1);
00250     std::string type = m.matchString(qrstr->GetName(), 2);
00251     std::string value = m.matchString(qrstr->GetName(), 3);
00252 
00253     if (type != "qr")
00254     {
00255       logme()
00256         << "ERROR: expected a 'qr' for a quality report '"
00257         << qrstr->GetName() << "' but found '" << type
00258         << "' instead\n";
00259       return abortReconstructObject(o);
00260     }
00261 
00262     std::string qrname = label;
00263     qrname.replace(0, qrbase.size(), "");
00264     if (qrname == label)
00265     {
00266       logme()
00267         << "ERROR: quality report label in '"
00268         << qrstr->GetName()
00269         << "' does not match object name '"
00270         << o.name << "'\n";
00271       return abortReconstructObject(o);
00272     }
00273 
00274     m.reset();
00275     if (! s_rxmeqr.match(value, 0, 0, &m))
00276     {
00277       logme()
00278         << "ERROR: quality test value '"
00279         << value << "' is incorrectly formatted\n";
00280       return abortReconstructObject(o);
00281     }
00282 
00283     QValue qval;
00284     qval.code = 0;
00285     qval.qtname = qrname;
00286     qval.message = m.matchString(value, 2);
00287     std::string strcode = m.matchString(value, 1);
00288     const char *p = strcode.c_str();
00289     if (! parseInt(p, "", 0, qval.code) || *p)
00290     {
00291       logme()
00292         << "ERROR: failed to determine quality test code from '"
00293         << value << "'\n";
00294       return abortReconstructObject(o);
00295     }
00296 
00297     o.qreports.push_back(qval);
00298   }
00299 
00300   return true;
00301 }
00302 
00303 bool
00304 DQMNet::reinstateObject(DQMStore *store, Object &o)
00305 {
00306   if (! reconstructObject (o))
00307     return false;
00308 
00309   // Reconstruct the main object
00310   std::string folder = o.name;
00311   std::string name = o.name;
00312   folder.erase(folder.rfind('/'), std::string::npos);
00313   name.erase(0, name.rfind('/')+1);
00314   store->setCurrentFolder(folder);
00315   if (TProfile2D *t = dynamic_cast<TProfile2D *>(o.object))
00316     store->bookProfile2D(name, t);
00317   else if (TProfile *t = dynamic_cast<TProfile *>(o.object))
00318     store->bookProfile(name, t);
00319   else if (TH3F *t = dynamic_cast<TH3F *>(o.object))
00320     store->book3D(name, t);
00321   else if (TH2F *t = dynamic_cast<TH2F *>(o.object))
00322     store->book2D(name, t);
00323   else if (TH2S *t = dynamic_cast<TH2S *>(o.object))
00324     store->book2S(name, t);
00325   else if (TH1F *t = dynamic_cast<TH1F *>(o.object))
00326     store->book1D(name, t);
00327   else if (TH1S *t = dynamic_cast<TH1S *>(o.object))
00328     store->book1S(name, t);
00329   else if (TObjString *t = dynamic_cast<TObjString *>(o.object))
00330   {
00331     RegexpMatch m;
00332     if (! s_rxmeval.match(t->GetName(), 0, 0, &m))
00333     {
00334       logme()
00335         << "ERROR: unexpected monitor element string '"
00336         << t->GetName() << "' for object '"
00337         << o.name << "'\n";
00338       return false;
00339     }
00340 
00341     // std::string label = m.matchString(t->GetName(), 1);
00342     std::string type = m.matchString(t->GetName(), 2);
00343     std::string value = m.matchString(t->GetName(), 3);
00344 
00345     if (type == "i")
00346       store->bookInt(name)->Fill(atoi(value.c_str()));
00347     else if (type == "f")
00348       store->bookFloat(name)->Fill(atof(value.c_str()));
00349     else if (type == "s")
00350       store->bookString(name, value);
00351     else
00352     {
00353       logme()
00354         << "ERROR: unexpected string monitor element of type '"
00355         << type << "' (from '" << t->GetName() << "') for object '"
00356         << o.name << "'\n";
00357       return false;
00358     }
00359   }
00360 
00361   // Reconstruct tags.  (FIXME: untag old tags first?)
00362   for (size_t i = 0, e = o.tags.size(); i < e; ++i)
00363     store->tag(o.name, o.tags[i]);
00364 
00365   // FIXME: Reference and quality reports?
00366 
00367   // Inidicate success.
00368   return true;
00369 }
00370 
00372 // Check if the network layer should stop.
00373 bool
00374 DQMNet::shouldStop(void)
00375 {
00376   return shutdown_;
00377 }
00378 
00379 // Once an object has been updated, this is invoked for all waiting
00380 // peers.  Send the requested object to the waiting peer.
00381 void
00382 DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00383 {
00384   if (o)
00385     sendObjectToPeer (msg, *o, true, sendScalarAsText_);
00386   else
00387   {
00388     uint32_t words [3];
00389     words[0] = sizeof(words) + w.name.size();
00390     words[1] = DQM_REPLY_NONE;
00391     words[2] = w.name.size();
00392 
00393     msg->data.reserve(msg->data.size() + words[0]);
00394     copydata(msg, &words[0], sizeof(words));
00395     copydata(msg, &w.name[0], w.name.size());
00396   }
00397 }
00398 
00399 // Extract data value of a scalar object @a o into @a objdata.
00400 bool
00401 DQMNet::extractScalarData(DataBlob &objdata, Object &o)
00402 {
00403   if (! o.flags & DQM_FLAG_SCALAR)
00404     return false;
00405 
00406   TObject *obj = o.object;
00407   if (! obj && o.rawdata.size())
00408   {
00409     DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00410     buf.InitMap();
00411     buf.Reset();
00412     obj = extractNextObject(buf);
00413   }
00414 
00415   if (TObjString *ostr = dynamic_cast<TObjString *>(obj))
00416   {
00417     const TString &s = ostr->String();
00418     objdata.insert(objdata.end(),
00419                    (unsigned char *) s.Data(),
00420                    (unsigned char *) s.Data() + s.Length());
00421     return true;
00422   }
00423 
00424   return false;
00425 }
00426 
00427 // Send an object to a peer.  If not @a data, only sends a summary
00428 // without the object data, except the data is always sent for scalar
00429 // objects.  If @a text is @c true, sends an ASCII text version of the
00430 // scalar value instead.
00431 void
00432 DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data, bool text)
00433 {
00434   uint32_t flags = o.flags & ~DQM_FLAG_ZOMBIE;
00435   DataBlob objdata;
00436 
00437   if (text && extractScalarData(objdata, o))
00438     flags |= DQM_FLAG_TEXT;
00439   else if (data || (flags & DQM_FLAG_SCALAR))
00440     objdata.insert(objdata.end(),
00441                    &o.rawdata[0],
00442                    &o.rawdata[0] + o.rawdata.size());
00443 
00444   uint32_t words [8];
00445   uint32_t namelen = o.name.size();
00446   uint32_t taglen  = o.tags.size() * sizeof(uint32_t);
00447   uint32_t datalen = objdata.size();
00448 
00449   words[0] = 8*sizeof(uint32_t) + namelen + taglen + datalen;
00450   words[1] = DQM_REPLY_OBJECT;
00451   words[2] = flags;
00452   words[3] = (o.version >> 0 ) & 0xffffffff;
00453   words[4] = (o.version >> 32) & 0xffffffff;
00454   words[5] = namelen;
00455   words[6] = taglen / sizeof(uint32_t);
00456   words[7] = datalen;
00457 
00458   msg->data.reserve(msg->data.size() + words[0]);
00459   copydata(msg, &words[0], 8*sizeof(uint32_t));
00460   if (namelen)
00461     copydata(msg, &o.name[0], namelen);
00462   if (taglen)
00463     copydata(msg, &o.tags[0], taglen);
00464   if (datalen)
00465     copydata(msg, &objdata[0], datalen);
00466 }
00467 
00469 // Handle peer messages.
00470 bool
00471 DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00472 {
00473   // Decode and process this message.
00474   uint32_t type;
00475   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00476   switch (type)
00477   {
00478   case DQM_MSG_UPDATE_ME:
00479     {
00480       if (len != 3*sizeof(uint32_t))
00481       {
00482         logme()
00483           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00484           << " from peer " << p->peeraddr << std::endl;
00485         return false;
00486       }
00487 
00488       // Get the update status: whether this is a full update.
00489       uint32_t full;
00490       memcpy(&full, data + 2*sizeof(uint32_t), sizeof(uint32_t));
00491 
00492       if (debug_)
00493         logme()
00494           << "DEBUG: received message 'UPDATE ME' from peer "
00495           << p->peeraddr << ", full = " << full << std::endl;
00496 
00497       p->update = true;
00498       p->updatefull = full;
00499 
00500       if (full && ! requestFullUpdates_)
00501       {
00502         if (debug_)
00503           logme()
00504             << "WARNING: forcing full update request mode on due to "
00505             << "request from " << p->peeraddr << std::endl;
00506         requestFullUpdates_ = true;
00507         requestFullUpdatesFromPeers();
00508       }
00509     }
00510     return true;
00511 
00512   case DQM_MSG_LIST_OBJECTS:
00513     {
00514       if (debug_)
00515         logme()
00516           << "DEBUG: received message 'LIST OBJECTS' from peer "
00517           << p->peeraddr << std::endl;
00518 
00519       // Send over current status: list of known objects.
00520       lock();
00521       sendObjectListToPeer(msg, p->updatefull, true, false);
00522       unlock();
00523     }
00524     return true;
00525 
00526   case DQM_MSG_GET_OBJECT:
00527     {
00528       if (debug_)
00529         logme()
00530           << "DEBUG: received message 'GET OBJECT' from peer "
00531           << p->peeraddr << std::endl;
00532 
00533       if (len < 3*sizeof(uint32_t))
00534       {
00535         logme()
00536           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00537           << " from peer " << p->peeraddr << std::endl;
00538         return false;
00539       }
00540 
00541       uint32_t namelen;
00542       memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00543       if (len != 3*sizeof(uint32_t) + namelen)
00544       {
00545         logme()
00546           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00547           << " from peer " << p->peeraddr
00548           << ", expected length " << (3*sizeof(uint32_t))
00549           << " + " << namelen << std::endl;
00550         return false;
00551       }
00552 
00553       lock();
00554       std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
00555       Peer *owner = 0;
00556       Object *o = findObject(0, name, &owner);
00557       if (o)
00558       {
00559         o->lastreq = Time::current();
00560         if (o->rawdata.empty())
00561           waitForData(p, name, "", owner);
00562         else
00563           sendObjectToPeer(msg, *o, true, sendScalarAsText_);
00564       }
00565       else
00566       {
00567         uint32_t words [3];
00568         words[0] = sizeof(words) + name.size();
00569         words[1] = DQM_REPLY_NONE;
00570         words[2] = name.size();
00571 
00572         msg->data.reserve(msg->data.size() + words[0]);
00573         copydata(msg, &words[0], sizeof(words));
00574         copydata(msg, &name[0], name.size());
00575       }
00576       unlock();
00577     }
00578     return true;
00579 
00580   case DQM_REPLY_LIST_BEGIN:
00581     {
00582       if (len != 4*sizeof(uint32_t))
00583       {
00584         logme()
00585           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00586           << " from peer " << p->peeraddr << std::endl;
00587         return false;
00588       }
00589 
00590       if (debug_)
00591         logme()
00592           << "DEBUG: received message 'LIST BEGIN' from "
00593           << p->peeraddr << std::endl;
00594 
00595       // Get the update status: whether this is a full update.
00596       uint32_t flags;
00597       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00598 
00599       // If we are about to receive a full list of objects, flag all
00600       // objects dead.  Subsequent object notifications will undo this
00601       // for the live objects.  This tells us the object has been
00602       // removed, but we can keep making it available for a while if
00603       // there continues to be interest in it.
00604       if (flags)
00605       {
00606         lock();
00607         markObjectsZombies(p);
00608         unlock();
00609       }
00610     }
00611     return true;
00612 
00613   case DQM_REPLY_LIST_END:
00614     {
00615       if (len != 4*sizeof(uint32_t))
00616       {
00617         logme()
00618           << "ERROR: corrupt 'LIST END' message of length " << len
00619           << " from peer " << p->peeraddr << std::endl;
00620         return false;
00621       }
00622 
00623       // Get the update status: whether this is a full update.
00624       uint32_t flags;
00625       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00626 
00627       // If we received a full list of objects, flag all zombie objects
00628       // now dead. We need to do this in two stages in case we receive
00629       // updates in many parts, and end up sending updates to others in
00630       // between; this avoids us lying live objects are dead.
00631       if (flags)
00632       {
00633         lock();
00634         markObjectsDead(p);
00635         unlock();
00636       }
00637 
00638       if (debug_)
00639         logme()
00640           << "DEBUG: received message 'LIST END' from "
00641           << p->peeraddr << std::endl;
00642 
00643       // Indicate we have received another update from this peer.
00644       // Also indicate we should flush to our clients.
00645       flush_ = true;
00646       p->updates++;
00647     }
00648     return true;
00649 
00650   case DQM_REPLY_OBJECT:
00651     {
00652       uint32_t words[8];
00653       if (len < sizeof(words))
00654       {
00655         logme()
00656           << "ERROR: corrupt 'OBJECT' message of length " << len
00657           << " from peer " << p->peeraddr << std::endl;
00658         return false;
00659       }
00660 
00661       memcpy (&words[0], data, sizeof(words));
00662       uint32_t &namelen = words[5];
00663       uint32_t &taglen = words[6];
00664       uint32_t &datalen = words[7];
00665 
00666       if (len != sizeof(words) + namelen + taglen*sizeof(uint32_t) + datalen)
00667       {
00668         logme()
00669           << "ERROR: corrupt 'OBJECT' message of length " << len
00670           << " from peer " << p->peeraddr
00671           << ", expected length " << sizeof(words)
00672           << " + " << namelen
00673           << " + " << (taglen*sizeof(uint32_t))
00674           << " + " << datalen
00675           << std::endl;
00676         return false;
00677       }
00678 
00679       unsigned char *namedata = data + sizeof(words);
00680       unsigned char *tagdata = namedata + namelen;
00681       unsigned char *objdata = tagdata + taglen*sizeof(uint32_t);
00682       unsigned char *enddata = objdata + datalen;
00683       std::string name ((char *) namedata, namelen);
00684       assert (enddata == data + len);
00685 
00686       if (debug_)
00687         logme()
00688           << "DEBUG: received message 'OBJECT " << name
00689           << "' from " << p->peeraddr << std::endl;
00690 
00691       // Mark the peer as a known object source.
00692       p->source = true;
00693 
00694       // Initialise or update an object entry.
00695       lock();
00696       Object *o = findObject(p, name);
00697       if (! o)
00698         o = makeObject(p, name);
00699 
00700       o->flags = words[2] | DQM_FLAG_NEW | DQM_FLAG_RECEIVED;
00701       o->version = ((uint64_t) words[4] << 32 | words[3]);
00702       o->tags.clear();
00703       o->tags.insert(o->tags.end(), (uint32_t *) tagdata, (uint32_t *) objdata);
00704       o->rawdata.clear();
00705       o->rawdata.insert (o->rawdata.end(), objdata, enddata);
00706 
00707       bool hadobject = (o->object != 0);
00708       delete o->object;
00709       o->object = 0;
00710       delete o->reference;
00711       o->reference = 0;
00712 
00713       // If we had an object for this one already and this is a list
00714       // update without data, issue an immediate data get request.
00715       if (hadobject && ! datalen)
00716         requestObject(p, (namelen ? &name[0] : 0), namelen);
00717 
00718       // If we have the object data, release from wait.
00719       if (datalen)
00720         releaseWaiters(o);
00721       unlock();
00722     }
00723     return true;
00724 
00725   case DQM_REPLY_NONE:
00726     {
00727       uint32_t words[3];
00728       if (len < sizeof(words))
00729       {
00730         logme()
00731           << "ERROR: corrupt 'NONE' message of length " << len
00732           << " from peer " << p->peeraddr << std::endl;
00733         return false;
00734       }
00735 
00736       memcpy (&words[0], data, sizeof(words));
00737       uint32_t &namelen = words[2];
00738 
00739       if (len != sizeof(words) + namelen)
00740       {
00741         logme()
00742           << "ERROR: corrupt 'NONE' message of length " << len
00743           << " from peer " << p->peeraddr
00744           << ", expected length " << sizeof(words)
00745           << " + " << namelen << std::endl;
00746         return false;
00747       }
00748 
00749       unsigned char *namedata = data + sizeof(words);
00750       unsigned char *enddata = namedata + namelen;
00751       std::string name ((char *) namedata, namelen);
00752       assert (enddata == data + len);
00753 
00754       if (debug_)
00755         logme()
00756           << "DEBUG: received message 'NONE " << name
00757           << "' from " << p->peeraddr << std::endl;
00758 
00759       // Mark the peer as a known object source.
00760       p->source = true;
00761 
00762       // If this was a known object, update its entry.
00763       lock();
00764       Object *o = findObject(p, name);
00765       if (o)
00766         o->flags |= DQM_FLAG_DEAD;
00767 
00768       // If someone was waiting for this, let them go.
00769       releaseWaiters(o);
00770       unlock();
00771     }
00772     return true;
00773 
00774   default:
00775     logme()
00776       << "ERROR: unrecognised message of length " << len
00777       << " and type " << type << " from peer " << p->peeraddr
00778       << std::endl;
00779     return false;
00780   }
00781 }
00782 
00785 bool
00786 DQMNet::onPeerData(IOSelectEvent *ev, Peer *p)
00787 {
00788   assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00789 
00790   // If there is a problem with the peer socket, discard the peer
00791   // and tell the selector to stop prcessing events for it.  If
00792   // this is a server connection, we will eventually recreate
00793   // everything if/when the data server comes back.
00794   if (ev->events & IOUrgent)
00795   {
00796     if (p->automatic)
00797     {
00798       logme()
00799         << "WARNING: connection to the DQM server at " << p->peeraddr
00800         << " lost (will attempt to reconnect in 15 seconds)\n";
00801       return losePeer(0, p, ev);
00802     }
00803     else
00804       return losePeer("WARNING: lost peer connection ", p, ev);
00805   }
00806 
00807   // If we can write to the peer socket, pump whatever we can into it.
00808   if (ev->events & IOWrite)
00809   {
00810     while (Bucket *b = p->sendq)
00811     {
00812       IOSize len = b->data.size() - p->sendpos;
00813       const void *data = (len ? (const void *)&b->data[p->sendpos]
00814                           : (const void *)&data);
00815       IOSize done;
00816 
00817       try
00818       {
00819         done = (len ? ev->source->write (data, len) : 0);
00820         if (debug_ && len)
00821           logme()
00822             << "DEBUG: sent " << done << " bytes to peer "
00823             << p->peeraddr << std::endl;
00824       }
00825       catch (Error &e)
00826       {
00827         return losePeer("WARNING: unable to write to peer ",
00828                         p, ev, &e);
00829       }
00830 
00831       p->sendpos += done;
00832       if (p->sendpos == b->data.size())
00833       {
00834         Bucket *old = p->sendq;
00835         p->sendq = old->next;
00836         p->sendpos = 0;
00837         old->next = 0;
00838         discard(old);
00839       }
00840 
00841       if (! done && len)
00842         // Cannot write any more.
00843         break;
00844     }
00845   }
00846 
00847   // If there is data to be read from the peer, first receive what we
00848   // can get out the socket, the process all complete requests.
00849   if (ev->events & IORead)
00850   {
00851     // First build up the incoming buffer of data in the socket.
00852     // Remember the last size returned by the socket; we need
00853     // it to determine if the remote end closed the connection.
00854     IOSize sz;
00855     try
00856     {
00857       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00858       do
00859         if ((sz = ev->source->read(&buf[0], buf.size())))
00860         {
00861           if (debug_)
00862             logme()
00863               << "DEBUG: received " << sz << " bytes from peer "
00864               << p->peeraddr << std::endl;
00865           DataBlob &data = p->incoming;
00866           if (data.capacity () < data.size () + sz)
00867             data.reserve (data.size() + SOCKET_READ_GROWTH);
00868           data.insert (data.end(), &buf[0], &buf[0] + sz);
00869         }
00870       while (sz == sizeof (buf));
00871     }
00872     catch (Error &e)
00873     {
00874       SystemError *next = dynamic_cast<SystemError *>(e.next());
00875       if (next && next->portable() == SysErr::ErrTryAgain)
00876         sz = 1; // Ignore it, and fake no end of data.
00877       else
00878         // Houston we have a problem.
00879         return losePeer("WARNING: failed to read from peer ",
00880                         p, ev, &e);
00881     }
00882 
00883     // Process fully received messages as long as we can.
00884     size_t consumed = 0;
00885     DataBlob &data = p->incoming;
00886     while (data.size()-consumed >= sizeof(uint32_t)
00887            && p->waiting < MAX_PEER_WAITREQS)
00888     {
00889       uint32_t msglen;
00890       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00891 
00892       if (msglen >= MESSAGE_SIZE_LIMIT)
00893         return losePeer("WARNING: excessively large message from ", p, ev);
00894 
00895       if (data.size()-consumed >= msglen)
00896       {
00897         bool valid = true;
00898         if (msglen < 2*sizeof(uint32_t))
00899         {
00900           logme()
00901             << "ERROR: corrupt peer message of length " << msglen
00902             << " from peer " << p->peeraddr << std::endl;
00903           valid = false;
00904         }
00905         else
00906         {
00907           // Decode and process this message.
00908           Bucket msg;
00909           msg.next = 0;
00910           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00911 
00912           // If we created a response, chain it to the write queue.
00913           if (! msg.data.empty())
00914           {
00915             Bucket **prev = &p->sendq;
00916             while (*prev)
00917                prev = &(*prev)->next;
00918 
00919             *prev = new Bucket;
00920             (*prev)->next = 0;
00921             (*prev)->data.swap(msg.data);
00922           }
00923         }
00924 
00925         if (! valid)
00926           return losePeer("WARNING: data stream error with ", p, ev);
00927 
00928         consumed += msglen;
00929       }
00930       else
00931         break;
00932     }
00933 
00934     data.erase(data.begin(), data.begin()+consumed);
00935 
00936     // If the client has closed the connection, shut down our end.  If
00937     // we have something to send back still, leave the write direction
00938     // open.  Otherwise close the shop for this client.
00939     if (sz == 0)
00940       sel_.setMask(p->socket, p->mask &= ~IORead);
00941   }
00942 
00943   // Yes, please keep processing events for this socket.
00944   return false;
00945 }
00946 
00951 bool
00952 DQMNet::onPeerConnect(IOSelectEvent *ev)
00953 {
00954   // Recover the server socket.
00955   assert (ev->source == server_);
00956 
00957   // Accept the connection.
00958   Socket *s = server_->accept();
00959   assert (s);
00960   assert (! s->isBlocking());
00961 
00962   // Record it to our list of peers.
00963   Peer *p = createPeer(s);
00964   InetAddress peeraddr = ((InetSocket *) s)->peername();
00965   InetAddress myaddr = ((InetSocket *) s)->sockname();
00966   p->peeraddr = StringFormat("%1:%2")
00967                 .arg(peeraddr.hostname())
00968                 .arg(peeraddr.port());
00969   p->mask = IORead|IOUrgent;
00970   p->socket = s;
00971 
00972   // Report the new connection.
00973   if (debug_)
00974     logme()
00975       << "INFO: new peer " << p->peeraddr << " is now connected to "
00976       << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00977 
00978   // Attach it to the listener.
00979   sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
00980 
00981   // We are never done.
00982   return false;
00983 }
00984 
00991 bool
00992 DQMNet::onLocalNotify(IOSelectEvent *ev)
00993 {
00994   // Discard the data in the pipe, we care only about the wakeup.
00995   try
00996   {
00997     IOSize sz;
00998     unsigned char buf [1024];
00999     while ((sz = ev->source->read(buf, sizeof(buf))))
01000       ;
01001   }
01002   catch (Error &e)
01003   {
01004     SystemError *next = dynamic_cast<SystemError *>(e.next());
01005     if (next && next->portable() == SysErr::ErrTryAgain)
01006       ; // Ignore it
01007     else
01008       logme()
01009         << "WARNING: error reading from notification pipe: "
01010         << e.explain() << std::endl;
01011   }
01012 
01013   // Tell the main event pump to send an update in a little while.
01014   flush_ = true;
01015 
01016   // We are never done, always keep going.
01017   return false;
01018 }
01019 
01022 void
01023 DQMNet::updateMask(Peer *p)
01024 {
01025   if (! p->socket)
01026     return;
01027 
01028   // Listen to writes iff we have data to send.
01029   unsigned oldmask = p->mask;
01030   if (! p->sendq && (p->mask & IOWrite))
01031     sel_.setMask(p->socket, p->mask &= ~IOWrite);
01032 
01033   if (p->sendq && ! (p->mask & IOWrite))
01034     sel_.setMask(p->socket, p->mask |= IOWrite);
01035 
01036   if (debug_ && oldmask != p->mask)
01037     logme()
01038       << "DEBUG: updating mask for " << p->peeraddr << " to "
01039       << p->mask << " from " << oldmask << std::endl;
01040 
01041   // If we have nothing more to send and are no longer listening
01042   // for reads, close up the shop for this peer.
01043   if (p->mask == IOUrgent && ! p->waiting)
01044   {
01045     assert (! p->sendq);
01046     if (debug_)
01047       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
01048     losePeer(0, p, 0);
01049   }
01050 }
01051 
01053 DQMNet::DQMNet (const std::string &appname /* = "" */)
01054   : debug_ (false),
01055     sendScalarAsText_ (false),
01056     requestFullUpdates_ (false),
01057     appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
01058     pid_ (getpid()),
01059     server_ (0),
01060     version_ (Time::current()),
01061     communicate_ ((pthread_t) -1),
01062     shutdown_ (0),
01063     delay_ (1000),
01064     flush_ (false)
01065 {
01066   // Create a pipe for the local DQM to tell the communicator
01067   // thread that local DQM data has changed and that the peers
01068   // should be notified.
01069   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
01070   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
01071 
01072   // Initialise the upstream and downstream to empty.
01073   upstream_.peer   = downstream_.peer   = 0;
01074   upstream_.next   = downstream_.next   = 0;
01075   upstream_.port   = downstream_.port   = 0;
01076   upstream_.update = downstream_.update = false;
01077   upstream_.warned = downstream_.warned = false;
01078 }
01079 
01080 DQMNet::~DQMNet(void)
01081 {
01082   // FIXME
01083 }
01084 
01087 void
01088 DQMNet::debug(bool doit)
01089 {
01090   debug_ = doit;
01091 }
01092 
01095 void
01096 DQMNet::delay(int delay)
01097 {
01098   delay_ = delay;
01099 }
01100 
01104 void
01105 DQMNet::sendScalarAsText(bool doit)
01106 {
01107   sendScalarAsText_ = doit;
01108 }
01109 
01115 void
01116 DQMNet::requestFullUpdates(bool doit)
01117 {
01118   requestFullUpdates_ = doit;
01119 }
01120 
01124 void
01125 DQMNet::startLocalServer(int port)
01126 {
01127   if (server_)
01128   {
01129     logme() << "ERROR: DQM server was already started.\n";
01130     return;
01131   }
01132 
01133   try
01134   {
01135     server_ = new InetServerSocket(InetAddress (port), 10);
01136     server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
01137     server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
01138     server_->setBlocking(false);
01139     sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01140   }
01141   catch (Error &e)
01142   {
01143     // FIXME: Do we need to do this when we throw an exception anyway?
01144     // FIXME: Abort instead?
01145     logme()
01146       << "ERROR: Failed to start server at port " << port << ": "
01147       << e.explain() << std::endl;
01148 
01149     // FIXME: Throw something simpler that removes the dependency?
01150     throw cms::Exception("DQMNet::startLocalServer")
01151       << "Failed to start server at port " << port << ": "
01152       << e.explain();
01153   }
01154   
01155   logme() << "INFO: DQM server started at port " << port << std::endl;
01156 }
01157 
01161 void
01162 DQMNet::updateToCollector(const std::string &host, int port)
01163 {
01164   if (! downstream_.host.empty())
01165   {
01166     logme()
01167       << "ERROR: Already updating another collector at "
01168       << downstream_.host << ":" << downstream_.port << std::endl;
01169     return;
01170   }
01171 
01172   downstream_.update = true;
01173   downstream_.host = host;
01174   downstream_.port = port;
01175 }
01176 
01180 void
01181 DQMNet::listenToCollector(const std::string &host, int port)
01182 {
01183   if (! upstream_.host.empty())
01184   {
01185     logme()
01186       << "ERROR: Already receiving data from another collector at "
01187       << upstream_.host << ":" << upstream_.port << std::endl;
01188     return;
01189   }
01190 
01191   upstream_.update = false;
01192   upstream_.host = host;
01193   upstream_.port = port;
01194 }
01195 
01197 void
01198 DQMNet::shutdown(void)
01199 {
01200   shutdown_ = 1;
01201   if (communicate_ != (pthread_t) -1)
01202     pthread_join(communicate_, 0);
01203 }
01204 
01210 static void *communicate(void *obj)
01211 {
01212   sigset_t sigs;
01213   sigfillset (&sigs);
01214   pthread_sigmask (SIG_BLOCK, &sigs, 0);
01215   ((DQMNet *)obj)->run();
01216   return 0;
01217 }
01218 
01220 void
01221 DQMNet::lock(void)
01222 {
01223   if (communicate_ != (pthread_t) -1)
01224     pthread_mutex_lock(&lock_);
01225 }
01226 
01228 void
01229 DQMNet::unlock(void)
01230 {
01231   if (communicate_ != (pthread_t) -1)
01232     pthread_mutex_unlock(&lock_);
01233 }
01234 
01238 void
01239 DQMNet::start(void)
01240 {
01241   if (communicate_ != (pthread_t) -1)
01242   {
01243     logme()
01244       << "ERROR: DQM networking thread has already been started\n";
01245     return;
01246   }
01247 
01248   pthread_mutex_init(&lock_, 0);
01249   pthread_create (&communicate_, 0, &communicate, this);
01250 }
01251 
01253 void
01254 DQMNet::run(void)
01255 {
01256   Time now;
01257   Time nextFlush = 0;
01258   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
01259 
01260   // Perform I/O.  Every once in a while flush updates to peers.
01261   while (! shouldStop())
01262   {
01263     for (int i = 0; i < 2; ++i)
01264     {
01265       AutoPeer *ap = automatic[i];
01266 
01267       // If we need a server connection and don't have one yet,
01268       // initiate asynchronous connection creation.  Swallow errors
01269       // in case the server won't talk to us.
01270       if (! ap->host.empty()
01271           && ! ap->peer
01272           && (now = Time::current()) > ap->next)
01273       {
01274         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01275         InetSocket *s = 0;
01276         try
01277         {
01278           s = new InetSocket (SocketConst::TypeStream);
01279           s->setBlocking (false);
01280           s->connect(InetAddress (ap->host.c_str(), ap->port));
01281           s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
01282           s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
01283         }
01284         catch (Error &e)
01285         {
01286           SystemError *sys = dynamic_cast<SystemError *>(e.next());
01287           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
01288           {
01289             // "In progress" just means the connection is in progress.
01290             // The connection is ready when the socket is writeable.
01291             // Anything else is a real problem.
01292             if (! ap->warned)
01293             {
01294               logme()
01295                 << "NOTE: DQM server at " << ap->host << ":" << ap->port
01296                 << " is unavailable.  Connection will be established"
01297                 << " automatically on the background once the server"
01298                 << " becomes available.  Error from the attempt was: "
01299                 << e.explain() << '\n';
01300               ap->warned = true;
01301             }
01302 
01303             if (s)
01304               s->abort();
01305             delete s;
01306             s = 0;
01307           }
01308         }
01309 
01310         // Set up with the selector if we were successful.  If this is
01311         // the upstream collector, queue a request for updates.
01312         if (s)
01313         {
01314           lock();
01315           Peer *p = createPeer(s);
01316           ap->peer = p;
01317           ap->warned = false;
01318           unlock();
01319 
01320           InetAddress peeraddr = ((InetSocket *) s)->peername();
01321           InetAddress myaddr = ((InetSocket *) s)->sockname();
01322           p->peeraddr = StringFormat("%1:%2")
01323                         .arg(peeraddr.hostname())
01324                         .arg(peeraddr.port());
01325           p->mask = IORead|IOWrite|IOUrgent;
01326           p->update = ap->update;
01327           p->automatic = ap;
01328           p->socket = s;
01329           sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
01330           if (ap == &upstream_)
01331           {
01332             uint32_t words[5] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
01333                                   3*sizeof(uint32_t), DQM_MSG_UPDATE_ME,
01334                                   requestFullUpdates_ };
01335             p->sendq = new Bucket;
01336             p->sendq->next = 0;
01337             copydata(p->sendq, words, sizeof(words));
01338           }
01339 
01340           // Report the new connection.
01341           if (debug_)
01342             logme()
01343               << "INFO: now connected to " << p->peeraddr << " from "
01344               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01345         }
01346       }
01347     }
01348 
01349     // Pump events for a while.
01350     sel_.dispatch(delay_);
01351     now = Time::current();
01352 
01353     // Check if flush is required.  Flush only if one is needed.
01354     // Always sends the full object list, but only rarely.
01355     // Compact objects no longer in active use before sending
01356     // out the update.
01357     if (flush_ && now > nextFlush)
01358     {
01359       flush_ = false;
01360       nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01361 
01362       lock();
01363       purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0),
01364                        now - TimeSpan(0, 0, 20 /* minutes */, 0, 0));
01365       sendObjectListToPeers(true);
01366       unlock();
01367     }
01368 
01369     // Update the data server and peer selection masks.  If we
01370     // have no more data to send and listening for writes, remove
01371     // the write mask.  If we have something to write and aren't
01372     // listening for writes, start listening so we can send off
01373     // the data.
01374     updatePeerMasks();
01375 
01376     // Release peers that have been waiting for data for too long.
01377     lock();
01378     Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0);
01379     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01380     {
01381       // If the peer has waited for too long, send something.
01382       if (i->time < waitold)
01383         releaseFromWait(i++, findObject(0, i->name));
01384 
01385       // Keep it for now.
01386       else
01387         ++i;
01388     }
01389     unlock();
01390   }
01391 }
01392 
01393 int
01394 DQMNet::receive(DQMStore *)
01395 {
01396   logme() << "ERROR: receive() method is not supported.\n";
01397   return 0;
01398 }
01399 
01400 void
01401 DQMNet::updateLocalObject(Object &o)
01402 {
01403   logme() << "ERROR: updateLocalObject() method is not supported.\n";
01404 }
01405 
01406 void
01407 DQMNet::removeLocalObject(const std::string &name)
01408 {
01409   logme() << "ERROR: removeLocalObject() method is not supported.\n";
01410 }
01411 
01412 // Tell the network cache that there have been local changes that
01413 // should be advertised to the downstream listeners.
01414 void
01415 DQMNet::sendLocalChanges(void)
01416 {
01417   char byte = 0;
01418   wakeup_.sink()->write(&byte, 1);
01419 }
01420 
01424 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */)
01425   : DQMImplNet<DQMNet::Object>(appname)
01426 {
01427   local_ = static_cast<ImplPeer *>(createPeer((Socket *) -1));
01428 }
01429 
01430 int
01431 DQMBasicNet::receive(DQMStore *store)
01432 {
01433   int updates = 0;
01434 
01435   lock();
01436   PeerMap::iterator pi, pe;
01437   ObjectMap::iterator oi, oe;
01438   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01439   {
01440     ImplPeer &p = pi->second;
01441     if (&p == local_)
01442       continue;
01443 
01444     updates += p.updates;
01445 
01446     for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01447     {
01448       Object &o = oi->second;
01449       if (o.flags & DQM_FLAG_DEAD)
01450       {
01451         std::string folder = o.name;
01452         std::string name = o.name;
01453         folder.erase(folder.rfind('/'), std::string::npos);
01454         name.erase(0, name.rfind('/')+1);
01455         store->setCurrentFolder(folder);
01456         store->removeElement(name);
01457         p.objs.erase(oi++);
01458       }
01459       else if ((o.flags & DQM_FLAG_RECEIVED) && reinstateObject(store, o))
01460       {
01461         o.flags &= ~DQM_FLAG_RECEIVED;
01462         ++oi;
01463       }
01464     }
01465   }
01466   unlock();
01467 
01468   return updates;
01469 }
01470 
01473 void
01474 DQMBasicNet::updateLocalObject(Object &o)
01475 {
01476   ObjectMap::iterator pos = local_->objs.find(o.name);
01477   if (pos == local_->objs.end())
01478     local_->objs.insert(ObjectMap::value_type(o.name, o));
01479   else
01480   {
01481     std::swap(pos->second.version,   o.version);
01482     std::swap(pos->second.tags,      o.tags);
01483     std::swap(pos->second.qreports,  o.qreports);
01484     std::swap(pos->second.flags,     o.flags);
01485     std::swap(pos->second.rawdata,   o.rawdata);
01486 
01487     delete pos->second.object;
01488     pos->second.object = 0;
01489     delete pos->second.reference;
01490     pos->second.reference = 0;
01491     pos->second.lastreq = 0;
01492   }
01493 }
01494 
01497 void
01498 DQMBasicNet::removeLocalObject(const std::string &path)
01499 {
01500   local_->objs.erase(path);
01501 }
01502 
01503 void
01504 DQMBasicNet::requestFullUpdatesFromPeers(void)
01505 {
01506   for (PeerMap::iterator i = peers_.begin(), e = peers_.end(); i != e; ++i)
01507   {
01508     ImplPeer &p = i->second;
01509     if (! p.source)
01510       continue;
01511 
01512     Bucket **msg = &p.sendq;
01513     while (*msg)
01514       msg = &(*msg)->next;
01515     *msg = new Bucket;
01516     (*msg)->next = 0;
01517 
01518     uint32_t words[3] = { 3*sizeof(uint32_t), DQM_MSG_UPDATE_ME, 1 };
01519     copydata(*msg, words, sizeof(words));
01520   }
01521 }

Generated on Tue Jun 9 17:34:14 2009 for CMSSW by  doxygen 1.5.4