00001 #ifndef DQMSERVICES_CORE_DQM_NET_H
00002 # define DQMSERVICES_CORE_DQM_NET_H
00003
00004 # include "classlib/iobase/Socket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017 # include <set>
00018 # include <ext/hash_set>
00019
00020
00021
00022 class DQMNet
00023 {
00024 public:
00025 static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
00026 static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
00027 static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
00028 static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
00029 static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
00030 static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
00031 static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
00032 static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
00033 static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
00034 static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
00035 static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
00036 static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
00037 static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
00038 static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
00039 static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
00040 static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
00041 static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
00042 static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
00043
00044 static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
00045 static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
00046 static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
00047 static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
00048 static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
00049 static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR
00050 | DQM_PROP_REPORT_WARN
00051 | DQM_PROP_REPORT_OTHER);
00052
00053 static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
00054 static const uint32_t DQM_PROP_TAGGED = 0x00002000;
00055 static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
00056 static const uint32_t DQM_PROP_RESET = 0x00008000;
00057
00058 static const uint32_t DQM_PROP_NEW = 0x00010000;
00059 static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
00060 static const uint32_t DQM_PROP_LUMI = 0x00040000;
00061 static const uint32_t DQM_PROP_DEAD = 0x00080000;
00062 static const uint32_t DQM_PROP_STALE = 0x00100000;
00063
00064 static const uint32_t DQM_MSG_HELLO = 0;
00065 static const uint32_t DQM_MSG_UPDATE_ME = 1;
00066 static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
00067 static const uint32_t DQM_MSG_GET_OBJECT = 3;
00068
00069 static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
00070 static const uint32_t DQM_REPLY_LIST_END = 102;
00071 static const uint32_t DQM_REPLY_NONE = 103;
00072 static const uint32_t DQM_REPLY_OBJECT = 104;
00073
00074 static const uint32_t MAX_PEER_WAITREQS = 128;
00075
00076 struct Peer;
00077 struct QValue;
00078 struct WaitObject;
00079
00080 typedef std::vector<unsigned char> DataBlob;
00081 typedef std::vector<QValue> QReports;
00082 typedef std::vector<uint32_t> TagList;
00083 typedef std::list<WaitObject> WaitList;
00084
00085 struct QValue
00086 {
00087 int code;
00088 float qtresult;
00089 std::string message;
00090 std::string qtname;
00091 std::string algorithm;
00092 };
00093
00094 struct CoreObject
00095 {
00096 uint32_t flags;
00097 uint32_t tag;
00098 uint64_t version;
00099 const std::string *dirname;
00100 std::string objname;
00101 QReports qreports;
00102 };
00103
00104 struct Object : CoreObject
00105 {
00106 uint64_t hash;
00107 uint64_t lastreq;
00108 DataBlob rawdata;
00109 std::string scalar;
00110 std::string qdata;
00111 };
00112
00113 struct Bucket
00114 {
00115 Bucket *next;
00116 DataBlob data;
00117 };
00118
00119 struct WaitObject
00120 {
00121 lat::Time time;
00122 std::string name;
00123 std::string info;
00124 Peer *peer;
00125 };
00126
00127 struct AutoPeer;
00128 struct Peer
00129 {
00130 std::string peeraddr;
00131 lat::Socket *socket;
00132 DataBlob incoming;
00133 Bucket *sendq;
00134 size_t sendpos;
00135
00136 unsigned mask;
00137 bool source;
00138 bool update;
00139 bool updated;
00140 size_t updates;
00141 size_t waiting;
00142 AutoPeer *automatic;
00143 };
00144
00145 struct AutoPeer
00146 {
00147 Peer *peer;
00148 lat::Time next;
00149 std::string host;
00150 int port;
00151 bool update;
00152 };
00153
00154 DQMNet(const std::string &appname = "");
00155 virtual ~DQMNet(void);
00156
00157 void debug(bool doit);
00158 void delay(int delay);
00159 void startLocalServer(int port);
00160 void startLocalServer(const char *path);
00161 void staleObjectWaitLimit(lat::TimeSpan time);
00162 void updateToCollector(const std::string &host, int port);
00163 void listenToCollector(const std::string &host, int port);
00164 void shutdown(void);
00165 void lock(void);
00166 void unlock(void);
00167
00168 void start(void);
00169 void run(void);
00170
00171 void sendLocalChanges(void);
00172
00173 static bool setOrder(const CoreObject &a, const CoreObject &b)
00174 {
00175 int diff = a.dirname->compare(*b.dirname);
00176 return (diff < 0 ? true
00177 : diff == 0 ? a.objname < b.objname
00178 : false);
00179 }
00180
00181 struct HashOp
00182 {
00183 uint32_t operator()(const Object &a) const
00184 {
00185 return a.hash;
00186 }
00187 };
00188
00189 struct HashEqual
00190 {
00191 bool operator()(const Object &a, const Object &b) const
00192 {
00193 return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
00194 }
00195 };
00196
00197 static size_t
00198 dqmhash(const void *key, size_t keylen)
00199 {
00200
00201
00202 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
00203 # define dqmhashmix(a,b,c) { \
00204 a -= c; a ^= dqmhashrot(c, 4); c += b; \
00205 b -= a; b ^= dqmhashrot(a, 6); a += c; \
00206 c -= b; c ^= dqmhashrot(b, 8); b += a; \
00207 a -= c; a ^= dqmhashrot(c,16); c += b; \
00208 b -= a; b ^= dqmhashrot(a,19); a += c; \
00209 c -= b; c ^= dqmhashrot(b, 4); b += a; }
00210 # define dqmhashfinal(a,b,c) { \
00211 c ^= b; c -= dqmhashrot(b,14); \
00212 a ^= c; a -= dqmhashrot(c,11); \
00213 b ^= a; b -= dqmhashrot(a,25); \
00214 c ^= b; c -= dqmhashrot(b,16); \
00215 a ^= c; a -= dqmhashrot(c,4); \
00216 b ^= a; b -= dqmhashrot(a,14); \
00217 c ^= b; c -= dqmhashrot(b,24); }
00218
00219 uint32_t a, b, c;
00220 a = b = c = 0xdeadbeef + (uint32_t) keylen;
00221 const unsigned char *k = (const unsigned char *) key;
00222
00223
00224 while (keylen > 12)
00225 {
00226 a += k[0];
00227 a += ((uint32_t)k[1]) << 8;
00228 a += ((uint32_t)k[2]) << 16;
00229 a += ((uint32_t)k[3]) << 24;
00230 b += k[4];
00231 b += ((uint32_t)k[5]) << 8;
00232 b += ((uint32_t)k[6]) << 16;
00233 b += ((uint32_t)k[7]) << 24;
00234 c += k[8];
00235 c += ((uint32_t)k[9]) << 8;
00236 c += ((uint32_t)k[10]) << 16;
00237 c += ((uint32_t)k[11]) << 24;
00238 dqmhashmix(a,b,c);
00239 keylen -= 12;
00240 k += 12;
00241 }
00242
00243
00244 switch (keylen)
00245 {
00246 case 12: c += ((uint32_t)k[11]) << 24;
00247 case 11: c += ((uint32_t)k[10]) << 16;
00248 case 10: c += ((uint32_t)k[9]) << 8;
00249 case 9 : c += k[8];
00250 case 8 : b += ((uint32_t)k[7]) << 24;
00251 case 7 : b += ((uint32_t)k[6]) << 16;
00252 case 6 : b += ((uint32_t)k[5]) << 8;
00253 case 5 : b += k[4];
00254 case 4 : a += ((uint32_t)k[3]) << 24;
00255 case 3 : a += ((uint32_t)k[2]) << 16;
00256 case 2 : a += ((uint32_t)k[1]) << 8;
00257 case 1 : a += k[0];
00258 break;
00259 case 0 : return c;
00260 }
00261
00262 dqmhashfinal(a, b, c);
00263 return c;
00264 # undef dqmhashrot
00265 # undef dqmhashmix
00266 # undef dqmhashfinal
00267 }
00268
00269 static void packQualityData(std::string &into, const QReports &qr);
00270 static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
00271
00272 protected:
00273 std::ostream & logme(void);
00274 static void copydata(Bucket *b, const void *data, size_t len);
00275 virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
00276
00277 virtual bool shouldStop(void);
00278 void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00279 virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00280 virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00281
00282
00283
00284 virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
00285 virtual Object * makeObject(Peer *p, const std::string &name) = 0;
00286 virtual void markObjectsDead(Peer *p) = 0;
00287 virtual void purgeDeadObjects(Peer *p) = 0;
00288
00289 virtual Peer * getPeer(lat::Socket *s) = 0;
00290 virtual Peer * createPeer(lat::Socket *s) = 0;
00291 virtual void removePeer(Peer *p, lat::Socket *s) = 0;
00292 virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
00293 virtual void sendObjectListToPeers(bool all) = 0;
00294
00295 void updateMask(Peer *p);
00296 virtual void updatePeerMasks(void) = 0;
00297 static void discard(Bucket *&b);
00298
00299 bool debug_;
00300 pthread_mutex_t lock_;
00301
00302 private:
00303 void losePeer(const char *reason,
00304 Peer *peer,
00305 lat::IOSelectEvent *event,
00306 lat::Error *err = 0);
00307 void requestObjectData(Peer *p, const char *name, size_t len);
00308 void releaseFromWait(WaitList::iterator i, Object *o);
00309 void releaseWaiters(const std::string &name, Object *o);
00310
00311 bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
00312 bool onPeerConnect(lat::IOSelectEvent *ev);
00313 bool onLocalNotify(lat::IOSelectEvent *ev);
00314
00315 std::string appname_;
00316 int pid_;
00317
00318 lat::IOSelector sel_;
00319 lat::Socket *server_;
00320 lat::Pipe wakeup_;
00321 lat::Time version_;
00322
00323 AutoPeer upstream_;
00324 AutoPeer downstream_;
00325 WaitList waiting_;
00326
00327 pthread_t communicate_;
00328 sig_atomic_t shutdown_;
00329
00330 int delay_;
00331 lat::TimeSpan waitStale_;
00332 lat::TimeSpan waitMax_;
00333 bool flush_;
00334
00335
00336 DQMNet(const DQMNet &);
00337 DQMNet &operator=(const DQMNet &);
00338 };
00339
00340 template <class ObjType>
00341 class DQMImplNet : public DQMNet
00342 {
00343 public:
00344 struct ImplPeer;
00345
00346 typedef std::set<std::string> DirMap;
00347 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
00348 typedef std::map<lat::Socket *, ImplPeer> PeerMap;
00349 struct ImplPeer : Peer
00350 {
00351 ImplPeer(void) {}
00352 ObjectMap objs;
00353 DirMap dirs;
00354 };
00355
00356 DQMImplNet(const std::string &appname = "")
00357 : DQMNet(appname)
00358 {}
00359
00360 ~DQMImplNet(void)
00361 {}
00362
00363 protected:
00364 virtual Object *
00365 findObject(Peer *p, const std::string &name, Peer **owner = 0)
00366 {
00367 size_t slash = name.rfind('/');
00368 size_t dirpos = (slash == std::string::npos ? 0 : slash);
00369 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00370 std::string path(name, 0, dirpos);
00371 ObjType proto;
00372 proto.hash = dqmhash(name.c_str(), name.size());
00373 proto.dirname = &path;
00374 proto.objname.append(name, namepos, std::string::npos);
00375
00376 typename ObjectMap::iterator pos;
00377 typename PeerMap::iterator i, e;
00378 if (owner)
00379 *owner = 0;
00380 if (p)
00381 {
00382 ImplPeer *ip = static_cast<ImplPeer *>(p);
00383 pos = ip->objs.find(proto);
00384 if (pos == ip->objs.end())
00385 return 0;
00386 else
00387 {
00388 if (owner) *owner = ip;
00389 return const_cast<ObjType *>(&*pos);
00390 }
00391 }
00392 else
00393 {
00394 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00395 {
00396 pos = i->second.objs.find(proto);
00397 if (pos != i->second.objs.end())
00398 {
00399 if (owner) *owner = &i->second;
00400 return const_cast<ObjType *>(&*pos);
00401 }
00402 }
00403 return 0;
00404 }
00405 }
00406
00407 virtual Object *
00408 makeObject(Peer *p, const std::string &name)
00409 {
00410 ImplPeer *ip = static_cast<ImplPeer *>(p);
00411 size_t slash = name.rfind('/');
00412 size_t dirpos = (slash == std::string::npos ? 0 : slash);
00413 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00414 ObjType o;
00415 o.flags = 0;
00416 o.tag = 0;
00417 o.version = 0;
00418 o.lastreq = 0;
00419 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
00420 o.objname.append(name, namepos, std::string::npos);
00421 o.hash = dqmhash(name.c_str(), name.size());
00422 return const_cast<ObjType *>(&*ip->objs.insert(o).first);
00423 }
00424
00425
00426
00427
00428
00429
00430
00431
00432 virtual void
00433 markObjectsDead(Peer *p)
00434 {
00435 uint64_t minreq
00436 = (lat::Time::current()
00437 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
00438 ImplPeer *ip = static_cast<ImplPeer *>(p);
00439 typename ObjectMap::iterator i, e;
00440 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00441 {
00442 if (i->lastreq && i->lastreq < minreq)
00443 const_cast<ObjType &>(*i).lastreq = 0;
00444 const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
00445 }
00446 }
00447
00448
00449 virtual void
00450 purgeDeadObjects(Peer *p)
00451 {
00452 ImplPeer *ip = static_cast<ImplPeer *>(p);
00453 typename ObjectMap::iterator i, e;
00454 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00455 {
00456 if (i->flags & DQM_PROP_DEAD)
00457 ip->objs.erase(i++);
00458 else
00459 ++i;
00460 }
00461 }
00462
00463 virtual Peer *
00464 getPeer(lat::Socket *s)
00465 {
00466 typename PeerMap::iterator pos = peers_.find(s);
00467 typename PeerMap::iterator end = peers_.end();
00468 return pos == end ? 0 : &pos->second;
00469 }
00470
00471 virtual Peer *
00472 createPeer(lat::Socket *s)
00473 {
00474 ImplPeer *ip = &peers_[s];
00475 ip->socket = 0;
00476 ip->sendq = 0;
00477 ip->sendpos = 0;
00478 ip->mask = 0;
00479 ip->source = false;
00480 ip->update = false;
00481 ip->updated = false;
00482 ip->updates = 0;
00483 ip->waiting = 0;
00484 ip->automatic = 0;
00485 return ip;
00486 }
00487
00488 virtual void
00489 removePeer(Peer *p, lat::Socket *s)
00490 {
00491 ImplPeer *ip = static_cast<ImplPeer *>(p);
00492 bool needflush = ! ip->objs.empty();
00493
00494 typename ObjectMap::iterator i, e;
00495 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00496 ip->objs.erase(i++);
00497
00498 peers_.erase(s);
00499
00500
00501
00502 if (needflush)
00503 sendLocalChanges();
00504 }
00505
00507 virtual void
00508 sendObjectListToPeer(Bucket *msg, bool all, bool clear)
00509 {
00510 typename PeerMap::iterator pi, pe;
00511 typename ObjectMap::iterator oi, oe;
00512 size_t size = 0;
00513 size_t numobjs = 0;
00514 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00515 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
00516 if (all || (oi->flags & DQM_PROP_NEW))
00517 size += 9*sizeof(uint32_t) + oi->dirname->size()
00518 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
00519 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
00520
00521 msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
00522
00523 uint32_t nupdates = 0;
00524 uint32_t words [4];
00525 words[0] = sizeof(words);
00526 words[1] = DQM_REPLY_LIST_BEGIN;
00527 words[2] = numobjs;
00528 words[3] = all;
00529 copydata(msg, &words[0], sizeof(words));
00530
00531 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00532 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00533 if (all || (oi->flags & DQM_PROP_NEW))
00534 {
00535 sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
00536 if (clear)
00537 const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
00538 ++nupdates;
00539 }
00540
00541 words[1] = DQM_REPLY_LIST_END;
00542 words[2] = nupdates;
00543 copydata(msg, &words[0], sizeof(words));
00544 }
00545
00546 virtual void
00547 sendObjectListToPeers(bool all)
00548 {
00549 typename PeerMap::iterator i, e;
00550 typename ObjectMap::iterator oi, oe;
00551 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00552 {
00553 ImplPeer &p = i->second;
00554 if (! p.update)
00555 continue;
00556
00557 if (debug_)
00558 logme()
00559 << "DEBUG: notifying " << p.peeraddr << std::endl;
00560
00561 Bucket msg;
00562 msg.next = 0;
00563 sendObjectListToPeer(&msg, !p.updated || all, true);
00564
00565 if (! msg.data.empty())
00566 {
00567 Bucket **prev = &p.sendq;
00568 while (*prev)
00569 prev = &(*prev)->next;
00570
00571 *prev = new Bucket;
00572 (*prev)->next = 0;
00573 (*prev)->data.swap(msg.data);
00574 }
00575 p.updated = true;
00576 }
00577 }
00578
00579 virtual void
00580 updatePeerMasks(void)
00581 {
00582 typename PeerMap::iterator i, e;
00583 for (i = peers_.begin(), e = peers_.end(); i != e; )
00584 updateMask(&(i++)->second);
00585 }
00586
00587 protected:
00588 PeerMap peers_;
00589 };
00590
00591
00592 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
00593 {
00594 public:
00595 DQMBasicNet(const std::string &appname = "");
00596
00597 void reserveLocalSpace(uint32_t size);
00598 void updateLocalObject(Object &o);
00599 bool removeLocalExcept(const std::set<std::string> &known);
00600
00601 private:
00602 ImplPeer *local_;
00603 };
00604
00605
00606 #endif // DQMSERVICES_CORE_DQM_NET_H