1 #ifndef DQMSERVICES_CORE_DQM_NET_H 2 # define DQMSERVICES_CORE_DQM_NET_H 4 # include "classlib/iobase/Socket.h" 5 # include "classlib/iobase/IOSelector.h" 6 # include "classlib/iobase/Pipe.h" 7 # include "classlib/utils/Signal.h" 8 # include "classlib/utils/Error.h" 9 # include "classlib/utils/Time.h" 19 # include <ext/hash_set> 51 | DQM_PROP_REPORT_WARN
83 typedef std::vector<unsigned char>
DataBlob;
164 void debug(
bool doit);
221 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k)))) 222 # define dqmhashmix(a,b,c) { \ 223 a -= c; a ^= dqmhashrot(c, 4); c += b; \ 224 b -= a; b ^= dqmhashrot(a, 6); a += c; \ 225 c -= b; c ^= dqmhashrot(b, 8); b += a; \ 226 a -= c; a ^= dqmhashrot(c,16); c += b; \ 227 b -= a; b ^= dqmhashrot(a,19); a += c; \ 228 c -= b; c ^= dqmhashrot(b, 4); b += a; } 229 # define dqmhashfinal(a,b,c) { \ 230 c ^= b; c -= dqmhashrot(b,14); \ 231 a ^= c; a -= dqmhashrot(c,11); \ 232 b ^= a; b -= dqmhashrot(a,25); \ 233 c ^= b; c -= dqmhashrot(b,16); \ 234 a ^= c; a -= dqmhashrot(c,4); \ 235 b ^= a; b -= dqmhashrot(a,14); \ 236 c ^= b; c -= dqmhashrot(b,24); } 239 a = b = c = 0xdeadbeef + (uint32_t) keylen;
240 const unsigned char *
k = (
const unsigned char *) key;
246 a += ((uint32_t)k[1]) << 8;
247 a += ((uint32_t)k[2]) << 16;
248 a += ((uint32_t)k[3]) << 24;
250 b += ((uint32_t)k[5]) << 8;
251 b += ((uint32_t)k[6]) << 16;
252 b += ((uint32_t)k[7]) << 24;
254 c += ((uint32_t)k[9]) << 8;
255 c += ((uint32_t)k[10]) << 16;
256 c += ((uint32_t)k[11]) << 24;
265 case 12: c += ((uint32_t)k[11]) << 24;
266 case 11: c += ((uint32_t)k[10]) << 16;
267 case 10: c += ((uint32_t)k[9]) << 8;
269 case 8 : b += ((uint32_t)k[7]) << 24;
270 case 7 : b += ((uint32_t)k[6]) << 16;
271 case 6 : b += ((uint32_t)k[5]) << 8;
273 case 4 : a += ((uint32_t)k[3]) << 24;
274 case 3 : a += ((uint32_t)k[2]) << 16;
275 case 2 : a += ((uint32_t)k[1]) << 8;
292 std::ostream &
logme(
void);
324 lat::IOSelectEvent *
event,
325 lat::Error *err =
nullptr);
359 template <
class ObjType>
365 typedef std::set<std::string>
DirMap;
366 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
367 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
386 size_t slash = name.rfind(
'/');
387 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
388 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
391 proto.hash =
dqmhash(name.c_str(), name.size());
392 proto.dirname = &
path;
393 proto.objname.append(name, namepos, std::string::npos);
395 typename ObjectMap::iterator
pos;
396 typename PeerMap::iterator
i,
e;
401 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
402 pos = ip->objs.find(proto);
403 if (pos == ip->objs.end())
407 if (owner) *owner = ip;
408 return const_cast<ObjType *
>(&*pos);
413 for (i = peers_.begin(), e = peers_.end(); i !=
e; ++
i)
415 pos = i->second.objs.find(proto);
416 if (pos != i->second.objs.end())
418 if (owner) *owner = &i->second;
419 return const_cast<ObjType *
>(&*pos);
429 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
430 size_t slash = name.rfind(
'/');
431 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
432 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
438 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).
first;
439 o.objname.append(name, namepos, std::string::npos);
440 o.hash =
dqmhash(name.c_str(), name.size());
441 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
455 = (lat::Time::current()
456 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
457 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
458 typename ObjectMap::iterator
i,
e;
459 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i)
461 if (i->lastreq && i->lastreq < minreq)
462 const_cast<ObjType &
>(*i).lastreq = 0;
471 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
472 typename ObjectMap::iterator
i,
e;
473 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
485 typename PeerMap::iterator
pos = peers_.find(s);
486 typename PeerMap::iterator
end = peers_.end();
487 return pos == end ?
nullptr : &pos->second;
493 ImplPeer *ip = &peers_[
s];
503 ip->automatic =
nullptr;
510 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
511 bool needflush = ! ip->objs.empty();
513 typename ObjectMap::iterator
i,
e;
514 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
529 typename PeerMap::iterator
pi, pe;
530 typename ObjectMap::iterator oi, oe;
533 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++
pi)
534 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
536 size += 9*
sizeof(uint32_t) + oi->dirname->size()
537 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
538 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
540 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
542 uint32_t nupdates = 0;
544 words[0] =
sizeof(words);
548 copydata(msg, &words[0],
sizeof(words));
550 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++
pi)
551 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
562 copydata(msg, &words[0],
sizeof(words));
568 typename PeerMap::iterator
i,
e;
569 typename ObjectMap::iterator oi, oe;
570 for (i = peers_.begin(), e = peers_.end(); i !=
e; ++
i)
572 ImplPeer &
p = i->second;
578 <<
"DEBUG: notifying " << p.peeraddr << std::endl;
584 if (! msg.
data.empty())
588 prev = &(*prev)->
next;
591 (*prev)->
next =
nullptr;
601 typename PeerMap::iterator
i,
e;
602 for (i = peers_.begin(), e = peers_.end(); i !=
e; )
616 void reserveLocalSpace(uint32_t
size);
617 void updateLocalObject(
Object &
o);
618 bool removeLocalExcept(
const std::set<std::string> &known);
625 #endif // DQMSERVICES_CORE_DQM_NET_H
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
static const uint32_t DQM_PROP_TYPE_TH1S
void sendLocalChanges(void)
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
bool operator()(const Object &a, const Object &b) const
void sendObjectListToPeers(bool all) override
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
void updatePeerMasks(void) override
static const uint32_t DQM_PROP_TAGGED
std::vector< Variable::Flags > flags
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH1F
void removePeer(Peer *p, lat::Socket *s) override
static const uint32_t DQM_PROP_MARKTODELETE
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
static const uint32_t DQM_PROP_ACCUMULATE
Peer * getPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_HAS_REFERENCE
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
void markObjectsDead(Peer *p) override
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
Peer * createPeer(lat::Socket *s) override
static const uint32_t MAX_PEER_WAITREQS
void unlock(void)
Release the lock on the DQM net layer.
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
std::vector< uint32_t > TagList
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
DQMNet & operator=(const DQMNet &)=delete
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::set< std::string > DirMap
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
static const uint32_t DQM_PROP_TYPE_INVALID
~DQMImplNet(void) override
static const uint32_t DQM_PROP_TYPE_TPROF2D
void purgeDeadObjects(Peer *p) override
static const uint32_t DQM_PROP_TYPE_TH2F