1 #ifndef DQMSERVICES_CORE_DQM_NET_H 2 #define DQMSERVICES_CORE_DQM_NET_H 4 #include "classlib/iobase/Socket.h" 5 #include "classlib/iobase/IOSelector.h" 6 #include "classlib/iobase/Pipe.h" 7 #include "classlib/utils/Error.h" 8 #include "classlib/utils/Time.h" 18 #include <unordered_set> 150 void debug(
bool doit);
167 if (
a.run ==
b.run) {
168 if (
a.lumi ==
b.lumi) {
169 if (
a.streamId ==
b.streamId) {
170 if (
a.moduleId ==
b.moduleId) {
171 if (
a.dirname ==
b.dirname) {
172 return a.objname <
b.objname;
174 return a.dirname <
b.dirname;
176 return a.moduleId <
b.moduleId;
178 return a.streamId <
b.streamId;
180 return a.lumi <
b.lumi;
182 return a.run <
b.run;
191 return a.hash ==
b.hash &&
a.dirname ==
b.dirname &&
a.objname ==
b.objname;
198 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k)))) 199 #define dqmhashmix(a, b, c) \ 202 a ^= dqmhashrot(c, 4); \ 205 b ^= dqmhashrot(a, 6); \ 208 c ^= dqmhashrot(b, 8); \ 211 a ^= dqmhashrot(c, 16); \ 214 b ^= dqmhashrot(a, 19); \ 217 c ^= dqmhashrot(b, 4); \ 220 #define dqmhashfinal(a, b, c) \ 223 c -= dqmhashrot(b, 14); \ 225 a -= dqmhashrot(c, 11); \ 227 b -= dqmhashrot(a, 25); \ 229 c -= dqmhashrot(b, 16); \ 231 a -= dqmhashrot(c, 4); \ 233 b -= dqmhashrot(a, 14); \ 235 c -= dqmhashrot(b, 24); \ 239 a =
b =
c = 0xdeadbeef + (uint32_t)keylen;
240 const auto *
k = (
const unsigned char *)
key;
243 while (keylen > 12) {
245 a += ((uint32_t)
k[1]) << 8;
246 a += ((uint32_t)
k[2]) << 16;
247 a += ((uint32_t)
k[3]) << 24;
249 b += ((uint32_t)
k[5]) << 8;
250 b += ((uint32_t)
k[6]) << 16;
251 b += ((uint32_t)
k[7]) << 24;
253 c += ((uint32_t)
k[9]) << 8;
254 c += ((uint32_t)
k[10]) << 16;
255 c += ((uint32_t)
k[11]) << 24;
264 c += ((uint32_t)
k[11]) << 24;
267 c += ((uint32_t)
k[10]) << 16;
270 c += ((uint32_t)
k[9]) << 8;
276 b += ((uint32_t)
k[7]) << 24;
279 b += ((uint32_t)
k[6]) << 16;
282 b += ((uint32_t)
k[5]) << 8;
288 a += ((uint32_t)
k[3]) << 24;
291 a += ((uint32_t)
k[2]) << 16;
294 a += ((uint32_t)
k[1]) << 8;
314 std::ostream &
logme();
315 static void copydata(Bucket *
b,
const void *
data,
size_t len);
330 virtual Peer *
getPeer(lat::Socket *
s) = 0;
379 template <
class ObjType>
385 typedef std::unordered_set<ObjType, HashOp, HashEqual>
ObjectMap;
386 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
400 size_t dirpos = (
slash == std::string::npos ? 0 :
slash);
401 size_t namepos = (
slash == std::string::npos ? 0 :
slash + 1);
405 proto.dirname =
path;
406 proto.objname.append(
name, namepos, std::string::npos);
408 typename ObjectMap::iterator
pos;
409 typename PeerMap::iterator
i,
e;
413 auto *ip =
static_cast<ImplPeer *
>(
p);
414 pos = ip->objs.find(proto);
415 if (
pos == ip->objs.end())
420 return const_cast<ObjType *
>(&*
pos);
424 pos =
i->second.objs.find(proto);
425 if (
pos !=
i->second.objs.end()) {
428 return const_cast<ObjType *
>(&*
pos);
436 auto *ip =
static_cast<ImplPeer *
>(
p);
438 size_t dirpos = (
slash == std::string::npos ? 0 :
slash);
439 size_t namepos = (
slash == std::string::npos ? 0 :
slash + 1);
445 o.dirname = *ip->dirs.insert(
name.substr(0, dirpos)).
first;
446 o.objname.append(
name, namepos, std::string::npos);
448 return const_cast<ObjType *
>(&*ip->objs.insert(
o).first);
459 uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
460 auto *ip =
static_cast<ImplPeer *
>(
p);
461 typename ObjectMap::iterator
i,
e;
462 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e; ++
i) {
463 if (
i->lastreq &&
i->lastreq < minreq)
464 const_cast<ObjType &
>(*i).lastreq = 0;
471 auto *ip =
static_cast<ImplPeer *
>(
p);
472 typename ObjectMap::iterator
i,
e;
473 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e;) {
484 return pos ==
end ? nullptr : &
pos->second;
489 ip->socket =
nullptr;
498 ip->automatic =
nullptr;
503 auto *ip =
static_cast<ImplPeer *
>(
p);
504 bool needflush = !ip->objs.empty();
506 typename ObjectMap::iterator
i,
e;
507 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e;)
520 typename PeerMap::iterator
pi, pe;
521 typename ObjectMap::iterator oi, oe;
525 for (oi =
pi->second.objs.begin(), oe =
pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
527 size += 9 *
sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
528 oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
530 msg->data.reserve(
msg->data.size() +
size + 8 *
sizeof(uint32_t));
532 uint32_t nupdates = 0;
534 words[0] =
sizeof(words);
541 for (oi =
pi->second.objs.begin(), oe =
pi->second.objs.end(); oi != oe; ++oi)
555 typename PeerMap::iterator
i,
e;
556 typename ObjectMap::iterator oi, oe;
558 ImplPeer &
p =
i->second;
563 logme() <<
"DEBUG: notifying " <<
p.peeraddr << std::endl;
569 if (!
msg.data.empty()) {
572 prev = &(*prev)->
next;
575 (*prev)->
next =
nullptr;
583 typename PeerMap::iterator
i,
e;
604 #endif // DQMSERVICES_CORE_DQM_NET_H
edm::ErrorSummaryEntry Error
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
std::list< WaitObject > WaitList
static const uint32_t DQM_PROP_TYPE_TH1S
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
void sendObjectListToPeers(bool all) override
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
std::unordered_set< ObjType, HashOp, HashEqual > ObjectMap
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void updatePeerMasks()=0
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
~DQMImplNet() override=default
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
static const uint32_t DQM_PROP_TYPE_TH1F
void removePeer(Peer *p, lat::Socket *s) override
static const uint32_t DQM_PROP_MARKTODELETE
void shutdown()
Stop the network layer and wait it to finish.
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
static const uint32_t DQM_PROP_ACCUMULATE
Peer * getPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_HAS_REFERENCE
std::set< std::string > DirMap
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
virtual bool shouldStop()
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
key
prepare the HTCondor submission files and eventually submit them
void markObjectsDead(Peer *p) override
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
Peer * createPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_TYPE_TH1I
static const uint32_t MAX_PEER_WAITREQS
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
static const uint32_t DQM_PROP_STALE
static const uint32_t DQM_REPLY_LIST_BEGIN
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
void unlock()
Release the lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
std::vector< uint32_t > TagList
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
static void copydata(Bucket *b, const void *data, size_t len)
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
uint32_t operator()(const Object &a) const
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
static const uint32_t DQM_PROP_TYPE_TH2I
std::vector< unsigned char > DataBlob
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
static const uint32_t DQM_PROP_TYPE_TPROF2D
void purgeDeadObjects(Peer *p) override
static const uint32_t DQM_PROP_TYPE_TH2F