00001 #ifndef DQMSERVICES_CORE_DQM_NET_H
00002 # define DQMSERVICES_CORE_DQM_NET_H
00003
00004 # include "classlib/iobase/Socket.h"
00005 # include "classlib/iobase/IOSelector.h"
00006 # include "classlib/iobase/Pipe.h"
00007 # include "classlib/utils/Signal.h"
00008 # include "classlib/utils/Error.h"
00009 # include "classlib/utils/Time.h"
00010 # include <pthread.h>
00011 # include <stdint.h>
00012 # include <iostream>
00013 # include <vector>
00014 # include <string>
00015 # include <list>
00016 # include <map>
00017 # include <set>
00018 # include <ext/hash_set>
00019
00020
00021
00022 class DQMNet
00023 {
00024 public:
00025 static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
00026 static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
00027 static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
00028 static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
00029 static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
00030 static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
00031 static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
00032 static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
00033 static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
00034 static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
00035 static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
00036 static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
00037 static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
00038 static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
00039 static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
00040 static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
00041 static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
00042 static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
00043
00044 static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
00045 static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
00046 static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
00047 static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
00048 static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
00049 static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR
00050 | DQM_PROP_REPORT_WARN
00051 | DQM_PROP_REPORT_OTHER);
00052
00053 static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
00054 static const uint32_t DQM_PROP_TAGGED = 0x00002000;
00055 static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
00056 static const uint32_t DQM_PROP_RESET = 0x00008000;
00057
00058 static const uint32_t DQM_PROP_NEW = 0x00010000;
00059 static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
00060 static const uint32_t DQM_PROP_LUMI = 0x00040000;
00061 static const uint32_t DQM_PROP_DEAD = 0x00080000;
00062 static const uint32_t DQM_PROP_STALE = 0x00100000;
00063 static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
00064
00065 static const uint32_t DQM_MSG_HELLO = 0;
00066 static const uint32_t DQM_MSG_UPDATE_ME = 1;
00067 static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
00068 static const uint32_t DQM_MSG_GET_OBJECT = 3;
00069
00070 static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
00071 static const uint32_t DQM_REPLY_LIST_END = 102;
00072 static const uint32_t DQM_REPLY_NONE = 103;
00073 static const uint32_t DQM_REPLY_OBJECT = 104;
00074
00075 static const uint32_t MAX_PEER_WAITREQS = 128;
00076
00077 struct Peer;
00078 struct QValue;
00079 struct WaitObject;
00080
00081 typedef std::vector<unsigned char> DataBlob;
00082 typedef std::vector<QValue> QReports;
00083 typedef std::vector<uint32_t> TagList;
00084 typedef std::list<WaitObject> WaitList;
00085
00086 struct QValue
00087 {
00088 int code;
00089 float qtresult;
00090 std::string message;
00091 std::string qtname;
00092 std::string algorithm;
00093 };
00094
00095 struct CoreObject
00096 {
00097 uint32_t flags;
00098 uint32_t tag;
00099 uint64_t version;
00100 const std::string *dirname;
00101 std::string objname;
00102 QReports qreports;
00103 };
00104
00105 struct Object : CoreObject
00106 {
00107 uint64_t hash;
00108 uint64_t lastreq;
00109 DataBlob rawdata;
00110 std::string scalar;
00111 std::string qdata;
00112 };
00113
00114 struct Bucket
00115 {
00116 Bucket *next;
00117 DataBlob data;
00118 };
00119
00120 struct WaitObject
00121 {
00122 lat::Time time;
00123 std::string name;
00124 std::string info;
00125 Peer *peer;
00126 };
00127
00128 struct AutoPeer;
00129 struct Peer
00130 {
00131 std::string peeraddr;
00132 lat::Socket *socket;
00133 DataBlob incoming;
00134 Bucket *sendq;
00135 size_t sendpos;
00136
00137 unsigned mask;
00138 bool source;
00139 bool update;
00140 bool updated;
00141 size_t updates;
00142 size_t waiting;
00143 AutoPeer *automatic;
00144 };
00145
00146 struct AutoPeer
00147 {
00148 Peer *peer;
00149 lat::Time next;
00150 std::string host;
00151 int port;
00152 bool update;
00153 };
00154
00155 DQMNet(const std::string &appname = "");
00156 virtual ~DQMNet(void);
00157
00158 void debug(bool doit);
00159 void delay(int delay);
00160 void startLocalServer(int port);
00161 void startLocalServer(const char *path);
00162 void staleObjectWaitLimit(lat::TimeSpan time);
00163 void updateToCollector(const std::string &host, int port);
00164 void listenToCollector(const std::string &host, int port);
00165 void shutdown(void);
00166 void lock(void);
00167 void unlock(void);
00168
00169 void start(void);
00170 void run(void);
00171
00172 void sendLocalChanges(void);
00173
00174 static bool setOrder(const CoreObject &a, const CoreObject &b)
00175 {
00176 int diff = a.dirname->compare(*b.dirname);
00177 return (diff < 0 ? true
00178 : diff == 0 ? a.objname < b.objname
00179 : false);
00180 }
00181
00182 struct HashOp
00183 {
00184 uint32_t operator()(const Object &a) const
00185 {
00186 return a.hash;
00187 }
00188 };
00189
00190 struct HashEqual
00191 {
00192 bool operator()(const Object &a, const Object &b) const
00193 {
00194 return a.hash == b.hash && *a.dirname == *b.dirname && a.objname == b.objname;
00195 }
00196 };
00197
00198 static size_t
00199 dqmhash(const void *key, size_t keylen)
00200 {
00201
00202
00203 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
00204 # define dqmhashmix(a,b,c) { \
00205 a -= c; a ^= dqmhashrot(c, 4); c += b; \
00206 b -= a; b ^= dqmhashrot(a, 6); a += c; \
00207 c -= b; c ^= dqmhashrot(b, 8); b += a; \
00208 a -= c; a ^= dqmhashrot(c,16); c += b; \
00209 b -= a; b ^= dqmhashrot(a,19); a += c; \
00210 c -= b; c ^= dqmhashrot(b, 4); b += a; }
00211 # define dqmhashfinal(a,b,c) { \
00212 c ^= b; c -= dqmhashrot(b,14); \
00213 a ^= c; a -= dqmhashrot(c,11); \
00214 b ^= a; b -= dqmhashrot(a,25); \
00215 c ^= b; c -= dqmhashrot(b,16); \
00216 a ^= c; a -= dqmhashrot(c,4); \
00217 b ^= a; b -= dqmhashrot(a,14); \
00218 c ^= b; c -= dqmhashrot(b,24); }
00219
00220 uint32_t a, b, c;
00221 a = b = c = 0xdeadbeef + (uint32_t) keylen;
00222 const unsigned char *k = (const unsigned char *) key;
00223
00224
00225 while (keylen > 12)
00226 {
00227 a += k[0];
00228 a += ((uint32_t)k[1]) << 8;
00229 a += ((uint32_t)k[2]) << 16;
00230 a += ((uint32_t)k[3]) << 24;
00231 b += k[4];
00232 b += ((uint32_t)k[5]) << 8;
00233 b += ((uint32_t)k[6]) << 16;
00234 b += ((uint32_t)k[7]) << 24;
00235 c += k[8];
00236 c += ((uint32_t)k[9]) << 8;
00237 c += ((uint32_t)k[10]) << 16;
00238 c += ((uint32_t)k[11]) << 24;
00239 dqmhashmix(a,b,c);
00240 keylen -= 12;
00241 k += 12;
00242 }
00243
00244
00245 switch (keylen)
00246 {
00247 case 12: c += ((uint32_t)k[11]) << 24;
00248 case 11: c += ((uint32_t)k[10]) << 16;
00249 case 10: c += ((uint32_t)k[9]) << 8;
00250 case 9 : c += k[8];
00251 case 8 : b += ((uint32_t)k[7]) << 24;
00252 case 7 : b += ((uint32_t)k[6]) << 16;
00253 case 6 : b += ((uint32_t)k[5]) << 8;
00254 case 5 : b += k[4];
00255 case 4 : a += ((uint32_t)k[3]) << 24;
00256 case 3 : a += ((uint32_t)k[2]) << 16;
00257 case 2 : a += ((uint32_t)k[1]) << 8;
00258 case 1 : a += k[0];
00259 break;
00260 case 0 : return c;
00261 }
00262
00263 dqmhashfinal(a, b, c);
00264 return c;
00265 # undef dqmhashrot
00266 # undef dqmhashmix
00267 # undef dqmhashfinal
00268 }
00269
00270 static void packQualityData(std::string &into, const QReports &qr);
00271 static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
00272
00273 protected:
00274 std::ostream & logme(void);
00275 static void copydata(Bucket *b, const void *data, size_t len);
00276 virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
00277
00278 virtual bool shouldStop(void);
00279 void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
00280 virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
00281 virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
00282
00283
00284
00285 virtual Object * findObject(Peer *p, const std::string &name, Peer **owner = 0) = 0;
00286 virtual Object * makeObject(Peer *p, const std::string &name) = 0;
00287 virtual void markObjectsDead(Peer *p) = 0;
00288 virtual void purgeDeadObjects(Peer *p) = 0;
00289
00290 virtual Peer * getPeer(lat::Socket *s) = 0;
00291 virtual Peer * createPeer(lat::Socket *s) = 0;
00292 virtual void removePeer(Peer *p, lat::Socket *s) = 0;
00293 virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
00294 virtual void sendObjectListToPeers(bool all) = 0;
00295
00296 void updateMask(Peer *p);
00297 virtual void updatePeerMasks(void) = 0;
00298 static void discard(Bucket *&b);
00299
00300 bool debug_;
00301 pthread_mutex_t lock_;
00302
00303 private:
00304 void losePeer(const char *reason,
00305 Peer *peer,
00306 lat::IOSelectEvent *event,
00307 lat::Error *err = 0);
00308 void requestObjectData(Peer *p, const char *name, size_t len);
00309 void releaseFromWait(WaitList::iterator i, Object *o);
00310 void releaseWaiters(const std::string &name, Object *o);
00311
00312 bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
00313 bool onPeerConnect(lat::IOSelectEvent *ev);
00314 bool onLocalNotify(lat::IOSelectEvent *ev);
00315
00316 std::string appname_;
00317 int pid_;
00318
00319 lat::IOSelector sel_;
00320 lat::Socket *server_;
00321 lat::Pipe wakeup_;
00322 lat::Time version_;
00323
00324 AutoPeer upstream_;
00325 AutoPeer downstream_;
00326 WaitList waiting_;
00327
00328 pthread_t communicate_;
00329 sig_atomic_t shutdown_;
00330
00331 int delay_;
00332 lat::TimeSpan waitStale_;
00333 lat::TimeSpan waitMax_;
00334 bool flush_;
00335
00336
00337 DQMNet(const DQMNet &);
00338 DQMNet &operator=(const DQMNet &);
00339 };
00340
00341 template <class ObjType>
00342 class DQMImplNet : public DQMNet
00343 {
00344 public:
00345 struct ImplPeer;
00346
00347 typedef std::set<std::string> DirMap;
00348 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual> ObjectMap;
00349 typedef std::map<lat::Socket *, ImplPeer> PeerMap;
00350 struct ImplPeer : Peer
00351 {
00352 ImplPeer(void) {}
00353 ObjectMap objs;
00354 DirMap dirs;
00355 };
00356
00357 DQMImplNet(const std::string &appname = "")
00358 : DQMNet(appname)
00359 {}
00360
00361 ~DQMImplNet(void)
00362 {}
00363
00364 protected:
00365 virtual Object *
00366 findObject(Peer *p, const std::string &name, Peer **owner = 0)
00367 {
00368 size_t slash = name.rfind('/');
00369 size_t dirpos = (slash == std::string::npos ? 0 : slash);
00370 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00371 std::string path(name, 0, dirpos);
00372 ObjType proto;
00373 proto.hash = dqmhash(name.c_str(), name.size());
00374 proto.dirname = &path;
00375 proto.objname.append(name, namepos, std::string::npos);
00376
00377 typename ObjectMap::iterator pos;
00378 typename PeerMap::iterator i, e;
00379 if (owner)
00380 *owner = 0;
00381 if (p)
00382 {
00383 ImplPeer *ip = static_cast<ImplPeer *>(p);
00384 pos = ip->objs.find(proto);
00385 if (pos == ip->objs.end())
00386 return 0;
00387 else
00388 {
00389 if (owner) *owner = ip;
00390 return const_cast<ObjType *>(&*pos);
00391 }
00392 }
00393 else
00394 {
00395 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00396 {
00397 pos = i->second.objs.find(proto);
00398 if (pos != i->second.objs.end())
00399 {
00400 if (owner) *owner = &i->second;
00401 return const_cast<ObjType *>(&*pos);
00402 }
00403 }
00404 return 0;
00405 }
00406 }
00407
00408 virtual Object *
00409 makeObject(Peer *p, const std::string &name)
00410 {
00411 ImplPeer *ip = static_cast<ImplPeer *>(p);
00412 size_t slash = name.rfind('/');
00413 size_t dirpos = (slash == std::string::npos ? 0 : slash);
00414 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
00415 ObjType o;
00416 o.flags = 0;
00417 o.tag = 0;
00418 o.version = 0;
00419 o.lastreq = 0;
00420 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first;
00421 o.objname.append(name, namepos, std::string::npos);
00422 o.hash = dqmhash(name.c_str(), name.size());
00423 return const_cast<ObjType *>(&*ip->objs.insert(o).first);
00424 }
00425
00426
00427
00428
00429
00430
00431
00432
00433 virtual void
00434 markObjectsDead(Peer *p)
00435 {
00436 uint64_t minreq
00437 = (lat::Time::current()
00438 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
00439 ImplPeer *ip = static_cast<ImplPeer *>(p);
00440 typename ObjectMap::iterator i, e;
00441 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i)
00442 {
00443 if (i->lastreq && i->lastreq < minreq)
00444 const_cast<ObjType &>(*i).lastreq = 0;
00445 const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
00446 }
00447 }
00448
00449
00450 virtual void
00451 purgeDeadObjects(Peer *p)
00452 {
00453 ImplPeer *ip = static_cast<ImplPeer *>(p);
00454 typename ObjectMap::iterator i, e;
00455 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00456 {
00457 if (i->flags & DQM_PROP_DEAD)
00458 ip->objs.erase(i++);
00459 else
00460 ++i;
00461 }
00462 }
00463
00464 virtual Peer *
00465 getPeer(lat::Socket *s)
00466 {
00467 typename PeerMap::iterator pos = peers_.find(s);
00468 typename PeerMap::iterator end = peers_.end();
00469 return pos == end ? 0 : &pos->second;
00470 }
00471
00472 virtual Peer *
00473 createPeer(lat::Socket *s)
00474 {
00475 ImplPeer *ip = &peers_[s];
00476 ip->socket = 0;
00477 ip->sendq = 0;
00478 ip->sendpos = 0;
00479 ip->mask = 0;
00480 ip->source = false;
00481 ip->update = false;
00482 ip->updated = false;
00483 ip->updates = 0;
00484 ip->waiting = 0;
00485 ip->automatic = 0;
00486 return ip;
00487 }
00488
00489 virtual void
00490 removePeer(Peer *p, lat::Socket *s)
00491 {
00492 ImplPeer *ip = static_cast<ImplPeer *>(p);
00493 bool needflush = ! ip->objs.empty();
00494
00495 typename ObjectMap::iterator i, e;
00496 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
00497 ip->objs.erase(i++);
00498
00499 peers_.erase(s);
00500
00501
00502
00503 if (needflush)
00504 sendLocalChanges();
00505 }
00506
00508 virtual void
00509 sendObjectListToPeer(Bucket *msg, bool all, bool clear)
00510 {
00511 typename PeerMap::iterator pi, pe;
00512 typename ObjectMap::iterator oi, oe;
00513 size_t size = 0;
00514 size_t numobjs = 0;
00515 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00516 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
00517 if (all || (oi->flags & DQM_PROP_NEW))
00518 size += 9*sizeof(uint32_t) + oi->dirname->size()
00519 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
00520 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
00521
00522 msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
00523
00524 uint32_t nupdates = 0;
00525 uint32_t words [4];
00526 words[0] = sizeof(words);
00527 words[1] = DQM_REPLY_LIST_BEGIN;
00528 words[2] = numobjs;
00529 words[3] = all;
00530 copydata(msg, &words[0], sizeof(words));
00531
00532 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
00533 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
00534 if (all || (oi->flags & DQM_PROP_NEW))
00535 {
00536 sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
00537 if (clear)
00538 const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
00539 ++nupdates;
00540 }
00541
00542 words[1] = DQM_REPLY_LIST_END;
00543 words[2] = nupdates;
00544 copydata(msg, &words[0], sizeof(words));
00545 }
00546
00547 virtual void
00548 sendObjectListToPeers(bool all)
00549 {
00550 typename PeerMap::iterator i, e;
00551 typename ObjectMap::iterator oi, oe;
00552 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
00553 {
00554 ImplPeer &p = i->second;
00555 if (! p.update)
00556 continue;
00557
00558 if (debug_)
00559 logme()
00560 << "DEBUG: notifying " << p.peeraddr << std::endl;
00561
00562 Bucket msg;
00563 msg.next = 0;
00564 sendObjectListToPeer(&msg, !p.updated || all, true);
00565
00566 if (! msg.data.empty())
00567 {
00568 Bucket **prev = &p.sendq;
00569 while (*prev)
00570 prev = &(*prev)->next;
00571
00572 *prev = new Bucket;
00573 (*prev)->next = 0;
00574 (*prev)->data.swap(msg.data);
00575 }
00576 p.updated = true;
00577 }
00578 }
00579
00580 virtual void
00581 updatePeerMasks(void)
00582 {
00583 typename PeerMap::iterator i, e;
00584 for (i = peers_.begin(), e = peers_.end(); i != e; )
00585 updateMask(&(i++)->second);
00586 }
00587
00588 protected:
00589 PeerMap peers_;
00590 };
00591
00592
00593 class DQMBasicNet : public DQMImplNet<DQMNet::Object>
00594 {
00595 public:
00596 DQMBasicNet(const std::string &appname = "");
00597
00598 void reserveLocalSpace(uint32_t size);
00599 void updateLocalObject(Object &o);
00600 bool removeLocalExcept(const std::set<std::string> &known);
00601
00602 private:
00603 ImplPeer *local_;
00604 };
00605
00606
00607 #endif // DQMSERVICES_CORE_DQM_NET_H