1 #ifndef DQMSERVICES_CORE_DQM_NET_H
2 # define DQMSERVICES_CORE_DQM_NET_H
4 # include "classlib/iobase/Socket.h"
5 # include "classlib/iobase/IOSelector.h"
6 # include "classlib/iobase/Pipe.h"
7 # include "classlib/utils/Signal.h"
8 # include "classlib/utils/Error.h"
9 # include "classlib/utils/Time.h"
18 # include <ext/hash_set>
82 typedef std::vector<unsigned char>
DataBlob;
163 void debug(
bool doit);
220 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
221 # define dqmhashmix(a,b,c) { \
222 a -= c; a ^= dqmhashrot(c, 4); c += b; \
223 b -= a; b ^= dqmhashrot(a, 6); a += c; \
224 c -= b; c ^= dqmhashrot(b, 8); b += a; \
225 a -= c; a ^= dqmhashrot(c,16); c += b; \
226 b -= a; b ^= dqmhashrot(a,19); a += c; \
227 c -= b; c ^= dqmhashrot(b, 4); b += a; }
228 # define dqmhashfinal(a,b,c) { \
229 c ^= b; c -= dqmhashrot(b,14); \
230 a ^= c; a -= dqmhashrot(c,11); \
231 b ^= a; b -= dqmhashrot(a,25); \
232 c ^= b; c -= dqmhashrot(b,16); \
233 a ^= c; a -= dqmhashrot(c,4); \
234 b ^= a; b -= dqmhashrot(a,14); \
235 c ^= b; c -= dqmhashrot(b,24); }
238 a = b = c = 0xdeadbeef + (uint32_t) keylen;
239 const unsigned char *
k = (
const unsigned char *) key;
245 a += ((uint32_t)k[1]) << 8;
246 a += ((uint32_t)k[2]) << 16;
247 a += ((uint32_t)k[3]) << 24;
249 b += ((uint32_t)k[5]) << 8;
250 b += ((uint32_t)k[6]) << 16;
251 b += ((uint32_t)k[7]) << 24;
253 c += ((uint32_t)k[9]) << 8;
254 c += ((uint32_t)k[10]) << 16;
255 c += ((uint32_t)k[11]) << 24;
264 case 12: c += ((uint32_t)k[11]) << 24;
265 case 11: c += ((uint32_t)k[10]) << 16;
266 case 10: c += ((uint32_t)k[9]) << 8;
268 case 8 : b += ((uint32_t)k[7]) << 24;
269 case 7 : b += ((uint32_t)k[6]) << 16;
270 case 6 : b += ((uint32_t)k[5]) << 8;
272 case 4 : a += ((uint32_t)k[3]) << 24;
273 case 3 : a += ((uint32_t)k[2]) << 16;
274 case 2 : a += ((uint32_t)k[1]) << 8;
291 std::ostream &
logme(
void);
292 static void copydata(Bucket *
b,
const void *
data,
size_t len);
307 virtual Peer *
getPeer(lat::Socket *
s) = 0;
323 lat::IOSelectEvent *
event,
324 lat::Error *err = 0);
358 template <
class ObjType>
364 typedef std::set<std::string>
DirMap;
365 typedef __gnu_cxx::hash_set<ObjType, HashOp, HashEqual>
ObjectMap;
366 typedef std::map<lat::Socket *, ImplPeer>
PeerMap;
385 size_t slash = name.rfind(
'/');
386 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
387 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
390 proto.hash =
dqmhash(name.c_str(), name.size());
391 proto.dirname = &
path;
392 proto.objname.append(name, namepos, std::string::npos);
394 typename ObjectMap::iterator pos;
395 typename PeerMap::iterator
i,
e;
400 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
401 pos = ip->objs.find(proto);
402 if (pos == ip->objs.end())
406 if (owner) *owner = ip;
407 return const_cast<ObjType *
>(&*pos);
414 pos = i->second.objs.find(proto);
415 if (pos != i->second.objs.end())
417 if (owner) *owner = &i->second;
418 return const_cast<ObjType *
>(&*pos);
428 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
429 size_t slash = name.rfind(
'/');
430 size_t dirpos = (slash == std::string::npos ? 0 :
slash);
431 size_t namepos = (slash == std::string::npos ? 0 : slash+1);
437 o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).
first;
438 o.objname.append(name, namepos, std::string::npos);
439 o.hash =
dqmhash(name.c_str(), name.size());
440 return const_cast<ObjType *
>(&*ip->objs.insert(o).first);
455 - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
456 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
457 typename ObjectMap::iterator
i,
e;
458 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; ++
i)
460 if (i->lastreq && i->lastreq < minreq)
461 const_cast<ObjType &
>(*i).lastreq = 0;
470 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
471 typename ObjectMap::iterator
i,
e;
472 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
484 typename PeerMap::iterator pos =
peers_.find(s);
485 typename PeerMap::iterator
end =
peers_.end();
486 return pos == end ? 0 : &pos->second;
509 ImplPeer *ip =
static_cast<ImplPeer *
>(
p);
510 bool needflush = ! ip->objs.empty();
512 typename ObjectMap::iterator
i,
e;
513 for (i = ip->objs.begin(), e = ip->objs.end(); i !=
e; )
528 typename PeerMap::iterator
pi, pe;
529 typename ObjectMap::iterator oi, oe;
533 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
535 size += 9*
sizeof(uint32_t) + oi->dirname->size()
536 + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size()
537 + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
539 msg->
data.reserve(msg->
data.size() + size + 8 *
sizeof(uint32_t));
541 uint32_t nupdates = 0;
543 words[0] =
sizeof(words);
547 copydata(msg, &words[0],
sizeof(words));
550 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
561 copydata(msg, &words[0],
sizeof(words));
567 typename PeerMap::iterator
i,
e;
568 typename ObjectMap::iterator oi, oe;
571 ImplPeer &
p = i->second;
577 <<
"DEBUG: notifying " << p.peeraddr << std::endl;
583 if (! msg.
data.empty())
587 prev = &(*prev)->
next;
600 typename PeerMap::iterator
i,
e;
624 #endif // DQMSERVICES_CORE_DQM_NET_H
virtual Peer * getPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_DATABLOB
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
static const uint32_t DQM_PROP_TYPE_TH1S
void sendLocalChanges(void)
std::map< lat::Socket *, ImplPeer > PeerMap
bool onLocalNotify(lat::IOSelectEvent *ev)
virtual Object * makeObject(Peer *p, const std::string &name)
bool operator()(const Object &a, const Object &b) const
static const uint32_t DQM_PROP_TYPE_TPROF
virtual void sendObjectListToPeers(bool all)=0
static const uint32_t DQM_REPLY_LIST_END
static const uint32_t DQM_PROP_TYPE_TH2D
virtual Peer * getPeer(lat::Socket *s)=0
static const uint32_t DQM_PROP_REPORT_MASK
static void discard(Bucket *&b)
virtual void sendObjectListToPeers(bool all)
static const uint32_t DQM_PROP_TYPE_SCALAR
#define dqmhashmix(a, b, c)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
static const uint32_t DQM_PROP_TAGGED
std::vector< Variable::Flags > flags
static const uint32_t DQM_PROP_EFFICIENCY_PLOT
void staleObjectWaitLimit(lat::TimeSpan time)
static const uint32_t DQM_PROP_TYPE_TH3F
static const uint32_t DQM_PROP_RESET
static const uint32_t DQM_PROP_DEAD
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
static const uint32_t DQM_PROP_TYPE_TH1F
static const uint32_t DQM_PROP_MARKTODELETE
virtual void markObjectsDead(Peer *p)
DQMImplNet(const std::string &appname="")
static const uint32_t DQM_MSG_HELLO
virtual void updatePeerMasks(void)
static const uint32_t DQM_PROP_ACCUMULATE
static const uint32_t DQM_PROP_HAS_REFERENCE
tuple path
else: Piece not in the list, fine.
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
static const uint32_t DQM_PROP_TYPE_INT
DQMNet & operator=(const DQMNet &)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static size_t dqmhash(const void *key, size_t keylen)
static const uint32_t DQM_PROP_REPORT_ERROR
static const uint32_t DQM_PROP_REPORT_OTHER
static const uint32_t DQM_PROP_TYPE_TH1D
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
virtual void markObjectsDead(Peer *p)=0
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
static const uint32_t MAX_PEER_WAITREQS
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
void unlock(void)
Release the lock on the DQM net layer.
static const uint32_t DQM_REPLY_OBJECT
virtual Object * makeObject(Peer *p, const std::string &name)=0
std::list< WaitObject > WaitList
std::vector< uint32_t > TagList
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
static const uint32_t DQM_MSG_GET_OBJECT
uint32_t operator()(const Object &a) const
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
static const uint32_t DQM_PROP_TYPE_TH3S
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)
Send all objects to a peer and optionally mark sent objects old.
static const uint32_t DQM_PROP_REPORT_ALARM
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const uint32_t DQM_REPLY_LIST_BEGIN
virtual void purgeDeadObjects(Peer *p)
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
char data[epos_bytes_allocation]
#define dqmhashfinal(a, b, c)
static bool setOrder(const CoreObject &a, const CoreObject &b)
static const uint32_t DQM_REPLY_NONE
static const uint32_t DQM_PROP_TYPE_TH2S
virtual void removePeer(Peer *p, lat::Socket *s)=0
void listenToCollector(const std::string &host, int port)
static const uint32_t DQM_MSG_LIST_OBJECTS
static const uint32_t DQM_PROP_TYPE_STRING
static const uint32_t DQM_PROP_TYPE_TH3D
__gnu_cxx::hash_set< ObjType, HashOp, HashEqual > ObjectMap
static void copydata(Bucket *b, const void *data, size_t len)
void updateLocalObject(Object &o)
static const uint32_t DQM_PROP_NEW
static const uint32_t DQM_PROP_TYPE_MASK
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_REPORT_CLEAR
static const uint32_t DQM_PROP_LUMI
void updateToCollector(const std::string &host, int port)
std::set< std::string > DirMap
static const uint32_t DQM_PROP_RECEIVED
static const uint32_t DQM_PROP_TYPE_REAL
tuple size
Write out results.
static const uint32_t DQM_PROP_TYPE_INVALID
DQMBasicNet(const std::string &appname="")
virtual Peer * createPeer(lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TPROF2D
virtual void removePeer(Peer *p, lat::Socket *s)
static const uint32_t DQM_PROP_TYPE_TH2F
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0