1 #ifndef DQMSERVICES_CORE_DQM_NET_H 2 # define DQMSERVICES_CORE_DQM_NET_H 4 # include "classlib/iobase/Socket.h" 5 # include "classlib/iobase/IOSelector.h" 6 # include "classlib/iobase/Pipe.h" 7 # include "classlib/utils/Signal.h" 8 # include "classlib/utils/Error.h" 9 # include "classlib/utils/Time.h" 19 # include <ext/hash_set> 51 | DQM_PROP_REPORT_WARN
164 void debug(
bool doit);
221 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k)))) 222 # define dqmhashmix(a,b,c) { \ 223 a -= c; a ^= dqmhashrot(c, 4); c += b; \ 224 b -= a; b ^= dqmhashrot(a, 6); a += c; \ 225 c -= b; c ^= dqmhashrot(b, 8); b += a; \ 226 a -= c; a ^= dqmhashrot(c,16); c += b; \ 227 b -= a; b ^= dqmhashrot(a,19); a += c; \ 228 c -= b; c ^= dqmhashrot(b, 4); b += a; } 229 # define dqmhashfinal(a,b,c) { \ 230 c ^= b; c -= dqmhashrot(b,14); \ 231 a ^= c; a -= dqmhashrot(c,11); \ 232 b ^= a; b -= dqmhashrot(a,25); \ 233 c ^= b; c -= dqmhashrot(b,16); \ 234 a ^= c; a -= dqmhashrot(c,4); \ 235 b ^= a; b -= dqmhashrot(a,14); \ 236 c ^= b; c -= dqmhashrot(b,24); } 239 a = b = c = 0xdeadbeef + (uint32_t) keylen;
240 const auto *
k = (
const unsigned char *) key;
246 a += ((uint32_t)
k[1]) << 8;
247 a += ((uint32_t)
k[2]) << 16;
248 a += ((uint32_t)
k[3]) << 24;
250 b += ((uint32_t)
k[5]) << 8;
251 b += ((uint32_t)
k[6]) << 16;
252 b += ((uint32_t)
k[7]) << 24;
254 c += ((uint32_t)
k[9]) << 8;
255 c += ((uint32_t)
k[10]) << 16;
256 c += ((uint32_t)
k[11]) << 24;
265 case 12: c += ((uint32_t)
k[11]) << 24;
266 case 11: c += ((uint32_t)
k[10]) << 16;
267 case 10: c += ((uint32_t)
k[9]) << 8;
269 case 8 : b += ((uint32_t)
k[7]) << 24;
270 case 7 : b += ((uint32_t)
k[6]) << 16;
271 case 6 : b += ((uint32_t)
k[5]) << 8;
273 case 4 : a += ((uint32_t)
k[3]) << 24;
274 case 3 : a += ((uint32_t)
k[2]) << 16;
275 case 2 : a += ((uint32_t)
k[1]) << 8;
292 std::ostream &
logme();
324 lat::IOSelectEvent *
event,
325 lat::Error *err =
nullptr);
360 template <
class ObjType>
367 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
368 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
387 size_t slash = name.rfind(
'/');
388 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
389 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
392 proto.hash =
dqmhash(name.c_str(), name.size());
393 proto.dirname = &
path;
394 proto.objname.append(name, namepos, std::string::npos);
396 typename ObjectMap::iterator
pos;
397 typename PeerMap::iterator
i,
e;
402 auto *ip =
static_cast<ImplPeer *
>(
p);
403 pos = ip->objs.find(proto);
404 if (pos == ip->objs.end())
408 if (owner) *owner = ip;
409 return const_cast<ObjType *
>(&*pos);
414 for (i = peers_.begin(), e = peers_.end(); i !=
e; ++
i)
416 pos = i->second.objs.find(proto);
417 if (pos != i->second.objs.end())
419 if (owner) *owner = &i->second;
420 return const_cast<ObjType *
>(&*pos);
430 auto *ip =
static_cast<ImplPeer *
>(
p);
431 size_t slash = name.rfind(
'/');
432 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
433 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
439 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).
first;
440 o.objname.append(name, namepos, std::string::npos);
441 o.hash =
dqmhash(name.c_str(), name.size());
442 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
456 = (lat::Time::current()
457 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
458 auto *ip =
static_cast<ImplPeer *
>(
p);
459 typename ObjectMap::iterator
i,
e;
460 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i)
462 if (i->lastreq && i->lastreq < minreq)
463 const_cast<ObjType &
>(*i).lastreq = 0;
472 auto *ip =
static_cast<ImplPeer *
>(
p);
473 typename ObjectMap::iterator
i,
e;
474 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
486 auto pos = peers_.find(s);
487 auto end = peers_.end();
488 return pos ==
end ?
nullptr : &
pos->second;
494 ImplPeer *ip = &peers_[
s];
504 ip->automatic =
nullptr;
511 auto *ip =
static_cast<ImplPeer *
>(
p);
512 bool needflush = ! ip->objs.empty();
514 typename ObjectMap::iterator
i,
e;
515 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
530 typename PeerMap::iterator
pi, pe;
531 typename ObjectMap::iterator oi, oe;
534 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++
pi)
535 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
537 size += 9*
sizeof(uint32_t) + oi->dirname->size()
538 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
539 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
541 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
543 uint32_t nupdates = 0;
545 words[0] =
sizeof(words);
549 copydata(msg, &words[0],
sizeof(words));
551 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++
pi)
552 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
563 copydata(msg, &words[0],
sizeof(words));
569 typename PeerMap::iterator
i,
e;
570 typename ObjectMap::iterator oi, oe;
571 for (i = peers_.begin(), e = peers_.end(); i !=
e; ++
i)
573 ImplPeer &
p = i->second;
579 <<
"DEBUG: notifying " << p.peeraddr << std::endl;
585 if (! msg.
data.empty())
589 prev = &(*prev)->
next;
592 (*prev)->
next =
nullptr;
602 typename PeerMap::iterator
i,
e;
603 for (i = peers_.begin(), e = peers_.end(); i !=
e; )
617 void reserveLocalSpace(uint32_t
size);
618 void updateLocalObject(
Object &
o);
619 bool removeLocalExcept(
const std::set<std::string> &known);
626 #endif // DQMSERVICES_CORE_DQM_NET_H
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
std::list< WaitObject > WaitList
static const uint32_t DQM_PROP_TYPE_TH1S
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
bool operator()(const Object &a, const Object &b) const
void sendObjectListToPeers(bool all) override
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void updatePeerMasks()=0
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
std::vector< Variable::Flags > flags
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
const std::string * dirname
static const uint32_t DQM_PROP_TYPE_TH1F
void removePeer(Peer *p, lat::Socket *s) override
static const uint32_t DQM_PROP_MARKTODELETE
void shutdown()
Stop the network layer and wait it to finish.
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
static const uint32_t DQM_PROP_ACCUMULATE
Peer * getPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_HAS_REFERENCE
std::set< std::string > DirMap
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
virtual bool shouldStop()
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
void markObjectsDead(Peer *p) override
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
Peer * createPeer(lat::Socket *s) override
static const uint32_t MAX_PEER_WAITREQS
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
static const uint32_t DQM_PROP_STALE
static const uint32_t DQM_REPLY_LIST_BEGIN
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
void unlock()
Release the lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
std::vector< uint32_t > TagList
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::vector< unsigned char > DataBlob
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
static const uint32_t DQM_PROP_TYPE_INVALID
static const uint32_t DQM_PROP_TYPE_TPROF2D
void purgeDeadObjects(Peer *p) override
static const uint32_t DQM_PROP_TYPE_TH2F