CMS 3D CMS Logo

VisNet.h

Go to the documentation of this file.
00001 #ifndef VIS_FRAMEWORK_BASE_VIS_NET_H
00002 # define VIS_FRAMEWORK_BASE_VIS_NET_H
00003 
00004 # include "classlib/iobase/InetServerSocket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017 
00018 class VisNet
00019 {
00020 public:
00021   static const uint32_t VIS_MSG_HELLO           = 0;
00022   static const uint32_t VIS_MSG_UPDATE_ME       = 1;
00023   static const uint32_t VIS_MSG_LIST_OBJECTS    = 2;
00024   static const uint32_t VIS_MSG_GET_OBJECT      = 3;
00025 
00026   static const uint32_t VIS_REPLY_LIST_BEGIN    = 101;
00027   static const uint32_t VIS_REPLY_LIST_END      = 102;
00028   static const uint32_t VIS_REPLY_NONE          = 103;
00029   static const uint32_t VIS_REPLY_OBJECT        = 104;
00030 
00031   static const uint32_t VIS_FLAG_SCALAR         = 0x1;
00032   static const uint32_t VIS_FLAG_RECEIVED       = 0x10000000;
00033   static const uint32_t VIS_FLAG_NEW            = 0x20000000;
00034   static const uint32_t VIS_FLAG_DEAD           = 0x40000000;
00035   static const uint32_t VIS_FLAG_ZOMBIE         = 0x80000000;
00036 
00037   static const uint32_t MAX_PEER_WAITREQS       = 128;
00038 
00039   struct Peer;
00040   struct Object;
00041   struct WaitObject;
00042 
00043   typedef std::vector<unsigned char>    DataBlob;
00044   typedef std::list<WaitObject>         WaitList;
00045   typedef std::map<std::string, Object> ObjectMap;
00046   typedef std::map<lat::Socket *, Peer> PeerMap;
00047 
00048   struct Object
00049   {
00050     uint64_t            version;
00051     std::string         name;
00052     DataBlob            rawdata;
00053     uint32_t            flags;
00054     lat::Time           lastreq;
00055   };
00056   
00057   struct Bucket
00058   {
00059     Bucket              *next;
00060     DataBlob            data;
00061   };
00062 
00063   struct WaitObject
00064   {
00065     lat::Time           time;
00066     std::string         name;
00067     std::string         info;
00068     Peer                *peer;
00069   };
00070 
00071   struct AutoPeer;
00072   struct Peer
00073   {
00074     std::string         peeraddr;
00075     lat::Socket         *socket;
00076     DataBlob            incoming;
00077     Bucket              *sendq;
00078     size_t              sendpos;
00079 
00080     unsigned            mask;
00081     bool                source;
00082     bool                update;
00083     bool                updated;
00084     size_t              updates;
00085     size_t              waiting;
00086     AutoPeer            *automatic;
00087 
00088     ObjectMap           objs;
00089   };
00090 
00091   struct AutoPeer
00092   {
00093     Peer                *peer;
00094     lat::Time           next;
00095     std::string         host;
00096     int                 port;
00097     bool                update;
00098     bool                warned;
00099   };
00100 
00101   VisNet(const std::string &appname = "");
00102   virtual ~VisNet(void);
00103 
00104   void                  debug(bool doit);
00105   void                  delay(int delay);
00106   void                  startLocalServer(int port);
00107   void                  updateToCollector(const std::string &host, int port);
00108   void                  listenToSource(const std::string &host, int port);
00109   void                  shutdown(void);
00110   void                  lock(void);
00111   void                  unlock(void);
00112 
00113   void                  start(void);
00114   void                  run(void);
00115 
00116   virtual int           receive(void (*callback)(void *arg, uint32_t reason, Object &obj), void *arg);
00117   virtual void          updateLocalObject(Object &o);
00118   virtual void          removeLocalObject(const std::string &name);
00119   void                  sendLocalChanges(void);
00120 
00121 protected:
00122   std::ostream &        logme(void);
00123   static void           copydata(Bucket *b, const void *data, size_t len);
00124   void                  sendObjectToPeer(Bucket *msg, Object &o, bool data);
00125 
00126   virtual bool          shouldStop(void);
00127   void                  waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00128   virtual void          releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00129   virtual bool          onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00130 
00131   virtual Object *      findObject(Peer *p, const std::string &name, Peer **owner = 0);
00132   virtual Object *      makeObject(Peer *p, const std::string &name);
00133   virtual void          markObjectsZombies(Peer *p);
00134   virtual void          markObjectsDead(Peer *p);
00135   virtual void          purgeDeadObjects(lat::Time oldobj, lat::Time deadobj);
00136 
00137   virtual Peer *        getPeer(lat::Socket *s);
00138   virtual Peer *        createPeer(lat::Socket *s);
00139   virtual void          removePeer(Peer *p, lat::Socket *s);
00140   virtual void          sendObjectListToPeer(Bucket *msg, bool all, bool clear);
00141   virtual void          sendObjectListToPeers(bool all);
00142 
00143   void                  updateMask(Peer *p);
00144   virtual void          updatePeerMasks(void);
00145 
00146   bool                  debug_;
00147 
00148 private:
00149   static void           discard(Bucket *&b);
00150   bool                  losePeer(const char *reason,
00151                                  Peer *peer,
00152                                  lat::IOSelectEvent *event,
00153                                  lat::Error *err = 0);
00154   void                  requestObject(Peer *p, const char *name, size_t len);
00155   void                  releaseFromWait(WaitList::iterator i, Object *o);
00156   void                  releaseWaiters(Object *o);
00157 
00158   bool                  onPeerData(lat::IOSelectEvent *ev, Peer *p);
00159   bool                  onPeerConnect(lat::IOSelectEvent *ev);
00160   bool                  onLocalNotify(lat::IOSelectEvent *ev);
00161 
00162   std::string           appname_;
00163   int                   pid_;
00164 
00165   lat::IOSelector       sel_;
00166   lat::InetServerSocket *server_;
00167   lat::Pipe             wakeup_;
00168   lat::Time             version_;
00169 
00170   PeerMap               peers_;
00171   AutoPeer              upstream_;
00172   AutoPeer              downstream_;
00173   WaitList              waiting_;
00174   Peer                  *local_;
00175 
00176   pthread_mutex_t       lock_;
00177   pthread_t             communicate_;
00178   sig_atomic_t          shutdown_;
00179 
00180   int                   delay_;
00181   bool                  flush_;
00182 
00183   // copying is not available
00184   VisNet(const VisNet &);
00185   VisNet &operator=(const VisNet &);
00186 };
00187 
00188 #endif // VIS_FRAMEWORK_BASE_VIS_NET_H

Generated on Tue Jun 9 17:49:55 2009 for CMSSW by  doxygen 1.5.4