CMS 3D CMS Logo

DQMNet.h

Go to the documentation of this file.
00001 #ifndef DQMSERVICES_CORE_DQM_NET_H
00002 # define DQMSERVICES_CORE_DQM_NET_H
00003 
00004 # include "classlib/iobase/InetServerSocket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017 
00018 class TObject;
00019 class DQMStore;
00020 
00021 class DQMNet
00022 {
00023 public:
00024   static const uint32_t DQM_MSG_HELLO           = 0;
00025   static const uint32_t DQM_MSG_UPDATE_ME       = 1;
00026   static const uint32_t DQM_MSG_LIST_OBJECTS    = 2;
00027   static const uint32_t DQM_MSG_GET_OBJECT      = 3;
00028 
00029   static const uint32_t DQM_REPLY_LIST_BEGIN    = 101;
00030   static const uint32_t DQM_REPLY_LIST_END      = 102;
00031   static const uint32_t DQM_REPLY_NONE          = 103;
00032   static const uint32_t DQM_REPLY_OBJECT        = 104;
00033 
00034   static const uint32_t DQM_FLAG_REPORT_ERROR   = 0x1;
00035   static const uint32_t DQM_FLAG_REPORT_WARNING = 0x2;
00036   static const uint32_t DQM_FLAG_REPORT_OTHER   = 0x4;
00037   static const uint32_t DQM_FLAG_SCALAR         = 0x8;
00038   static const uint32_t DQM_FLAG_ZOMBIE         = 0x08000000;
00039   static const uint32_t DQM_FLAG_TEXT           = 0x10000000;
00040   static const uint32_t DQM_FLAG_RECEIVED       = 0x20000000;
00041   static const uint32_t DQM_FLAG_NEW            = 0x40000000;
00042   static const uint32_t DQM_FLAG_DEAD           = 0x80000000;
00043 
00044   static const uint32_t MAX_PEER_WAITREQS       = 128;
00045 
00046   struct Peer;
00047   struct QValue;
00048   struct WaitObject;
00049 
00050   typedef std::vector<unsigned char>    DataBlob;
00051   typedef std::vector<uint32_t>         TagList;
00052   typedef std::vector<QValue>           QReports;
00053   typedef std::list<WaitObject>         WaitList;
00054 
00055   struct QValue
00056   {
00057     int                 code;
00058     float               qtresult;
00059     std::string         message;
00060     std::string         qtname;
00061     std::string         algorithm;
00062   };
00063 
00064   struct CoreObject
00065   {
00066     uint64_t            version;
00067     std::string         name;
00068     TagList             tags;
00069     TObject             *object;
00070     TObject             *reference;
00071     QReports            qreports;
00072     uint32_t            flags;
00073   };
00074   
00075   struct Object : CoreObject
00076   {
00077     DataBlob            rawdata;
00078     lat::Time           lastreq;
00079   };
00080 
00081   struct Bucket
00082   {
00083     Bucket              *next;
00084     DataBlob            data;
00085   };
00086 
00087   struct WaitObject
00088   {
00089     lat::Time           time;
00090     std::string         name;
00091     std::string         info;
00092     Peer                *peer;
00093   };
00094 
00095   struct AutoPeer;
00096   struct Peer
00097   {
00098     std::string         peeraddr;
00099     lat::Socket         *socket;
00100     DataBlob            incoming;
00101     Bucket              *sendq;
00102     size_t              sendpos;
00103 
00104     unsigned            mask;
00105     bool                source;
00106     bool                update;
00107     bool                updated;
00108     bool                updatefull;
00109     size_t              updates;
00110     size_t              waiting;
00111     AutoPeer            *automatic;
00112   };
00113 
00114   struct AutoPeer
00115   {
00116     Peer                *peer;
00117     lat::Time           next;
00118     std::string         host;
00119     int                 port;
00120     bool                update;
00121     bool                warned;
00122   };
00123 
00124   DQMNet(const std::string &appname = "");
00125   virtual ~DQMNet(void);
00126 
00127   void                  debug(bool doit);
00128   void                  delay(int delay);
00129   void                  sendScalarAsText(bool doit);
00130   void                  requestFullUpdates(bool doit);
00131   void                  startLocalServer(int port);
00132   void                  updateToCollector(const std::string &host, int port);
00133   void                  listenToCollector(const std::string &host, int port);
00134   void                  shutdown(void);
00135   void                  lock(void);
00136   void                  unlock(void);
00137 
00138   void                  start(void);
00139   void                  run(void);
00140 
00141   virtual int           receive(DQMStore *store);
00142   virtual void          updateLocalObject(Object &o);
00143   virtual void          removeLocalObject(const std::string &name);
00144   void                  sendLocalChanges(void);
00145 
00146 protected:
00147   std::ostream &        logme(void);
00148   static void           copydata(Bucket *b, const void *data, size_t len);
00149   bool                  extractScalarData(DataBlob &objdata, Object &o);
00150   void                  sendObjectToPeer(Bucket *msg, Object &o, bool data, bool text);
00151 
00152   virtual bool          shouldStop(void);
00153   void                  waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00154   virtual void          releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00155   virtual bool          onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00156 
00157   bool                  reconstructObject(Object &o);
00158   bool                  reinstateObject(DQMStore *store, Object &o);
00159   virtual Object *      findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
00160   virtual Object *      makeObject(Peer *p, const std::string &name) = 0;
00161   virtual void          markObjectsZombies(Peer *p) = 0;
00162   virtual void          markObjectsDead(Peer *p) = 0;
00163   virtual void          purgeDeadObjects(lat::Time oldobj, lat::Time deadobj) = 0;
00164 
00165   virtual Peer *        getPeer(lat::Socket *s) = 0;
00166   virtual Peer *        createPeer(lat::Socket *s) = 0;
00167   virtual void          removePeer(Peer *p, lat::Socket *s) = 0;
00168   virtual void          sendObjectListToPeer(Bucket *msg, bool data, bool all, bool clear) = 0;
00169   virtual void          sendObjectListToPeers(bool all) = 0;
00170   virtual void          requestFullUpdatesFromPeers(void) = 0;
00171 
00172   void                  updateMask(Peer *p);
00173   virtual void          updatePeerMasks(void) = 0;
00174 
00175   bool                  debug_;
00176   bool                  sendScalarAsText_;
00177   bool                  requestFullUpdates_;
00178 
00179 private:
00180   static void           discard(Bucket *&b);
00181   bool                  losePeer(const char *reason,
00182                                  Peer *peer,
00183                                  lat::IOSelectEvent *event,
00184                                  lat::Error *err = 0);
00185   void                  requestObject(Peer *p, const char *name, size_t len);
00186   void                  releaseFromWait(WaitList::iterator i, Object *o);
00187   void                  releaseWaiters(Object *o);
00188 
00189   bool                  onPeerData(lat::IOSelectEvent *ev, Peer *p);
00190   bool                  onPeerConnect(lat::IOSelectEvent *ev);
00191   bool                  onLocalNotify(lat::IOSelectEvent *ev);
00192 
00193   std::string           appname_;
00194   int                   pid_;
00195 
00196   lat::IOSelector       sel_;
00197   lat::InetServerSocket *server_;
00198   lat::Pipe             wakeup_;
00199   lat::Time             version_;
00200 
00201   AutoPeer              upstream_;
00202   AutoPeer              downstream_;
00203   WaitList              waiting_;
00204 
00205   pthread_mutex_t       lock_;
00206   pthread_t             communicate_;
00207   sig_atomic_t          shutdown_;
00208 
00209   int                   delay_;
00210   bool                  flush_;
00211 
00212   // copying is not available
00213   DQMNet(const DQMNet &);
00214   DQMNet &operator=(const DQMNet &);
00215 };
00216 
00217 template <class ObjType>
00218 class DQMImplNet : public DQMNet
00219 {
00220 public:
00221   struct ImplPeer;
00222   typedef std::map<std::string, ObjType> ObjectMap;
00223   typedef std::map<lat::Socket *, ImplPeer> PeerMap;
00224   struct ImplPeer : Peer
00225   {
00226     ObjectMap objs;
00227   };
00228 
00229   DQMImplNet(const std::string &appname = "")
00230     : DQMNet(appname)
00231     {}
00232   
00233   ~DQMImplNet(void)
00234     {
00235       typename PeerMap::iterator pi, pe;
00236       typename ObjectMap::iterator oi, oe;
00237       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00238         for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00239         {
00240           ObjType &o = oi->second;
00241           delete o.object;
00242           delete o.reference;
00243           o.object = 0;
00244           o.reference = 0;
00245         }
00246     }
00247 
00248 protected:
00249   virtual Object *
00250   findObject(Peer *p, const std::string &name, Peer **owner = 0)
00251     {
00252       typename ObjectMap::iterator pos;
00253       typename PeerMap::iterator i, e;
00254       if (owner)
00255         *owner = 0;
00256       if (p)
00257       {
00258         ImplPeer *ip = static_cast<ImplPeer *>(p);
00259         pos = ip->objs.find(name);
00260         if (pos == ip->objs.end())
00261           return 0;
00262         else
00263         {
00264           if (owner) *owner = ip;
00265           return &pos->second;
00266         }
00267       }
00268       else
00269       {
00270         for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00271         {
00272           pos = i->second.objs.find(name);
00273           if (pos != i->second.objs.end())
00274           {
00275             if (owner) *owner = &i->second;
00276             return &pos->second;
00277           }
00278         }
00279         return 0;
00280       }
00281     }
00282 
00283   virtual Object *
00284   makeObject(Peer *p, const std::string &name)
00285     {
00286       ImplPeer *ip = static_cast<ImplPeer *>(p);
00287       ObjType *o = &ip->objs[name];
00288       o->version = 0;
00289       o->name = name;
00290       o->object = 0;
00291       o->reference = 0;
00292       o->flags = 0;
00293       o->lastreq = 0;
00294       return o;
00295     }
00296 
00297   // Mark all the objects as zombies.  This is intended to be used
00298   // when starting to process a complete list of objects, in order
00299   // to flag the objects that need to be killed at the end.  After
00300   // call to this method, revive all live objects by removing the
00301   // DQM_FLAG_ZOMBIE flag, then call markObjectsDead() at the end
00302   // to flag dead as all remaining zombies.
00303   virtual void
00304   markObjectsZombies(Peer *p)
00305     {
00306       ImplPeer *ip = static_cast<ImplPeer *>(p);
00307       typename ObjectMap::iterator i, e;
00308       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00309         i->second.flags |= DQM_FLAG_ZOMBIE;
00310     }
00311 
00312   // Mark remaining zombie objects as dead.  See markObjectsZombies().
00313   virtual void
00314   markObjectsDead(Peer *p)
00315     {
00316       ImplPeer *ip = static_cast<ImplPeer *>(p);
00317       typename ObjectMap::iterator i, e;
00318       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00319         if (i->second.flags & DQM_FLAG_ZOMBIE)
00320           i->second.flags = (i->second.flags & ~DQM_FLAG_ZOMBIE) | DQM_FLAG_DEAD;
00321     }
00322 
00323   // Purge all old and dead objects.
00324   virtual void
00325   purgeDeadObjects(lat::Time oldobj, lat::Time deadobj)
00326     {
00327       typename PeerMap::iterator pi, pe;
00328       typename ObjectMap::iterator oi, oe;
00329       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00330         for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
00331         {
00332           ObjType &o = oi->second;
00333 
00334           // Compact non-scalar objects that are unused.  We send scalar
00335           // objects to the web server so we keep them around.
00336           if (o.lastreq < oldobj && o.object && ! (o.flags & DQM_FLAG_SCALAR))
00337           {
00338             if (debug_)
00339               logme()
00340                 << "DEBUG: compacting idle '" << o.name
00341                 << "' from " << pi->second.peeraddr << std::endl;
00342 
00343             delete o.object;
00344             delete o.reference;
00345             o.object = 0;
00346             o.reference = 0;
00347             purgeDeadObject(o);
00348           }
00349 
00350           // Remove if dead, old and unused.
00351           if (o.lastreq < deadobj
00352               && o.version < deadobj
00353               && (o.flags & DQM_FLAG_DEAD))
00354           {
00355             if (debug_)
00356               logme()
00357                 << "DEBUG: removing dead '" << o.name
00358                 << "' from " << pi->second.peeraddr << std::endl;
00359 
00360             pi->second.objs.erase(oi++);
00361           }
00362           else
00363             ++oi;
00364         }
00365     }
00366 
00367   virtual void
00368   purgeDeadObject(ObjType &o)
00369     {}
00370 
00371   virtual Peer *
00372   getPeer(lat::Socket *s)
00373     {
00374       typename PeerMap::iterator pos = peers_.find(s);
00375       return pos == peers_.end() ? 0 : &pos->second;
00376     }
00377 
00378   virtual Peer *
00379   createPeer(lat::Socket *s)
00380     {
00381       ImplPeer *ip = &peers_[s];
00382       ip->socket = 0;
00383       ip->sendq = 0;
00384       ip->sendpos = 0;
00385       ip->mask = 0;
00386       ip->source = false;
00387       ip->update = false;
00388       ip->updated = false;
00389       ip->updatefull = false;
00390       ip->updates = 0;
00391       ip->waiting = 0;
00392       ip->automatic = 0;
00393       return ip;
00394     }
00395 
00396   virtual void
00397   removePeer(Peer *p, lat::Socket *s)
00398     {
00399       ImplPeer *ip = static_cast<ImplPeer *>(p);
00400       bool needflush = ! ip->objs.empty();
00401 
00402       typename ObjectMap::iterator i, e;
00403       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00404       {
00405         ObjType &o = i->second;
00406         delete o.object;
00407         delete o.reference;
00408         ip->objs.erase(i++);
00409       }
00410     
00411       peers_.erase(s);
00412 
00413       // If we removed a peer with objects, our list of objects
00414       // has changed and we need to update downstream peers.
00415       if (needflush)
00416         sendLocalChanges();
00417     }
00418 
00420   virtual void
00421   sendObjectListToPeer(Bucket *msg, bool data, bool all, bool clear)
00422     {
00423       typename PeerMap::iterator pi, pe;
00424       typename ObjectMap::iterator oi, oe;
00425       uint32_t numobjs = 0;
00426       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00427         numobjs += pi->second.objs.size();
00428 
00429       msg->data.reserve(msg->data.size() + 300*numobjs);
00430 
00431       uint32_t nupdates = 0;
00432       uint32_t words [4];
00433       words[0] = sizeof(words);
00434       words[1] = DQM_REPLY_LIST_BEGIN;
00435       words[2] = numobjs;
00436       words[3] = all;
00437       copydata(msg, &words[0], sizeof(words));
00438 
00439       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00440         for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00441           if (all || (oi->second.flags & DQM_FLAG_NEW))
00442           {
00443             sendObjectToPeer(msg, oi->second, data, sendScalarAsText_);
00444             if (clear)
00445               oi->second.flags &= ~DQM_FLAG_NEW;
00446             ++nupdates;
00447           }
00448 
00449       words[1] = DQM_REPLY_LIST_END;
00450       words[2] = nupdates;
00451       copydata(msg, &words[0], sizeof(words));
00452     }
00453 
00454   virtual void
00455   sendObjectListToPeers(bool all)
00456     {
00457       typename PeerMap::iterator i, e;
00458       for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00459       {
00460         ImplPeer &p = i->second;
00461         if (! p.update)
00462           continue;
00463 
00464         if (debug_)
00465           logme()
00466             << "DEBUG: notifying " << p.peeraddr
00467             << ", full = " << p.updatefull << std::endl;
00468 
00469         Bucket msg;
00470         msg.next = 0;
00471         sendObjectListToPeer(&msg, p.updatefull, !p.updated || all, true);
00472 
00473         if (! msg.data.empty())
00474         {
00475           Bucket **prev = &p.sendq;
00476           while (*prev)
00477              prev = &(*prev)->next;
00478 
00479           *prev = new Bucket;
00480           (*prev)->next = 0;
00481           (*prev)->data.swap(msg.data);
00482         }
00483         p.updated = true;
00484       }
00485     }
00486 
00487   virtual void
00488   requestFullUpdatesFromPeers(void)
00489     {
00490       logme()
00491         << "ERROR: invalid request for full updates from peers.\n";
00492     }
00493 
00494   virtual void
00495   updatePeerMasks(void)
00496     {
00497       typename PeerMap::iterator i, e;
00498       for (i = peers_.begin(), e = peers_.end(); i != e; )
00499         updateMask(&(i++)->second);
00500     }
00501 
00502 protected:
00503   PeerMap               peers_;
00504 };
00505   
00506 
00507 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
00508 {
00509 public:
00510   DQMBasicNet(const std::string &appname = "");
00511 
00512   virtual int           receive(DQMStore *store);
00513 
00514 protected:
00515   virtual void          updateLocalObject(Object &o);
00516   virtual void          removeLocalObject(const std::string &name);
00517   virtual void          requestFullUpdatesFromPeers(void);
00518 
00519 private:
00520   ImplPeer              *local_;
00521 };
00522 
00523 
00524 #endif // DQMSERVICES_CORE_DQM_NET_H

Generated on Tue Jun 9 17:34:11 2009 for CMSSW by  doxygen 1.5.4