00001 #ifndef VIS_FRAMEWORK_BASE_VIS_NET_H
00002 # define VIS_FRAMEWORK_BASE_VIS_NET_H
00003
00004 # include "classlib/iobase/InetServerSocket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017
00018 class VisNet
00019 {
00020 public:
00021 static const uint32_t VIS_MSG_HELLO = 0;
00022 static const uint32_t VIS_MSG_UPDATE_ME = 1;
00023 static const uint32_t VIS_MSG_LIST_OBJECTS = 2;
00024 static const uint32_t VIS_MSG_GET_OBJECT = 3;
00025
00026 static const uint32_t VIS_REPLY_LIST_BEGIN = 101;
00027 static const uint32_t VIS_REPLY_LIST_END = 102;
00028 static const uint32_t VIS_REPLY_NONE = 103;
00029 static const uint32_t VIS_REPLY_OBJECT = 104;
00030
00031 static const uint32_t VIS_FLAG_SCALAR = 0x1;
00032 static const uint32_t VIS_FLAG_RECEIVED = 0x10000000;
00033 static const uint32_t VIS_FLAG_NEW = 0x20000000;
00034 static const uint32_t VIS_FLAG_DEAD = 0x40000000;
00035 static const uint32_t VIS_FLAG_ZOMBIE = 0x80000000;
00036
00037 static const uint32_t MAX_PEER_WAITREQS = 128;
00038
00039 struct Peer;
00040 struct Object;
00041 struct WaitObject;
00042
00043 typedef std::vector<unsigned char> DataBlob;
00044 typedef std::list<WaitObject> WaitList;
00045 typedef std::map<std::string, Object> ObjectMap;
00046 typedef std::map<lat::Socket *, Peer> PeerMap;
00047
00048 struct Object
00049 {
00050 uint64_t version;
00051 std::string name;
00052 DataBlob rawdata;
00053 uint32_t flags;
00054 lat::Time lastreq;
00055 };
00056
00057 struct Bucket
00058 {
00059 Bucket *next;
00060 DataBlob data;
00061 };
00062
00063 struct WaitObject
00064 {
00065 lat::Time time;
00066 std::string name;
00067 std::string info;
00068 Peer *peer;
00069 };
00070
00071 struct AutoPeer;
00072 struct Peer
00073 {
00074 std::string peeraddr;
00075 lat::Socket *socket;
00076 DataBlob incoming;
00077 Bucket *sendq;
00078 size_t sendpos;
00079
00080 unsigned mask;
00081 bool source;
00082 bool update;
00083 bool updated;
00084 size_t updates;
00085 size_t waiting;
00086 AutoPeer *automatic;
00087
00088 ObjectMap objs;
00089 };
00090
00091 struct AutoPeer
00092 {
00093 Peer *peer;
00094 lat::Time next;
00095 std::string host;
00096 int port;
00097 bool update;
00098 bool warned;
00099 };
00100
00101 VisNet(const std::string &appname = "");
00102 virtual ~VisNet(void);
00103
00104 void debug(bool doit);
00105 void delay(int delay);
00106 void startLocalServer(int port);
00107 void updateToCollector(const std::string &host, int port);
00108 void listenToSource(const std::string &host, int port);
00109 void shutdown(void);
00110 void lock(void);
00111 void unlock(void);
00112
00113 void start(void);
00114 void run(void);
00115
00116 virtual int receive(void (*callback)(void *arg, uint32_t reason, Object &obj), void *arg);
00117 virtual void updateLocalObject(Object &o);
00118 virtual void removeLocalObject(const std::string &name);
00119 void sendLocalChanges(void);
00120
00121 protected:
00122 std::ostream & logme(void);
00123 static void copydata(Bucket *b, const void *data, size_t len);
00124 void sendObjectToPeer(Bucket *msg, Object &o, bool data);
00125
00126 virtual bool shouldStop(void);
00127 void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00128 virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00129 virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00130
00131 virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0);
00132 virtual Object * makeObject(Peer *p, const std::string &name);
00133 virtual void markObjectsZombies(Peer *p);
00134 virtual void markObjectsDead(Peer *p);
00135 virtual void purgeDeadObjects(lat::Time oldobj, lat::Time deadobj);
00136
00137 virtual Peer * getPeer(lat::Socket *s);
00138 virtual Peer * createPeer(lat::Socket *s);
00139 virtual void removePeer(Peer *p, lat::Socket *s);
00140 virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear);
00141 virtual void sendObjectListToPeers(bool all);
00142
00143 void updateMask(Peer *p);
00144 virtual void updatePeerMasks(void);
00145
00146 bool debug_;
00147
00148 private:
00149 static void discard(Bucket *&b);
00150 bool losePeer(const char *reason,
00151 Peer *peer,
00152 lat::IOSelectEvent *event,
00153 lat::Error *err = 0);
00154 void requestObject(Peer *p, const char *name, size_t len);
00155 void releaseFromWait(WaitList::iterator i, Object *o);
00156 void releaseWaiters(Object *o);
00157
00158 bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
00159 bool onPeerConnect(lat::IOSelectEvent *ev);
00160 bool onLocalNotify(lat::IOSelectEvent *ev);
00161
00162 std::string appname_;
00163 int pid_;
00164
00165 lat::IOSelector sel_;
00166 lat::InetServerSocket *server_;
00167 lat::Pipe wakeup_;
00168 lat::Time version_;
00169
00170 PeerMap peers_;
00171 AutoPeer upstream_;
00172 AutoPeer downstream_;
00173 WaitList waiting_;
00174 Peer *local_;
00175
00176 pthread_mutex_t lock_;
00177 pthread_t communicate_;
00178 sig_atomic_t shutdown_;
00179
00180 int delay_;
00181 bool flush_;
00182
00183
00184 VisNet(const VisNet &);
00185 VisNet &operator=(const VisNet &);
00186 };
00187
00188 #endif // VIS_FRAMEWORK_BASE_VIS_NET_H