1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 #define DQMSERVICES_CORE_DQM_NET_H
4 #include "classlib/iobase/Socket.h"
5 #include "classlib/iobase/IOSelector.h"
6 #include "classlib/iobase/Pipe.h"
7 #include "classlib/utils/Signal.h"
8 #include "classlib/utils/Error.h"
9 #include "classlib/utils/Time.h"
19 #include <ext/hash_set>
151 void debug(
bool doit);
199 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
200 #define dqmhashmix(a, b, c) \
203 a ^= dqmhashrot(c, 4); \
206 b ^= dqmhashrot(a, 6); \
209 c ^= dqmhashrot(b, 8); \
212 a ^= dqmhashrot(c, 16); \
215 b ^= dqmhashrot(a, 19); \
218 c ^= dqmhashrot(b, 4); \
221 #define dqmhashfinal(a, b, c) \
224 c -= dqmhashrot(b, 14); \
226 a -= dqmhashrot(c, 11); \
228 b -= dqmhashrot(a, 25); \
230 c -= dqmhashrot(b, 16); \
232 a -= dqmhashrot(c, 4); \
234 b -= dqmhashrot(a, 14); \
236 c -= dqmhashrot(b, 24); \
240 a = b = c = 0xdeadbeef + (uint32_t)keylen;
241 const auto *
k = (
const unsigned char *)key;
244 while (keylen > 12) {
246 a += ((uint32_t)
k[1]) << 8;
247 a += ((uint32_t)
k[2]) << 16;
248 a += ((uint32_t)
k[3]) << 24;
250 b += ((uint32_t)
k[5]) << 8;
251 b += ((uint32_t)
k[6]) << 16;
252 b += ((uint32_t)
k[7]) << 24;
254 c += ((uint32_t)
k[9]) << 8;
255 c += ((uint32_t)
k[10]) << 16;
256 c += ((uint32_t)
k[11]) << 24;
265 c += ((uint32_t)
k[11]) << 24;
268 c += ((uint32_t)
k[10]) << 16;
271 c += ((uint32_t)
k[9]) << 8;
277 b += ((uint32_t)
k[7]) << 24;
280 b += ((uint32_t)
k[6]) << 16;
283 b += ((uint32_t)
k[5]) << 8;
289 a += ((uint32_t)
k[3]) << 24;
292 a += ((uint32_t)
k[2]) << 16;
295 a += ((uint32_t)
k[1]) << 8;
315 std::ostream &
logme();
316 static void copydata(Bucket *
b,
const void *
data,
size_t len);
331 virtual Peer *
getPeer(lat::Socket *
s) = 0;
380 template <
class ObjType>
386 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
387 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
400 size_t slash = name.rfind(
'/');
401 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
402 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
405 proto.hash =
dqmhash(name.c_str(), name.size());
406 proto.dirname =
path;
407 proto.objname.append(name, namepos, std::string::npos);
409 typename ObjectMap::iterator pos;
410 typename PeerMap::iterator
i,
e;
414 auto *ip =
static_cast<ImplPeer *
>(
p);
415 pos = ip->objs.find(proto);
416 if (pos == ip->objs.end())
421 return const_cast<ObjType *
>(&*pos);
425 pos = i->second.objs.find(proto);
426 if (pos != i->second.objs.end()) {
429 return const_cast<ObjType *
>(&*pos);
437 auto *ip =
static_cast<ImplPeer *
>(
p);
438 size_t slash = name.rfind(
'/');
439 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
440 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
446 o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).
first;
447 o.objname.append(name, namepos, std::string::npos);
448 o.hash =
dqmhash(name.c_str(), name.size());
449 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
460 uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
461 auto *ip =
static_cast<ImplPeer *
>(
p);
462 typename ObjectMap::iterator
i,
e;
463 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i) {
464 if (i->lastreq && i->lastreq < minreq)
465 const_cast<ObjType &
>(*i).lastreq = 0;
472 auto *ip =
static_cast<ImplPeer *
>(
p);
473 typename ObjectMap::iterator
i,
e;
474 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e;) {
483 auto pos =
peers_.find(s);
485 return pos ==
end ?
nullptr : &pos->second;
490 ip->socket =
nullptr;
499 ip->automatic =
nullptr;
504 auto *ip =
static_cast<ImplPeer *
>(
p);
505 bool needflush = !ip->objs.empty();
507 typename ObjectMap::iterator
i,
e;
508 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e;)
521 typename PeerMap::iterator
pi, pe;
522 typename ObjectMap::iterator oi, oe;
526 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
528 size += 9 *
sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
529 oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
531 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
533 uint32_t nupdates = 0;
535 words[0] =
sizeof(words);
539 copydata(msg, &words[0],
sizeof(words));
542 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
552 copydata(msg, &words[0],
sizeof(words));
556 typename PeerMap::iterator
i,
e;
557 typename ObjectMap::iterator oi, oe;
559 ImplPeer &
p = i->second;
564 logme() <<
"DEBUG: notifying " << p.peeraddr << std::endl;
570 if (!msg.
data.empty()) {
573 prev = &(*prev)->
next;
576 (*prev)->
next =
nullptr;
584 typename PeerMap::iterator
i,
e;
605 #endif // DQMSERVICES_CORE_DQM_NET_H
edm::ErrorSummaryEntry Error
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
std::list< WaitObject > WaitList
static const uint32_t DQM_PROP_TYPE_TH1S
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
const edm::EventSetup & c
bool operator()(const Object &a, const Object &b) const
void sendObjectListToPeers(bool all) override
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void updatePeerMasks()=0
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
~DQMImplNet() override=default
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
static const uint32_t DQM_PROP_TYPE_TH1F
void removePeer(Peer *p, lat::Socket *s) override
static const uint32_t DQM_PROP_MARKTODELETE
void shutdown()
Stop the network layer and wait it to finish.
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
static const uint32_t DQM_PROP_ACCUMULATE
Peer * getPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_HAS_REFERENCE
std::set< std::string > DirMap
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
virtual bool shouldStop()
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
tuple key
prepare the HTCondor submission files and eventually submit them
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
void markObjectsDead(Peer *p) override
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
Peer * createPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_TYPE_TH1I
static const uint32_t MAX_PEER_WAITREQS
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
static const uint32_t DQM_PROP_STALE
static const uint32_t DQM_REPLY_LIST_BEGIN
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
void unlock()
Release the lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
std::vector< uint32_t > TagList
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
static const uint32_t DQM_PROP_TYPE_TH2I
std::vector< unsigned char > DataBlob
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
static const uint32_t DQM_PROP_TYPE_TPROF2D
void purgeDeadObjects(Peer *p) override
static const uint32_t DQM_PROP_TYPE_TH2F