CMS 3D CMS Logo

VisNet Class Reference

#include <VisFramework/VisFrameworkBase/interface/VisNet.h>

List of all members.

Public Types

typedef std::vector< unsigned
char > 
DataBlob
typedef std::map< std::string,
Object
ObjectMap
typedef std::map< lat::Socket *,
Peer
PeerMap
typedef std::list< WaitObjectWaitList

Public Member Functions

void debug (bool doit)
 Enable or disable verbose debugging.
void delay (int delay)
 Set the I/O dispatching delay.
void listenToSource (const std::string &host, int port)
 Tell the network layer to connect to host and port and automatically receive updates from upstream source(s).
void lock (void)
 Acquire a lock on the net layer.
virtual int receive (void(*callback)(void *arg, uint32_t reason, Object &obj), void *arg)
virtual void removeLocalObject (const std::string &name)
 Delete the local object.
void run (void)
 Run the actual I/O processing loop.
void sendLocalChanges (void)
void shutdown (void)
 Stop the network layer and wait it to finish.
void start (void)
 Start running the network layer in a new thread.
void startLocalServer (int port)
 Start a server socket for accessing this node remotely.
void unlock (void)
 Release the lock on the net layer.
virtual void updateLocalObject (Object &o)
 Update the network cache for an object.
void updateToCollector (const std::string &host, int port)
 Tell the network layer to connect to host and port and automatically send updates whenever local data changes.
 VisNet (const std::string &appname="")
virtual ~VisNet (void)

Static Public Attributes

static const uint32_t MAX_PEER_WAITREQS = 128
static const uint32_t VIS_FLAG_DEAD = 0x40000000
static const uint32_t VIS_FLAG_NEW = 0x20000000
static const uint32_t VIS_FLAG_RECEIVED = 0x10000000
static const uint32_t VIS_FLAG_SCALAR = 0x1
static const uint32_t VIS_FLAG_ZOMBIE = 0x80000000
static const uint32_t VIS_MSG_GET_OBJECT = 3
static const uint32_t VIS_MSG_HELLO = 0
static const uint32_t VIS_MSG_LIST_OBJECTS = 2
static const uint32_t VIS_MSG_UPDATE_ME = 1
static const uint32_t VIS_REPLY_LIST_BEGIN = 101
static const uint32_t VIS_REPLY_LIST_END = 102
static const uint32_t VIS_REPLY_NONE = 103
static const uint32_t VIS_REPLY_OBJECT = 104

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=0)
virtual PeergetPeer (lat::Socket *s)
std::ostream & logme (void)
virtual ObjectmakeObject (Peer *p, const std::string &name)
virtual void markObjectsDead (Peer *p)
virtual void markObjectsZombies (Peer *p)
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
virtual void purgeDeadObjects (lat::Time oldobj, lat::Time deadobj)
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
virtual void removePeer (Peer *p, lat::Socket *s)
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)
 Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeers (bool all)
void sendObjectToPeer (Bucket *msg, Object &o, bool data)
virtual bool shouldStop (void)
void updateMask (Peer *p)
 Update the selector mask for a peer based on data queues.
virtual void updatePeerMasks (void)
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)

Protected Attributes

bool debug_

Private Member Functions

bool losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
 Handle errors with a peer socket.
bool onLocalNotify (lat::IOSelectEvent *ev)
 React to notifications from the app thread.
bool onPeerConnect (lat::IOSelectEvent *ev)
 Respond to new connections on the server socket.
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client.
VisNetoperator= (const VisNet &)
void releaseFromWait (WaitList::iterator i, Object *o)
void releaseWaiters (Object *o)
void requestObject (Peer *p, const char *name, size_t len)
 Queue an object request to the data server.
 VisNet (const VisNet &)

Static Private Member Functions

static void discard (Bucket *&b)

Private Attributes

std::string appname_
pthread_t communicate_
int delay_
AutoPeer downstream_
bool flush_
Peerlocal_
pthread_mutex_t lock_
PeerMap peers_
int pid_
lat::IOSelector sel_
lat::InetServerSocketserver_
sig_atomic_t shutdown_
AutoPeer upstream_
lat::Time version_
WaitList waiting_
lat::Pipe wakeup_

Classes

struct  AutoPeer
struct  Bucket
struct  Object
struct  Peer
struct  WaitObject


Detailed Description

Definition at line 18 of file VisNet.h.


Member Typedef Documentation

typedef std::vector<unsigned char> VisNet::DataBlob

Definition at line 41 of file VisNet.h.

typedef std::map<std::string, Object> VisNet::ObjectMap

Definition at line 45 of file VisNet.h.

typedef std::map<lat::Socket *, Peer> VisNet::PeerMap

Definition at line 46 of file VisNet.h.

typedef std::list<WaitObject> VisNet::WaitList

Definition at line 44 of file VisNet.h.


Constructor & Destructor Documentation

VisNet::VisNet ( const std::string &  appname = ""  ) 

Definition at line 781 of file VisNet.cc.

References lat::IOSelector::attach(), lat::CreateHook(), createPeer(), downstream_, lat::IOChannel::fd(), IORead, local_, VisNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), VisNet::AutoPeer::peer, VisNet::AutoPeer::port, sel_, lat::Pipe::source(), VisNet::AutoPeer::update, upstream_, wakeup_, and VisNet::AutoPeer::warned.

00782   : debug_ (false),
00783     appname_ (appname.empty() ? "VisNet" : appname.c_str()),
00784     pid_ (getpid()),
00785     server_ (0),
00786     version_ (Time::current()),
00787     communicate_ ((pthread_t) -1),
00788     shutdown_ (0),
00789     delay_ (1000),
00790     flush_ (false)
00791 {
00792   // Create a pipe for the local apps to tell the communicator
00793   // thread that local app data has changed and that the peers
00794   // should be notified.
00795   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
00796   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &VisNet::onLocalNotify));
00797 
00798   // Initialise the upstream and downstream to empty.
00799   upstream_.peer   = downstream_.peer   = 0;
00800   upstream_.next   = downstream_.next   = 0;
00801   upstream_.port   = downstream_.port   = 0;
00802   upstream_.update = downstream_.update = false;
00803   upstream_.warned = downstream_.warned = false;
00804 
00805   local_ = createPeer((Socket *) -1);
00806 }

VisNet::~VisNet ( void   )  [virtual]

Definition at line 808 of file VisNet.cc.

00809 {
00810   // FIXME
00811 }

VisNet::VisNet ( const VisNet  )  [private]


Member Function Documentation

void VisNet::copydata ( Bucket b,
const void data,
size_t  len 
) [static, protected]

Definition at line 38 of file VisNet.cc.

References VisNet::Bucket::data.

Referenced by onMessage(), releaseFromWait(), requestObject(), run(), sendObjectListToPeer(), and sendObjectToPeer().

00039 {
00040   b->data.insert(b->data.end(),
00041                  (const unsigned char *)data,
00042                  (const unsigned char *)data + len);
00043 }

VisNet::Peer * VisNet::createPeer ( lat::Socket s  )  [protected, virtual]

Definition at line 1223 of file VisNet.cc.

References VisNet::Peer::automatic, VisNet::Peer::mask, p, peers_, VisNet::Peer::sendpos, VisNet::Peer::sendq, VisNet::Peer::socket, VisNet::Peer::source, VisNet::Peer::update, VisNet::Peer::updated, VisNet::Peer::updates, and VisNet::Peer::waiting.

Referenced by onPeerConnect(), run(), and VisNet().

01224 {
01225   Peer *p = &peers_[s];
01226   p->socket = 0;
01227   p->sendq = 0;
01228   p->sendpos = 0;
01229   p->mask = 0;
01230   p->source = false;
01231   p->update = false;
01232   p->updated = false;
01233   p->updates = 0;
01234   p->waiting = 0;
01235   p->automatic = 0;
01236   return p;
01237 }

void VisNet::debug ( bool  doit  ) 

Enable or disable verbose debugging.

Must be called before calling run() or start().

Definition at line 816 of file VisNet.cc.

References debug_.

00817 {
00818   debug_ = doit;
00819 }

void VisNet::delay ( int  delay  ) 

Set the I/O dispatching delay.

Must be called before calling run() or start().

Definition at line 824 of file VisNet.cc.

References delay_.

00825 {
00826   delay_ = delay;
00827 }

void VisNet::discard ( Bucket *&  b  )  [static, private]

Definition at line 47 of file VisNet.cc.

References VisNet::Bucket::next.

Referenced by losePeer(), and onPeerData().

00048 {
00049   while (b)
00050   {
00051     Bucket *next = b->next;
00052     delete b;
00053     b = next;
00054   }
00055 }

VisNet::Object * VisNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = 0 
) [protected, virtual]

Definition at line 1110 of file VisNet.cc.

References e, i, VisNet::Peer::objs, and peers_.

Referenced by onMessage(), and run().

01111 {
01112   ObjectMap::iterator pos;
01113   PeerMap::iterator i, e;
01114   if (owner)
01115     *owner = 0;
01116   if (p)
01117   {
01118     pos = p->objs.find(name);
01119     if (pos == p->objs.end())
01120       return 0;
01121     else
01122     {
01123       if (owner) *owner = p;
01124       return &pos->second;
01125     }
01126   }
01127   else
01128   {
01129     for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01130     {
01131       pos = i->second.objs.find(name);
01132       if (pos != i->second.objs.end())
01133       {
01134         if (owner) *owner = &i->second;
01135         return &pos->second;
01136       }
01137     }
01138     return 0;
01139   }
01140 }

VisNet::Peer * VisNet::getPeer ( lat::Socket s  )  [protected, virtual]

Definition at line 1216 of file VisNet.cc.

References peers_.

Referenced by onPeerData().

01217 {
01218   PeerMap::iterator pos = peers_.find(s);
01219   return pos == peers_.end() ? 0 : &pos->second;
01220 }

void VisNet::listenToSource ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream source(s).

Must be called before calling run() or start().

Definition at line 889 of file VisNet.cc.

References lat::endl(), VisNet::AutoPeer::host, logme(), VisNet::AutoPeer::port, VisNet::AutoPeer::update, and upstream_.

00890 {
00891   if (! upstream_.host.empty())
00892   {
00893     logme()
00894       << "ERROR: Already receiving data from another collector at "
00895       << upstream_.host << ":" << upstream_.port << std::endl;
00896     return;
00897   }
00898 
00899   upstream_.update = false;
00900   upstream_.host = host;
00901   upstream_.port = port;
00902 }

void VisNet::lock ( void   ) 

Acquire a lock on the net layer.

Definition at line 929 of file VisNet.cc.

References communicate_, and lock_.

Referenced by onMessage(), receive(), and run().

00930 {
00931   if (communicate_ != (pthread_t) -1)
00932     pthread_mutex_lock(&lock_);
00933 }

std::ostream & VisNet::logme ( void   )  [protected]

Definition at line 29 of file VisNet.cc.

References appname_, TestMuL1L2Filter_cff::cerr, and pid_.

Referenced by listenToSource(), losePeer(), onLocalNotify(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), start(), startLocalServer(), updateMask(), and updateToCollector().

00030 {
00031   return std::cerr
00032     << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00033     << " " << appname_ << "[" << pid_ << "]: ";
00034 }

bool VisNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent event,
lat::Error err = 0 
) [private]

Handle errors with a peer socket.

Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 62 of file VisNet.cc.

References VisNet::Peer::automatic, lat::Socket::close(), lat::IOSelector::detach(), discard(), e, lat::endl(), lat::Error::explain(), i, logme(), VisNet::AutoPeer::peer, VisNet::Peer::peeraddr, removePeer(), s, sel_, VisNet::Peer::sendq, VisNet::Peer::socket, lat::IOSelectEvent::source, and waiting_.

Referenced by onPeerData(), and updateMask().

00066 {
00067   if (reason)
00068     logme ()
00069       << reason << peer->peeraddr
00070       << (err ? "; error was: " + err->explain() : std::string(""))
00071       << std::endl;
00072 
00073   Socket *s = peer->socket;
00074 
00075   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00076     if (i->peer == peer)
00077       waiting_.erase(i++);
00078     else
00079       ++i;
00080 
00081   if (ev)
00082     ev->source = 0;
00083 
00084   discard(peer->sendq);
00085   if (peer->automatic)
00086     peer->automatic->peer = 0;
00087 
00088   sel_.detach(s);
00089   s->close();
00090   removePeer(peer, s);
00091   delete s;
00092   return true;
00093 }

VisNet::Object * VisNet::makeObject ( Peer p,
const std::string &  name 
) [protected, virtual]

Definition at line 1143 of file VisNet.cc.

References VisNet::Object::flags, VisNet::Object::lastreq, VisNet::Object::name, VisNet::Peer::objs, and VisNet::Object::version.

Referenced by onMessage().

01144 {
01145   Object *o = &p->objs[name];
01146   o->version = 0;
01147   o->name = name;
01148   o->flags = 0;
01149   o->lastreq = 0;
01150   return o;
01151 }

void VisNet::markObjectsDead ( Peer p  )  [protected, virtual]

Definition at line 1169 of file VisNet.cc.

References e, i, VisNet::Peer::objs, VIS_FLAG_DEAD, and VIS_FLAG_ZOMBIE.

Referenced by onMessage().

01170 {
01171   ObjectMap::iterator i, e;
01172   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01173     if (i->second.flags & VIS_FLAG_ZOMBIE)
01174       i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD;
01175 }

void VisNet::markObjectsZombies ( Peer p  )  [protected, virtual]

Definition at line 1160 of file VisNet.cc.

References e, i, VisNet::Peer::objs, and VIS_FLAG_ZOMBIE.

Referenced by onMessage().

01161 {
01162   ObjectMap::iterator i, e;
01163   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01164     i->second.flags |= VIS_FLAG_ZOMBIE;
01165 }

bool VisNet::onLocalNotify ( lat::IOSelectEvent ev  )  [private]

React to notifications from the app thread.

This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new app data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent object updates.

Definition at line 720 of file VisNet.cc.

References e, lat::endl(), lat::SysErr::ErrTryAgain, lat::Error::explain(), flush_, logme(), lat::Error::next(), lat::SystemError::portable(), lat::IOChannel::read(), and lat::IOSelectEvent::source.

Referenced by VisNet().

00721 {
00722   // Discard the data in the pipe, we care only about the wakeup.
00723   try
00724   {
00725     IOSize sz;
00726     unsigned char buf [1024];
00727     while ((sz = ev->source->read(buf, sizeof(buf))))
00728       ;
00729   }
00730   catch (Error &e)
00731   {
00732     SystemError *next = dynamic_cast<SystemError *>(e.next());
00733     if (next && next->portable() == SysErr::ErrTryAgain)
00734       ; // Ignore it
00735     else
00736       logme()
00737         << "WARNING: error reading from notification pipe: "
00738         << e.explain() << std::endl;
00739   }
00740 
00741   // Tell the main event pump to send an update in a little while.
00742   flush_ = true;
00743 
00744   // We are never done, always keep going.
00745   return false;
00746 }

bool VisNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
) [protected, virtual]

Definition at line 224 of file VisNet.cc.

References copydata(), VisNet::Bucket::data, debug_, lat::endl(), findObject(), VisNet::Object::flags, flags, flush_, VisNet::Object::lastreq, lock(), logme(), makeObject(), markObjectsDead(), markObjectsZombies(), name, VisNet::Peer::peeraddr, VisNet::Object::rawdata, releaseWaiters(), requestObject(), sendObjectListToPeer(), sendObjectToPeer(), VisNet::Peer::source, unlock(), VisNet::Peer::update, VisNet::Peer::updates, VisNet::Object::version, VIS_FLAG_DEAD, VIS_FLAG_RECEIVED, VIS_MSG_GET_OBJECT, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, VIS_REPLY_LIST_BEGIN, VIS_REPLY_LIST_END, VIS_REPLY_NONE, VIS_REPLY_OBJECT, and waitForData().

Referenced by onPeerData().

00225 {
00226   // Decode and process this message.
00227   uint32_t type;
00228   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00229   switch (type)
00230   {
00231   case VIS_MSG_UPDATE_ME:
00232     {
00233       if (len != 2*sizeof(uint32_t))
00234       {
00235         logme()
00236           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00237           << " from peer " << p->peeraddr << std::endl;
00238         return false;
00239       }
00240 
00241       if (debug_)
00242         logme()
00243           << "DEBUG: received message 'UPDATE ME' from peer "
00244           << p->peeraddr << std::endl;
00245 
00246       p->update = true;
00247     }
00248     return true;
00249 
00250   case VIS_MSG_LIST_OBJECTS:
00251     {
00252       if (debug_)
00253         logme()
00254           << "DEBUG: received message 'LIST OBJECTS' from peer "
00255           << p->peeraddr << std::endl;
00256 
00257       // Send over current status: list of known objects.
00258       lock();
00259       sendObjectListToPeer(msg, true, false);
00260       unlock();
00261     }
00262     return true;
00263 
00264   case VIS_MSG_GET_OBJECT:
00265     {
00266       if (debug_)
00267         logme()
00268           << "DEBUG: received message 'GET OBJECT' from peer "
00269           << p->peeraddr << std::endl;
00270 
00271       if (len < 3*sizeof(uint32_t))
00272       {
00273         logme()
00274           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00275           << " from peer " << p->peeraddr << std::endl;
00276         return false;
00277       }
00278 
00279       uint32_t namelen;
00280       memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00281       if (len != 3*sizeof(uint32_t) + namelen)
00282       {
00283         logme()
00284           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00285           << " from peer " << p->peeraddr
00286           << ", expected length " << (3*sizeof(uint32_t))
00287           << " + " << namelen << std::endl;
00288         return false;
00289       }
00290 
00291       lock();
00292       std::string name((char *) data + 3*sizeof(uint32_t), namelen);
00293       Peer *owner = 0;
00294       Object *o = findObject(0, name, &owner);
00295       if (o)
00296       {
00297         o->lastreq = Time::current();
00298         if (o->rawdata.empty())
00299           waitForData(p, name, "", owner);
00300         else
00301           sendObjectToPeer(msg, *o, true);
00302       }
00303       else
00304       {
00305         uint32_t words[3];
00306         words[0] = sizeof(words) + name.size();
00307         words[1] = VIS_REPLY_NONE;
00308         words[2] = name.size();
00309 
00310         msg->data.reserve(msg->data.size() + words[0]);
00311         copydata(msg, &words[0], sizeof(words));
00312         copydata(msg, &name[0], name.size());
00313       }
00314       unlock();
00315     }
00316     return true;
00317 
00318   case VIS_REPLY_LIST_BEGIN:
00319     {
00320       if (len != 4*sizeof(uint32_t))
00321       {
00322         logme()
00323           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00324           << " from peer " << p->peeraddr << std::endl;
00325         return false;
00326       }
00327 
00328       if (debug_)
00329         logme()
00330           << "DEBUG: received message 'LIST BEGIN' from "
00331           << p->peeraddr << std::endl;
00332 
00333       // Get the update status: whether this is a full update.
00334       uint32_t flags;
00335       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00336 
00337       // If we are about to receive a full list of objects, flag all
00338       // objects dead.  Subsequent object notifications will undo this
00339       // for the live objects.  This tells us the object has been
00340       // removed, but we can keep making it available for a while if
00341       // there continues to be interest in it.
00342       if (flags)
00343       {
00344         lock();
00345         markObjectsZombies(p);
00346         unlock();
00347       }
00348     }
00349     return true;
00350 
00351   case VIS_REPLY_LIST_END:
00352     {
00353       if (len != 4*sizeof(uint32_t))
00354       {
00355         logme()
00356           << "ERROR: corrupt 'LIST END' message of length " << len
00357           << " from peer " << p->peeraddr << std::endl;
00358         return false;
00359       }
00360 
00361       // Get the update status: whether this is a full update.
00362       uint32_t flags;
00363       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00364 
00365       // If we received a full list of objects, flag all zombie objects
00366       // now dead. We need to do this in two stages in case we receive
00367       // updates in many parts, and end up sending updates to others in
00368       // between; this avoids us lying live objects are dead.
00369       if (flags)
00370       {
00371         lock();
00372         markObjectsDead(p);
00373         unlock();
00374       }
00375 
00376       if (debug_)
00377         logme()
00378           << "DEBUG: received message 'LIST END' from "
00379           << p->peeraddr << std::endl;
00380 
00381       // Indicate we have received another update from this peer.
00382       // Also indicate we should flush to our clients.
00383       flush_ = true;
00384       p->updates++;
00385     }
00386     return true;
00387 
00388   case VIS_REPLY_OBJECT:
00389     {
00390       uint32_t words[7];
00391       if (len < sizeof(words))
00392       {
00393         logme()
00394           << "ERROR: corrupt 'OBJECT' message of length " << len
00395           << " from peer " << p->peeraddr << std::endl;
00396         return false;
00397       }
00398 
00399       memcpy(&words[0], data, sizeof(words));
00400       uint32_t &namelen = words[5];
00401       uint32_t &datalen = words[6];
00402 
00403       if (len != sizeof(words) + namelen + datalen)
00404       {
00405         logme()
00406           << "ERROR: corrupt 'OBJECT' message of length " << len
00407           << " from peer " << p->peeraddr
00408           << ", expected length " << sizeof(words)
00409           << " + " << namelen
00410           << " + " << datalen
00411           << std::endl;
00412         return false;
00413       }
00414 
00415       unsigned char *namedata = data + sizeof(words);
00416       unsigned char *objdata = namedata + namelen;
00417       unsigned char *enddata = objdata + datalen;
00418       std::string name((char *) namedata, namelen);
00419       assert(enddata == data + len);
00420 
00421       if (debug_)
00422         logme()
00423           << "DEBUG: received message 'OBJECT " << name
00424           << "' from " << p->peeraddr << std::endl;
00425 
00426       // Mark the peer as a known object source.
00427       p->source = true;
00428 
00429       // Initialise or update an object entry.
00430       lock();
00431       Object *o = findObject(p, name);
00432       if (! o)
00433         o = makeObject(p, name);
00434 
00435       bool hadobject = ! o->rawdata.empty();
00436       o->flags = words[2] | VIS_FLAG_RECEIVED;
00437       o->version = ((uint64_t) words[4] << 32 | words[3]);
00438       o->rawdata.clear();
00439       o->rawdata.insert(o->rawdata.end(), objdata, enddata);
00440 
00441       // If we had an object for this one already and this is a list
00442       // update without data, issue an immediate data get request.
00443       if (hadobject && ! datalen)
00444         requestObject(p, (namelen ? &name[0] : 0), namelen);
00445 
00446       // If we have the object data, release from wait.
00447       if (datalen)
00448         releaseWaiters(o);
00449       unlock();
00450     }
00451     return true;
00452 
00453   case VIS_REPLY_NONE:
00454     {
00455       uint32_t words[3];
00456       if (len < sizeof(words))
00457       {
00458         logme()
00459           << "ERROR: corrupt 'NONE' message of length " << len
00460           << " from peer " << p->peeraddr << std::endl;
00461         return false;
00462       }
00463 
00464       memcpy(&words[0], data, sizeof(words));
00465       uint32_t &namelen = words[2];
00466 
00467       if (len != sizeof(words) + namelen)
00468       {
00469         logme()
00470           << "ERROR: corrupt 'NONE' message of length " << len
00471           << " from peer " << p->peeraddr
00472           << ", expected length " << sizeof(words)
00473           << " + " << namelen << std::endl;
00474         return false;
00475       }
00476 
00477       unsigned char *namedata = data + sizeof(words);
00478       unsigned char *enddata = namedata + namelen;
00479       std::string name((char *) namedata, namelen);
00480       assert(enddata == data + len);
00481 
00482       if (debug_)
00483         logme()
00484           << "DEBUG: received message 'NONE " << name
00485           << "' from " << p->peeraddr << std::endl;
00486 
00487       // Mark the peer as a known object source.
00488       p->source = true;
00489 
00490       // If this was a known object, update its entry.
00491       lock();
00492       Object *o = findObject(p, name);
00493       if (o)
00494         o->flags |= VIS_FLAG_DEAD;
00495 
00496       // If someone was waiting for this, let them go.
00497       releaseWaiters(o);
00498       unlock();
00499     }
00500     return true;
00501 
00502   default:
00503     logme()
00504       << "ERROR: unrecognised message of length " << len
00505       << " and type " << type << " from peer " << p->peeraddr
00506       << std::endl;
00507     return false;
00508   }
00509 }

bool VisNet::onPeerConnect ( lat::IOSelectEvent ev  )  [private]

Respond to new connections on the server socket.

Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 680 of file VisNet.cc.

References lat::Socket::accept(), arg, lat::IOSelector::attach(), lat::CreateHook(), createPeer(), debug_, lat::endl(), lat::InetAddress::hostname(), IORead, IOUrgent, lat::IOChannel::isBlocking(), logme(), onPeerData(), p, lat::InetAddress::port(), s, sel_, server_, and lat::IOSelectEvent::source.

Referenced by startLocalServer().

00681 {
00682   // Recover the server socket.
00683   assert(ev->source == server_);
00684 
00685   // Accept the connection.
00686   Socket *s = server_->accept();
00687   assert(s);
00688   assert(! s->isBlocking());
00689 
00690   // Record it to our list of peers.
00691   Peer *p = createPeer(s);
00692   InetAddress peeraddr = ((InetSocket *) s)->peername();
00693   InetAddress myaddr = ((InetSocket *) s)->sockname();
00694   p->peeraddr = StringFormat("%1:%2")
00695                 .arg(peeraddr.hostname())
00696                 .arg(peeraddr.port());
00697   p->mask = IORead|IOUrgent;
00698   p->socket = s;
00699 
00700   // Report the new connection.
00701   if (debug_)
00702     logme()
00703       << "INFO: new peer " << p->peeraddr << " is now connected to "
00704       << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00705 
00706   // Attach it to the listener.
00707   sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p));
00708 
00709   // We are never done.
00710   return false;
00711 }

bool VisNet::onPeerData ( lat::IOSelectEvent ev,
Peer p 
) [private]

Handle communication to a particular client.

Definition at line 514 of file VisNet.cc.

References VisNet::Peer::automatic, b, data, VisNet::Bucket::data, debug_, discard(), e, lat::endl(), lat::SysErr::ErrTryAgain, lat::IOSelectEvent::events, getPeer(), VisNet::Peer::incoming, IORead, IOUrgent, IOWrite, len, logme(), losePeer(), VisNet::Peer::mask, MAX_PEER_WAITREQS, MESSAGE_SIZE_LIMIT, alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, lat::Error::next(), old, onMessage(), VisNet::Peer::peeraddr, lat::SystemError::portable(), lat::IOChannel::read(), sel_, VisNet::Peer::sendpos, VisNet::Peer::sendq, lat::IOSelector::setMask(), VisNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, lat::IOSelectEvent::source, TrackValidation_HighPurity_cff::valid, VisNet::Peer::waiting, and lat::IOChannel::write().

Referenced by onPeerConnect(), and run().

00515 {
00516   assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00517 
00518   // If there is a problem with the peer socket, discard the peer
00519   // and tell the selector to stop prcessing events for it.  If
00520   // this is a server connection, we will eventually recreate
00521   // everything if/when the data server comes back.
00522   if (ev->events & IOUrgent)
00523   {
00524     if (p->automatic)
00525     {
00526       logme()
00527         << "WARNING: connection to the server at " << p->peeraddr
00528         << " lost (will attempt to reconnect in 15 seconds)\n";
00529       return losePeer(0, p, ev);
00530     }
00531     else
00532       return losePeer("WARNING: lost peer connection ", p, ev);
00533   }
00534 
00535   // If we can write to the peer socket, pump whatever we can into it.
00536   if (ev->events & IOWrite)
00537   {
00538     while (Bucket *b = p->sendq)
00539     {
00540       IOSize len = b->data.size() - p->sendpos;
00541       const void *data = (len ? (const void *)&b->data[p->sendpos]
00542                           : (const void *)&data);
00543       IOSize done;
00544 
00545       try
00546       {
00547         done = (len ? ev->source->write (data, len) : 0);
00548         if (debug_ && len)
00549           logme()
00550             << "DEBUG: sent " << done << " bytes to peer "
00551             << p->peeraddr << std::endl;
00552       }
00553       catch (Error &e)
00554       {
00555         return losePeer("WARNING: unable to write to peer ",
00556                         p, ev, &e);
00557       }
00558 
00559       p->sendpos += done;
00560       if (p->sendpos == b->data.size())
00561       {
00562         Bucket *old = p->sendq;
00563         p->sendq = old->next;
00564         p->sendpos = 0;
00565         old->next = 0;
00566         discard(old);
00567       }
00568 
00569       if (! done && len)
00570         // Cannot write any more.
00571         break;
00572     }
00573   }
00574 
00575   // If there is data to be read from the peer, first receive what we
00576   // can get out the socket, the process all complete requests.
00577   if (ev->events & IORead)
00578   {
00579     // First build up the incoming buffer of data in the socket.
00580     // Remember the last size returned by the socket; we need
00581     // it to determine if the remote end closed the connection.
00582     IOSize sz;
00583     try
00584     {
00585       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00586       do
00587         if ((sz = ev->source->read(&buf[0], buf.size())))
00588         {
00589           if (debug_)
00590             logme()
00591               << "DEBUG: received " << sz << " bytes from peer "
00592               << p->peeraddr << std::endl;
00593           DataBlob &data = p->incoming;
00594           if (data.capacity () < data.size () + sz)
00595             data.reserve (data.size() + SOCKET_READ_GROWTH);
00596           data.insert (data.end(), &buf[0], &buf[0] + sz);
00597         }
00598       while (sz == sizeof (buf));
00599     }
00600     catch (Error &e)
00601     {
00602       SystemError *next = dynamic_cast<SystemError *>(e.next());
00603       if (next && next->portable() == SysErr::ErrTryAgain)
00604         sz = 1; // Ignore it, and fake no end of data.
00605       else
00606         // Houston we have a problem.
00607         return losePeer("WARNING: failed to read from peer ",
00608                         p, ev, &e);
00609     }
00610 
00611     // Process fully received messages as long as we can.
00612     size_t consumed = 0;
00613     DataBlob &data = p->incoming;
00614     while (data.size()-consumed >= sizeof(uint32_t)
00615            && p->waiting < MAX_PEER_WAITREQS)
00616     {
00617       uint32_t msglen;
00618       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00619 
00620       if (msglen >= MESSAGE_SIZE_LIMIT)
00621         return losePeer("WARNING: excessively large message from ", p, ev);
00622 
00623       if (data.size()-consumed >= msglen)
00624       {
00625         bool valid = true;
00626         if (msglen < 2*sizeof(uint32_t))
00627         {
00628           logme()
00629             << "ERROR: corrupt peer message of length " << msglen
00630             << " from peer " << p->peeraddr << std::endl;
00631           valid = false;
00632         }
00633         else
00634         {
00635           // Decode and process this message.
00636           Bucket msg;
00637           msg.next = 0;
00638           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00639 
00640           // If we created a response, chain it to the write queue.
00641           if (! msg.data.empty())
00642           {
00643             Bucket **prev = &p->sendq;
00644             while (*prev)
00645                prev = &(*prev)->next;
00646 
00647             *prev = new Bucket;
00648             (*prev)->next = 0;
00649             (*prev)->data.swap(msg.data);
00650           }
00651         }
00652 
00653         if (! valid)
00654           return losePeer("WARNING: data stream error with ", p, ev);
00655 
00656         consumed += msglen;
00657       }
00658       else
00659         break;
00660     }
00661 
00662     data.erase(data.begin(), data.begin()+consumed);
00663 
00664     // If the client has closed the connection, shut down our end.  If
00665     // we have something to send back still, leave the write direction
00666     // open.  Otherwise close the shop for this client.
00667     if (sz == 0)
00668       sel_.setMask(p->socket, p->mask &= ~IORead);
00669   }
00670 
00671   // Yes, please keep processing events for this socket.
00672   return false;
00673 }

VisNet& VisNet::operator= ( const VisNet  )  [private]

void VisNet::purgeDeadObjects ( lat::Time  oldobj,
lat::Time  deadobj 
) [protected, virtual]

Definition at line 1179 of file VisNet.cc.

References debug_, lat::endl(), VisNet::Object::flags, VisNet::Object::lastreq, logme(), VisNet::Object::name, peers_, pi, VisNet::Object::rawdata, VisNet::Object::version, VIS_FLAG_DEAD, and VIS_FLAG_SCALAR.

Referenced by run().

01180 {
01181   PeerMap::iterator pi, pe;
01182   ObjectMap::iterator oi, oe;
01183   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01184     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
01185     {
01186       Object &o = oi->second;
01187 
01188       // Compact non-scalar objects that are unused.  We send scalar
01189       // objects to the web server so we keep them around.
01190       if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR))
01191       {
01192         if (debug_)
01193           logme()
01194             << "DEBUG: compacting idle '" << o.name
01195             << "' from " << pi->second.peeraddr << std::endl;
01196       }
01197 
01198       // Remove if dead, old and unused.
01199       if (o.lastreq < deadobj
01200           && o.version < deadobj
01201           && (o.flags & VIS_FLAG_DEAD))
01202       {
01203         if (debug_)
01204           logme()
01205             << "DEBUG: removing dead '" << o.name
01206             << "' from " << pi->second.peeraddr << std::endl;
01207 
01208         pi->second.objs.erase(oi++);
01209       }
01210       else
01211         ++oi;
01212     }
01213 }

int VisNet::receive ( void(*)(void *arg, uint32_t reason, Object &obj)  callback,
void arg 
) [virtual]

Definition at line 1333 of file VisNet.cc.

References VisNet::Object::flags, local_, lock(), VisNet::Peer::objs, p, peers_, pi, unlock(), VisNet::Peer::updates, VIS_FLAG_DEAD, and VIS_FLAG_RECEIVED.

01334 {
01335   int updates = 0;
01336 
01337   lock();
01338   PeerMap::iterator pi, pe;
01339   ObjectMap::iterator oi, oe;
01340   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01341   {
01342     Peer &p = pi->second;
01343     if (&p == local_)
01344       continue;
01345 
01346     updates += p.updates;
01347 
01348     for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01349     {
01350       Object &o = oi->second;
01351       if (o.flags & VIS_FLAG_DEAD)
01352       {
01353         callback(arg, VIS_FLAG_DEAD, o);
01354         p.objs.erase(oi++);
01355       }
01356       else if (o.flags & VIS_FLAG_RECEIVED)
01357       {
01358         callback(arg, VIS_FLAG_RECEIVED, o);
01359         o.flags &= ~VIS_FLAG_RECEIVED;
01360         ++oi;
01361       }
01362       else
01363         ++oi;
01364     }
01365   }
01366   unlock();
01367 
01368   return updates;
01369 }

void VisNet::releaseFromWait ( WaitList::iterator  i,
Object o 
) [private]

Definition at line 133 of file VisNet.cc.

References alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, releaseFromWait(), and waiting_.

00134 {
00135   Bucket **msg = &i->peer->sendq;
00136   while (*msg)
00137     msg = &(*msg)->next;
00138   *msg = new Bucket;
00139   (*msg)->next = 0;
00140 
00141   releaseFromWait(*msg, *i, o);
00142 
00143   assert(i->peer->waiting > 0);
00144   i->peer->waiting--;
00145   waiting_.erase(i);
00146 }

void VisNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
) [protected, virtual]

Definition at line 170 of file VisNet.cc.

References copydata(), VisNet::Bucket::data, VisNet::WaitObject::name, sendObjectToPeer(), and VIS_REPLY_NONE.

Referenced by releaseFromWait(), releaseWaiters(), and run().

00171 {
00172   if (o)
00173     sendObjectToPeer (msg, *o, true);
00174   else
00175   {
00176     uint32_t words[3];
00177     words[0] = sizeof(words) + w.name.size();
00178     words[1] = VIS_REPLY_NONE;
00179     words[2] = w.name.size();
00180 
00181     msg->data.reserve(msg->data.size() + words[0]);
00182     copydata(msg, &words[0], sizeof(words));
00183     copydata(msg, &w.name[0], w.name.size());
00184   }
00185 }

void VisNet::releaseWaiters ( Object o  )  [private]

Definition at line 150 of file VisNet.cc.

References e, i, VisNet::Object::name, releaseFromWait(), and waiting_.

Referenced by onMessage().

00151 {
00152   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00153     if (i->name == o->name)
00154       releaseFromWait(i++, o);
00155     else
00156       ++i;
00157 }

void VisNet::removeLocalObject ( const std::string &  path  )  [virtual]

Delete the local object.

The caller must call sendLocalChanges() later to push out the changes.

Definition at line 1391 of file VisNet.cc.

References local_, and VisNet::Peer::objs.

01392 {
01393   local_->objs.erase(path);
01394 }

void VisNet::removePeer ( Peer p,
lat::Socket s 
) [protected, virtual]

Definition at line 1240 of file VisNet.cc.

References VisNet::Peer::objs, peers_, and sendLocalChanges().

Referenced by losePeer().

01241 {
01242   bool needflush = ! p->objs.empty();
01243 
01244   p->objs.clear();
01245   peers_.erase(s);
01246 
01247   // If we removed a peer with objects, our list of objects
01248   // has changed and we need to update downstream peers.
01249   if (needflush)
01250     sendLocalChanges();
01251 }

void VisNet::requestObject ( Peer p,
const char *  name,
size_t  len 
) [private]

Queue an object request to the data server.

Definition at line 97 of file VisNet.cc.

References copydata(), alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, VisNet::Peer::sendq, and VIS_MSG_GET_OBJECT.

Referenced by onMessage(), and waitForData().

00098 {
00099   Bucket **msg = &p->sendq;
00100   while (*msg)
00101     msg = &(*msg)->next;
00102   *msg = new Bucket;
00103   (*msg)->next = 0;
00104 
00105   uint32_t words[3];
00106   words[0] = sizeof(words) + len;
00107   words[1] = VIS_MSG_GET_OBJECT;
00108   words[2] = len;
00109   (*msg)->data.reserve((*msg)->data.size() + words[0]);
00110   copydata(*msg, words, sizeof(words));
00111   copydata(*msg, name, len);
00112 }

void VisNet::run ( void   ) 

Run the actual I/O processing loop.

Definition at line 962 of file VisNet.cc.

References lat::Socket::abort(), arg, lat::IOSelector::attach(), VisNet::Peer::automatic, lat::InetSocket::connect(), copydata(), lat::CreateHook(), createPeer(), debug_, delay_, lat::IOSelector::dispatch(), downstream_, e, lat::endl(), lat::SysErr::ErrOperationInProgress, lat::Error::explain(), findObject(), flush_, VisNet::AutoPeer::host, lat::InetAddress::hostname(), i, IORead, IOUrgent, IOWrite, lock(), logme(), VisNet::Peer::mask, VisNet::Bucket::next, lat::Error::next(), VisNet::AutoPeer::next, onPeerData(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, p, VisNet::AutoPeer::peer, VisNet::Peer::peeraddr, VisNet::AutoPeer::port, lat::InetAddress::port(), lat::SystemError::portable(), purgeDeadObjects(), releaseFromWait(), s, sel_, sendObjectListToPeers(), VisNet::Peer::sendq, lat::IOChannel::setBlocking(), lat::Socket::setopt(), shouldStop(), VisNet::Peer::socket, SOCKET_BUF_SIZE, lat::SocketConst::TypeStream, unlock(), VisNet::Peer::update, VisNet::AutoPeer::update, updatePeerMasks(), upstream_, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, waiting_, and VisNet::AutoPeer::warned.

00963 {
00964   Time now;
00965   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
00966 
00967   // Perform I/O.  Every once in a while flush updates to peers.
00968   while (! shouldStop())
00969   {
00970     for (int i = 0; i < 2; ++i)
00971     {
00972       AutoPeer *ap = automatic[i];
00973 
00974       // If we need a server connection and don't have one yet,
00975       // initiate asynchronous connection creation.  Swallow errors
00976       // in case the server won't talk to us.
00977       if (! ap->host.empty()
00978           && ! ap->peer
00979           && (now = Time::current()) > ap->next)
00980       {
00981         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
00982         InetSocket *s = 0;
00983         try
00984         {
00985           s = new InetSocket (SocketConst::TypeStream);
00986           s->setBlocking (false);
00987           s->connect(InetAddress (ap->host.c_str(), ap->port));
00988           s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00989           s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00990         }
00991         catch (Error &e)
00992         {
00993           SystemError *sys = dynamic_cast<SystemError *>(e.next());
00994           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
00995           {
00996             // "In progress" just means the connection is in progress.
00997             // The connection is ready when the socket is writeable.
00998             // Anything else is a real problem.
00999             if (! ap->warned)
01000             {
01001               logme()
01002                 << "NOTE: server at " << ap->host << ":" << ap->port
01003                 << " is unavailable.  Connection will be established"
01004                 << " automatically on the background once the server"
01005                 << " becomes available.  Error from the attempt was: "
01006                 << e.explain() << '\n';
01007               ap->warned = true;
01008             }
01009 
01010             if (s)
01011               s->abort();
01012             delete s;
01013             s = 0;
01014           }
01015         }
01016 
01017         // Set up with the selector if we were successful.  If this is
01018         // the upstream collector, queue a request for updates.
01019         if (s)
01020         {
01021           lock();
01022           Peer *p = createPeer(s);
01023           ap->peer = p;
01024           ap->warned = false;
01025           unlock();
01026 
01027           InetAddress peeraddr = ((InetSocket *) s)->peername();
01028           InetAddress myaddr = ((InetSocket *) s)->sockname();
01029           p->peeraddr = StringFormat("%1:%2")
01030                         .arg(peeraddr.hostname())
01031                         .arg(peeraddr.port());
01032           p->mask = IORead|IOWrite|IOUrgent;
01033           p->update = ap->update;
01034           p->automatic = ap;
01035           p->socket = s;
01036           sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p));
01037           if (ap == &upstream_)
01038           {
01039             uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS,
01040                                   2*sizeof(uint32_t), VIS_MSG_UPDATE_ME };
01041             p->sendq = new Bucket;
01042             p->sendq->next = 0;
01043             copydata(p->sendq, words, sizeof(words));
01044           }
01045 
01046           // Report the new connection.
01047           if (debug_)
01048             logme()
01049               << "INFO: now connected to " << p->peeraddr << " from "
01050               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01051         }
01052       }
01053     }
01054 
01055     // Pump events for a while.
01056     sel_.dispatch(delay_);
01057     now = Time::current();
01058 
01059     // Check if flush is required.  Flush only if one is needed.
01060     // Always sends the full object list.  Compact objects no longer
01061     // in active use before sending out the update.
01062     if (flush_)
01063     {
01064       flush_ = false;
01065 
01066       lock();
01067       purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0),
01068                        now - TimeSpan(0, 0, 20 /* minutes */, 0, 0));
01069       sendObjectListToPeers(true);
01070       unlock();
01071     }
01072 
01073     // Update the data server and peer selection masks.  If we
01074     // have no more data to send and listening for writes, remove
01075     // the write mask.  If we have something to write and aren't
01076     // listening for writes, start listening so we can send off
01077     // the data.
01078     updatePeerMasks();
01079 
01080     // Release peers that have been waiting for data for too long.
01081     lock();
01082     Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0);
01083     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01084     {
01085       // If the peer has waited for too long, send something.
01086       if (i->time < waitold)
01087         releaseFromWait(i++, findObject(0, i->name));
01088 
01089       // Keep it for now.
01090       else
01091         ++i;
01092     }
01093     unlock();
01094   }
01095 }

void VisNet::sendLocalChanges ( void   ) 

Definition at line 1100 of file VisNet.cc.

References lat::Pipe::sink(), wakeup_, and lat::IOChannel::write().

Referenced by removePeer().

01101 {
01102   char byte = 0;
01103   wakeup_.sink()->write(&byte, 1);
01104 }

void VisNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
) [protected, virtual]

Send all objects to a peer and optionally mark sent objects old.

Definition at line 1255 of file VisNet.cc.

References copydata(), VisNet::Bucket::data, peers_, pi, sendObjectToPeer(), VIS_FLAG_NEW, VIS_REPLY_LIST_BEGIN, and VIS_REPLY_LIST_END.

Referenced by onMessage(), and sendObjectListToPeers().

01256 {
01257   PeerMap::iterator pi, pe;
01258   ObjectMap::iterator oi, oe;
01259   uint32_t numobjs = 0;
01260   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01261     numobjs += pi->second.objs.size();
01262 
01263   msg->data.reserve(msg->data.size() + 300*numobjs);
01264 
01265   uint32_t nupdates = 0;
01266   uint32_t words[4];
01267   words[0] = sizeof(words);
01268   words[1] = VIS_REPLY_LIST_BEGIN;
01269   words[2] = numobjs;
01270   words[3] = all;
01271   copydata(msg, &words[0], sizeof(words));
01272 
01273   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01274     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
01275       if (all || (oi->second.flags & VIS_FLAG_NEW))
01276       {
01277         sendObjectToPeer(msg, oi->second, false);
01278         if (clear)
01279           oi->second.flags &= ~VIS_FLAG_NEW;
01280         ++nupdates;
01281       }
01282 
01283   words[1] = VIS_REPLY_LIST_END;
01284   words[2] = nupdates;
01285   copydata(msg, &words[0], sizeof(words));
01286 }

void VisNet::sendObjectListToPeers ( bool  all  )  [protected, virtual]

Definition at line 1289 of file VisNet.cc.

References VisNet::Bucket::data, debug_, e, lat::endl(), i, logme(), alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, p, VisNet::Peer::peeraddr, peers_, sendObjectListToPeer(), VisNet::Peer::sendq, VisNet::Peer::update, and VisNet::Peer::updated.

Referenced by run().

01290 {
01291   PeerMap::iterator i, e;
01292   for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01293   {
01294     Peer &p = i->second;
01295     if (! p.update)
01296       continue;
01297 
01298     if (debug_)
01299       logme()
01300         << "DEBUG: notifying " << p.peeraddr << std::endl;
01301 
01302     Bucket msg;
01303     msg.next = 0;
01304     sendObjectListToPeer(&msg, !p.updated || all, true);
01305 
01306     if (! msg.data.empty())
01307     {
01308       Bucket **prev = &p.sendq;
01309       while (*prev)
01310         prev = &(*prev)->next;
01311 
01312       *prev = new Bucket;
01313       (*prev)->next = 0;
01314       (*prev)->data.swap(msg.data);
01315     }
01316     p.updated = true;
01317   }
01318 }

void VisNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
) [protected]

Definition at line 191 of file VisNet.cc.

References copydata(), VisNet::Bucket::data, VisNet::Object::flags, flags, VisNet::Object::name, VisNet::Object::rawdata, VisNet::Object::version, VIS_FLAG_SCALAR, VIS_FLAG_ZOMBIE, and VIS_REPLY_OBJECT.

Referenced by onMessage(), releaseFromWait(), and sendObjectListToPeer().

00192 {
00193   uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE;
00194   DataBlob objdata;
00195 
00196   if (data || (flags & VIS_FLAG_SCALAR))
00197     objdata.insert(objdata.end(),
00198                    &o.rawdata[0],
00199                    &o.rawdata[0] + o.rawdata.size());
00200 
00201   uint32_t words[7];
00202   uint32_t namelen = o.name.size();
00203   uint32_t datalen = objdata.size();
00204 
00205   words[0] = sizeof(words) + namelen + datalen;
00206   words[1] = VIS_REPLY_OBJECT;
00207   words[2] = flags;
00208   words[3] = (o.version >> 0 ) & 0xffffffff;
00209   words[4] = (o.version >> 32) & 0xffffffff;
00210   words[5] = namelen;
00211   words[6] = datalen;
00212 
00213   msg->data.reserve(msg->data.size() + words[0]);
00214   copydata(msg, &words[0], sizeof(words));
00215   if (namelen)
00216     copydata(msg, &o.name[0], namelen);
00217   if (datalen)
00218     copydata(msg, &objdata[0], datalen);
00219 }

bool VisNet::shouldStop ( void   )  [protected, virtual]

Definition at line 162 of file VisNet.cc.

References shutdown_.

Referenced by run().

00163 {
00164   return shutdown_;
00165 }

void VisNet::shutdown ( void   ) 

Stop the network layer and wait it to finish.

Definition at line 906 of file VisNet.cc.

References communicate_, and shutdown_.

00907 {
00908   shutdown_ = 1;
00909   if (communicate_ != (pthread_t) -1)
00910     pthread_join(communicate_, 0);
00911 }

void VisNet::start ( void   ) 

Start running the network layer in a new thread.

This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 947 of file VisNet.cc.

References communicate(), communicate_, lock_, logme(), and pthread_create.

00948 {
00949   if (communicate_ != (pthread_t) -1)
00950   {
00951     logme()
00952       << "ERROR: Shared memory networking thread has already been started\n";
00953     return;
00954   }
00955 
00956   pthread_mutex_init(&lock_, 0);
00957   pthread_create(&communicate_, 0, &communicate, this);
00958 }

void VisNet::startLocalServer ( int  port  ) 

Start a server socket for accessing this node remotely.

Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 833 of file VisNet.cc.

References lat::IOSelector::attach(), lat::CreateHook(), e, lat::endl(), Exception, lat::Error::explain(), IOAccept, logme(), onPeerConnect(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, sel_, server_, lat::IOChannel::setBlocking(), lat::Socket::setopt(), and SOCKET_BUF_SIZE.

00834 {
00835   if (server_)
00836   {
00837     logme() << "ERROR: server was already started.\n";
00838     return;
00839   }
00840 
00841   try
00842   {
00843     server_ = new InetServerSocket(InetAddress (port), 10);
00844     server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00845     server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00846     server_->setBlocking(false);
00847     sel_.attach(server_, IOAccept, CreateHook(this, &VisNet::onPeerConnect));
00848   }
00849   catch (Error &e)
00850   {
00851     // FIXME: Do we need to do this when we throw an exception anyway?
00852     // FIXME: Abort instead?
00853     logme()
00854       << "ERROR: Failed to start server at port " << port << ": "
00855       << e.explain() << std::endl;
00856 
00857     // FIXME: Throw something simpler that removes the dependency?
00858     throw cms::Exception("VisNet::startLocalServer")
00859       << "Failed to start server at port " << port << ": "
00860       << e.explain();
00861   }
00862   
00863   logme() << "INFO: Shared memory server started at port " << port << std::endl;
00864 }

void VisNet::unlock ( void   ) 

Release the lock on the net layer.

Definition at line 937 of file VisNet.cc.

References communicate_, and lock_.

Referenced by onMessage(), receive(), and run().

00938 {
00939   if (communicate_ != (pthread_t) -1)
00940     pthread_mutex_unlock(&lock_);
00941 }

void VisNet::updateLocalObject ( Object o  )  [virtual]

Update the network cache for an object.

The caller must call sendLocalChanges() later to push out the changes.

Definition at line 1374 of file VisNet.cc.

References VisNet::Object::flags, local_, VisNet::Object::name, VisNet::Peer::objs, VisNet::Object::rawdata, std::swap(), and VisNet::Object::version.

01375 {
01376   ObjectMap::iterator pos = local_->objs.find(o.name);
01377   if (pos == local_->objs.end())
01378     local_->objs.insert(ObjectMap::value_type(o.name, o));
01379   else
01380   {
01381     std::swap(pos->second.version,   o.version);
01382     std::swap(pos->second.flags,     o.flags);
01383     std::swap(pos->second.rawdata,   o.rawdata);
01384     pos->second.lastreq = 0;
01385   }
01386 }

void VisNet::updateMask ( Peer p  )  [protected]

Update the selector mask for a peer based on data queues.

Close the connection if there is no reason to maintain it open.

Definition at line 751 of file VisNet.cc.

References debug_, lat::endl(), IOUrgent, IOWrite, logme(), losePeer(), VisNet::Peer::mask, VisNet::Peer::peeraddr, sel_, VisNet::Peer::sendq, lat::IOSelector::setMask(), VisNet::Peer::socket, and VisNet::Peer::waiting.

Referenced by updatePeerMasks().

00752 {
00753   if (! p->socket)
00754     return;
00755 
00756   // Listen to writes iff we have data to send.
00757   unsigned oldmask = p->mask;
00758   if (! p->sendq && (p->mask & IOWrite))
00759     sel_.setMask(p->socket, p->mask &= ~IOWrite);
00760 
00761   if (p->sendq && ! (p->mask & IOWrite))
00762     sel_.setMask(p->socket, p->mask |= IOWrite);
00763 
00764   if (debug_ && oldmask != p->mask)
00765     logme()
00766       << "DEBUG: updating mask for " << p->peeraddr << " to "
00767       << p->mask << " from " << oldmask << std::endl;
00768 
00769   // If we have nothing more to send and are no longer listening
00770   // for reads, close up the shop for this peer.
00771   if (p->mask == IOUrgent && ! p->waiting)
00772   {
00773     assert(! p->sendq);
00774     if (debug_)
00775       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
00776     losePeer(0, p, 0);
00777   }
00778 }

void VisNet::updatePeerMasks ( void   )  [protected, virtual]

Definition at line 1321 of file VisNet.cc.

References e, i, peers_, and updateMask().

Referenced by run().

01322 {
01323   PeerMap::iterator i, e;
01324   for (i = peers_.begin(), e = peers_.end(); i != e; )
01325     updateMask(&(i++)->second);
01326 }

void VisNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local data changes.

Must be called before calling run() or start().

Definition at line 870 of file VisNet.cc.

References downstream_, lat::endl(), VisNet::AutoPeer::host, logme(), VisNet::AutoPeer::port, and VisNet::AutoPeer::update.

00871 {
00872   if (! downstream_.host.empty())
00873   {
00874     logme()
00875       << "ERROR: Already updating another collector at "
00876       << downstream_.host << ":" << downstream_.port << std::endl;
00877     return;
00878   }
00879 
00880   downstream_.update = true;
00881   downstream_.host = host;
00882   downstream_.port = port;
00883 }

void VisNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
) [protected]

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 117 of file VisNet.cc.

References requestObject(), VisNet::Peer::waiting, and waiting_.

Referenced by onMessage().

00118 {
00119   // FIXME: Should we automatically record which exact peer the waiter
00120   // is expecting to deliver data so we know to release the waiter if
00121   // the other peer vanishes?  The current implementation stands a
00122   // chance for the waiter to wait indefinitely -- although we do
00123   // force terminate the wait after a while.
00124   requestObject(owner, name.size() ? &name[0] : 0, name.size());
00125   WaitObject wo = { Time::current(), name, info, p };
00126   waiting_.push_back(wo);
00127   p->waiting++;
00128 }


Member Data Documentation

std::string VisNet::appname_ [private]

Definition at line 162 of file VisNet.h.

Referenced by logme().

pthread_t VisNet::communicate_ [private]

Definition at line 177 of file VisNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool VisNet::debug_ [protected]

Definition at line 146 of file VisNet.h.

Referenced by debug(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), and updateMask().

int VisNet::delay_ [private]

Definition at line 180 of file VisNet.h.

Referenced by delay(), and run().

AutoPeer VisNet::downstream_ [private]

Definition at line 172 of file VisNet.h.

Referenced by run(), updateToCollector(), and VisNet().

bool VisNet::flush_ [private]

Definition at line 181 of file VisNet.h.

Referenced by onLocalNotify(), onMessage(), and run().

Peer* VisNet::local_ [private]

Definition at line 174 of file VisNet.h.

Referenced by receive(), removeLocalObject(), updateLocalObject(), and VisNet().

pthread_mutex_t VisNet::lock_ [private]

Definition at line 176 of file VisNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t VisNet::MAX_PEER_WAITREQS = 128 [static]

Definition at line 37 of file VisNet.h.

Referenced by onPeerData().

PeerMap VisNet::peers_ [private]

Definition at line 170 of file VisNet.h.

Referenced by createPeer(), findObject(), getPeer(), purgeDeadObjects(), receive(), removePeer(), sendObjectListToPeer(), sendObjectListToPeers(), and updatePeerMasks().

int VisNet::pid_ [private]

Definition at line 163 of file VisNet.h.

Referenced by logme().

lat::IOSelector VisNet::sel_ [private]

Definition at line 165 of file VisNet.h.

Referenced by losePeer(), onPeerConnect(), onPeerData(), run(), startLocalServer(), updateMask(), and VisNet().

lat::InetServerSocket* VisNet::server_ [private]

Definition at line 166 of file VisNet.h.

Referenced by onPeerConnect(), and startLocalServer().

sig_atomic_t VisNet::shutdown_ [private]

Definition at line 178 of file VisNet.h.

Referenced by shouldStop(), and shutdown().

AutoPeer VisNet::upstream_ [private]

Definition at line 171 of file VisNet.h.

Referenced by listenToSource(), run(), and VisNet().

lat::Time VisNet::version_ [private]

Definition at line 168 of file VisNet.h.

const uint32_t VisNet::VIS_FLAG_DEAD = 0x40000000 [static]

Definition at line 34 of file VisNet.h.

Referenced by markObjectsDead(), onMessage(), purgeDeadObjects(), and receive().

const uint32_t VisNet::VIS_FLAG_NEW = 0x20000000 [static]

Definition at line 33 of file VisNet.h.

Referenced by sendObjectListToPeer().

const uint32_t VisNet::VIS_FLAG_RECEIVED = 0x10000000 [static]

Definition at line 32 of file VisNet.h.

Referenced by onMessage(), and receive().

const uint32_t VisNet::VIS_FLAG_SCALAR = 0x1 [static]

Definition at line 31 of file VisNet.h.

Referenced by purgeDeadObjects(), and sendObjectToPeer().

const uint32_t VisNet::VIS_FLAG_ZOMBIE = 0x80000000 [static]

Definition at line 35 of file VisNet.h.

Referenced by markObjectsDead(), markObjectsZombies(), and sendObjectToPeer().

const uint32_t VisNet::VIS_MSG_GET_OBJECT = 3 [static]

Definition at line 24 of file VisNet.h.

Referenced by onMessage(), and requestObject().

const uint32_t VisNet::VIS_MSG_HELLO = 0 [static]

Definition at line 21 of file VisNet.h.

const uint32_t VisNet::VIS_MSG_LIST_OBJECTS = 2 [static]

Definition at line 23 of file VisNet.h.

Referenced by onMessage(), and run().

const uint32_t VisNet::VIS_MSG_UPDATE_ME = 1 [static]

Definition at line 22 of file VisNet.h.

Referenced by onMessage(), and run().

const uint32_t VisNet::VIS_REPLY_LIST_BEGIN = 101 [static]

Definition at line 26 of file VisNet.h.

Referenced by onMessage(), and sendObjectListToPeer().

const uint32_t VisNet::VIS_REPLY_LIST_END = 102 [static]

Definition at line 27 of file VisNet.h.

Referenced by onMessage(), and sendObjectListToPeer().

const uint32_t VisNet::VIS_REPLY_NONE = 103 [static]

Definition at line 28 of file VisNet.h.

Referenced by onMessage(), and releaseFromWait().

const uint32_t VisNet::VIS_REPLY_OBJECT = 104 [static]

Definition at line 29 of file VisNet.h.

Referenced by onMessage(), and sendObjectToPeer().

WaitList VisNet::waiting_ [private]

Definition at line 173 of file VisNet.h.

Referenced by losePeer(), releaseFromWait(), releaseWaiters(), run(), and waitForData().

lat::Pipe VisNet::wakeup_ [private]

Definition at line 167 of file VisNet.h.

Referenced by sendLocalChanges(), and VisNet().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:35:33 2009 for CMSSW by  doxygen 1.5.4