1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 #define DQMSERVICES_CORE_DQM_NET_H
4 #include "classlib/iobase/Socket.h"
5 #include "classlib/iobase/IOSelector.h"
6 #include "classlib/iobase/Pipe.h"
7 #include "classlib/utils/Signal.h"
8 #include "classlib/utils/Error.h"
9 #include "classlib/utils/Time.h"
19 #include <ext/hash_set>
149 void debug(
bool doit);
197 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
198 #define dqmhashmix(a, b, c) \
201 a ^= dqmhashrot(c, 4); \
204 b ^= dqmhashrot(a, 6); \
207 c ^= dqmhashrot(b, 8); \
210 a ^= dqmhashrot(c, 16); \
213 b ^= dqmhashrot(a, 19); \
216 c ^= dqmhashrot(b, 4); \
219 #define dqmhashfinal(a, b, c) \
222 c -= dqmhashrot(b, 14); \
224 a -= dqmhashrot(c, 11); \
226 b -= dqmhashrot(a, 25); \
228 c -= dqmhashrot(b, 16); \
230 a -= dqmhashrot(c, 4); \
232 b -= dqmhashrot(a, 14); \
234 c -= dqmhashrot(b, 24); \
238 a = b = c = 0xdeadbeef + (uint32_t)keylen;
239 const auto *
k = (
const unsigned char *)key;
242 while (keylen > 12) {
244 a += ((uint32_t)
k[1]) << 8;
245 a += ((uint32_t)
k[2]) << 16;
246 a += ((uint32_t)
k[3]) << 24;
248 b += ((uint32_t)
k[5]) << 8;
249 b += ((uint32_t)
k[6]) << 16;
250 b += ((uint32_t)
k[7]) << 24;
252 c += ((uint32_t)
k[9]) << 8;
253 c += ((uint32_t)
k[10]) << 16;
254 c += ((uint32_t)
k[11]) << 24;
263 c += ((uint32_t)
k[11]) << 24;
266 c += ((uint32_t)
k[10]) << 16;
269 c += ((uint32_t)
k[9]) << 8;
275 b += ((uint32_t)
k[7]) << 24;
278 b += ((uint32_t)
k[6]) << 16;
281 b += ((uint32_t)
k[5]) << 8;
287 a += ((uint32_t)
k[3]) << 24;
290 a += ((uint32_t)
k[2]) << 16;
293 a += ((uint32_t)
k[1]) << 8;
313 std::ostream &
logme();
314 static void copydata(Bucket *
b,
const void *
data,
size_t len);
329 virtual Peer *
getPeer(lat::Socket *
s) = 0;
378 template <
class ObjType>
384 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
385 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
398 size_t slash = name.rfind(
'/');
399 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
400 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
403 proto.hash =
dqmhash(name.c_str(), name.size());
404 proto.dirname =
path;
405 proto.objname.append(name, namepos, std::string::npos);
407 typename ObjectMap::iterator pos;
408 typename PeerMap::iterator
i,
e;
412 auto *ip =
static_cast<ImplPeer *
>(
p);
413 pos = ip->objs.find(proto);
414 if (pos == ip->objs.end())
419 return const_cast<ObjType *
>(&*pos);
423 pos = i->second.objs.find(proto);
424 if (pos != i->second.objs.end()) {
427 return const_cast<ObjType *
>(&*pos);
435 auto *ip =
static_cast<ImplPeer *
>(
p);
436 size_t slash = name.rfind(
'/');
437 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
438 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
444 o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).
first;
445 o.objname.append(name, namepos, std::string::npos);
446 o.hash =
dqmhash(name.c_str(), name.size());
447 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
458 uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
459 auto *ip =
static_cast<ImplPeer *
>(
p);
460 typename ObjectMap::iterator
i,
e;
461 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i) {
462 if (i->lastreq && i->lastreq < minreq)
463 const_cast<ObjType &
>(*i).lastreq = 0;
470 auto *ip =
static_cast<ImplPeer *
>(
p);
471 typename ObjectMap::iterator
i,
e;
472 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e;) {
481 auto pos =
peers_.find(s);
483 return pos ==
end ?
nullptr : &pos->second;
488 ip->socket =
nullptr;
497 ip->automatic =
nullptr;
502 auto *ip =
static_cast<ImplPeer *
>(
p);
503 bool needflush = !ip->objs.empty();
505 typename ObjectMap::iterator
i,
e;
506 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e;)
519 typename PeerMap::iterator
pi, pe;
520 typename ObjectMap::iterator oi, oe;
524 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
526 size += 9 *
sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
527 oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
529 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
531 uint32_t nupdates = 0;
533 words[0] =
sizeof(words);
537 copydata(msg, &words[0],
sizeof(words));
540 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
550 copydata(msg, &words[0],
sizeof(words));
554 typename PeerMap::iterator
i,
e;
555 typename ObjectMap::iterator oi, oe;
557 ImplPeer &
p = i->second;
562 logme() <<
"DEBUG: notifying " << p.peeraddr << std::endl;
568 if (!msg.
data.empty()) {
571 prev = &(*prev)->
next;
574 (*prev)->
next =
nullptr;
582 typename PeerMap::iterator
i,
e;
603 #endif // DQMSERVICES_CORE_DQM_NET_H
edm::ErrorSummaryEntry Error
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
std::list< WaitObject > WaitList
static const uint32_t DQM_PROP_TYPE_TH1S
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
const edm::EventSetup & c
bool operator()(const Object &a, const Object &b) const
void sendObjectListToPeers(bool all) override
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void updatePeerMasks()=0
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
~DQMImplNet() override=default
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
static const uint32_t DQM_PROP_TYPE_TH1F
void removePeer(Peer *p, lat::Socket *s) override
static const uint32_t DQM_PROP_MARKTODELETE
void shutdown()
Stop the network layer and wait it to finish.
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
static const uint32_t DQM_PROP_ACCUMULATE
Peer * getPeer(lat::Socket *s) override
static const uint32_t DQM_PROP_HAS_REFERENCE
std::set< std::string > DirMap
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
virtual bool shouldStop()
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
tuple key
prepare the HTCondor submission files and eventually submit them
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
void markObjectsDead(Peer *p) override
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
Peer * createPeer(lat::Socket *s) override
static const uint32_t MAX_PEER_WAITREQS
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
DQMNet & operator=(const DQMNet &)=delete
void updatePeerMasks() override
static const uint32_t DQM_PROP_STALE
static const uint32_t DQM_REPLY_LIST_BEGIN
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
void unlock()
Release the lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
Object * makeObject(Peer *p, const std::string &name) override
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
std::vector< uint32_t > TagList
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::vector< unsigned char > DataBlob
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
static const uint32_t DQM_PROP_TYPE_TPROF2D
void purgeDeadObjects(Peer *p) override
static const uint32_t DQM_PROP_TYPE_TH2F