1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
18 # include <ext/hash_set>
80 typedef std::vector<unsigned char>
DataBlob;
154 DQMNet(
const std::string &appname =
"");
157 void debug(
bool doit);
176 return (diff < 0 ?
true
202 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
203 # define dqmhashmix(a,b,c) { \
204 a -= c; a ^= dqmhashrot(c, 4); c += b; \
205 b -= a; b ^= dqmhashrot(a, 6); a += c; \
206 c -= b; c ^= dqmhashrot(b, 8); b += a; \
207 a -= c; a ^= dqmhashrot(c,16); c += b; \
208 b -= a; b ^= dqmhashrot(a,19); a += c; \
209 c -= b; c ^= dqmhashrot(b, 4); b += a; }
210 # define dqmhashfinal(a,b,c) { \
211 c ^= b; c -= dqmhashrot(b,14); \
212 a ^= c; a -= dqmhashrot(c,11); \
213 b ^= a; b -= dqmhashrot(a,25); \
214 c ^= b; c -= dqmhashrot(b,16); \
215 a ^= c; a -= dqmhashrot(c,4); \
216 b ^= a; b -= dqmhashrot(a,14); \
217 c ^= b; c -= dqmhashrot(b,24); }
220 a = b = c = 0xdeadbeef + (uint32_t) keylen;
221 const unsigned char *
k = (
const unsigned char *) key;
227 a += ((uint32_t)k[1]) << 8;
228 a += ((uint32_t)k[2]) << 16;
229 a += ((uint32_t)k[3]) << 24;
231 b += ((uint32_t)k[5]) << 8;
232 b += ((uint32_t)k[6]) << 16;
233 b += ((uint32_t)k[7]) << 24;
235 c += ((uint32_t)k[9]) << 8;
236 c += ((uint32_t)k[10]) << 16;
237 c += ((uint32_t)k[11]) << 24;
246 case 12: c += ((uint32_t)k[11]) << 24;
247 case 11: c += ((uint32_t)k[10]) << 16;
248 case 10: c += ((uint32_t)k[9]) << 8;
250 case 8 : b += ((uint32_t)k[7]) << 24;
251 case 7 : b += ((uint32_t)k[6]) << 16;
252 case 6 : b += ((uint32_t)k[5]) << 8;
254 case 4 : a += ((uint32_t)k[3]) << 24;
255 case 3 : a += ((uint32_t)k[2]) << 16;
256 case 2 : a += ((uint32_t)k[1]) << 8;
273 std::ostream &
logme(
void);
274 static void copydata(Bucket *
b,
const void *
data,
size_t len);
284 virtual Object *
findObject(Peer *
p,
const std::string &
name, Peer **owner = 0) = 0;
289 virtual Peer *
getPeer(lat::Socket *
s) = 0;
305 lat::IOSelectEvent *
event,
306 lat::Error *err = 0);
340 template <
class ObjType>
346 typedef std::set<std::string>
DirMap;
347 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
348 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
367 size_t slash = name.rfind(
'/');
368 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
369 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
370 std::string
path(name, 0, dirpos);
372 proto.hash =
dqmhash(name.c_str(), name.size());
373 proto.dirname = &
path;
374 proto.objname.append(name, namepos, std::string::npos);
376 typename ObjectMap::iterator
pos;
377 typename PeerMap::iterator
i, e;
382 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
383 pos = ip->objs.find(proto);
384 if (pos == ip->objs.end())
388 if (owner) *owner = ip;
396 pos = i->second.objs.find(proto);
397 if (pos != i->second.objs.end())
399 if (owner) *owner = &i->second;
410 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
411 size_t slash = name.rfind(
'/');
412 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
413 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
419 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).
first;
420 o.objname.append(name, namepos, std::string::npos);
421 o.hash =
dqmhash(name.c_str(), name.size());
422 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
437 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
438 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
439 typename ObjectMap::iterator
i, e;
440 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++
i)
442 if (i->lastreq && i->lastreq < minreq)
443 const_cast<ObjType &
>(*i).lastreq = 0;
452 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
453 typename ObjectMap::iterator
i, e;
454 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
466 typename PeerMap::iterator
pos =
peers_.find(s);
467 typename PeerMap::iterator
end =
peers_.end();
468 return pos == end ? 0 : &pos->second;
491 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
492 bool needflush = ! ip->objs.empty();
494 typename ObjectMap::iterator
i, e;
495 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; )
510 typename PeerMap::iterator
pi, pe;
511 typename ObjectMap::iterator oi, oe;
515 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
517 size += 9*
sizeof(uint32_t) + oi->dirname->size()
518 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
519 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
521 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
523 uint32_t nupdates = 0;
525 words[0] =
sizeof(words);
529 copydata(msg, &words[0],
sizeof(words));
532 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
543 copydata(msg, &words[0],
sizeof(words));
549 typename PeerMap::iterator
i, e;
550 typename ObjectMap::iterator oi, oe;
553 ImplPeer &
p = i->second;
559 <<
"DEBUG: notifying " << p.peeraddr << std::endl;
565 if (! msg.
data.empty())
569 prev = &(*prev)->
next;
582 typename PeerMap::iterator
i, e;
606 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
static const uint32_t DQM_PROP_TYPE_TH1S
void sendLocalChanges(void)
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
virtual Object * makeObject(Peer *p, const std::string &name)
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void sendObjectListToPeers(bool all)
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
std::vector< Variable::Flags > flags
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH1F
virtual void markObjectsDead(Peer *p)
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
virtual void updatePeerMasks(void)
static const uint32_t DQM_PROP_ACCUMULATE
static const uint32_t DQM_PROP_HAS_REFERENCE
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
DQMNet & operator=(const DQMNet &)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
static const uint32_t MAX_PEER_WAITREQS
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
void unlock(void)
Release the lock on the DQM net layer.
static const uint32_t DQM_REPLY_OBJECT
static std::string from(" from ")
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
std::vector< uint32_t > TagList
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
virtual void purgeDeadObjects(Peer *p)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::set< std::string > DirMap
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
virtual Peer * createPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TPROF2D
virtual void removePeer(Peer *p, lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TH2F
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0