1 #ifndef DQMSERVICES_CORE_DQM_NET_H 2 #define DQMSERVICES_CORE_DQM_NET_H 4 #include "classlib/iobase/Socket.h" 5 #include "classlib/iobase/IOSelector.h" 6 #include "classlib/iobase/Pipe.h" 7 #include "classlib/utils/Signal.h" 8 #include "classlib/utils/Error.h" 9 #include "classlib/utils/Time.h" 19 #include <ext/hash_set> 151 void debug(
bool doit);
168 if (
a.run ==
b.run) {
169 if (
a.lumi ==
b.lumi) {
170 if (
a.streamId ==
b.streamId) {
171 if (
a.moduleId ==
b.moduleId) {
172 if (
a.dirname ==
b.dirname) {
173 return a.objname <
b.objname;
175 return a.dirname <
b.dirname;
177 return a.moduleId <
b.moduleId;
179 return a.streamId <
b.streamId;
181 return a.lumi <
b.lumi;
183 return a.run <
b.run;
192 return a.hash ==
b.hash &&
a.dirname ==
b.dirname &&
a.objname ==
b.objname;
199 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k)))) 200 #define dqmhashmix(a, b, c) \ 203 a ^= dqmhashrot(c, 4); \ 206 b ^= dqmhashrot(a, 6); \ 209 c ^= dqmhashrot(b, 8); \ 212 a ^= dqmhashrot(c, 16); \ 215 b ^= dqmhashrot(a, 19); \ 218 c ^= dqmhashrot(b, 4); \ 221 #define dqmhashfinal(a, b, c) \ 224 c -= dqmhashrot(b, 14); \ 226 a -= dqmhashrot(c, 11); \ 228 b -= dqmhashrot(a, 25); \ 230 c -= dqmhashrot(b, 16); \ 232 a -= dqmhashrot(c, 4); \ 234 b -= dqmhashrot(a, 14); \ 236 c -= dqmhashrot(b, 24); \ 240 a =
b =
c = 0xdeadbeef + (uint32_t)keylen;
241 const auto *
k = (
const unsigned char *)
key;
244 while (keylen > 12) {
246 a += ((uint32_t)
k[1]) << 8;
247 a += ((uint32_t)
k[2]) << 16;
248 a += ((uint32_t)
k[3]) << 24;
250 b += ((uint32_t)
k[5]) << 8;
251 b += ((uint32_t)
k[6]) << 16;
252 b += ((uint32_t)
k[7]) << 24;
254 c += ((uint32_t)
k[9]) << 8;
255 c += ((uint32_t)
k[10]) << 16;
256 c += ((uint32_t)
k[11]) << 24;
265 c += ((uint32_t)
k[11]) << 24;
268 c += ((uint32_t)
k[10]) << 16;
271 c += ((uint32_t)
k[9]) << 8;
277 b += ((uint32_t)
k[7]) << 24;
280 b += ((uint32_t)
k[6]) << 16;
283 b += ((uint32_t)
k[5]) << 8;
289 a += ((uint32_t)
k[3]) << 24;
292 a += ((uint32_t)
k[2]) << 16;
295 a += ((uint32_t)
k[1]) << 8;
315 std::ostream &
logme();
316 static void copydata(Bucket *
b,
const void *
data,
size_t len);
331 virtual Peer *
getPeer(lat::Socket *
s) = 0;
380 template <
class ObjType>
386 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
387 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
401 size_t dirpos = (
slash == std::string::npos ? 0 :
slash);
402 size_t namepos = (
slash == std::string::npos ? 0 :
slash + 1);
406 proto.dirname =
path;
407 proto.objname.append(
name, namepos, std::string::npos);
409 typename ObjectMap::iterator
pos;
410 typename PeerMap::iterator
i,
e;
414 auto *ip =
static_cast<ImplPeer *
>(
p);
415 pos = ip->objs.find(proto);
416 if (
pos == ip->objs.end())
421 return const_cast<ObjType *
>(&*
pos);
425 pos =
i->second.objs.find(proto);
426 if (
pos !=
i->second.objs.end()) {
429 return const_cast<ObjType *
>(&*
pos);
437 auto *ip =
static_cast<ImplPeer *
>(
p);
439 size_t dirpos = (
slash == std::string::npos ? 0 :
slash);
440 size_t namepos = (
slash == std::string::npos ? 0 :
slash + 1);
446 o.dirname = *ip->dirs.insert(
name.substr(0, dirpos)).
first;
447 o.objname.append(
name, namepos, std::string::npos);
449 return const_cast<ObjType *
>(&*ip->objs.insert(
o).first);
460 uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
461 auto *ip =
static_cast<ImplPeer *
>(
p);
462 typename ObjectMap::iterator
i,
e;
463 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e; ++
i) {
464 if (
i->lastreq &&
i->lastreq < minreq)
465 const_cast<ObjType &
>(*i).lastreq = 0;
472 auto *ip =
static_cast<ImplPeer *
>(
p);
473 typename ObjectMap::iterator
i,
e;
474 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e;) {
485 return pos ==
end ? nullptr : &
pos->second;
490 ip->socket =
nullptr;
499 ip->automatic =
nullptr;
504 auto *ip =
static_cast<ImplPeer *
>(
p);
505 bool needflush = !ip->objs.empty();
507 typename ObjectMap::iterator
i,
e;
508 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e;)
521 typename PeerMap::iterator
pi, pe;
522 typename ObjectMap::iterator oi, oe;
526 for (oi =
pi->second.objs.begin(), oe =
pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
528 size += 9 *
sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
529 oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
531 msg->data.reserve(
msg->data.size() +
size + 8 *
sizeof(uint32_t));
533 uint32_t nupdates = 0;
535 words[0] =
sizeof(words);
542 for (oi =
pi->second.objs.begin(), oe =
pi->second.objs.end(); oi != oe; ++oi)
556 typename PeerMap::iterator
i,
e;
557 typename ObjectMap::iterator oi, oe;
559 ImplPeer &
p =
i->second;
564 logme() <<
"DEBUG: notifying " <<
p.peeraddr << std::endl;
570 if (!
msg.data.empty()) {
573 prev = &(*prev)->
next;
576 (*prev)->
next =
nullptr;
584 typename PeerMap::iterator
i,
e;
605 #endif // DQMSERVICES_CORE_DQM_NET_H
edm::ErrorSummaryEntry Error
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
std::list< WaitObject > WaitList
static const uint32_t DQM_PROP_TYPE_TH1S
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
void sendObjectListToPeers(bool all) override
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void updatePeerMasks()=0
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
~DQMImplNet() override=default
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
static const uint32_t DQM_PROP_TYPE_TH1F
void removePeer(Peer *p, lat::Socket *s) override
static const uint32_t DQM_PROP_MARKTODELETE
void shutdown()
Stop the network layer and wait it to finish.
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
static const uint32_t DQM_PROP_ACCUMULATE
Peer * getPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_HAS_REFERENCE
std::set< std::string > DirMap
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
virtual bool shouldStop()
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
void markObjectsDead(Peer *p) override
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
Peer * createPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_TYPE_TH1I
static const uint32_t MAX_PEER_WAITREQS
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
static const uint32_t DQM_PROP_STALE
static const uint32_t DQM_REPLY_LIST_BEGIN
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
void unlock()
Release the lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
std::vector< uint32_t > TagList
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
uint32_t operator()(const Object &a) const
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
static const uint32_t DQM_PROP_TYPE_TH2I
std::vector< unsigned char > DataBlob
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
static const uint32_t DQM_PROP_TYPE_TPROF2D
void purgeDeadObjects(Peer *p) override
static const uint32_t DQM_PROP_TYPE_TH2F