00001 #ifndef DQMSERVICES_CORE_DQM_NET_H
00002 # define DQMSERVICES_CORE_DQM_NET_H
00003
00004 # include "classlib/iobase/InetServerSocket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017
00018 class TObject;
00019 class DQMStore;
00020
00021 class DQMNet
00022 {
00023 public:
00024 static const uint32_t DQM_MSG_HELLO = 0;
00025 static const uint32_t DQM_MSG_UPDATE_ME = 1;
00026 static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
00027 static const uint32_t DQM_MSG_GET_OBJECT = 3;
00028
00029 static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
00030 static const uint32_t DQM_REPLY_LIST_END = 102;
00031 static const uint32_t DQM_REPLY_NONE = 103;
00032 static const uint32_t DQM_REPLY_OBJECT = 104;
00033
00034 static const uint32_t DQM_FLAG_REPORT_ERROR = 0x1;
00035 static const uint32_t DQM_FLAG_REPORT_WARNING = 0x2;
00036 static const uint32_t DQM_FLAG_REPORT_OTHER = 0x4;
00037 static const uint32_t DQM_FLAG_SCALAR = 0x8;
00038 static const uint32_t DQM_FLAG_ZOMBIE = 0x08000000;
00039 static const uint32_t DQM_FLAG_TEXT = 0x10000000;
00040 static const uint32_t DQM_FLAG_RECEIVED = 0x20000000;
00041 static const uint32_t DQM_FLAG_NEW = 0x40000000;
00042 static const uint32_t DQM_FLAG_DEAD = 0x80000000;
00043
00044 static const uint32_t MAX_PEER_WAITREQS = 128;
00045
00046 struct Peer;
00047 struct QValue;
00048 struct WaitObject;
00049
00050 typedef std::vector<unsigned char> DataBlob;
00051 typedef std::vector<uint32_t> TagList;
00052 typedef std::vector<QValue> QReports;
00053 typedef std::list<WaitObject> WaitList;
00054
00055 struct QValue
00056 {
00057 int code;
00058 float qtresult;
00059 std::string message;
00060 std::string qtname;
00061 std::string algorithm;
00062 };
00063
00064 struct CoreObject
00065 {
00066 uint64_t version;
00067 std::string name;
00068 TagList tags;
00069 TObject *object;
00070 TObject *reference;
00071 QReports qreports;
00072 uint32_t flags;
00073 };
00074
00075 struct Object : CoreObject
00076 {
00077 DataBlob rawdata;
00078 lat::Time lastreq;
00079 };
00080
00081 struct Bucket
00082 {
00083 Bucket *next;
00084 DataBlob data;
00085 };
00086
00087 struct WaitObject
00088 {
00089 lat::Time time;
00090 std::string name;
00091 std::string info;
00092 Peer *peer;
00093 };
00094
00095 struct AutoPeer;
00096 struct Peer
00097 {
00098 std::string peeraddr;
00099 lat::Socket *socket;
00100 DataBlob incoming;
00101 Bucket *sendq;
00102 size_t sendpos;
00103
00104 unsigned mask;
00105 bool source;
00106 bool update;
00107 bool updated;
00108 bool updatefull;
00109 size_t updates;
00110 size_t waiting;
00111 AutoPeer *automatic;
00112 };
00113
00114 struct AutoPeer
00115 {
00116 Peer *peer;
00117 lat::Time next;
00118 std::string host;
00119 int port;
00120 bool update;
00121 bool warned;
00122 };
00123
00124 DQMNet(const std::string &appname = "");
00125 virtual ~DQMNet(void);
00126
00127 void debug(bool doit);
00128 void delay(int delay);
00129 void sendScalarAsText(bool doit);
00130 void requestFullUpdates(bool doit);
00131 void startLocalServer(int port);
00132 void updateToCollector(const std::string &host, int port);
00133 void listenToCollector(const std::string &host, int port);
00134 void shutdown(void);
00135 void lock(void);
00136 void unlock(void);
00137
00138 void start(void);
00139 void run(void);
00140
00141 virtual int receive(DQMStore *store);
00142 virtual void updateLocalObject(Object &o);
00143 virtual void removeLocalObject(const std::string &name);
00144 void sendLocalChanges(void);
00145
00146 protected:
00147 std::ostream & logme(void);
00148 static void copydata(Bucket *b, const void *data, size_t len);
00149 bool extractScalarData(DataBlob &objdata, Object &o);
00150 void sendObjectToPeer(Bucket *msg, Object &o, bool data, bool text);
00151
00152 virtual bool shouldStop(void);
00153 void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00154 virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00155 virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00156
00157 bool reconstructObject(Object &o);
00158 bool reinstateObject(DQMStore *store, Object &o);
00159 virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
00160 virtual Object * makeObject(Peer *p, const std::string &name) = 0;
00161 virtual void markObjectsZombies(Peer *p) = 0;
00162 virtual void markObjectsDead(Peer *p) = 0;
00163 virtual void purgeDeadObjects(lat::Time oldobj, lat::Time deadobj) = 0;
00164
00165 virtual Peer * getPeer(lat::Socket *s) = 0;
00166 virtual Peer * createPeer(lat::Socket *s) = 0;
00167 virtual void removePeer(Peer *p, lat::Socket *s) = 0;
00168 virtual void sendObjectListToPeer(Bucket *msg, bool data, bool all, bool clear) = 0;
00169 virtual void sendObjectListToPeers(bool all) = 0;
00170 virtual void requestFullUpdatesFromPeers(void) = 0;
00171
00172 void updateMask(Peer *p);
00173 virtual void updatePeerMasks(void) = 0;
00174
00175 bool debug_;
00176 bool sendScalarAsText_;
00177 bool requestFullUpdates_;
00178
00179 private:
00180 static void discard(Bucket *&b);
00181 bool losePeer(const char *reason,
00182 Peer *peer,
00183 lat::IOSelectEvent *event,
00184 lat::Error *err = 0);
00185 void requestObject(Peer *p, const char *name, size_t len);
00186 void releaseFromWait(WaitList::iterator i, Object *o);
00187 void releaseWaiters(Object *o);
00188
00189 bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
00190 bool onPeerConnect(lat::IOSelectEvent *ev);
00191 bool onLocalNotify(lat::IOSelectEvent *ev);
00192
00193 std::string appname_;
00194 int pid_;
00195
00196 lat::IOSelector sel_;
00197 lat::InetServerSocket *server_;
00198 lat::Pipe wakeup_;
00199 lat::Time version_;
00200
00201 AutoPeer upstream_;
00202 AutoPeer downstream_;
00203 WaitList waiting_;
00204
00205 pthread_mutex_t lock_;
00206 pthread_t communicate_;
00207 sig_atomic_t shutdown_;
00208
00209 int delay_;
00210 bool flush_;
00211
00212
00213 DQMNet(const DQMNet &);
00214 DQMNet &operator=(const DQMNet &);
00215 };
00216
00217 template <class ObjType>
00218 class DQMImplNet : public DQMNet
00219 {
00220 public:
00221 struct ImplPeer;
00222 typedef std::map<std::string, ObjType> ObjectMap;
00223 typedef std::map<lat::Socket *, ImplPeer> PeerMap;
00224 struct ImplPeer : Peer
00225 {
00226 ObjectMap objs;
00227 };
00228
00229 DQMImplNet(const std::string &appname = "")
00230 : DQMNet(appname)
00231 {}
00232
00233 ~DQMImplNet(void)
00234 {
00235 typename PeerMap::iterator pi, pe;
00236 typename ObjectMap::iterator oi, oe;
00237 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00238 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00239 {
00240 ObjType &o = oi->second;
00241 delete o.object;
00242 delete o.reference;
00243 o.object = 0;
00244 o.reference = 0;
00245 }
00246 }
00247
00248 protected:
00249 virtual Object *
00250 findObject(Peer *p, const std::string &name, Peer **owner = 0)
00251 {
00252 typename ObjectMap::iterator pos;
00253 typename PeerMap::iterator i, e;
00254 if (owner)
00255 *owner = 0;
00256 if (p)
00257 {
00258 ImplPeer *ip = static_cast<ImplPeer *>(p);
00259 pos = ip->objs.find(name);
00260 if (pos == ip->objs.end())
00261 return 0;
00262 else
00263 {
00264 if (owner) *owner = ip;
00265 return &pos->second;
00266 }
00267 }
00268 else
00269 {
00270 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00271 {
00272 pos = i->second.objs.find(name);
00273 if (pos != i->second.objs.end())
00274 {
00275 if (owner) *owner = &i->second;
00276 return &pos->second;
00277 }
00278 }
00279 return 0;
00280 }
00281 }
00282
00283 virtual Object *
00284 makeObject(Peer *p, const std::string &name)
00285 {
00286 ImplPeer *ip = static_cast<ImplPeer *>(p);
00287 ObjType *o = &ip->objs[name];
00288 o->version = 0;
00289 o->name = name;
00290 o->object = 0;
00291 o->reference = 0;
00292 o->flags = 0;
00293 o->lastreq = 0;
00294 return o;
00295 }
00296
00297
00298
00299
00300
00301
00302
00303 virtual void
00304 markObjectsZombies(Peer *p)
00305 {
00306 ImplPeer *ip = static_cast<ImplPeer *>(p);
00307 typename ObjectMap::iterator i, e;
00308 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00309 i->second.flags |= DQM_FLAG_ZOMBIE;
00310 }
00311
00312
00313 virtual void
00314 markObjectsDead(Peer *p)
00315 {
00316 ImplPeer *ip = static_cast<ImplPeer *>(p);
00317 typename ObjectMap::iterator i, e;
00318 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00319 if (i->second.flags & DQM_FLAG_ZOMBIE)
00320 i->second.flags = (i->second.flags & ~DQM_FLAG_ZOMBIE) | DQM_FLAG_DEAD;
00321 }
00322
00323
00324 virtual void
00325 purgeDeadObjects(lat::Time oldobj, lat::Time deadobj)
00326 {
00327 typename PeerMap::iterator pi, pe;
00328 typename ObjectMap::iterator oi, oe;
00329 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00330 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
00331 {
00332 ObjType &o = oi->second;
00333
00334
00335
00336 if (o.lastreq < oldobj && o.object && ! (o.flags & DQM_FLAG_SCALAR))
00337 {
00338 if (debug_)
00339 logme()
00340 << "DEBUG: compacting idle '" << o.name
00341 << "' from " << pi->second.peeraddr << std::endl;
00342
00343 delete o.object;
00344 delete o.reference;
00345 o.object = 0;
00346 o.reference = 0;
00347 purgeDeadObject(o);
00348 }
00349
00350
00351 if (o.lastreq < deadobj
00352 && o.version < deadobj
00353 && (o.flags & DQM_FLAG_DEAD))
00354 {
00355 if (debug_)
00356 logme()
00357 << "DEBUG: removing dead '" << o.name
00358 << "' from " << pi->second.peeraddr << std::endl;
00359
00360 pi->second.objs.erase(oi++);
00361 }
00362 else
00363 ++oi;
00364 }
00365 }
00366
00367 virtual void
00368 purgeDeadObject(ObjType &o)
00369 {}
00370
00371 virtual Peer *
00372 getPeer(lat::Socket *s)
00373 {
00374 typename PeerMap::iterator pos = peers_.find(s);
00375 return pos == peers_.end() ? 0 : &pos->second;
00376 }
00377
00378 virtual Peer *
00379 createPeer(lat::Socket *s)
00380 {
00381 ImplPeer *ip = &peers_[s];
00382 ip->socket = 0;
00383 ip->sendq = 0;
00384 ip->sendpos = 0;
00385 ip->mask = 0;
00386 ip->source = false;
00387 ip->update = false;
00388 ip->updated = false;
00389 ip->updatefull = false;
00390 ip->updates = 0;
00391 ip->waiting = 0;
00392 ip->automatic = 0;
00393 return ip;
00394 }
00395
00396 virtual void
00397 removePeer(Peer *p, lat::Socket *s)
00398 {
00399 ImplPeer *ip = static_cast<ImplPeer *>(p);
00400 bool needflush = ! ip->objs.empty();
00401
00402 typename ObjectMap::iterator i, e;
00403 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00404 {
00405 ObjType &o = i->second;
00406 delete o.object;
00407 delete o.reference;
00408 ip->objs.erase(i++);
00409 }
00410
00411 peers_.erase(s);
00412
00413
00414
00415 if (needflush)
00416 sendLocalChanges();
00417 }
00418
00420 virtual void
00421 sendObjectListToPeer(Bucket *msg, bool data, bool all, bool clear)
00422 {
00423 typename PeerMap::iterator pi, pe;
00424 typename ObjectMap::iterator oi, oe;
00425 uint32_t numobjs = 0;
00426 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00427 numobjs += pi->second.objs.size();
00428
00429 msg->data.reserve(msg->data.size() + 300*numobjs);
00430
00431 uint32_t nupdates = 0;
00432 uint32_t words [4];
00433 words[0] = sizeof(words);
00434 words[1] = DQM_REPLY_LIST_BEGIN;
00435 words[2] = numobjs;
00436 words[3] = all;
00437 copydata(msg, &words[0], sizeof(words));
00438
00439 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00440 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00441 if (all || (oi->second.flags & DQM_FLAG_NEW))
00442 {
00443 sendObjectToPeer(msg, oi->second, data, sendScalarAsText_);
00444 if (clear)
00445 oi->second.flags &= ~DQM_FLAG_NEW;
00446 ++nupdates;
00447 }
00448
00449 words[1] = DQM_REPLY_LIST_END;
00450 words[2] = nupdates;
00451 copydata(msg, &words[0], sizeof(words));
00452 }
00453
00454 virtual void
00455 sendObjectListToPeers(bool all)
00456 {
00457 typename PeerMap::iterator i, e;
00458 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00459 {
00460 ImplPeer &p = i->second;
00461 if (! p.update)
00462 continue;
00463
00464 if (debug_)
00465 logme()
00466 << "DEBUG: notifying " << p.peeraddr
00467 << ", full = " << p.updatefull << std::endl;
00468
00469 Bucket msg;
00470 msg.next = 0;
00471 sendObjectListToPeer(&msg, p.updatefull, !p.updated || all, true);
00472
00473 if (! msg.data.empty())
00474 {
00475 Bucket **prev = &p.sendq;
00476 while (*prev)
00477 prev = &(*prev)->next;
00478
00479 *prev = new Bucket;
00480 (*prev)->next = 0;
00481 (*prev)->data.swap(msg.data);
00482 }
00483 p.updated = true;
00484 }
00485 }
00486
00487 virtual void
00488 requestFullUpdatesFromPeers(void)
00489 {
00490 logme()
00491 << "ERROR: invalid request for full updates from peers.\n";
00492 }
00493
00494 virtual void
00495 updatePeerMasks(void)
00496 {
00497 typename PeerMap::iterator i, e;
00498 for (i = peers_.begin(), e = peers_.end(); i != e; )
00499 updateMask(&(i++)->second);
00500 }
00501
00502 protected:
00503 PeerMap peers_;
00504 };
00505
00506
00507 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
00508 {
00509 public:
00510 DQMBasicNet(const std::string &appname = "");
00511
00512 virtual int receive(DQMStore *store);
00513
00514 protected:
00515 virtual void updateLocalObject(Object &o);
00516 virtual void removeLocalObject(const std::string &name);
00517 virtual void requestFullUpdatesFromPeers(void);
00518
00519 private:
00520 ImplPeer *local_;
00521 };
00522
00523
00524 #endif // DQMSERVICES_CORE_DQM_NET_H