#include <DQMNet.h>
Classes | |
struct | ImplPeer |
Public Types | |
typedef std::set< std::string > | DirMap |
typedef __gnu_cxx::hash_set < ObjType, HashOp, HashEqual > | ObjectMap |
typedef std::map< lat::Socket *, ImplPeer > | PeerMap |
Public Member Functions | |
DQMImplNet (const std::string &appname="") | |
~DQMImplNet (void) | |
Protected Member Functions | |
virtual Peer * | createPeer (lat::Socket *s) |
virtual Object * | findObject (Peer *p, const std::string &name, Peer **owner=0) |
virtual Peer * | getPeer (lat::Socket *s) |
virtual Object * | makeObject (Peer *p, const std::string &name) |
virtual void | markObjectsDead (Peer *p) |
virtual void | purgeDeadObjects (Peer *p) |
virtual void | removePeer (Peer *p, lat::Socket *s) |
virtual void | sendObjectListToPeer (Bucket *msg, bool all, bool clear) |
Send all objects to a peer and optionally mark sent objects old. | |
virtual void | sendObjectListToPeers (bool all) |
virtual void | updatePeerMasks (void) |
Protected Attributes | |
PeerMap | peers_ |
typedef std::set<std::string> DQMImplNet< ObjType >::DirMap |
typedef std::map<lat::Socket *, ImplPeer> DQMImplNet< ObjType >::PeerMap |
DQMImplNet< ObjType >::DQMImplNet | ( | const std::string & | appname = "" | ) | [inline] |
DQMImplNet< ObjType >::~DQMImplNet | ( | void | ) | [inline] |
virtual Peer* DQMImplNet< ObjType >::createPeer | ( | lat::Socket * | s | ) | [inline, protected, virtual] |
virtual Object* DQMImplNet< ObjType >::findObject | ( | Peer * | p, |
const std::string & | name, | ||
Peer ** | owner = 0 |
||
) | [inline, protected, virtual] |
Implements DQMNet.
Definition at line 365 of file DQMNet.h.
{ size_t slash = name.rfind('/'); size_t dirpos = (slash == std::string::npos ? 0 : slash); size_t namepos = (slash == std::string::npos ? 0 : slash+1); std::string path(name, 0, dirpos); ObjType proto; proto.hash = dqmhash(name.c_str(), name.size()); proto.dirname = &path; proto.objname.append(name, namepos, std::string::npos); typename ObjectMap::iterator pos; typename PeerMap::iterator i, e; if (owner) *owner = 0; if (p) { ImplPeer *ip = static_cast<ImplPeer *>(p); pos = ip->objs.find(proto); if (pos == ip->objs.end()) return 0; else { if (owner) *owner = ip; return const_cast<ObjType *>(&*pos); } } else { for (i = peers_.begin(), e = peers_.end(); i != e; ++i) { pos = i->second.objs.find(proto); if (pos != i->second.objs.end()) { if (owner) *owner = &i->second; return const_cast<ObjType *>(&*pos); } } return 0; } }
virtual Peer* DQMImplNet< ObjType >::getPeer | ( | lat::Socket * | s | ) | [inline, protected, virtual] |
virtual Object* DQMImplNet< ObjType >::makeObject | ( | Peer * | p, |
const std::string & | name | ||
) | [inline, protected, virtual] |
Implements DQMNet.
Definition at line 408 of file DQMNet.h.
{ ImplPeer *ip = static_cast<ImplPeer *>(p); size_t slash = name.rfind('/'); size_t dirpos = (slash == std::string::npos ? 0 : slash); size_t namepos = (slash == std::string::npos ? 0 : slash+1); ObjType o; o.flags = 0; o.tag = 0; o.version = 0; o.lastreq = 0; o.dirname = &*ip->dirs.insert(name.substr(0, dirpos)).first; o.objname.append(name, namepos, std::string::npos); o.hash = dqmhash(name.c_str(), name.size()); return const_cast<ObjType *>(&*ip->objs.insert(o).first); }
virtual void DQMImplNet< ObjType >::markObjectsDead | ( | Peer * | p | ) | [inline, protected, virtual] |
Implements DQMNet.
Definition at line 433 of file DQMNet.h.
{ uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns(); ImplPeer *ip = static_cast<ImplPeer *>(p); typename ObjectMap::iterator i, e; for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i) { if (i->lastreq && i->lastreq < minreq) const_cast<ObjType &>(*i).lastreq = 0; const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD; } }
virtual void DQMImplNet< ObjType >::purgeDeadObjects | ( | Peer * | p | ) | [inline, protected, virtual] |
virtual void DQMImplNet< ObjType >::removePeer | ( | Peer * | p, |
lat::Socket * | s | ||
) | [inline, protected, virtual] |
Implements DQMNet.
Definition at line 489 of file DQMNet.h.
{ ImplPeer *ip = static_cast<ImplPeer *>(p); bool needflush = ! ip->objs.empty(); typename ObjectMap::iterator i, e; for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ) ip->objs.erase(i++); peers_.erase(s); // If we removed a peer with objects, our list of objects // has changed and we need to update downstream peers. if (needflush) sendLocalChanges(); }
virtual void DQMImplNet< ObjType >::sendObjectListToPeer | ( | Bucket * | msg, |
bool | all, | ||
bool | clear | ||
) | [inline, protected, virtual] |
Send all objects to a peer and optionally mark sent objects old.
Implements DQMNet.
Definition at line 508 of file DQMNet.h.
Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeers().
{ typename PeerMap::iterator pi, pe; typename ObjectMap::iterator oi, oe; size_t size = 0; size_t numobjs = 0; for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs) if (all || (oi->flags & DQM_PROP_NEW)) size += 9*sizeof(uint32_t) + oi->dirname->size() + oi->objname.size() + 1 + oi->scalar.size() + oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0); msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t)); uint32_t nupdates = 0; uint32_t words [4]; words[0] = sizeof(words); words[1] = DQM_REPLY_LIST_BEGIN; words[2] = numobjs; words[3] = all; copydata(msg, &words[0], sizeof(words)); for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi) if (all || (oi->flags & DQM_PROP_NEW)) { sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0); if (clear) const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW; ++nupdates; } words[1] = DQM_REPLY_LIST_END; words[2] = nupdates; copydata(msg, &words[0], sizeof(words)); }
virtual void DQMImplNet< ObjType >::sendObjectListToPeers | ( | bool | all | ) | [inline, protected, virtual] |
Implements DQMNet.
Definition at line 547 of file DQMNet.h.
{ typename PeerMap::iterator i, e; typename ObjectMap::iterator oi, oe; for (i = peers_.begin(), e = peers_.end(); i != e; ++i) { ImplPeer &p = i->second; if (! p.update) continue; if (debug_) logme() << "DEBUG: notifying " << p.peeraddr << std::endl; Bucket msg; msg.next = 0; sendObjectListToPeer(&msg, !p.updated || all, true); if (! msg.data.empty()) { Bucket **prev = &p.sendq; while (*prev) prev = &(*prev)->next; *prev = new Bucket; (*prev)->next = 0; (*prev)->data.swap(msg.data); } p.updated = true; } }
virtual void DQMImplNet< ObjType >::updatePeerMasks | ( | void | ) | [inline, protected, virtual] |
PeerMap DQMImplNet< ObjType >::peers_ [protected] |
Definition at line 588 of file DQMNet.h.
Referenced by DQMImplNet< DQMNet::Object >::createPeer(), DQMImplNet< DQMNet::Object >::findObject(), DQMImplNet< DQMNet::Object >::getPeer(), DQMImplNet< DQMNet::Object >::removePeer(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), and DQMImplNet< DQMNet::Object >::updatePeerMasks().