1 #ifndef DQMSERVICES_CORE_DQM_NET_H 2 # define DQMSERVICES_CORE_DQM_NET_H 4 # include "classlib/iobase/Socket.h" 5 # include "classlib/iobase/IOSelector.h" 6 # include "classlib/iobase/Pipe.h" 7 # include "classlib/utils/Signal.h" 8 # include "classlib/utils/Error.h" 9 # include "classlib/utils/Time.h" 18 # include <ext/hash_set> 50 | DQM_PROP_REPORT_WARN
82 typedef std::vector<unsigned char>
DataBlob;
163 void debug(
bool doit);
220 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k)))) 221 # define dqmhashmix(a,b,c) { \ 222 a -= c; a ^= dqmhashrot(c, 4); c += b; \ 223 b -= a; b ^= dqmhashrot(a, 6); a += c; \ 224 c -= b; c ^= dqmhashrot(b, 8); b += a; \ 225 a -= c; a ^= dqmhashrot(c,16); c += b; \ 226 b -= a; b ^= dqmhashrot(a,19); a += c; \ 227 c -= b; c ^= dqmhashrot(b, 4); b += a; } 228 # define dqmhashfinal(a,b,c) { \ 229 c ^= b; c -= dqmhashrot(b,14); \ 230 a ^= c; a -= dqmhashrot(c,11); \ 231 b ^= a; b -= dqmhashrot(a,25); \ 232 c ^= b; c -= dqmhashrot(b,16); \ 233 a ^= c; a -= dqmhashrot(c,4); \ 234 b ^= a; b -= dqmhashrot(a,14); \ 235 c ^= b; c -= dqmhashrot(b,24); } 238 a = b = c = 0xdeadbeef + (uint32_t) keylen;
239 const unsigned char *
k = (
const unsigned char *) key;
245 a += ((uint32_t)k[1]) << 8;
246 a += ((uint32_t)k[2]) << 16;
247 a += ((uint32_t)k[3]) << 24;
249 b += ((uint32_t)k[5]) << 8;
250 b += ((uint32_t)k[6]) << 16;
251 b += ((uint32_t)k[7]) << 24;
253 c += ((uint32_t)k[9]) << 8;
254 c += ((uint32_t)k[10]) << 16;
255 c += ((uint32_t)k[11]) << 24;
264 case 12: c += ((uint32_t)k[11]) << 24;
265 case 11: c += ((uint32_t)k[10]) << 16;
266 case 10: c += ((uint32_t)k[9]) << 8;
268 case 8 : b += ((uint32_t)k[7]) << 24;
269 case 7 : b += ((uint32_t)k[6]) << 16;
270 case 6 : b += ((uint32_t)k[5]) << 8;
272 case 4 : a += ((uint32_t)k[3]) << 24;
273 case 3 : a += ((uint32_t)k[2]) << 16;
274 case 2 : a += ((uint32_t)k[1]) << 8;
291 std::ostream &
logme(
void);
323 lat::IOSelectEvent *
event,
324 lat::Error *err = 0);
358 template <
class ObjType>
364 typedef std::set<std::string>
DirMap;
365 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
366 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
385 size_t slash = name.rfind(
'/');
386 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
387 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
390 proto.hash =
dqmhash(name.c_str(), name.size());
391 proto.dirname = &
path;
392 proto.objname.append(name, namepos, std::string::npos);
394 typename ObjectMap::iterator
pos;
395 typename PeerMap::iterator
i,
e;
400 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
401 pos = ip->objs.find(proto);
402 if (pos == ip->objs.end())
406 if (owner) *owner = ip;
407 return const_cast<ObjType *
>(&*pos);
412 for (i = peers_.begin(), e = peers_.end(); i !=
e; ++
i)
414 pos = i->second.objs.find(proto);
415 if (pos != i->second.objs.end())
417 if (owner) *owner = &i->second;
418 return const_cast<ObjType *
>(&*pos);
428 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
429 size_t slash = name.rfind(
'/');
430 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
431 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
437 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).
first;
438 o.objname.append(name, namepos, std::string::npos);
439 o.hash =
dqmhash(name.c_str(), name.size());
440 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
454 = (lat::Time::current()
455 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
456 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
457 typename ObjectMap::iterator
i,
e;
458 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i)
460 if (i->lastreq && i->lastreq < minreq)
461 const_cast<ObjType &
>(*i).lastreq = 0;
470 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
471 typename ObjectMap::iterator
i,
e;
472 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
484 typename PeerMap::iterator
pos = peers_.find(s);
485 typename PeerMap::iterator
end = peers_.end();
486 return pos == end ? 0 : &pos->second;
492 ImplPeer *ip = &peers_[
s];
509 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
510 bool needflush = ! ip->objs.empty();
512 typename ObjectMap::iterator
i,
e;
513 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
528 typename PeerMap::iterator
pi, pe;
529 typename ObjectMap::iterator oi, oe;
532 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++
pi)
533 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
535 size += 9*
sizeof(uint32_t) + oi->dirname->size()
536 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
537 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
539 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
541 uint32_t nupdates = 0;
543 words[0] =
sizeof(words);
547 copydata(msg, &words[0],
sizeof(words));
549 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++
pi)
550 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
561 copydata(msg, &words[0],
sizeof(words));
567 typename PeerMap::iterator
i,
e;
568 typename ObjectMap::iterator oi, oe;
569 for (i = peers_.begin(), e = peers_.end(); i !=
e; ++
i)
571 ImplPeer &
p = i->second;
577 <<
"DEBUG: notifying " << p.peeraddr << std::endl;
583 if (! msg.
data.empty())
587 prev = &(*prev)->
next;
600 typename PeerMap::iterator
i,
e;
601 for (i = peers_.begin(), e = peers_.end(); i !=
e; )
615 void reserveLocalSpace(uint32_t
size);
616 void updateLocalObject(
Object &
o);
617 bool removeLocalExcept(
const std::set<std::string> &known);
624 #endif // DQMSERVICES_CORE_DQM_NET_H virtual Peer * getPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
static const uint32_t DQM_PROP_TYPE_TH1S
void sendLocalChanges(void)
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
virtual Object * makeObject(Peer *p, const std::string &name)
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void sendObjectListToPeers(bool all)
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
std::vector< Variable::Flags > flags
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH1F
static const uint32_t DQM_PROP_MARKTODELETE
virtual void markObjectsDead(Peer *p)
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
virtual void updatePeerMasks(void)
static const uint32_t DQM_PROP_ACCUMULATE
static const uint32_t DQM_PROP_HAS_REFERENCE
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
DQMNet & operator=(const DQMNet &)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
static const uint32_t MAX_PEER_WAITREQS
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
void unlock(void)
Release the lock on the DQM net layer.
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
std::vector< uint32_t > TagList
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
virtual void purgeDeadObjects(Peer *p)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::set< std::string > DirMap
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
static const uint32_t DQM_PROP_TYPE_INVALID
virtual Peer * createPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TPROF2D
virtual void removePeer(Peer *p, lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TH2F
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0