CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/DQMServices/Core/interface/DQMNet.h

Go to the documentation of this file.
00001 #ifndef DQMSERVICES_CORE_DQM_NET_H
00002 # define DQMSERVICES_CORE_DQM_NET_H
00003 
00004 # include "classlib/iobase/Socket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017 # include <set>
00018 # include <ext/hash_set>
00019 
00020 //class DQMStore;
00021 
00022 class DQMNet
00023 {
00024 public:
00025   static const uint32_t DQM_PROP_TYPE_MASK       = 0x000000ff;
00026   static const uint32_t DQM_PROP_TYPE_SCALAR     = 0x0000000f;
00027   static const uint32_t DQM_PROP_TYPE_INVALID    = 0x00000000;
00028   static const uint32_t DQM_PROP_TYPE_INT        = 0x00000001;
00029   static const uint32_t DQM_PROP_TYPE_REAL       = 0x00000002;
00030   static const uint32_t DQM_PROP_TYPE_STRING     = 0x00000003;
00031   static const uint32_t DQM_PROP_TYPE_TH1F       = 0x00000010;
00032   static const uint32_t DQM_PROP_TYPE_TH1S       = 0x00000011;
00033   static const uint32_t DQM_PROP_TYPE_TH1D       = 0x00000012;
00034   static const uint32_t DQM_PROP_TYPE_TH2F       = 0x00000020;
00035   static const uint32_t DQM_PROP_TYPE_TH2S       = 0x00000021;
00036   static const uint32_t DQM_PROP_TYPE_TH2D       = 0x00000022;
00037   static const uint32_t DQM_PROP_TYPE_TH3F       = 0x00000030;
00038   static const uint32_t DQM_PROP_TYPE_TH3S       = 0x00000031;
00039   static const uint32_t DQM_PROP_TYPE_TH3D       = 0x00000032;
00040   static const uint32_t DQM_PROP_TYPE_TPROF      = 0x00000040;
00041   static const uint32_t DQM_PROP_TYPE_TPROF2D    = 0x00000041;
00042   static const uint32_t DQM_PROP_TYPE_DATABLOB   = 0x00000050;
00043   
00044   static const uint32_t DQM_PROP_REPORT_MASK     = 0x00000f00;
00045   static const uint32_t DQM_PROP_REPORT_CLEAR    = 0x00000000;
00046   static const uint32_t DQM_PROP_REPORT_ERROR    = 0x00000100;
00047   static const uint32_t DQM_PROP_REPORT_WARN     = 0x00000200;
00048   static const uint32_t DQM_PROP_REPORT_OTHER    = 0x00000400;
00049   static const uint32_t DQM_PROP_REPORT_ALARM    = (DQM_PROP_REPORT_ERROR
00050                                                     | DQM_PROP_REPORT_WARN
00051                                                     | DQM_PROP_REPORT_OTHER);
00052 
00053   static const uint32_t DQM_PROP_HAS_REFERENCE   = 0x00001000;
00054   static const uint32_t DQM_PROP_TAGGED          = 0x00002000;
00055   static const uint32_t DQM_PROP_ACCUMULATE      = 0x00004000;
00056   static const uint32_t DQM_PROP_RESET           = 0x00008000;
00057 
00058   static const uint32_t DQM_PROP_NEW             = 0x00010000;
00059   static const uint32_t DQM_PROP_RECEIVED        = 0x00020000;
00060   static const uint32_t DQM_PROP_LUMI            = 0x00040000;
00061   static const uint32_t DQM_PROP_DEAD            = 0x00080000;
00062   static const uint32_t DQM_PROP_STALE           = 0x00100000;
00063   static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
00064 
00065   static const uint32_t DQM_MSG_HELLO            = 0;
00066   static const uint32_t DQM_MSG_UPDATE_ME        = 1;
00067   static const uint32_t DQM_MSG_LIST_OBJECTS     = 2;
00068   static const uint32_t DQM_MSG_GET_OBJECT       = 3;
00069 
00070   static const uint32_t DQM_REPLY_LIST_BEGIN     = 101;
00071   static const uint32_t DQM_REPLY_LIST_END       = 102;
00072   static const uint32_t DQM_REPLY_NONE           = 103;
00073   static const uint32_t DQM_REPLY_OBJECT         = 104;
00074 
00075   static const uint32_t MAX_PEER_WAITREQS        = 128;
00076 
00077   struct Peer;
00078   struct QValue;
00079   struct WaitObject;
00080 
00081   typedef std::vector<unsigned char>    DataBlob;
00082   typedef std::vector<QValue>           QReports;
00083   typedef std::vector<uint32_t>         TagList; // DEPRECATED
00084   typedef std::list<WaitObject>         WaitList;
00085 
00086   struct QValue
00087   {
00088     int                 code;
00089     float               qtresult;
00090     std::string         message;
00091     std::string         qtname;
00092     std::string         algorithm;
00093   };
00094 
00095   struct CoreObject
00096   {
00097     uint32_t            flags;
00098     uint32_t            tag;
00099     uint64_t            version;
00100     const std::string   *dirname;
00101     std::string         objname;
00102     QReports            qreports;
00103   };
00104   
00105   struct Object : CoreObject
00106   {
00107     uint64_t            hash;
00108     uint64_t            lastreq;
00109     DataBlob            rawdata;
00110     std::string         scalar;
00111     std::string         qdata;
00112   };
00113 
00114   struct Bucket
00115   {
00116     Bucket              *next;
00117     DataBlob            data;
00118   };
00119 
00120   struct WaitObject
00121   {
00122     lat::Time           time;
00123     std::string         name;
00124     std::string         info;
00125     Peer                *peer;
00126   };
00127 
00128   struct AutoPeer;
00129   struct Peer
00130   {
00131     std::string         peeraddr;
00132     lat::Socket         *socket;
00133     DataBlob            incoming;
00134     Bucket              *sendq;
00135     size_t              sendpos;
00136 
00137     unsigned            mask;
00138     bool                source;
00139     bool                update;
00140     bool                updated;
00141     size_t              updates;
00142     size_t              waiting;
00143     AutoPeer            *automatic;
00144   };
00145 
00146   struct AutoPeer
00147   {
00148     Peer                *peer;
00149     lat::Time           next;
00150     std::string         host;
00151     int                 port;
00152     bool                update;
00153   };
00154 
00155   DQMNet(const std::string &appname = "");
00156   virtual ~DQMNet(void);
00157 
00158   void                  debug(bool doit);
00159   void                  delay(int delay);
00160   void                  startLocalServer(int port);
00161   void                  startLocalServer(const char *path);
00162   void                  staleObjectWaitLimit(lat::TimeSpan time);
00163   void                  updateToCollector(const std::string &host, int port);
00164   void                  listenToCollector(const std::string &host, int port);
00165   void                  shutdown(void);
00166   void                  lock(void);
00167   void                  unlock(void);
00168 
00169   void                  start(void);
00170   void                  run(void);
00171 
00172   void                  sendLocalChanges(void);
00173 
00174   static bool setOrder(const CoreObject &a, const CoreObject &b)
00175     {
00176       int diff = a.dirname->compare(*b.dirname);
00177       return (diff < 0 ? true
00178               : diff == 0 ? a.objname < b.objname
00179               : false);
00180     }
00181 
00182   struct HashOp
00183   {
00184     uint32_t operator()(const Object &a) const
00185       {
00186         return a.hash;
00187       }
00188   };
00189 
00190   struct HashEqual
00191   {
00192     bool operator()(const Object &a, const Object &b) const
00193       {
00194         return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
00195       }
00196   };
00197 
00198   static size_t
00199   dqmhash(const void *key, size_t keylen)
00200     {
00201       // Reduced version of Bob Jenkins' hash function at:
00202       //   http://www.burtleburtle.net/bob/c/lookup3.c
00203 #     define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
00204 #     define dqmhashmix(a,b,c) { \
00205         a -= c; a ^= dqmhashrot(c, 4); c += b; \
00206         b -= a; b ^= dqmhashrot(a, 6); a += c; \
00207         c -= b; c ^= dqmhashrot(b, 8); b += a; \
00208         a -= c; a ^= dqmhashrot(c,16); c += b; \
00209         b -= a; b ^= dqmhashrot(a,19); a += c; \
00210         c -= b; c ^= dqmhashrot(b, 4); b += a; }
00211 #     define dqmhashfinal(a,b,c) { \
00212         c ^= b; c -= dqmhashrot(b,14); \
00213         a ^= c; a -= dqmhashrot(c,11); \
00214         b ^= a; b -= dqmhashrot(a,25); \
00215         c ^= b; c -= dqmhashrot(b,16); \
00216         a ^= c; a -= dqmhashrot(c,4);  \
00217         b ^= a; b -= dqmhashrot(a,14); \
00218         c ^= b; c -= dqmhashrot(b,24); }
00219  
00220       uint32_t a, b, c;
00221       a = b = c = 0xdeadbeef + (uint32_t) keylen;
00222       const unsigned char *k = (const unsigned char *) key;
00223 
00224       // all but the last block: affect some bits of (a, b, c)
00225       while (keylen > 12)
00226       {
00227         a += k[0];
00228         a += ((uint32_t)k[1]) << 8;
00229         a += ((uint32_t)k[2]) << 16;
00230         a += ((uint32_t)k[3]) << 24;
00231         b += k[4];
00232         b += ((uint32_t)k[5]) << 8;
00233         b += ((uint32_t)k[6]) << 16;
00234         b += ((uint32_t)k[7]) << 24;
00235         c += k[8];
00236         c += ((uint32_t)k[9]) << 8;
00237         c += ((uint32_t)k[10]) << 16;
00238         c += ((uint32_t)k[11]) << 24;
00239         dqmhashmix(a,b,c);
00240         keylen -= 12;
00241         k += 12;
00242       }
00243 
00244       // last block: affect all 32 bits of (c); all case statements fall through
00245       switch (keylen)
00246       {
00247       case 12: c += ((uint32_t)k[11]) << 24;
00248       case 11: c += ((uint32_t)k[10]) << 16;
00249       case 10: c += ((uint32_t)k[9]) << 8;
00250       case 9 : c += k[8];
00251       case 8 : b += ((uint32_t)k[7]) << 24;
00252       case 7 : b += ((uint32_t)k[6]) << 16;
00253       case 6 : b += ((uint32_t)k[5]) << 8;
00254       case 5 : b += k[4];
00255       case 4 : a += ((uint32_t)k[3]) << 24;
00256       case 3 : a += ((uint32_t)k[2]) << 16;
00257       case 2 : a += ((uint32_t)k[1]) << 8;
00258       case 1 : a += k[0];
00259                break;
00260       case 0 : return c;
00261       }
00262 
00263       dqmhashfinal(a, b, c);
00264       return c;
00265 #     undef dqmhashrot
00266 #     undef dqmhashmix
00267 #     undef dqmhashfinal
00268     }
00269 
00270   static void           packQualityData(std::string &into, const QReports &qr);
00271   static void           unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
00272 
00273 protected:
00274   std::ostream &        logme(void);
00275   static void           copydata(Bucket *b, const void *data, size_t len);
00276   virtual void          sendObjectToPeer(Bucket *msg, Object &o, bool data);
00277 
00278   virtual bool          shouldStop(void);
00279   void                  waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00280   virtual void          releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00281   virtual bool          onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00282 
00283   // bool                       reconstructObject(Object &o);
00284   // bool                       reinstateObject(DQMStore *store, Object &o);
00285   virtual Object *      findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
00286   virtual Object *      makeObject(Peer *p, const std::string &name) = 0;
00287   virtual void          markObjectsDead(Peer *p) = 0;
00288   virtual void          purgeDeadObjects(Peer *p) = 0;
00289 
00290   virtual Peer *        getPeer(lat::Socket *s) = 0;
00291   virtual Peer *        createPeer(lat::Socket *s) = 0;
00292   virtual void          removePeer(Peer *p, lat::Socket *s) = 0;
00293   virtual void          sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
00294   virtual void          sendObjectListToPeers(bool all) = 0;
00295 
00296   void                  updateMask(Peer *p);
00297   virtual void          updatePeerMasks(void) = 0;
00298   static void           discard(Bucket *&b);
00299 
00300   bool                  debug_;
00301   pthread_mutex_t       lock_;
00302 
00303 private:
00304   void                  losePeer(const char *reason,
00305                                  Peer *peer,
00306                                  lat::IOSelectEvent *event,
00307                                  lat::Error *err = 0);
00308   void                  requestObjectData(Peer *p, const char *name, size_t len);
00309   void                  releaseFromWait(WaitList::iterator i, Object *o);
00310   void                  releaseWaiters(const std::string &name, Object *o);
00311 
00312   bool                  onPeerData(lat::IOSelectEvent *ev, Peer *p);
00313   bool                  onPeerConnect(lat::IOSelectEvent *ev);
00314   bool                  onLocalNotify(lat::IOSelectEvent *ev);
00315 
00316   std::string           appname_;
00317   int                   pid_;
00318 
00319   lat::IOSelector       sel_;
00320   lat::Socket           *server_;
00321   lat::Pipe             wakeup_;
00322   lat::Time             version_;
00323 
00324   AutoPeer              upstream_;
00325   AutoPeer              downstream_;
00326   WaitList              waiting_;
00327 
00328   pthread_t             communicate_;
00329   sig_atomic_t          shutdown_;
00330 
00331   int                   delay_;
00332   lat::TimeSpan         waitStale_;
00333   lat::TimeSpan         waitMax_;
00334   bool                  flush_;
00335 
00336   // copying is not available
00337   DQMNet(const DQMNet &);
00338   DQMNet &operator=(const DQMNet &);
00339 };
00340 
00341 template <class ObjType>
00342 class DQMImplNet : public DQMNet
00343 {
00344 public:
00345   struct ImplPeer;
00346 
00347   typedef std::set<std::string> DirMap;
00348   typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
00349   typedef std::map<lat::Socket *, ImplPeer> PeerMap;
00350   struct ImplPeer : Peer
00351   {
00352     ImplPeer(void) {}
00353     ObjectMap objs;
00354     DirMap dirs;
00355   };
00356 
00357   DQMImplNet(const std::string &appname = "")
00358     : DQMNet(appname)
00359     {}
00360   
00361   ~DQMImplNet(void)
00362     {}
00363 
00364 protected:
00365   virtual Object *
00366   findObject(Peer *p, const std::string &name, Peer **owner = 0)
00367     {
00368       size_t slash = name.rfind('/');
00369       size_t dirpos = (slash == std::string::npos ? 0 : slash);
00370       size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00371       std::string path(name, 0, dirpos);
00372       ObjType proto;
00373       proto.hash = dqmhash(name.c_str(), name.size());
00374       proto.dirname = &path;
00375       proto.objname.append(name, namepos, std::string::npos);
00376 
00377       typename ObjectMap::iterator pos;
00378       typename PeerMap::iterator i, e;
00379       if (owner)
00380         *owner = 0;
00381       if (p)
00382       {
00383         ImplPeer *ip = static_cast<ImplPeer *>(p);
00384         pos = ip->objs.find(proto);
00385         if (pos == ip->objs.end())
00386           return 0;
00387         else
00388         {
00389           if (owner) *owner = ip;
00390           return const_cast<ObjType *>(&*pos);
00391         }
00392       }
00393       else
00394       {
00395         for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00396         {
00397           pos = i->second.objs.find(proto);
00398           if (pos != i->second.objs.end())
00399           {
00400             if (owner) *owner = &i->second;
00401             return const_cast<ObjType *>(&*pos);
00402           }
00403         }
00404         return 0;
00405       }
00406     }
00407 
00408   virtual Object *
00409   makeObject(Peer *p, const std::string &name)
00410     {
00411       ImplPeer *ip = static_cast<ImplPeer *>(p);
00412       size_t slash = name.rfind('/');
00413       size_t dirpos = (slash == std::string::npos ? 0 : slash);
00414       size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00415       ObjType o;
00416       o.flags = 0;
00417       o.tag = 0;
00418       o.version = 0;
00419       o.lastreq = 0;
00420       o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
00421       o.objname.append(name, namepos, std::string::npos);
00422       o.hash = dqmhash(name.c_str(), name.size());
00423       return const_cast<ObjType *>(&*ip->objs.insert(o).first);
00424     }
00425 
00426   // Mark all the objects dead.  This is intended to be used when
00427   // starting to process a complete list of objects, in order to
00428   // flag the objects that need to be killed at the end.  After
00429   // call to this method, revive all live objects by removing the
00430   // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
00431   // to remove the dead ones.  This also turns off object request
00432   // for objects we've lost interest in.
00433   virtual void
00434   markObjectsDead(Peer *p)
00435     {
00436       uint64_t minreq
00437         = (lat::Time::current()
00438           - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
00439       ImplPeer *ip = static_cast<ImplPeer *>(p);
00440       typename ObjectMap::iterator i, e;
00441       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00442       {
00443         if (i->lastreq && i->lastreq < minreq)
00444           const_cast<ObjType &>(*i).lastreq = 0;
00445         const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
00446       }
00447     }
00448 
00449   // Mark remaining zombie objects as dead.  See markObjectsDead().
00450   virtual void
00451   purgeDeadObjects(Peer *p)
00452     {
00453       ImplPeer *ip = static_cast<ImplPeer *>(p);
00454       typename ObjectMap::iterator i, e;
00455       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00456       {
00457         if (i->flags & DQM_PROP_DEAD)
00458           ip->objs.erase(i++);
00459         else
00460           ++i;
00461       }
00462     }
00463 
00464   virtual Peer *
00465   getPeer(lat::Socket *s)
00466     {
00467       typename PeerMap::iterator pos = peers_.find(s);
00468       typename PeerMap::iterator end = peers_.end();
00469       return pos == end ? 0 : &pos->second;
00470     }
00471 
00472   virtual Peer *
00473   createPeer(lat::Socket *s)
00474     {
00475       ImplPeer *ip = &peers_[s];
00476       ip->socket = 0;
00477       ip->sendq = 0;
00478       ip->sendpos = 0;
00479       ip->mask = 0;
00480       ip->source = false;
00481       ip->update = false;
00482       ip->updated = false;
00483       ip->updates = 0;
00484       ip->waiting = 0;
00485       ip->automatic = 0;
00486       return ip;
00487     }
00488 
00489   virtual void
00490   removePeer(Peer *p, lat::Socket *s)
00491     {
00492       ImplPeer *ip = static_cast<ImplPeer *>(p);
00493       bool needflush = ! ip->objs.empty();
00494 
00495       typename ObjectMap::iterator i, e;
00496       for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00497         ip->objs.erase(i++);
00498     
00499       peers_.erase(s);
00500 
00501       // If we removed a peer with objects, our list of objects
00502       // has changed and we need to update downstream peers.
00503       if (needflush)
00504         sendLocalChanges();
00505     }
00506 
00508   virtual void
00509   sendObjectListToPeer(Bucket *msg, bool all, bool clear)
00510     {
00511       typename PeerMap::iterator pi, pe;
00512       typename ObjectMap::iterator oi, oe;
00513       size_t size = 0;
00514       size_t numobjs = 0;
00515       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00516         for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
00517           if (all || (oi->flags & DQM_PROP_NEW))
00518             size += 9*sizeof(uint32_t) + oi->dirname->size()
00519                     + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
00520                     + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
00521 
00522       msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
00523 
00524       uint32_t nupdates = 0;
00525       uint32_t words [4];
00526       words[0] = sizeof(words);
00527       words[1] = DQM_REPLY_LIST_BEGIN;
00528       words[2] = numobjs;
00529       words[3] = all;
00530       copydata(msg, &words[0], sizeof(words));
00531 
00532       for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00533         for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00534           if (all || (oi->flags & DQM_PROP_NEW))
00535           {
00536             sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
00537             if (clear)
00538               const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
00539             ++nupdates;
00540           }
00541 
00542       words[1] = DQM_REPLY_LIST_END;
00543       words[2] = nupdates;
00544       copydata(msg, &words[0], sizeof(words));
00545     }
00546 
00547   virtual void
00548   sendObjectListToPeers(bool all)
00549     {
00550       typename PeerMap::iterator i, e;
00551       typename ObjectMap::iterator oi, oe;
00552       for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00553       {
00554         ImplPeer &p = i->second;
00555         if (! p.update)
00556           continue;
00557 
00558         if (debug_)
00559           logme()
00560             << "DEBUG: notifying " << p.peeraddr << std::endl;
00561 
00562         Bucket msg;
00563         msg.next = 0;
00564         sendObjectListToPeer(&msg, !p.updated || all, true);
00565 
00566         if (! msg.data.empty())
00567         {
00568           Bucket **prev = &p.sendq;
00569           while (*prev)
00570             prev = &(*prev)->next;
00571 
00572           *prev = new Bucket;
00573           (*prev)->next = 0;
00574           (*prev)->data.swap(msg.data);
00575         }
00576         p.updated = true;
00577       }
00578     }
00579 
00580   virtual void
00581   updatePeerMasks(void)
00582     {
00583       typename PeerMap::iterator i, e;
00584       for (i = peers_.begin(), e = peers_.end(); i != e; )
00585         updateMask(&(i++)->second);
00586     }
00587 
00588 protected:
00589   PeerMap               peers_;
00590 };
00591   
00592 
00593 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
00594 {
00595 public:
00596   DQMBasicNet(const std::string &appname = "");
00597 
00598   void                  reserveLocalSpace(uint32_t size);
00599   void                  updateLocalObject(Object &o);
00600   bool                  removeLocalExcept(const std::set<std::string> &known);
00601 
00602 private:
00603   ImplPeer              *local_;
00604 };
00605 
00606 
00607 #endif // DQMSERVICES_CORE_DQM_NET_H