1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
18 # include <ext/hash_set>
81 typedef std::vector<unsigned char>
DataBlob;
162 void debug(
bool doit);
219 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
220 # define dqmhashmix(a,b,c) { \
221 a -= c; a ^= dqmhashrot(c, 4); c += b; \
222 b -= a; b ^= dqmhashrot(a, 6); a += c; \
223 c -= b; c ^= dqmhashrot(b, 8); b += a; \
224 a -= c; a ^= dqmhashrot(c,16); c += b; \
225 b -= a; b ^= dqmhashrot(a,19); a += c; \
226 c -= b; c ^= dqmhashrot(b, 4); b += a; }
227 # define dqmhashfinal(a,b,c) { \
228 c ^= b; c -= dqmhashrot(b,14); \
229 a ^= c; a -= dqmhashrot(c,11); \
230 b ^= a; b -= dqmhashrot(a,25); \
231 c ^= b; c -= dqmhashrot(b,16); \
232 a ^= c; a -= dqmhashrot(c,4); \
233 b ^= a; b -= dqmhashrot(a,14); \
234 c ^= b; c -= dqmhashrot(b,24); }
237 a = b = c = 0xdeadbeef + (uint32_t) keylen;
238 const unsigned char *
k = (
const unsigned char *) key;
244 a += ((uint32_t)k[1]) << 8;
245 a += ((uint32_t)k[2]) << 16;
246 a += ((uint32_t)k[3]) << 24;
248 b += ((uint32_t)k[5]) << 8;
249 b += ((uint32_t)k[6]) << 16;
250 b += ((uint32_t)k[7]) << 24;
252 c += ((uint32_t)k[9]) << 8;
253 c += ((uint32_t)k[10]) << 16;
254 c += ((uint32_t)k[11]) << 24;
263 case 12: c += ((uint32_t)k[11]) << 24;
264 case 11: c += ((uint32_t)k[10]) << 16;
265 case 10: c += ((uint32_t)k[9]) << 8;
267 case 8 : b += ((uint32_t)k[7]) << 24;
268 case 7 : b += ((uint32_t)k[6]) << 16;
269 case 6 : b += ((uint32_t)k[5]) << 8;
271 case 4 : a += ((uint32_t)k[3]) << 24;
272 case 3 : a += ((uint32_t)k[2]) << 16;
273 case 2 : a += ((uint32_t)k[1]) << 8;
290 std::ostream &
logme(
void);
291 static void copydata(Bucket *
b,
const void *
data,
size_t len);
306 virtual Peer *
getPeer(lat::Socket *
s) = 0;
322 lat::IOSelectEvent *
event,
323 lat::Error *err = 0);
357 template <
class ObjType>
363 typedef std::set<std::string>
DirMap;
364 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
365 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
384 size_t slash = name.rfind(
'/');
385 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
386 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
389 proto.hash =
dqmhash(name.c_str(), name.size());
390 proto.dirname = &
path;
391 proto.objname.append(name, namepos, std::string::npos);
393 typename ObjectMap::iterator pos;
394 typename PeerMap::iterator
i,
e;
399 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
400 pos = ip->objs.find(proto);
401 if (pos == ip->objs.end())
405 if (owner) *owner = ip;
406 return const_cast<ObjType *
>(&*pos);
413 pos = i->second.objs.find(proto);
414 if (pos != i->second.objs.end())
416 if (owner) *owner = &i->second;
417 return const_cast<ObjType *
>(&*pos);
427 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
428 size_t slash = name.rfind(
'/');
429 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
430 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
436 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).
first;
437 o.objname.append(name, namepos, std::string::npos);
438 o.hash =
dqmhash(name.c_str(), name.size());
439 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
454 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
455 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
456 typename ObjectMap::iterator
i,
e;
457 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i)
459 if (i->lastreq && i->lastreq < minreq)
460 const_cast<ObjType &
>(*i).lastreq = 0;
469 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
470 typename ObjectMap::iterator
i,
e;
471 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
483 typename PeerMap::iterator pos =
peers_.find(s);
484 typename PeerMap::iterator
end =
peers_.end();
485 return pos == end ? 0 : &pos->second;
508 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
509 bool needflush = ! ip->objs.empty();
511 typename ObjectMap::iterator
i,
e;
512 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
527 typename PeerMap::iterator
pi, pe;
528 typename ObjectMap::iterator oi, oe;
532 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
534 size += 9*
sizeof(uint32_t) + oi->dirname->size()
535 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
536 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
538 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
540 uint32_t nupdates = 0;
542 words[0] =
sizeof(words);
546 copydata(msg, &words[0],
sizeof(words));
549 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
560 copydata(msg, &words[0],
sizeof(words));
566 typename PeerMap::iterator
i,
e;
567 typename ObjectMap::iterator oi, oe;
570 ImplPeer &
p = i->second;
576 <<
"DEBUG: notifying " << p.peeraddr << std::endl;
582 if (! msg.
data.empty())
586 prev = &(*prev)->
next;
599 typename PeerMap::iterator
i,
e;
623 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
static const uint32_t DQM_PROP_TYPE_TH1S
void sendLocalChanges(void)
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
virtual Object * makeObject(Peer *p, const std::string &name)
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void sendObjectListToPeers(bool all)
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
std::vector< Variable::Flags > flags
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH1F
virtual void markObjectsDead(Peer *p)
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
virtual void updatePeerMasks(void)
static const uint32_t DQM_PROP_ACCUMULATE
static const uint32_t DQM_PROP_HAS_REFERENCE
tuple path
else: Piece not in the list, fine.
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
DQMNet & operator=(const DQMNet &)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
static const uint32_t MAX_PEER_WAITREQS
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
void unlock(void)
Release the lock on the DQM net layer.
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
std::vector< uint32_t > TagList
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
virtual void purgeDeadObjects(Peer *p)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::set< std::string > DirMap
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
virtual Peer * createPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TPROF2D
virtual void removePeer(Peer *p, lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TH2F
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0