CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/DQMServices/Core/interface/DQMNet.h

Go to the documentation of this file.
00001 #ifndef DQMSERVICES_CORE_DQM_NET_H
00002 # define DQMSERVICES_CORE_DQM_NET_H
00003 
00004 # include "classlib/iobase/Socket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017 # include <set>
00018 # include <ext/hash_set>
00019 
00020 //class DQMStore;
00021 
00022 class DQMNet
00023 {
00024 public:
00025   static const uint32_t DQM_PROP_TYPE_MASK      = 0x000000ff;
00026   static const uint32_t DQM_PROP_TYPE_SCALAR    = 0x0000000f;
00027   static const uint32_t DQM_PROP_TYPE_INVALID   = 0x00000000;
00028   static const uint32_t DQM_PROP_TYPE_INT       = 0x00000001;
00029   static const uint32_t DQM_PROP_TYPE_REAL      = 0x00000002;
00030   static const uint32_t DQM_PROP_TYPE_STRING    = 0x00000003;
00031   static const uint32_t DQM_PROP_TYPE_TH1F      = 0x00000010;
00032   static const uint32_t DQM_PROP_TYPE_TH1S      = 0x00000011;
00033   static const uint32_t DQM_PROP_TYPE_TH1D      = 0x00000012;
00034   static const uint32_t DQM_PROP_TYPE_TH2F      = 0x00000020;
00035   static const uint32_t DQM_PROP_TYPE_TH2S      = 0x00000021;
00036   static const uint32_t DQM_PROP_TYPE_TH2D      = 0x00000022;
00037   static const uint32_t DQM_PROP_TYPE_TH3F      = 0x00000030;
00038   static const uint32_t DQM_PROP_TYPE_TH3S      = 0x00000031;
00039   static const uint32_t DQM_PROP_TYPE_TH3D      = 0x00000032;
00040   static const uint32_t DQM_PROP_TYPE_TPROF     = 0x00000040;
00041   static const uint32_t DQM_PROP_TYPE_TPROF2D   = 0x00000041;
00042   static const uint32_t DQM_PROP_TYPE_DATABLOB  = 0x00000050;
00043   
00044   static const uint32_t DQM_PROP_REPORT_MASK    = 0x00000f00;
00045   static const uint32_t DQM_PROP_REPORT_CLEAR   = 0x00000000;
00046   static const uint32_t DQM_PROP_REPORT_ERROR   = 0x00000100;
00047   static const uint32_t DQM_PROP_REPORT_WARN    = 0x00000200;
00048   static const uint32_t DQM_PROP_REPORT_OTHER   = 0x00000400;
00049   static const uint32_t DQM_PROP_REPORT_ALARM   = (DQM_PROP_REPORT_ERROR
00050                                                    | DQM_PROP_REPORT_WARN
00051                                                    | DQM_PROP_REPORT_OTHER);
00052 
00053   static const uint32_t DQM_PROP_HAS_REFERENCE  = 0x00001000;
00054   static const uint32_t DQM_PROP_TAGGED         = 0x00002000;
00055   static const uint32_t DQM_PROP_ACCUMULATE     = 0x00004000;
00056   static const uint32_t DQM_PROP_RESET          = 0x00008000;
00057 
00058   static const uint32_t DQM_PROP_NEW            = 0x00010000;
00059   static const uint32_t DQM_PROP_RECEIVED       = 0x00020000;
00060   static const uint32_t DQM_PROP_LUMI           = 0x00040000;
00061   static const uint32_t DQM_PROP_DEAD           = 0x00080000;
00062   static const uint32_t DQM_PROP_STALE          = 0x00100000;
00063 
00064   static const uint32_t DQM_MSG_HELLO           = 0;
00065   static const uint32_t DQM_MSG_UPDATE_ME       = 1;
00066   static const uint32_t DQM_MSG_LIST_OBJECTS    = 2;
00067   static const uint32_t DQM_MSG_GET_OBJECT      = 3;
00068 
00069   static const uint32_t DQM_REPLY_LIST_BEGIN    = 101;
00070   static const uint32_t DQM_REPLY_LIST_END      = 102;
00071   static const uint32_t DQM_REPLY_NONE          = 103;
00072   static const uint32_t DQM_REPLY_OBJECT        = 104;
00073 
00074   static const uint32_t MAX_PEER_WAITREQS       = 128;
00075 
00076   struct Peer;
00077   struct QValue;
00078   struct WaitObject;
00079 
00080   typedef std::vector<unsigned char>    DataBlob;
00081   typedef std::vector<QValue>           QReports;
00082   typedef std::vector<uint32_t>         TagList; // DEPRECATED
00083   typedef std::list<WaitObject>         WaitList;
00084 
00085   struct QValue
00086   {
00087     int                 code;
00088     float               qtresult;
00089     std::string         message;
00090     std::string         qtname;
00091     std::string         algorithm;
00092   };
00093 
00094   struct CoreObject
00095   {
00096     uint32_t            flags;
00097     uint32_t            tag;
00098     uint64_t            version;
00099     const std::string   *dirname;
00100     std::string         objname;
00101     QReports            qreports;
00102   };
00103   
00104   struct Object : CoreObject
00105   {
00106     uint64_t            hash;
00107     uint64_t            lastreq;
00108     DataBlob            rawdata;
00109     std::string         scalar;
00110     std::string         qdata;
00111   };
00112 
00113   struct Bucket
00114   {
00115     Bucket              *next;
00116     DataBlob            data;
00117   };
00118 
00119   struct WaitObject
00120   {
00121     lat::Time           time;
00122     std::string         name;
00123     std::string         info;
00124     Peer                *peer;
00125   };
00126 
00127   struct AutoPeer;
00128   struct Peer
00129   {
00130     std::string         peeraddr;
00131     lat::Socket         *socket;
00132     DataBlob            incoming;
00133     Bucket              *sendq;
00134     size_t              sendpos;
00135 
00136     unsigned            mask;
00137     bool                source;
00138     bool                update;
00139     bool                updated;
00140     size_t              updates;
00141     size_t              waiting;
00142     AutoPeer            *automatic;
00143   };
00144 
00145   struct AutoPeer
00146   {
00147     Peer                *peer;
00148     lat::Time           next;
00149     std::string         host;
00150     int                 port;
00151     bool                update;
00152   };
00153 
00154   DQMNet(const std::string &appname = "");
00155   virtual ~DQMNet(void);
00156 
00157   void                  debug(bool doit);
00158   void                  delay(int delay);
00159   void                  startLocalServer(int port);
00160   void                  startLocalServer(const char *path);
00161   void                  staleObjectWaitLimit(lat::TimeSpan time);
00162   void                  updateToCollector(const std::string &host, int port);
00163   void                  listenToCollector(const std::string &host, int port);
00164   void                  shutdown(void);
00165   void                  lock(void);
00166   void                  unlock(void);
00167 
00168   void                  start(void);
00169   void                  run(void);
00170 
00171   void                  sendLocalChanges(void);
00172 
00173   static bool setOrder(const CoreObject &a, const CoreObject &b)
00174     {
00175       int diff = a.dirname->compare(*b.dirname);
00176       return (diff < 0 ? true
00177               : diff == 0 ? a.objname < b.objname
00178               : false);
00179     }
00180 
00181   struct HashOp
00182   {
00183     uint32_t operator()(const Object &a) const
00184       {
00185         return a.hash;
00186       }
00187   };
00188 
00189   struct HashEqual
00190   {
00191     bool operator()(const Object &a, const Object &b) const
00192       {
00193         return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
00194       }
00195   };
00196 
00197   static size_t
00198   dqmhash(const void *key, size_t keylen)
00199     {
00200       // Reduced version of Bob Jenkins' hash function at:
00201       //   http://www.burtleburtle.net/bob/c/lookup3.c
00202 #     define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
00203 #     define dqmhashmix(a,b,c) { \
00204         a -= c; a ^= dqmhashrot(c, 4); c += b; \
00205         b -= a; b ^= dqmhashrot(a, 6); a += c; \
00206         c -= b; c ^= dqmhashrot(b, 8); b += a; \
00207         a -= c; a ^= dqmhashrot(c,16); c += b; \
00208         b -= a; b ^= dqmhashrot(a,19); a += c; \
00209         c -= b; c ^= dqmhashrot(b, 4); b += a; }
00210 #     define dqmhashfinal(a,b,c) { \
00211         c ^= b; c -= dqmhashrot(b,14); \
00212         a ^= c; a -= dqmhashrot(c,11); \
00213         b ^= a; b -= dqmhashrot(a,25); \
00214         c ^= b; c -= dqmhashrot(b,16); \
00215         a ^= c; a -= dqmhashrot(c,4);  \
00216         b ^= a; b -= dqmhashrot(a,14); \
00217         c ^= b; c -= dqmhashrot(b,24); }
00218  
00219       uint32_t a, b, c;
00220       a = b = c = 0xdeadbeef + (uint32_t) keylen;
00221       const unsigned char *k = (const unsigned char *) key;
00222 
00223       // all but the last block: affect some bits of (a, b, c)
00224       while (keylen > 12)
00225       {
00226         a += k[0];
00227         a += ((uint32_t)k[1]) << 8;
00228         a += ((uint32_t)k[2]) << 16;
00229         a += ((uint32_t)k[3]) << 24;
00230         b += k[4];
00231         b += ((uint32_t)k[5]) << 8;
00232         b += ((uint32_t)k[6]) << 16;
00233         b += ((uint32_t)k[7]) << 24;
00234         c += k[8];
00235         c += ((uint32_t)k[9]) << 8;
00236         c += ((uint32_t)k[10]) << 16;
00237         c += ((uint32_t)k[11]) << 24;
00238         dqmhashmix(a,b,c);
00239         keylen -= 12;
00240         k += 12;
00241       }
00242 
00243       // last block: affect all 32 bits of (c); all case statements fall through
00244       switch (keylen)
00245       {
00246       case 12: c += ((uint32_t)k[11]) << 24;
00247       case 11: c += ((uint32_t)k[10]) << 16;
00248       case 10: c += ((uint32_t)k[9]) << 8;
00249       case 9 : c += k[8];
00250       case 8 : b += ((uint32_t)k[7]) << 24;
00251       case 7 : b += ((uint32_t)k[6]) << 16;
00252       case 6 : b += ((uint32_t)k[5]) << 8;
00253       case 5 : b += k[4];
00254       case 4 : a += ((uint32_t)k[3]) << 24;
00255       case 3 : a += ((uint32_t)k[2]) << 16;
00256       case 2 : a += ((uint32_t)k[1]) << 8;
00257       case 1 : a += k[0];
00258                break;
00259       case 0 : return c;
00260       }
00261 
00262       dqmhashfinal(a, b, c);
00263       return c;
00264 #     undef dqmhashrot
00265 #     undef dqmhashmix
00266 #     undef dqmhashfinal
00267     }
00268 
00269   static void           packQualityData(std::string &into, const QReports &qr);
00270   static void           unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
00271 
00272 protected:
00273   std::ostream &        logme(void);
00274   static void           copydata(Bucket *b, const void *data, size_t len);
00275   virtual void          sendObjectToPeer(Bucket *msg, Object &o, bool data);
00276 
00277   virtual bool          shouldStop(void);
00278   void                  waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00279   virtual void          releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00280   virtual bool          onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00281 
00282   // bool                       reconstructObject(Object &o);
00283   // bool                       reinstateObject(DQMStore *store, Object &o);
00284   virtual Object *      findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
00285   virtual Object *      makeObject(Peer *p, const std::string &name) = 0;
00286   virtual void          markObjectsDead(Peer *p) = 0;
00287   virtual void          purgeDeadObjects(Peer *p) = 0;
00288 
00289   virtual Peer *        getPeer(lat::Socket *s) = 0;
00290   virtual Peer *        createPeer(lat::Socket *s) = 0;
00291   virtual void          removePeer(Peer *p, lat::Socket *s) = 0;
00292   virtual void          sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
00293   virtual void          sendObjectListToPeers(bool all) = 0;
00294 
00295   void                  updateMask(Peer *p);
00296   virtual void          updatePeerMasks(void) = 0;
00297   static void           discard(Bucket *&b);
00298 
00299   bool                  debug_;
00300   pthread_mutex_t       lock_;
00301 
00302 private:
00303   void                  losePeer(const char *reason,
00304                                  Peer *peer,
00305                                  lat::IOSelectEvent *event,
00306                                  lat::Error *err = 0);
00307   void                  requestObjectData(Peer *p, const char *name, size_t len);
00308   void                  releaseFromWait(WaitList::iterator i, Object *o);
00309   void                  releaseWaiters(const std::string &name, Object *o);
00310 
00311   bool                  onPeerData(lat::IOSelectEvent *ev, Peer *p);
00312   bool                  onPeerConnect(lat::IOSelectEvent *ev);
00313   bool                  onLocalNotify(lat::IOSelectEvent *ev);
00314 
00315   std::string           appname_;
00316   int                   pid_;
00317 
00318   lat::IOSelector       sel_;
00319   lat::Socket           *server_;
00320   lat::Pipe             wakeup_;
00321   lat::Time             version_;
00322 
00323   AutoPeer              upstream_;
00324   AutoPeer              downstream_;
00325   WaitList              waiting_;
00326 
00327   pthread_t             communicate_;
00328   sig_atomic_t          shutdown_;
00329 
00330   int                   delay_;
00331   lat::TimeSpan         waitStale_;
00332   lat::TimeSpan         waitMax_;
00333   bool                  flush_;
00334 
00335   // copying is not available
00336   DQMNet(const DQMNet &);
00337   DQMNet &operator=(const DQMNet &);
00338 };
00339 
00340 template <class ObjType>
00341 class DQMImplNet : public DQMNet
00342 {
00343 public:
00344   struct ImplPeer;
00345 
00346   typedef std::set<std::string> DirMap;
00347   typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
00348   typedef std::map<lat::Socket *, ImplPeer> PeerMap;
00349   struct ImplPeer : Peer
00350   {
00351     ImplPeer(void) {}
00352     ObjectMap objs;
00353     DirMap dirs;
00354   };
00355 
00356   DQMImplNet(const std::string &appname = "")
00357     : DQMNet(appname)
00358     {}
00359   
00360   ~DQMImplNet(void)
00361     {}
00362 
00363 protected:
00364   virtual Object *
00365   findObject(Peer *p, const std::string &name, Peer **owner = 0)
00366     {
00367       size_t slash = name.rfind('/');
00368       size_t dirpos = (slash == std::string::npos ? 0 : slash);
00369       size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00370       std::string path(name, 0, dirpos);
00371       ObjType proto;
00372       proto.hash = dqmhash(name.c_str(), name.size());
00373       proto.dirname = &path;
00374       proto.objname.append(name, namepos, std::string::npos);
00375 
00376       typename ObjectMap::iterator pos;
00377       typename PeerMap::iterator i, e;
00378       if (owner)
00379         *owner = 0;
00380       if (p)
00381       {
00382         ImplPeer *ip = static_cast<ImplPeer *>(p);
00383         pos = ip->objs.find(proto);
00384         if (pos == ip->objs.end())
00385           return 0;
00386         else
00387         {
00388           if (owner) *owner = ip;
00389           return const_cast<ObjType *>(&*pos);
00390         }
00391       }
00392       else
00393       {
00394         for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00395         {
00396           pos = i->second.objs.find(proto);
00397           if (pos != i->second.objs.end())
00398           {
00399             if (owner) *owner = &i->second;
00400             return const_cast<ObjType *>(&*pos);
00401           }
00402         }
00403         return 0;
00404       }
00405     }
00406 
00407   virtual Object *
00408   makeObject(Peer *p, const std::string &name)
00409     {
00410       ImplPeer *ip = static_cast<ImplPeer *>(p);
00411       size_t slash = name.rfind('/');
00412       size_t dirpos = (slash == std::string::npos ? 0 : slash);
00413       size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00414       ObjType o;
00415       o.flags = 0;
00416       o.tag = 0;
00417       o.version = 0;
00418       o.lastreq = 0;
00419       o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
00420       o.objname.append(name, namepos, std::string::npos);
00421       o.hash = dqmhash(name.c_str(), name.size());
00422       return const_cast<ObjType *>(&*ip->objs.insert(o).first);
00423     }
00424 
00425   // Mark all the objects dead.  This is intended to be used when
00426   // starting to process a complete list of objects, in order to
00427   // flag the objects that need to be killed at the end.  After
00428   // call to this method, revive all live objects by removing the
00429   // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
00430   // to remove the dead ones.  This also turns off object request
00431   // for objects we've lost interest in.
00432   virtual void
00433   markObjectsDead(Peer *p)
00434     {
00435       uint64_t minreq
00436         = (lat::Time::current()
00437           - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
00438       ImplPeer *ip = static_cast<ImplPeer *>(p);
00439       typename ObjectMap::iterator i, e;
00440       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00441       {
00442         if (i->lastreq && i->lastreq < minreq)
00443           const_cast<ObjType &>(*i).lastreq = 0;
00444         const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
00445       }
00446     }
00447 
00448   // Mark remaining zombie objects as dead.  See markObjectsDead().
00449   virtual void
00450   purgeDeadObjects(Peer *p)
00451     {
00452       ImplPeer *ip = static_cast<ImplPeer *>(p);
00453       typename ObjectMap::iterator i, e;
00454       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00455       {
00456         if (i->flags & DQM_PROP_DEAD)
00457           ip->objs.erase(i++);
00458         else
00459           ++i;
00460       }
00461     }
00462 
00463   virtual Peer *
00464   getPeer(lat::Socket *s)
00465     {
00466       typename PeerMap::iterator pos = peers_.find(s);
00467       typename PeerMap::iterator end = peers_.end();
00468       return pos == end ? 0 : &pos->second;
00469     }
00470 
00471   virtual Peer *
00472   createPeer(lat::Socket *s)
00473     {
00474       ImplPeer *ip = &peers_[s];
00475       ip->socket = 0;
00476       ip->sendq = 0;
00477       ip->sendpos = 0;
00478       ip->mask = 0;
00479       ip->source = false;
00480       ip->update = false;
00481       ip->updated = false;
00482       ip->updates = 0;
00483       ip->waiting = 0;
00484       ip->automatic = 0;
00485       return ip;
00486     }
00487 
00488   virtual void
00489   removePeer(Peer *p, lat::Socket *s)
00490     {
00491       ImplPeer *ip = static_cast<ImplPeer *>(p);
00492       bool needflush = ! ip->objs.empty();
00493 
00494       typename ObjectMap::iterator i, e;
00495       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00496         ip->objs.erase(i++);
00497     
00498       peers_.erase(s);
00499 
00500       // If we removed a peer with objects, our list of objects
00501       // has changed and we need to update downstream peers.
00502       if (needflush)
00503         sendLocalChanges();
00504     }
00505 
00507   virtual void
00508   sendObjectListToPeer(Bucket *msg, bool all, bool clear)
00509     {
00510       typename PeerMap::iterator pi, pe;
00511       typename ObjectMap::iterator oi, oe;
00512       size_t size = 0;
00513       size_t numobjs = 0;
00514       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00515         for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
00516           if (all || (oi->flags & DQM_PROP_NEW))
00517             size += 9*sizeof(uint32_t) + oi->dirname->size()
00518                     + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
00519                     + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
00520 
00521       msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
00522 
00523       uint32_t nupdates = 0;
00524       uint32_t words [4];
00525       words[0] = sizeof(words);
00526       words[1] = DQM_REPLY_LIST_BEGIN;
00527       words[2] = numobjs;
00528       words[3] = all;
00529       copydata(msg, &words[0], sizeof(words));
00530 
00531       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00532         for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00533           if (all || (oi->flags & DQM_PROP_NEW))
00534           {
00535             sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
00536             if (clear)
00537               const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
00538             ++nupdates;
00539           }
00540 
00541       words[1] = DQM_REPLY_LIST_END;
00542       words[2] = nupdates;
00543       copydata(msg, &words[0], sizeof(words));
00544     }
00545 
00546   virtual void
00547   sendObjectListToPeers(bool all)
00548     {
00549       typename PeerMap::iterator i, e;
00550       typename ObjectMap::iterator oi, oe;
00551       for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00552       {
00553         ImplPeer &p = i->second;
00554         if (! p.update)
00555           continue;
00556 
00557         if (debug_)
00558           logme()
00559             << "DEBUG: notifying " << p.peeraddr << std::endl;
00560 
00561         Bucket msg;
00562         msg.next = 0;
00563         sendObjectListToPeer(&msg, !p.updated || all, true);
00564 
00565         if (! msg.data.empty())
00566         {
00567           Bucket **prev = &p.sendq;
00568           while (*prev)
00569             prev = &(*prev)->next;
00570 
00571           *prev = new Bucket;
00572           (*prev)->next = 0;
00573           (*prev)->data.swap(msg.data);
00574         }
00575         p.updated = true;
00576       }
00577     }
00578 
00579   virtual void
00580   updatePeerMasks(void)
00581     {
00582       typename PeerMap::iterator i, e;
00583       for (i = peers_.begin(), e = peers_.end(); i != e; )
00584         updateMask(&(i++)->second);
00585     }
00586 
00587 protected:
00588   PeerMap               peers_;
00589 };
00590   
00591 
00592 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
00593 {
00594 public:
00595   DQMBasicNet(const std::string &appname = "");
00596 
00597   void                  reserveLocalSpace(uint32_t size);
00598   void                  updateLocalObject(Object &o);
00599   bool                  removeLocalExcept(const std::set<std::string> &known);
00600 
00601 private:
00602   ImplPeer              *local_;
00603 };
00604 
00605 
00606 #endif // DQMSERVICES_CORE_DQM_NET_H