|
|
Go to the documentation of this file. 1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 #define DQMSERVICES_CORE_DQM_NET_H
4 #include "classlib/iobase/Socket.h"
5 #include "classlib/iobase/IOSelector.h"
6 #include "classlib/iobase/Pipe.h"
7 #include "classlib/utils/Signal.h"
8 #include "classlib/utils/Error.h"
9 #include "classlib/utils/Time.h"
19 #include <ext/hash_set>
149 void debug(
bool doit);
166 if (
a.run ==
b.run) {
167 if (
a.lumi ==
b.lumi) {
168 if (
a.streamId ==
b.streamId) {
169 if (
a.moduleId ==
b.moduleId) {
170 if (
a.dirname ==
b.dirname) {
171 return a.objname <
b.objname;
173 return a.dirname <
b.dirname;
175 return a.moduleId <
b.moduleId;
177 return a.streamId <
b.streamId;
179 return a.lumi <
b.lumi;
181 return a.run <
b.run;
190 return a.hash ==
b.hash &&
a.dirname ==
b.dirname &&
a.objname ==
b.objname;
197 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
198 #define dqmhashmix(a, b, c) \
201 a ^= dqmhashrot(c, 4); \
204 b ^= dqmhashrot(a, 6); \
207 c ^= dqmhashrot(b, 8); \
210 a ^= dqmhashrot(c, 16); \
213 b ^= dqmhashrot(a, 19); \
216 c ^= dqmhashrot(b, 4); \
219 #define dqmhashfinal(a, b, c) \
222 c -= dqmhashrot(b, 14); \
224 a -= dqmhashrot(c, 11); \
226 b -= dqmhashrot(a, 25); \
228 c -= dqmhashrot(b, 16); \
230 a -= dqmhashrot(c, 4); \
232 b -= dqmhashrot(a, 14); \
234 c -= dqmhashrot(b, 24); \
238 a =
b =
c = 0xdeadbeef + (uint32_t)keylen;
239 const auto *
k = (
const unsigned char *)
key;
242 while (keylen > 12) {
244 a += ((uint32_t)
k[1]) << 8;
245 a += ((uint32_t)
k[2]) << 16;
246 a += ((uint32_t)
k[3]) << 24;
248 b += ((uint32_t)
k[5]) << 8;
249 b += ((uint32_t)
k[6]) << 16;
250 b += ((uint32_t)
k[7]) << 24;
252 c += ((uint32_t)
k[9]) << 8;
253 c += ((uint32_t)
k[10]) << 16;
254 c += ((uint32_t)
k[11]) << 24;
263 c += ((uint32_t)
k[11]) << 24;
266 c += ((uint32_t)
k[10]) << 16;
269 c += ((uint32_t)
k[9]) << 8;
275 b += ((uint32_t)
k[7]) << 24;
278 b += ((uint32_t)
k[6]) << 16;
281 b += ((uint32_t)
k[5]) << 8;
287 a += ((uint32_t)
k[3]) << 24;
290 a += ((uint32_t)
k[2]) << 16;
293 a += ((uint32_t)
k[1]) << 8;
313 std::ostream &
logme();
314 static void copydata(Bucket *
b,
const void *
data,
size_t len);
329 virtual Peer *
getPeer(lat::Socket *
s) = 0;
378 template <
class ObjType>
384 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
385 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
399 size_t dirpos = (
slash == std::string::npos ? 0 :
slash);
400 size_t namepos = (
slash == std::string::npos ? 0 :
slash + 1);
404 proto.dirname =
path;
405 proto.objname.append(
name, namepos, std::string::npos);
407 typename ObjectMap::iterator
pos;
408 typename PeerMap::iterator
i,
e;
412 auto *ip = static_cast<ImplPeer *>(
p);
413 pos = ip->objs.find(proto);
414 if (
pos == ip->objs.end())
419 return const_cast<ObjType *>(&*
pos);
423 pos =
i->second.objs.find(proto);
424 if (
pos !=
i->second.objs.end()) {
427 return const_cast<ObjType *>(&*
pos);
435 auto *ip = static_cast<ImplPeer *>(
p);
437 size_t dirpos = (
slash == std::string::npos ? 0 :
slash);
438 size_t namepos = (
slash == std::string::npos ? 0 :
slash + 1);
444 o.dirname = *ip->dirs.insert(
name.substr(0, dirpos)).
first;
445 o.objname.append(
name, namepos, std::string::npos);
447 return const_cast<ObjType *>(&*ip->objs.insert(
o).first);
458 uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
459 auto *ip = static_cast<ImplPeer *>(
p);
460 typename ObjectMap::iterator
i,
e;
461 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e; ++
i) {
462 if (
i->lastreq &&
i->lastreq < minreq)
463 const_cast<ObjType &>(*i).lastreq = 0;
470 auto *ip = static_cast<ImplPeer *>(
p);
471 typename ObjectMap::iterator
i,
e;
472 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e;) {
483 return pos ==
end ? nullptr : &
pos->second;
488 ip->socket =
nullptr;
497 ip->automatic =
nullptr;
502 auto *ip = static_cast<ImplPeer *>(
p);
503 bool needflush = !ip->objs.empty();
505 typename ObjectMap::iterator
i,
e;
506 for (
i = ip->objs.begin(),
e = ip->objs.end();
i !=
e;)
519 typename PeerMap::iterator
pi, pe;
520 typename ObjectMap::iterator oi, oe;
524 for (oi =
pi->second.objs.begin(), oe =
pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
526 size += 9 *
sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
527 oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
529 msg->data.reserve(
msg->data.size() +
size + 8 *
sizeof(uint32_t));
531 uint32_t nupdates = 0;
533 words[0] =
sizeof(words);
540 for (oi =
pi->second.objs.begin(), oe =
pi->second.objs.end(); oi != oe; ++oi)
554 typename PeerMap::iterator
i,
e;
555 typename ObjectMap::iterator oi, oe;
557 ImplPeer &
p =
i->second;
562 logme() <<
"DEBUG: notifying " <<
p.peeraddr << std::endl;
568 if (!
msg.data.empty()) {
571 prev = &(*prev)->
next;
574 (*prev)->
next =
nullptr;
582 typename PeerMap::iterator
i,
e;
603 #endif // DQMSERVICES_CORE_DQM_NET_H
void markObjectsDead(Peer *p) override
virtual void updatePeerMasks()=0
void staleObjectWaitLimit(lat::TimeSpan time)
virtual Peer * getPeer(lat::Socket *s)=0
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
static const uint32_t DQM_PROP_RESET
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
#define dqmhashmix(a, b, c)
static const uint32_t DQM_PROP_STALE
static bool setOrder(const CoreObject &a, const CoreObject &b)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
static const uint32_t DQM_REPLY_NONE
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_TYPE_TH2S
DQMNet & operator=(const DQMNet &)=delete
static const uint32_t DQM_REPLY_OBJECT
void updatePeerMasks() override
static const uint32_t DQM_REPLY_LIST_END
std::vector< QValue > QReports
void removePeer(Peer *p, lat::Socket *s) override
void shutdown()
Stop the network layer and wait it to finish.
Peer * getPeer(lat::Socket *s) override
void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override
Send all objects to a peer and optionally mark sent objects old.
Peer * createPeer(lat::Socket *s) override
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ALARM
static const uint32_t DQM_MSG_UPDATE_ME
static const uint32_t DQM_PROP_RECEIVED
void releaseWaiters(const std::string &name, Object *o)
std::list< WaitObject > WaitList
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
static const uint32_t DQM_PROP_TYPE_TH2F
DQMBasicNet(const std::string &appname="")
void unlock()
Release the lock on the DQM net layer.
virtual Object * makeObject(Peer *p, const std::string &name)=0
static const uint32_t DQM_PROP_TAGGED
static const uint32_t DQM_PROP_TYPE_TH3S
static const uint32_t DQM_PROP_DEAD
static const uint32_t DQM_PROP_TYPE_INVALID
virtual bool shouldStop()
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static const uint32_t DQM_PROP_TYPE_TH1D
static void packQualityData(std::string &into, const QReports &qr)
Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr) override
static const uint32_t DQM_PROP_TYPE_SCALAR
virtual void sendObjectListToPeers(bool all)=0
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_REPORT_CLEAR
DQMNet(const std::string &appname="")
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_PROP_TYPE_TH3F
std::map< lat::Socket *, ImplPeer > PeerMap
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
static const uint32_t DQM_PROP_TYPE_TH1F
static const uint32_t DQM_PROP_TYPE_REAL
static void discard(Bucket *&b)
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_REPLY_LIST_BEGIN
bool removeLocalExcept(const std::set< std::string > &known)
std::vector< unsigned char > DataBlob
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
#define dqmhashfinal(a, b, c)
void clear(HadCaloObj &c)
static const uint32_t DQM_PROP_REPORT_ERROR
Object * makeObject(Peer *p, const std::string &name) override
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_DATABLOB
virtual void markObjectsDead(Peer *p)=0
virtual void removePeer(Peer *p, lat::Socket *s)=0
edm::ErrorSummaryEntry Error
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
void purgeDeadObjects(Peer *p) override
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_INT
void listenToCollector(const std::string &host, int port)
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
std::set< std::string > DirMap
static const uint32_t DQM_PROP_TYPE_TH1S
static const uint32_t DQM_PROP_MARKTODELETE
static const uint32_t DQM_PROP_TYPE_TPROF
static const uint32_t DQM_PROP_HAS_REFERENCE
DQMImplNet(const std::string &appname="")
void lock()
Acquire a lock on the DQM net layer.
virtual Peer * createPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static const uint32_t DQM_MSG_LIST_OBJECTS
void sendObjectListToPeers(bool all) override
char data[epos_bytes_allocation]
static const uint32_t DQM_PROP_TYPE_TH2D
unsigned long long uint64_t
static const uint32_t DQM_MSG_GET_OBJECT
static const uint32_t DQM_MSG_HELLO
static const uint32_t DQM_PROP_LUMI
std::vector< uint32_t > TagList
~DQMImplNet() override=default
bool onLocalNotify(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_ACCUMULATE
void updateToCollector(const std::string &host, int port)
static void copydata(Bucket *b, const void *data, size_t len)
uint32_t operator()(const Object &a) const
static const uint32_t MAX_PEER_WAITREQS
void startLocalServer(int port)
static const uint32_t DQM_PROP_TYPE_TH3D
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_TYPE_TPROF2D