1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
18 # include <ext/hash_set>
81 typedef std::vector<unsigned char>
DataBlob;
158 void debug(
bool doit);
177 return (diff < 0 ?
true
203 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
204 # define dqmhashmix(a,b,c) { \
205 a -= c; a ^= dqmhashrot(c, 4); c += b; \
206 b -= a; b ^= dqmhashrot(a, 6); a += c; \
207 c -= b; c ^= dqmhashrot(b, 8); b += a; \
208 a -= c; a ^= dqmhashrot(c,16); c += b; \
209 b -= a; b ^= dqmhashrot(a,19); a += c; \
210 c -= b; c ^= dqmhashrot(b, 4); b += a; }
211 # define dqmhashfinal(a,b,c) { \
212 c ^= b; c -= dqmhashrot(b,14); \
213 a ^= c; a -= dqmhashrot(c,11); \
214 b ^= a; b -= dqmhashrot(a,25); \
215 c ^= b; c -= dqmhashrot(b,16); \
216 a ^= c; a -= dqmhashrot(c,4); \
217 b ^= a; b -= dqmhashrot(a,14); \
218 c ^= b; c -= dqmhashrot(b,24); }
221 a = b = c = 0xdeadbeef + (uint32_t) keylen;
222 const unsigned char *
k = (
const unsigned char *) key;
228 a += ((uint32_t)k[1]) << 8;
229 a += ((uint32_t)k[2]) << 16;
230 a += ((uint32_t)k[3]) << 24;
232 b += ((uint32_t)k[5]) << 8;
233 b += ((uint32_t)k[6]) << 16;
234 b += ((uint32_t)k[7]) << 24;
236 c += ((uint32_t)k[9]) << 8;
237 c += ((uint32_t)k[10]) << 16;
238 c += ((uint32_t)k[11]) << 24;
247 case 12: c += ((uint32_t)k[11]) << 24;
248 case 11: c += ((uint32_t)k[10]) << 16;
249 case 10: c += ((uint32_t)k[9]) << 8;
251 case 8 : b += ((uint32_t)k[7]) << 24;
252 case 7 : b += ((uint32_t)k[6]) << 16;
253 case 6 : b += ((uint32_t)k[5]) << 8;
255 case 4 : a += ((uint32_t)k[3]) << 24;
256 case 3 : a += ((uint32_t)k[2]) << 16;
257 case 2 : a += ((uint32_t)k[1]) << 8;
274 std::ostream &
logme(
void);
275 static void copydata(Bucket *
b,
const void *
data,
size_t len);
290 virtual Peer *
getPeer(lat::Socket *
s) = 0;
306 lat::IOSelectEvent *
event,
307 lat::Error *err = 0);
341 template <
class ObjType>
347 typedef std::set<std::string>
DirMap;
348 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
349 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
368 size_t slash = name.rfind(
'/');
369 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
370 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
373 proto.hash =
dqmhash(name.c_str(), name.size());
374 proto.dirname = &
path;
375 proto.objname.append(name, namepos, std::string::npos);
377 typename ObjectMap::iterator
pos;
378 typename PeerMap::iterator
i,
e;
383 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
384 pos = ip->objs.find(proto);
385 if (pos == ip->objs.end())
389 if (owner) *owner = ip;
397 pos = i->second.objs.find(proto);
398 if (pos != i->second.objs.end())
400 if (owner) *owner = &i->second;
411 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
412 size_t slash = name.rfind(
'/');
413 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
414 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
420 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).
first;
421 o.objname.append(name, namepos, std::string::npos);
422 o.hash =
dqmhash(name.c_str(), name.size());
423 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
438 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
439 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
440 typename ObjectMap::iterator
i,
e;
441 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i)
443 if (i->lastreq && i->lastreq < minreq)
444 const_cast<ObjType &
>(*i).lastreq = 0;
453 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
454 typename ObjectMap::iterator
i,
e;
455 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
467 typename PeerMap::iterator
pos =
peers_.find(s);
468 typename PeerMap::iterator
end =
peers_.end();
469 return pos == end ? 0 : &pos->second;
492 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
493 bool needflush = ! ip->objs.empty();
495 typename ObjectMap::iterator
i,
e;
496 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
511 typename PeerMap::iterator
pi, pe;
512 typename ObjectMap::iterator oi, oe;
516 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
518 size += 9*
sizeof(uint32_t) + oi->dirname->size()
519 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
520 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
522 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
524 uint32_t nupdates = 0;
526 words[0] =
sizeof(words);
530 copydata(msg, &words[0],
sizeof(words));
533 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
544 copydata(msg, &words[0],
sizeof(words));
550 typename PeerMap::iterator
i,
e;
551 typename ObjectMap::iterator oi, oe;
554 ImplPeer &
p = i->second;
560 <<
"DEBUG: notifying " << p.peeraddr << std::endl;
566 if (! msg.
data.empty())
570 prev = &(*prev)->
next;
583 typename PeerMap::iterator
i,
e;
607 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
static const uint32_t DQM_PROP_TYPE_TH1S
void sendLocalChanges(void)
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
virtual Object * makeObject(Peer *p, const std::string &name)
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void sendObjectListToPeers(bool all)
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
std::vector< Variable::Flags > flags
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH1F
virtual void markObjectsDead(Peer *p)
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
virtual void updatePeerMasks(void)
static const uint32_t DQM_PROP_ACCUMULATE
static const uint32_t DQM_PROP_HAS_REFERENCE
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
DQMNet & operator=(const DQMNet &)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
static const uint32_t MAX_PEER_WAITREQS
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
void unlock(void)
Release the lock on the DQM net layer.
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
std::vector< uint32_t > TagList
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
virtual void purgeDeadObjects(Peer *p)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::set< std::string > DirMap
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
virtual Peer * createPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TPROF2D
virtual void removePeer(Peer *p, lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TH2F
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0