CMS 3D CMS Logo

IgNet Class Reference

#include <Iguana/Framework/interface/IgNet.h>

Inheritance diagram for IgNet:

IguanaNetProducer

List of all members.

Public Types

typedef std::vector< unsigned
char > 
DataBlob
typedef std::map< std::string,
Object
ObjectMap
typedef std::map< lat::Socket *,
Peer
PeerMap
typedef std::list< WaitObjectWaitList

Public Member Functions

void debug (bool doit)
 Enable or disable verbose debugging.
void delay (int delay)
 Set the I/O dispatching delay.
 IgNet (const std::string &appname="")
void listenToSource (const std::string &host, int port)
 Tell the network layer to connect to host and port and automatically receive updates from upstream source(s).
void lock (void)
 Acquire a lock on the net layer.
virtual int receive (void(*callback)(void *arg, uint32_t reason, Object &obj), void *arg)
virtual void removeLocalObject (const std::string &name)
 Delete the local object.
void run (void)
 Run the actual I/O processing loop.
void sendLocalChanges (void)
void shutdown (void)
 Stop the network layer and wait it to finish.
void start (void)
 Start running the network layer in a new thread.
void startLocalServer (int port)
 Start a server socket for accessing this node remotely.
void unlock (void)
 Release the lock on the net layer.
virtual void updateLocalObject (Object &o)
 Update the network cache for an object.
void updateToCollector (const std::string &host, int port)
 Tell the network layer to connect to host and port and automatically send updates whenever local data changes.
virtual ~IgNet (void)

Static Public Attributes

static const uint32_t MAX_PEER_WAITREQS = 128
static const uint32_t VIS_FLAG_DEAD = 0x40000000
static const uint32_t VIS_FLAG_NEW = 0x20000000
static const uint32_t VIS_FLAG_RECEIVED = 0x10000000
static const uint32_t VIS_FLAG_SCALAR = 0x1
static const uint32_t VIS_FLAG_ZOMBIE = 0x80000000
static const uint32_t VIS_MSG_GET_OBJECT = 3
static const uint32_t VIS_MSG_HELLO = 0
static const uint32_t VIS_MSG_LIST_OBJECTS = 2
static const uint32_t VIS_MSG_UPDATE_ME = 1
static const uint32_t VIS_REPLY_LIST_BEGIN = 101
static const uint32_t VIS_REPLY_LIST_END = 102
static const uint32_t VIS_REPLY_NONE = 103
static const uint32_t VIS_REPLY_OBJECT = 104

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=0)
virtual PeergetPeer (lat::Socket *s)
std::ostream & logme (void)
virtual ObjectmakeObject (Peer *p, const std::string &name)
virtual void markObjectsDead (Peer *p)
virtual void markObjectsZombies (Peer *p)
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
virtual void purgeDeadObjects (lat::Time oldobj, lat::Time deadobj)
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
virtual void removePeer (Peer *p, lat::Socket *s)
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)
 Send all objects to a peer and optionally mark sent objects old.
virtual void sendObjectListToPeers (bool all)
void sendObjectToPeer (Bucket *msg, Object &o, bool data)
virtual bool shouldStop (void)
void updateMask (Peer *p)
 Update the selector mask for a peer based on data queues.
virtual void updatePeerMasks (void)
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)

Protected Attributes

bool debug_

Private Member Functions

 IgNet (const IgNet &)
bool losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
 Handle errors with a peer socket.
bool onLocalNotify (lat::IOSelectEvent *ev)
 React to notifications from the app thread.
bool onPeerConnect (lat::IOSelectEvent *ev)
 Respond to new connections on the server socket.
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client.
IgNetoperator= (const IgNet &)
void releaseFromWait (WaitList::iterator i, Object *o)
void releaseWaiters (Object *o)
void requestObject (Peer *p, const char *name, size_t len)
 Queue an object request to the data server.

Static Private Member Functions

static void discard (Bucket *&b)

Private Attributes

std::string appname_
pthread_t communicate_
int delay_
AutoPeer downstream_
bool flush_
Peerlocal_
pthread_mutex_t lock_
PeerMap peers_
int pid_
lat::IOSelector sel_
lat::InetServerSocketserver_
sig_atomic_t shutdown_
AutoPeer upstream_
lat::Time version_
WaitList waiting_
lat::Pipe wakeup_

Classes

struct  AutoPeer
struct  Bucket
struct  Object
struct  Peer
struct  WaitObject


Detailed Description

Definition at line 18 of file IgNet.h.


Member Typedef Documentation

typedef std::vector<unsigned char> IgNet::DataBlob

Definition at line 41 of file IgNet.h.

typedef std::map<std::string, Object> IgNet::ObjectMap

Definition at line 45 of file IgNet.h.

typedef std::map<lat::Socket *, Peer> IgNet::PeerMap

Definition at line 46 of file IgNet.h.

typedef std::list<WaitObject> IgNet::WaitList

Definition at line 44 of file IgNet.h.


Constructor & Destructor Documentation

IgNet::IgNet ( const std::string &  appname = ""  ) 

Definition at line 781 of file IgNet.cc.

References lat::IOSelector::attach(), lat::CreateHook(), createPeer(), downstream_, lat::IOChannel::fd(), IORead, local_, IgNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), IgNet::AutoPeer::peer, IgNet::AutoPeer::port, sel_, lat::Pipe::source(), IgNet::AutoPeer::update, upstream_, wakeup_, and IgNet::AutoPeer::warned.

00782   : debug_ (false),
00783     appname_ (appname.empty() ? "IgNet" : appname.c_str()),
00784     pid_ (getpid()),
00785     server_ (0),
00786     version_ (Time::current()),
00787     communicate_ ((pthread_t) -1),
00788     shutdown_ (0),
00789     delay_ (1000),
00790     flush_ (false)
00791 {
00792   // Create a pipe for the local apps to tell the communicator
00793   // thread that local app data has changed and that the peers
00794   // should be notified.
00795   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
00796   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &IgNet::onLocalNotify));
00797 
00798   // Initialise the upstream and downstream to empty.
00799   upstream_.peer   = downstream_.peer   = 0;
00800   upstream_.next   = downstream_.next   = 0;
00801   upstream_.port   = downstream_.port   = 0;
00802   upstream_.update = downstream_.update = false;
00803   upstream_.warned = downstream_.warned = false;
00804 
00805   local_ = createPeer((Socket *) -1);
00806 }

IgNet::~IgNet ( void   )  [virtual]

Definition at line 808 of file IgNet.cc.

00809 {
00810   // FIXME
00811 }

IgNet::IgNet ( const IgNet  )  [private]


Member Function Documentation

void IgNet::copydata ( Bucket b,
const void data,
size_t  len 
) [static, protected]

Definition at line 38 of file IgNet.cc.

References IgNet::Bucket::data.

Referenced by onMessage(), releaseFromWait(), requestObject(), run(), sendObjectListToPeer(), and sendObjectToPeer().

00039 {
00040   b->data.insert(b->data.end(),
00041                  (const unsigned char *)data,
00042                  (const unsigned char *)data + len);
00043 }

IgNet::Peer * IgNet::createPeer ( lat::Socket s  )  [protected, virtual]

Definition at line 1225 of file IgNet.cc.

References IgNet::Peer::automatic, IgNet::Peer::mask, p, peers_, IgNet::Peer::sendpos, IgNet::Peer::sendq, IgNet::Peer::socket, IgNet::Peer::source, IgNet::Peer::update, IgNet::Peer::updated, IgNet::Peer::updates, and IgNet::Peer::waiting.

Referenced by IgNet(), onPeerConnect(), and run().

01226 {
01227   Peer *p = &peers_[s];
01228   p->socket = 0;
01229   p->sendq = 0;
01230   p->sendpos = 0;
01231   p->mask = 0;
01232   p->source = false;
01233   p->update = false;
01234   p->updated = false;
01235   p->updates = 0;
01236   p->waiting = 0;
01237   p->automatic = 0;
01238   return p;
01239 }

void IgNet::debug ( bool  doit  ) 

Enable or disable verbose debugging.

Must be called before calling run() or start().

Definition at line 816 of file IgNet.cc.

References debug_.

Referenced by IguanaNetProducer::IguanaNetProducer().

00817 {
00818   debug_ = doit;
00819 }

void IgNet::delay ( int  delay  ) 

Set the I/O dispatching delay.

Must be called before calling run() or start().

Definition at line 824 of file IgNet.cc.

References delay_.

00825 {
00826   delay_ = delay;
00827 }

void IgNet::discard ( Bucket *&  b  )  [static, private]

Definition at line 47 of file IgNet.cc.

References IgNet::Bucket::next.

Referenced by losePeer(), and onPeerData().

00048 {
00049   while (b)
00050   {
00051     Bucket *next = b->next;
00052     delete b;
00053     b = next;
00054   }
00055 }

IgNet::Object * IgNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = 0 
) [protected, virtual]

Definition at line 1112 of file IgNet.cc.

References e, i, IgNet::Peer::objs, and peers_.

Referenced by onMessage(), and run().

01113 {
01114   ObjectMap::iterator pos;
01115   PeerMap::iterator i, e;
01116   if (owner)
01117     *owner = 0;
01118   if (p)
01119   {
01120     pos = p->objs.find(name);
01121     if (pos == p->objs.end())
01122       return 0;
01123     else
01124     {
01125       if (owner) *owner = p;
01126       return &pos->second;
01127     }
01128   }
01129   else
01130   {
01131     for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01132     {
01133       pos = i->second.objs.find(name);
01134       if (pos != i->second.objs.end())
01135       {
01136         if (owner) *owner = &i->second;
01137         return &pos->second;
01138       }
01139     }
01140     return 0;
01141   }
01142 }

IgNet::Peer * IgNet::getPeer ( lat::Socket s  )  [protected, virtual]

Definition at line 1218 of file IgNet.cc.

References peers_.

Referenced by onPeerData().

01219 {
01220   PeerMap::iterator pos = peers_.find(s);
01221   return pos == peers_.end() ? 0 : &pos->second;
01222 }

void IgNet::listenToSource ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream source(s).

Must be called before calling run() or start().

Definition at line 891 of file IgNet.cc.

References lat::endl(), IgNet::AutoPeer::host, logme(), IgNet::AutoPeer::port, IgNet::AutoPeer::update, and upstream_.

00892 {
00893   if (! upstream_.host.empty())
00894   {
00895     logme()
00896       << "ERROR: Already receiving data from another collector at "
00897       << upstream_.host << ":" << upstream_.port << std::endl;
00898     return;
00899   }
00900 
00901   upstream_.update = false;
00902   upstream_.host = host;
00903   upstream_.port = port;
00904 }

void IgNet::lock ( void   ) 

Acquire a lock on the net layer.

Definition at line 931 of file IgNet.cc.

References communicate_, and lock_.

Referenced by onMessage(), edm::service::IguanaService::produceEvent(), receive(), and run().

00932 {
00933   if (communicate_ != (pthread_t) -1)
00934     pthread_mutex_lock(&lock_);
00935 }

std::ostream & IgNet::logme ( void   )  [protected]

Definition at line 29 of file IgNet.cc.

References appname_, TestMuL1L2Filter_cff::cerr, and pid_.

Referenced by IguanaNetProducer::IguanaNetProducer(), listenToSource(), losePeer(), onLocalNotify(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), start(), startLocalServer(), updateMask(), and updateToCollector().

00030 {
00031   return std::cerr
00032     << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00033     << " " << appname_ << "[" << pid_ << "]: ";
00034 }

bool IgNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent event,
lat::Error err = 0 
) [private]

Handle errors with a peer socket.

Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 62 of file IgNet.cc.

References IgNet::Peer::automatic, lat::Socket::close(), lat::IOSelector::detach(), discard(), e, lat::endl(), lat::Error::explain(), i, logme(), IgNet::AutoPeer::peer, IgNet::Peer::peeraddr, removePeer(), s, sel_, IgNet::Peer::sendq, IgNet::Peer::socket, lat::IOSelectEvent::source, and waiting_.

Referenced by onPeerData(), and updateMask().

00066 {
00067   if (reason)
00068     logme ()
00069       << reason << peer->peeraddr
00070       << (err ? "; error was: " + err->explain() : std::string(""))
00071       << std::endl;
00072 
00073   Socket *s = peer->socket;
00074 
00075   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00076     if (i->peer == peer)
00077       waiting_.erase(i++);
00078     else
00079       ++i;
00080 
00081   if (ev)
00082     ev->source = 0;
00083 
00084   discard(peer->sendq);
00085   if (peer->automatic)
00086     peer->automatic->peer = 0;
00087 
00088   sel_.detach(s);
00089   s->close();
00090   removePeer(peer, s);
00091   delete s;
00092   return true;
00093 }

IgNet::Object * IgNet::makeObject ( Peer p,
const std::string &  name 
) [protected, virtual]

Definition at line 1145 of file IgNet.cc.

References IgNet::Object::flags, IgNet::Object::lastreq, IgNet::Object::name, IgNet::Peer::objs, and IgNet::Object::version.

Referenced by onMessage().

01146 {
01147   Object *o = &p->objs[name];
01148   o->version = 0;
01149   o->name = name;
01150   o->flags = 0;
01151   o->lastreq = 0;
01152   return o;
01153 }

void IgNet::markObjectsDead ( Peer p  )  [protected, virtual]

Definition at line 1171 of file IgNet.cc.

References e, i, IgNet::Peer::objs, VIS_FLAG_DEAD, and VIS_FLAG_ZOMBIE.

Referenced by onMessage().

01172 {
01173   ObjectMap::iterator i, e;
01174   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01175     if (i->second.flags & VIS_FLAG_ZOMBIE)
01176       i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD;
01177 }

void IgNet::markObjectsZombies ( Peer p  )  [protected, virtual]

Definition at line 1162 of file IgNet.cc.

References e, i, IgNet::Peer::objs, and VIS_FLAG_ZOMBIE.

Referenced by onMessage().

01163 {
01164   ObjectMap::iterator i, e;
01165   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01166     i->second.flags |= VIS_FLAG_ZOMBIE;
01167 }

bool IgNet::onLocalNotify ( lat::IOSelectEvent ev  )  [private]

React to notifications from the app thread.

This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new app data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent object updates.

Definition at line 720 of file IgNet.cc.

References e, lat::endl(), lat::SysErr::ErrTryAgain, lat::Error::explain(), flush_, logme(), lat::Error::next(), lat::SystemError::portable(), lat::IOChannel::read(), and lat::IOSelectEvent::source.

Referenced by IgNet().

00721 {
00722   // Discard the data in the pipe, we care only about the wakeup.
00723   try
00724   {
00725     IOSize sz;
00726     unsigned char buf [1024];
00727     while ((sz = ev->source->read(buf, sizeof(buf))))
00728       ;
00729   }
00730   catch (Error &e)
00731   {
00732     SystemError *next = dynamic_cast<SystemError *>(e.next());
00733     if (next && next->portable() == SysErr::ErrTryAgain)
00734       ; // Ignore it
00735     else
00736       logme()
00737         << "WARNING: error reading from notification pipe: "
00738         << e.explain() << std::endl;
00739   }
00740 
00741   // Tell the main event pump to send an update in a little while.
00742   flush_ = true;
00743 
00744   // We are never done, always keep going.
00745   return false;
00746 }

bool IgNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
) [protected, virtual]

Definition at line 224 of file IgNet.cc.

References copydata(), IgNet::Bucket::data, debug_, lat::endl(), findObject(), IgNet::Object::flags, flags, flush_, IgNet::Object::lastreq, lock(), logme(), makeObject(), markObjectsDead(), markObjectsZombies(), name, IgNet::Peer::peeraddr, IgNet::Object::rawdata, releaseWaiters(), requestObject(), sendObjectListToPeer(), sendObjectToPeer(), IgNet::Peer::source, unlock(), IgNet::Peer::update, IgNet::Peer::updates, IgNet::Object::version, VIS_FLAG_DEAD, VIS_FLAG_RECEIVED, VIS_MSG_GET_OBJECT, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, VIS_REPLY_LIST_BEGIN, VIS_REPLY_LIST_END, VIS_REPLY_NONE, VIS_REPLY_OBJECT, and waitForData().

Referenced by onPeerData().

00225 {
00226   // Decode and process this message.
00227   uint32_t type;
00228   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00229   switch (type)
00230   {
00231   case VIS_MSG_UPDATE_ME:
00232     {
00233       if (len != 2*sizeof(uint32_t))
00234       {
00235         logme()
00236           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00237           << " from peer " << p->peeraddr << std::endl;
00238         return false;
00239       }
00240 
00241       if (debug_)
00242         logme()
00243           << "DEBUG: received message 'UPDATE ME' from peer "
00244           << p->peeraddr << std::endl;
00245 
00246       p->update = true;
00247     }
00248     return true;
00249 
00250   case VIS_MSG_LIST_OBJECTS:
00251     {
00252       if (debug_)
00253         logme()
00254           << "DEBUG: received message 'LIST OBJECTS' from peer "
00255           << p->peeraddr << std::endl;
00256 
00257       // Send over current status: list of known objects.
00258       lock();
00259       sendObjectListToPeer(msg, true, false);
00260       unlock();
00261     }
00262     return true;
00263 
00264   case VIS_MSG_GET_OBJECT:
00265     {
00266       if (debug_)
00267         logme()
00268           << "DEBUG: received message 'GET OBJECT' from peer "
00269           << p->peeraddr << std::endl;
00270 
00271       if (len < 3*sizeof(uint32_t))
00272       {
00273         logme()
00274           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00275           << " from peer " << p->peeraddr << std::endl;
00276         return false;
00277       }
00278 
00279       uint32_t namelen;
00280       memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00281       if (len != 3*sizeof(uint32_t) + namelen)
00282       {
00283         logme()
00284           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00285           << " from peer " << p->peeraddr
00286           << ", expected length " << (3*sizeof(uint32_t))
00287           << " + " << namelen << std::endl;
00288         return false;
00289       }
00290 
00291       lock();
00292       std::string name((char *) data + 3*sizeof(uint32_t), namelen);
00293       Peer *owner = 0;
00294       Object *o = findObject(0, name, &owner);
00295       if (o)
00296       {
00297         o->lastreq = Time::current();
00298         if (o->rawdata.empty())
00299           waitForData(p, name, "", owner);
00300         else
00301           sendObjectToPeer(msg, *o, true);
00302       }
00303       else
00304       {
00305         uint32_t words[3];
00306         words[0] = sizeof(words) + name.size();
00307         words[1] = VIS_REPLY_NONE;
00308         words[2] = name.size();
00309 
00310         msg->data.reserve(msg->data.size() + words[0]);
00311         copydata(msg, &words[0], sizeof(words));
00312         copydata(msg, &name[0], name.size());
00313       }
00314       unlock();
00315     }
00316     return true;
00317 
00318   case VIS_REPLY_LIST_BEGIN:
00319     {
00320       if (len != 4*sizeof(uint32_t))
00321       {
00322         logme()
00323           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00324           << " from peer " << p->peeraddr << std::endl;
00325         return false;
00326       }
00327 
00328       if (debug_)
00329         logme()
00330           << "DEBUG: received message 'LIST BEGIN' from "
00331           << p->peeraddr << std::endl;
00332 
00333       // Get the update status: whether this is a full update.
00334       uint32_t flags;
00335       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00336 
00337       // If we are about to receive a full list of objects, flag all
00338       // objects dead.  Subsequent object notifications will undo this
00339       // for the live objects.  This tells us the object has been
00340       // removed, but we can keep making it available for a while if
00341       // there continues to be interest in it.
00342       if (flags)
00343       {
00344         lock();
00345         markObjectsZombies(p);
00346         unlock();
00347       }
00348     }
00349     return true;
00350 
00351   case VIS_REPLY_LIST_END:
00352     {
00353       if (len != 4*sizeof(uint32_t))
00354       {
00355         logme()
00356           << "ERROR: corrupt 'LIST END' message of length " << len
00357           << " from peer " << p->peeraddr << std::endl;
00358         return false;
00359       }
00360 
00361       // Get the update status: whether this is a full update.
00362       uint32_t flags;
00363       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00364 
00365       // If we received a full list of objects, flag all zombie objects
00366       // now dead. We need to do this in two stages in case we receive
00367       // updates in many parts, and end up sending updates to others in
00368       // between; this avoids us lying live objects are dead.
00369       if (flags)
00370       {
00371         lock();
00372         markObjectsDead(p);
00373         unlock();
00374       }
00375 
00376       if (debug_)
00377         logme()
00378           << "DEBUG: received message 'LIST END' from "
00379           << p->peeraddr << std::endl;
00380 
00381       // Indicate we have received another update from this peer.
00382       // Also indicate we should flush to our clients.
00383       flush_ = true;
00384       p->updates++;
00385     }
00386     return true;
00387 
00388   case VIS_REPLY_OBJECT:
00389     {
00390       uint32_t words[7];
00391       if (len < sizeof(words))
00392       {
00393         logme()
00394           << "ERROR: corrupt 'OBJECT' message of length " << len
00395           << " from peer " << p->peeraddr << std::endl;
00396         return false;
00397       }
00398 
00399       memcpy(&words[0], data, sizeof(words));
00400       uint32_t &namelen = words[5];
00401       uint32_t &datalen = words[6];
00402 
00403       if (len != sizeof(words) + namelen + datalen)
00404       {
00405         logme()
00406           << "ERROR: corrupt 'OBJECT' message of length " << len
00407           << " from peer " << p->peeraddr
00408           << ", expected length " << sizeof(words)
00409           << " + " << namelen
00410           << " + " << datalen
00411           << std::endl;
00412         return false;
00413       }
00414 
00415       unsigned char *namedata = data + sizeof(words);
00416       unsigned char *objdata = namedata + namelen;
00417       unsigned char *enddata = objdata + datalen;
00418       std::string name((char *) namedata, namelen);
00419       assert(enddata == data + len);
00420 
00421       if (debug_)
00422         logme()
00423           << "DEBUG: received message 'OBJECT " << name
00424           << "' from " << p->peeraddr << std::endl;
00425 
00426       // Mark the peer as a known object source.
00427       p->source = true;
00428 
00429       // Initialise or update an object entry.
00430       lock();
00431       Object *o = findObject(p, name);
00432       if (! o)
00433         o = makeObject(p, name);
00434 
00435       bool hadobject = ! o->rawdata.empty();
00436       o->flags = words[2] | VIS_FLAG_RECEIVED;
00437       o->version = ((uint64_t) words[4] << 32 | words[3]);
00438       o->rawdata.clear();
00439       o->rawdata.insert(o->rawdata.end(), objdata, enddata);
00440 
00441       // If we had an object for this one already and this is a list
00442       // update without data, issue an immediate data get request.
00443       if (hadobject && ! datalen)
00444         requestObject(p, (namelen ? &name[0] : 0), namelen);
00445 
00446       // If we have the object data, release from wait.
00447       if (datalen)
00448         releaseWaiters(o);
00449       unlock();
00450     }
00451     return true;
00452 
00453   case VIS_REPLY_NONE:
00454     {
00455       uint32_t words[3];
00456       if (len < sizeof(words))
00457       {
00458         logme()
00459           << "ERROR: corrupt 'NONE' message of length " << len
00460           << " from peer " << p->peeraddr << std::endl;
00461         return false;
00462       }
00463 
00464       memcpy(&words[0], data, sizeof(words));
00465       uint32_t &namelen = words[2];
00466 
00467       if (len != sizeof(words) + namelen)
00468       {
00469         logme()
00470           << "ERROR: corrupt 'NONE' message of length " << len
00471           << " from peer " << p->peeraddr
00472           << ", expected length " << sizeof(words)
00473           << " + " << namelen << std::endl;
00474         return false;
00475       }
00476 
00477       unsigned char *namedata = data + sizeof(words);
00478       unsigned char *enddata = namedata + namelen;
00479       std::string name((char *) namedata, namelen);
00480       assert(enddata == data + len);
00481 
00482       if (debug_)
00483         logme()
00484           << "DEBUG: received message 'NONE " << name
00485           << "' from " << p->peeraddr << std::endl;
00486 
00487       // Mark the peer as a known object source.
00488       p->source = true;
00489 
00490       // If this was a known object, update its entry.
00491       lock();
00492       Object *o = findObject(p, name);
00493       if (o)
00494         o->flags |= VIS_FLAG_DEAD;
00495 
00496       // If someone was waiting for this, let them go.
00497       releaseWaiters(o);
00498       unlock();
00499     }
00500     return true;
00501 
00502   default:
00503     logme()
00504       << "ERROR: unrecognised message of length " << len
00505       << " and type " << type << " from peer " << p->peeraddr
00506       << std::endl;
00507     return false;
00508   }
00509 }

bool IgNet::onPeerConnect ( lat::IOSelectEvent ev  )  [private]

Respond to new connections on the server socket.

Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 680 of file IgNet.cc.

References lat::Socket::accept(), arg, lat::IOSelector::attach(), lat::CreateHook(), createPeer(), debug_, lat::endl(), lat::InetAddress::hostname(), IORead, IOUrgent, lat::IOChannel::isBlocking(), logme(), onPeerData(), p, lat::InetAddress::port(), s, sel_, server_, and lat::IOSelectEvent::source.

Referenced by startLocalServer().

00681 {
00682   // Recover the server socket.
00683   assert(ev->source == server_);
00684 
00685   // Accept the connection.
00686   Socket *s = server_->accept();
00687   assert(s);
00688   assert(! s->isBlocking());
00689 
00690   // Record it to our list of peers.
00691   Peer *p = createPeer(s);
00692   InetAddress peeraddr = ((InetSocket *) s)->peername();
00693   InetAddress myaddr = ((InetSocket *) s)->sockname();
00694   p->peeraddr = StringFormat("%1:%2")
00695                 .arg(peeraddr.hostname())
00696                 .arg(peeraddr.port());
00697   p->mask = IORead|IOUrgent;
00698   p->socket = s;
00699 
00700   // Report the new connection.
00701   if (debug_)
00702     logme()
00703       << "INFO: new peer " << p->peeraddr << " is now connected to "
00704       << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00705 
00706   // Attach it to the listener.
00707   sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p));
00708 
00709   // We are never done.
00710   return false;
00711 }

bool IgNet::onPeerData ( lat::IOSelectEvent ev,
Peer p 
) [private]

Handle communication to a particular client.

Definition at line 514 of file IgNet.cc.

References IgNet::Peer::automatic, b, data, IgNet::Bucket::data, debug_, discard(), e, lat::endl(), lat::SysErr::ErrTryAgain, lat::IOSelectEvent::events, getPeer(), IgNet::Peer::incoming, IORead, IOUrgent, IOWrite, len, logme(), losePeer(), IgNet::Peer::mask, MAX_PEER_WAITREQS, MESSAGE_SIZE_LIMIT, alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, lat::Error::next(), old, onMessage(), IgNet::Peer::peeraddr, lat::SystemError::portable(), lat::IOChannel::read(), sel_, IgNet::Peer::sendpos, IgNet::Peer::sendq, lat::IOSelector::setMask(), IgNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, lat::IOSelectEvent::source, TrackValidation_HighPurity_cff::valid, IgNet::Peer::waiting, and lat::IOChannel::write().

Referenced by onPeerConnect(), and run().

00515 {
00516   assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00517 
00518   // If there is a problem with the peer socket, discard the peer
00519   // and tell the selector to stop prcessing events for it.  If
00520   // this is a server connection, we will eventually recreate
00521   // everything if/when the data server comes back.
00522   if (ev->events & IOUrgent)
00523   {
00524     if (p->automatic)
00525     {
00526       logme()
00527         << "WARNING: connection to the server at " << p->peeraddr
00528         << " lost (will attempt to reconnect in 15 seconds)\n";
00529       return losePeer(0, p, ev);
00530     }
00531     else
00532       return losePeer("WARNING: lost peer connection ", p, ev);
00533   }
00534 
00535   // If we can write to the peer socket, pump whatever we can into it.
00536   if (ev->events & IOWrite)
00537   {
00538     while (Bucket *b = p->sendq)
00539     {
00540       IOSize len = b->data.size() - p->sendpos;
00541       const void *data = (len ? (const void *)&b->data[p->sendpos]
00542                           : (const void *)&data);
00543       IOSize done;
00544 
00545       try
00546       {
00547         done = (len ? ev->source->write (data, len) : 0);
00548         if (debug_ && len)
00549           logme()
00550             << "DEBUG: sent " << done << " bytes to peer "
00551             << p->peeraddr << std::endl;
00552       }
00553       catch (Error &e)
00554       {
00555         return losePeer("WARNING: unable to write to peer ",
00556                         p, ev, &e);
00557       }
00558 
00559       p->sendpos += done;
00560       if (p->sendpos == b->data.size())
00561       {
00562         Bucket *old = p->sendq;
00563         p->sendq = old->next;
00564         p->sendpos = 0;
00565         old->next = 0;
00566         discard(old);
00567       }
00568 
00569       if (! done && len)
00570         // Cannot write any more.
00571         break;
00572     }
00573   }
00574 
00575   // If there is data to be read from the peer, first receive what we
00576   // can get out the socket, the process all complete requests.
00577   if (ev->events & IORead)
00578   {
00579     // First build up the incoming buffer of data in the socket.
00580     // Remember the last size returned by the socket; we need
00581     // it to determine if the remote end closed the connection.
00582     IOSize sz;
00583     try
00584     {
00585       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00586       do
00587         if ((sz = ev->source->read(&buf[0], buf.size())))
00588         {
00589           if (debug_)
00590             logme()
00591               << "DEBUG: received " << sz << " bytes from peer "
00592               << p->peeraddr << std::endl;
00593           DataBlob &data = p->incoming;
00594           if (data.capacity () < data.size () + sz)
00595             data.reserve (data.size() + SOCKET_READ_GROWTH);
00596           data.insert (data.end(), &buf[0], &buf[0] + sz);
00597         }
00598       while (sz == sizeof (buf));
00599     }
00600     catch (Error &e)
00601     {
00602       SystemError *next = dynamic_cast<SystemError *>(e.next());
00603       if (next && next->portable() == SysErr::ErrTryAgain)
00604         sz = 1; // Ignore it, and fake no end of data.
00605       else
00606         // Houston we have a problem.
00607         return losePeer("WARNING: failed to read from peer ",
00608                         p, ev, &e);
00609     }
00610 
00611     // Process fully received messages as long as we can.
00612     size_t consumed = 0;
00613     DataBlob &data = p->incoming;
00614     while (data.size()-consumed >= sizeof(uint32_t)
00615            && p->waiting < MAX_PEER_WAITREQS)
00616     {
00617       uint32_t msglen;
00618       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00619 
00620       if (msglen >= MESSAGE_SIZE_LIMIT)
00621         return losePeer("WARNING: excessively large message from ", p, ev);
00622 
00623       if (data.size()-consumed >= msglen)
00624       {
00625         bool valid = true;
00626         if (msglen < 2*sizeof(uint32_t))
00627         {
00628           logme()
00629             << "ERROR: corrupt peer message of length " << msglen
00630             << " from peer " << p->peeraddr << std::endl;
00631           valid = false;
00632         }
00633         else
00634         {
00635           // Decode and process this message.
00636           Bucket msg;
00637           msg.next = 0;
00638           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00639 
00640           // If we created a response, chain it to the write queue.
00641           if (! msg.data.empty())
00642           {
00643             Bucket **prev = &p->sendq;
00644             while (*prev)
00645                prev = &(*prev)->next;
00646 
00647             *prev = new Bucket;
00648             (*prev)->next = 0;
00649             (*prev)->data.swap(msg.data);
00650           }
00651         }
00652 
00653         if (! valid)
00654           return losePeer("WARNING: data stream error with ", p, ev);
00655 
00656         consumed += msglen;
00657       }
00658       else
00659         break;
00660     }
00661 
00662     data.erase(data.begin(), data.begin()+consumed);
00663 
00664     // If the client has closed the connection, shut down our end.  If
00665     // we have something to send back still, leave the write direction
00666     // open.  Otherwise close the shop for this client.
00667     if (sz == 0)
00668       sel_.setMask(p->socket, p->mask &= ~IORead);
00669   }
00670 
00671   // Yes, please keep processing events for this socket.
00672   return false;
00673 }

IgNet& IgNet::operator= ( const IgNet  )  [private]

void IgNet::purgeDeadObjects ( lat::Time  oldobj,
lat::Time  deadobj 
) [protected, virtual]

Definition at line 1181 of file IgNet.cc.

References debug_, lat::endl(), IgNet::Object::flags, IgNet::Object::lastreq, logme(), IgNet::Object::name, peers_, pi, IgNet::Object::rawdata, IgNet::Object::version, VIS_FLAG_DEAD, and VIS_FLAG_SCALAR.

Referenced by run().

01182 {
01183   PeerMap::iterator pi, pe;
01184   ObjectMap::iterator oi, oe;
01185   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01186     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
01187     {
01188       Object &o = oi->second;
01189 
01190       // Compact non-scalar objects that are unused.  We send scalar
01191       // objects to the web server so we keep them around.
01192       if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR))
01193       {
01194         if (debug_)
01195           logme()
01196             << "DEBUG: compacting idle '" << o.name
01197             << "' from " << pi->second.peeraddr << std::endl;
01198       }
01199 
01200       // Remove if dead, old and unused.
01201       if (o.lastreq < deadobj
01202           && o.version < deadobj
01203           && (o.flags & VIS_FLAG_DEAD))
01204       {
01205         if (debug_)
01206           logme()
01207             << "DEBUG: removing dead '" << o.name
01208             << "' from " << pi->second.peeraddr << std::endl;
01209 
01210         pi->second.objs.erase(oi++);
01211       }
01212       else
01213         ++oi;
01214     }
01215 }

int IgNet::receive ( void(*)(void *arg, uint32_t reason, Object &obj)  callback,
void arg 
) [virtual]

Definition at line 1335 of file IgNet.cc.

References IgNet::Object::flags, local_, lock(), IgNet::Peer::objs, p, peers_, pi, unlock(), IgNet::Peer::updates, VIS_FLAG_DEAD, and VIS_FLAG_RECEIVED.

01336 {
01337   int updates = 0;
01338 
01339   lock();
01340   PeerMap::iterator pi, pe;
01341   ObjectMap::iterator oi, oe;
01342   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01343   {
01344     Peer &p = pi->second;
01345     if (&p == local_)
01346       continue;
01347 
01348     updates += p.updates;
01349 
01350     for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01351     {
01352       Object &o = oi->second;
01353       if (o.flags & VIS_FLAG_DEAD)
01354       {
01355         callback(arg, VIS_FLAG_DEAD, o);
01356         p.objs.erase(oi++);
01357       }
01358       else if (o.flags & VIS_FLAG_RECEIVED)
01359       {
01360         callback(arg, VIS_FLAG_RECEIVED, o);
01361         o.flags &= ~VIS_FLAG_RECEIVED;
01362         ++oi;
01363       }
01364       else
01365         ++oi;
01366     }
01367   }
01368   unlock();
01369 
01370   return updates;
01371 }

void IgNet::releaseFromWait ( WaitList::iterator  i,
Object o 
) [private]

Definition at line 133 of file IgNet.cc.

References alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, releaseFromWait(), and waiting_.

00134 {
00135   Bucket **msg = &i->peer->sendq;
00136   while (*msg)
00137     msg = &(*msg)->next;
00138   *msg = new Bucket;
00139   (*msg)->next = 0;
00140 
00141   releaseFromWait(*msg, *i, o);
00142 
00143   assert(i->peer->waiting > 0);
00144   i->peer->waiting--;
00145   waiting_.erase(i);
00146 }

void IgNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
) [protected, virtual]

Definition at line 170 of file IgNet.cc.

References copydata(), IgNet::Bucket::data, IgNet::WaitObject::name, sendObjectToPeer(), and VIS_REPLY_NONE.

Referenced by releaseFromWait(), releaseWaiters(), and run().

00171 {
00172   if (o)
00173     sendObjectToPeer (msg, *o, true);
00174   else
00175   {
00176     uint32_t words[3];
00177     words[0] = sizeof(words) + w.name.size();
00178     words[1] = VIS_REPLY_NONE;
00179     words[2] = w.name.size();
00180 
00181     msg->data.reserve(msg->data.size() + words[0]);
00182     copydata(msg, &words[0], sizeof(words));
00183     copydata(msg, &w.name[0], w.name.size());
00184   }
00185 }

void IgNet::releaseWaiters ( Object o  )  [private]

Definition at line 150 of file IgNet.cc.

References e, i, IgNet::Object::name, releaseFromWait(), and waiting_.

Referenced by onMessage().

00151 {
00152   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00153     if (i->name == o->name)
00154       releaseFromWait(i++, o);
00155     else
00156       ++i;
00157 }

void IgNet::removeLocalObject ( const std::string &  path  )  [virtual]

Delete the local object.

The caller must call sendLocalChanges() later to push out the changes.

Definition at line 1393 of file IgNet.cc.

References local_, and IgNet::Peer::objs.

Referenced by edm::service::IguanaService::produceEvent().

01394 {
01395   local_->objs.erase(path);
01396 }

void IgNet::removePeer ( Peer p,
lat::Socket s 
) [protected, virtual]

Definition at line 1242 of file IgNet.cc.

References IgNet::Peer::objs, peers_, and sendLocalChanges().

Referenced by losePeer().

01243 {
01244   bool needflush = ! p->objs.empty();
01245 
01246   p->objs.clear();
01247   peers_.erase(s);
01248 
01249   // If we removed a peer with objects, our list of objects
01250   // has changed and we need to update downstream peers.
01251   if (needflush)
01252     sendLocalChanges();
01253 }

void IgNet::requestObject ( Peer p,
const char *  name,
size_t  len 
) [private]

Queue an object request to the data server.

Definition at line 97 of file IgNet.cc.

References copydata(), alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, IgNet::Peer::sendq, and VIS_MSG_GET_OBJECT.

Referenced by onMessage(), and waitForData().

00098 {
00099   Bucket **msg = &p->sendq;
00100   while (*msg)
00101     msg = &(*msg)->next;
00102   *msg = new Bucket;
00103   (*msg)->next = 0;
00104 
00105   uint32_t words[3];
00106   words[0] = sizeof(words) + len;
00107   words[1] = VIS_MSG_GET_OBJECT;
00108   words[2] = len;
00109   (*msg)->data.reserve((*msg)->data.size() + words[0]);
00110   copydata(*msg, words, sizeof(words));
00111   copydata(*msg, name, len);
00112 }

void IgNet::run ( void   ) 

Run the actual I/O processing loop.

Definition at line 964 of file IgNet.cc.

References lat::Socket::abort(), arg, lat::IOSelector::attach(), IgNet::Peer::automatic, lat::InetSocket::connect(), copydata(), lat::CreateHook(), createPeer(), debug_, delay_, lat::IOSelector::dispatch(), downstream_, e, lat::endl(), lat::SysErr::ErrOperationInProgress, lat::Error::explain(), findObject(), flush_, IgNet::AutoPeer::host, lat::InetAddress::hostname(), i, IORead, IOUrgent, IOWrite, lock(), logme(), IgNet::Peer::mask, IgNet::Bucket::next, IgNet::AutoPeer::next, lat::Error::next(), onPeerData(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, p, IgNet::AutoPeer::peer, IgNet::Peer::peeraddr, IgNet::AutoPeer::port, lat::InetAddress::port(), lat::SystemError::portable(), purgeDeadObjects(), releaseFromWait(), s, sel_, sendObjectListToPeers(), IgNet::Peer::sendq, lat::IOChannel::setBlocking(), lat::Socket::setopt(), shouldStop(), IgNet::Peer::socket, SOCKET_BUF_SIZE, lat::SocketConst::TypeStream, unlock(), IgNet::Peer::update, IgNet::AutoPeer::update, updatePeerMasks(), upstream_, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, waiting_, and IgNet::AutoPeer::warned.

00965 {
00966   Time now;
00967   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
00968 
00969   // Perform I/O.  Every once in a while flush updates to peers.
00970   while (! shouldStop())
00971   {
00972     for (int i = 0; i < 2; ++i)
00973     {
00974       AutoPeer *ap = automatic[i];
00975 
00976       // If we need a server connection and don't have one yet,
00977       // initiate asynchronous connection creation.  Swallow errors
00978       // in case the server won't talk to us.
00979       if (! ap->host.empty()
00980           && ! ap->peer
00981           && (now = Time::current()) > ap->next)
00982       {
00983         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
00984         InetSocket *s = 0;
00985         try
00986         {
00987           s = new InetSocket (SocketConst::TypeStream);
00988           s->setBlocking (false);
00989           s->connect(InetAddress (ap->host.c_str(), ap->port));
00990           s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00991           s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00992         }
00993         catch (Error &e)
00994         {
00995           SystemError *sys = dynamic_cast<SystemError *>(e.next());
00996           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
00997           {
00998             // "In progress" just means the connection is in progress.
00999             // The connection is ready when the socket is writeable.
01000             // Anything else is a real problem.
01001             if (! ap->warned)
01002             {
01003               logme()
01004                 << "NOTE: server at " << ap->host << ":" << ap->port
01005                 << " is unavailable.  Connection will be established"
01006                 << " automatically on the background once the server"
01007                 << " becomes available.  Error from the attempt was: "
01008                 << e.explain() << '\n';
01009               ap->warned = true;
01010             }
01011 
01012             if (s)
01013               s->abort();
01014             delete s;
01015             s = 0;
01016           }
01017         }
01018 
01019         // Set up with the selector if we were successful.  If this is
01020         // the upstream collector, queue a request for updates.
01021         if (s)
01022         {
01023           lock();
01024           Peer *p = createPeer(s);
01025           ap->peer = p;
01026           ap->warned = false;
01027           unlock();
01028 
01029           InetAddress peeraddr = ((InetSocket *) s)->peername();
01030           InetAddress myaddr = ((InetSocket *) s)->sockname();
01031           p->peeraddr = StringFormat("%1:%2")
01032                         .arg(peeraddr.hostname())
01033                         .arg(peeraddr.port());
01034           p->mask = IORead|IOWrite|IOUrgent;
01035           p->update = ap->update;
01036           p->automatic = ap;
01037           p->socket = s;
01038           sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p));
01039           if (ap == &upstream_)
01040           {
01041             uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS,
01042                                   2*sizeof(uint32_t), VIS_MSG_UPDATE_ME };
01043             p->sendq = new Bucket;
01044             p->sendq->next = 0;
01045             copydata(p->sendq, words, sizeof(words));
01046           }
01047 
01048           // Report the new connection.
01049           if (debug_)
01050             logme()
01051               << "INFO: now connected to " << p->peeraddr << " from "
01052               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01053         }
01054       }
01055     }
01056 
01057     // Pump events for a while.
01058     sel_.dispatch(delay_);
01059     now = Time::current();
01060 
01061     // Check if flush is required.  Flush only if one is needed.
01062     // Always sends the full object list.  Compact objects no longer
01063     // in active use before sending out the update.
01064     if (flush_)
01065     {
01066       flush_ = false;
01067 
01068       lock();
01069       purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0),
01070                        now - TimeSpan(0, 0, 20 /* minutes */, 0, 0));
01071       sendObjectListToPeers(true);
01072       unlock();
01073     }
01074 
01075     // Update the data server and peer selection masks.  If we
01076     // have no more data to send and listening for writes, remove
01077     // the write mask.  If we have something to write and aren't
01078     // listening for writes, start listening so we can send off
01079     // the data.
01080     updatePeerMasks();
01081 
01082     // Release peers that have been waiting for data for too long.
01083     lock();
01084     Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0);
01085     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01086     {
01087       // If the peer has waited for too long, send something.
01088       if (i->time < waitold)
01089         releaseFromWait(i++, findObject(0, i->name));
01090 
01091       // Keep it for now.
01092       else
01093         ++i;
01094     }
01095     unlock();
01096   }
01097 }

void IgNet::sendLocalChanges ( void   ) 

Definition at line 1102 of file IgNet.cc.

References lat::Pipe::sink(), wakeup_, and lat::IOChannel::write().

Referenced by edm::service::IguanaService::produceEvent(), and removePeer().

01103 {
01104   char byte = 0;
01105   wakeup_.sink()->write(&byte, 1);
01106 }

void IgNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
) [protected, virtual]

Send all objects to a peer and optionally mark sent objects old.

Definition at line 1257 of file IgNet.cc.

References copydata(), IgNet::Bucket::data, peers_, pi, sendObjectToPeer(), VIS_FLAG_NEW, VIS_REPLY_LIST_BEGIN, and VIS_REPLY_LIST_END.

Referenced by onMessage(), and sendObjectListToPeers().

01258 {
01259   PeerMap::iterator pi, pe;
01260   ObjectMap::iterator oi, oe;
01261   uint32_t numobjs = 0;
01262   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01263     numobjs += pi->second.objs.size();
01264 
01265   msg->data.reserve(msg->data.size() + 300*numobjs);
01266 
01267   uint32_t nupdates = 0;
01268   uint32_t words[4];
01269   words[0] = sizeof(words);
01270   words[1] = VIS_REPLY_LIST_BEGIN;
01271   words[2] = numobjs;
01272   words[3] = all;
01273   copydata(msg, &words[0], sizeof(words));
01274 
01275   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01276     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
01277       if (all || (oi->second.flags & VIS_FLAG_NEW))
01278       {
01279         sendObjectToPeer(msg, oi->second, false);
01280         if (clear)
01281           oi->second.flags &= ~VIS_FLAG_NEW;
01282         ++nupdates;
01283       }
01284 
01285   words[1] = VIS_REPLY_LIST_END;
01286   words[2] = nupdates;
01287   copydata(msg, &words[0], sizeof(words));
01288 }

void IgNet::sendObjectListToPeers ( bool  all  )  [protected, virtual]

Definition at line 1291 of file IgNet.cc.

References IgNet::Bucket::data, debug_, e, lat::endl(), i, logme(), alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, p, IgNet::Peer::peeraddr, peers_, sendObjectListToPeer(), IgNet::Peer::sendq, IgNet::Peer::update, and IgNet::Peer::updated.

Referenced by run().

01292 {
01293   PeerMap::iterator i, e;
01294   for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01295   {
01296     Peer &p = i->second;
01297     if (! p.update)
01298       continue;
01299 
01300     if (debug_)
01301       logme()
01302         << "DEBUG: notifying " << p.peeraddr << std::endl;
01303 
01304     Bucket msg;
01305     msg.next = 0;
01306     sendObjectListToPeer(&msg, !p.updated || all, true);
01307 
01308     if (! msg.data.empty())
01309     {
01310       Bucket **prev = &p.sendq;
01311       while (*prev)
01312         prev = &(*prev)->next;
01313 
01314       *prev = new Bucket;
01315       (*prev)->next = 0;
01316       (*prev)->data.swap(msg.data);
01317     }
01318     p.updated = true;
01319   }
01320 }

void IgNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
) [protected]

Definition at line 191 of file IgNet.cc.

References copydata(), IgNet::Bucket::data, IgNet::Object::flags, flags, IgNet::Object::name, IgNet::Object::rawdata, IgNet::Object::version, VIS_FLAG_SCALAR, VIS_FLAG_ZOMBIE, and VIS_REPLY_OBJECT.

Referenced by onMessage(), releaseFromWait(), and sendObjectListToPeer().

00192 {
00193   uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE;
00194   DataBlob objdata;
00195 
00196   if (data || (flags & VIS_FLAG_SCALAR))
00197     objdata.insert(objdata.end(),
00198                    &o.rawdata[0],
00199                    &o.rawdata[0] + o.rawdata.size());
00200 
00201   uint32_t words[7];
00202   uint32_t namelen = o.name.size();
00203   uint32_t datalen = objdata.size();
00204 
00205   words[0] = sizeof(words) + namelen + datalen;
00206   words[1] = VIS_REPLY_OBJECT;
00207   words[2] = flags;
00208   words[3] = (o.version >> 0 ) & 0xffffffff;
00209   words[4] = (o.version >> 32) & 0xffffffff;
00210   words[5] = namelen;
00211   words[6] = datalen;
00212 
00213   msg->data.reserve(msg->data.size() + words[0]);
00214   copydata(msg, &words[0], sizeof(words));
00215   if (namelen)
00216     copydata(msg, &o.name[0], namelen);
00217   if (datalen)
00218     copydata(msg, &objdata[0], datalen);
00219 }

bool IgNet::shouldStop ( void   )  [protected, virtual]

Reimplemented in IguanaNetProducer.

Definition at line 162 of file IgNet.cc.

References shutdown_.

Referenced by run().

00163 {
00164   return shutdown_;
00165 }

void IgNet::shutdown ( void   ) 

Stop the network layer and wait it to finish.

Definition at line 908 of file IgNet.cc.

References communicate_, and shutdown_.

00909 {
00910   shutdown_ = 1;
00911   if (communicate_ != (pthread_t) -1)
00912     pthread_join(communicate_, 0);
00913 }

void IgNet::start ( void   ) 

Start running the network layer in a new thread.

This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 949 of file IgNet.cc.

References communicate(), communicate_, lock_, logme(), and pthread_create.

Referenced by edm::service::IguanaService::init().

00950 {
00951   if (communicate_ != (pthread_t) -1)
00952   {
00953     logme()
00954       << "ERROR: Shared memory networking thread has already been started\n";
00955     return;
00956   }
00957 
00958   pthread_mutex_init(&lock_, 0);
00959   pthread_create(&communicate_, 0, &communicate, this);
00960 }

void IgNet::startLocalServer ( int  port  ) 

Start a server socket for accessing this node remotely.

Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 833 of file IgNet.cc.

References lat::IOSelector::attach(), lat::Error::clone(), lat::CreateHook(), e, lat::endl(), lat::Error::explain(), IOAccept, logme(), onPeerConnect(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, sel_, server_, lat::IOChannel::setBlocking(), lat::Socket::setopt(), and SOCKET_BUF_SIZE.

Referenced by IguanaNetProducer::IguanaNetProducer().

00834 {
00835   if (server_)
00836   {
00837     logme() << "ERROR: server was already started.\n";
00838     return;
00839   }
00840 
00841   try
00842   {
00843     server_ = new InetServerSocket(InetAddress (port), 10);
00844     server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00845     server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00846     server_->setBlocking(false);
00847     sel_.attach(server_, IOAccept, CreateHook(this, &IgNet::onPeerConnect));
00848   }
00849   catch (Error &e)
00850   {
00851     // FIXME: Do we need to do this when we throw an exception anyway?
00852     // FIXME: Abort instead?
00853     logme()
00854       << "ERROR: Failed to start server at port " << port << ": "
00855       << e.explain() << std::endl;
00856 
00857     throw IgNetError("Failed to start server at port ", e.clone());
00858     
00859     // FIXME: Throw something simpler that removes the dependency?
00860 //     throw cms::Exception("IgNet::startLocalServer")
00861 //       << "Failed to start server at port " << port << ": "
00862 //       << e.explain();
00863   }
00864   
00865   logme() << "INFO: Shared memory server started at port " << port << std::endl;
00866 }

void IgNet::unlock ( void   ) 

Release the lock on the net layer.

Definition at line 939 of file IgNet.cc.

References communicate_, and lock_.

Referenced by onMessage(), edm::service::IguanaService::produceEvent(), receive(), and run().

00940 {
00941   if (communicate_ != (pthread_t) -1)
00942     pthread_mutex_unlock(&lock_);
00943 }

void IgNet::updateLocalObject ( Object o  )  [virtual]

Update the network cache for an object.

The caller must call sendLocalChanges() later to push out the changes.

Definition at line 1376 of file IgNet.cc.

References IgNet::Object::flags, local_, IgNet::Object::name, IgNet::Peer::objs, IgNet::Object::rawdata, std::swap(), and IgNet::Object::version.

Referenced by edm::service::IguanaService::produceEvent().

01377 {
01378   ObjectMap::iterator pos = local_->objs.find(o.name);
01379   if (pos == local_->objs.end())
01380     local_->objs.insert(ObjectMap::value_type(o.name, o));
01381   else
01382   {
01383     std::swap(pos->second.version,   o.version);
01384     std::swap(pos->second.flags,     o.flags);
01385     std::swap(pos->second.rawdata,   o.rawdata);
01386     pos->second.lastreq = 0;
01387   }
01388 }

void IgNet::updateMask ( Peer p  )  [protected]

Update the selector mask for a peer based on data queues.

Close the connection if there is no reason to maintain it open.

Definition at line 751 of file IgNet.cc.

References debug_, lat::endl(), IOUrgent, IOWrite, logme(), losePeer(), IgNet::Peer::mask, IgNet::Peer::peeraddr, sel_, IgNet::Peer::sendq, lat::IOSelector::setMask(), IgNet::Peer::socket, and IgNet::Peer::waiting.

Referenced by updatePeerMasks().

00752 {
00753   if (! p->socket)
00754     return;
00755 
00756   // Listen to writes iff we have data to send.
00757   unsigned oldmask = p->mask;
00758   if (! p->sendq && (p->mask & IOWrite))
00759     sel_.setMask(p->socket, p->mask &= ~IOWrite);
00760 
00761   if (p->sendq && ! (p->mask & IOWrite))
00762     sel_.setMask(p->socket, p->mask |= IOWrite);
00763 
00764   if (debug_ && oldmask != p->mask)
00765     logme()
00766       << "DEBUG: updating mask for " << p->peeraddr << " to "
00767       << p->mask << " from " << oldmask << std::endl;
00768 
00769   // If we have nothing more to send and are no longer listening
00770   // for reads, close up the shop for this peer.
00771   if (p->mask == IOUrgent && ! p->waiting)
00772   {
00773     assert(! p->sendq);
00774     if (debug_)
00775       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
00776     losePeer(0, p, 0);
00777   }
00778 }

void IgNet::updatePeerMasks ( void   )  [protected, virtual]

Definition at line 1323 of file IgNet.cc.

References e, i, peers_, and updateMask().

Referenced by run().

01324 {
01325   PeerMap::iterator i, e;
01326   for (i = peers_.begin(), e = peers_.end(); i != e; )
01327     updateMask(&(i++)->second);
01328 }

void IgNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local data changes.

Must be called before calling run() or start().

Definition at line 872 of file IgNet.cc.

References downstream_, lat::endl(), IgNet::AutoPeer::host, logme(), IgNet::AutoPeer::port, and IgNet::AutoPeer::update.

00873 {
00874   if (! downstream_.host.empty())
00875   {
00876     logme()
00877       << "ERROR: Already updating another collector at "
00878       << downstream_.host << ":" << downstream_.port << std::endl;
00879     return;
00880   }
00881 
00882   downstream_.update = true;
00883   downstream_.host = host;
00884   downstream_.port = port;
00885 }

void IgNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
) [protected]

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 117 of file IgNet.cc.

References requestObject(), IgNet::Peer::waiting, and waiting_.

Referenced by onMessage().

00118 {
00119   // FIXME: Should we automatically record which exact peer the waiter
00120   // is expecting to deliver data so we know to release the waiter if
00121   // the other peer vanishes?  The current implementation stands a
00122   // chance for the waiter to wait indefinitely -- although we do
00123   // force terminate the wait after a while.
00124   requestObject(owner, name.size() ? &name[0] : 0, name.size());
00125   WaitObject wo = { Time::current(), name, info, p };
00126   waiting_.push_back(wo);
00127   p->waiting++;
00128 }


Member Data Documentation

std::string IgNet::appname_ [private]

Definition at line 162 of file IgNet.h.

Referenced by logme().

pthread_t IgNet::communicate_ [private]

Definition at line 177 of file IgNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool IgNet::debug_ [protected]

Definition at line 146 of file IgNet.h.

Referenced by debug(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), and updateMask().

int IgNet::delay_ [private]

Definition at line 180 of file IgNet.h.

Referenced by delay(), and run().

AutoPeer IgNet::downstream_ [private]

Definition at line 172 of file IgNet.h.

Referenced by IgNet(), run(), and updateToCollector().

bool IgNet::flush_ [private]

Definition at line 181 of file IgNet.h.

Referenced by onLocalNotify(), onMessage(), and run().

Peer* IgNet::local_ [private]

Definition at line 174 of file IgNet.h.

Referenced by IgNet(), receive(), removeLocalObject(), and updateLocalObject().

pthread_mutex_t IgNet::lock_ [private]

Definition at line 176 of file IgNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t IgNet::MAX_PEER_WAITREQS = 128 [static]

Definition at line 37 of file IgNet.h.

Referenced by onPeerData().

PeerMap IgNet::peers_ [private]

Definition at line 170 of file IgNet.h.

Referenced by createPeer(), findObject(), getPeer(), purgeDeadObjects(), receive(), removePeer(), sendObjectListToPeer(), sendObjectListToPeers(), and updatePeerMasks().

int IgNet::pid_ [private]

Definition at line 163 of file IgNet.h.

Referenced by logme().

lat::IOSelector IgNet::sel_ [private]

Definition at line 165 of file IgNet.h.

Referenced by IgNet(), losePeer(), onPeerConnect(), onPeerData(), run(), startLocalServer(), and updateMask().

lat::InetServerSocket* IgNet::server_ [private]

Definition at line 166 of file IgNet.h.

Referenced by onPeerConnect(), and startLocalServer().

sig_atomic_t IgNet::shutdown_ [private]

Definition at line 178 of file IgNet.h.

Referenced by shouldStop(), and shutdown().

AutoPeer IgNet::upstream_ [private]

Definition at line 171 of file IgNet.h.

Referenced by IgNet(), listenToSource(), and run().

lat::Time IgNet::version_ [private]

Definition at line 168 of file IgNet.h.

const uint32_t IgNet::VIS_FLAG_DEAD = 0x40000000 [static]

Definition at line 34 of file IgNet.h.

Referenced by markObjectsDead(), onMessage(), purgeDeadObjects(), and receive().

const uint32_t IgNet::VIS_FLAG_NEW = 0x20000000 [static]

Definition at line 33 of file IgNet.h.

Referenced by edm::service::IguanaService::produceEvent(), and sendObjectListToPeer().

const uint32_t IgNet::VIS_FLAG_RECEIVED = 0x10000000 [static]

Definition at line 32 of file IgNet.h.

Referenced by onMessage(), and receive().

const uint32_t IgNet::VIS_FLAG_SCALAR = 0x1 [static]

Definition at line 31 of file IgNet.h.

Referenced by edm::service::IguanaService::produceEvent(), purgeDeadObjects(), and sendObjectToPeer().

const uint32_t IgNet::VIS_FLAG_ZOMBIE = 0x80000000 [static]

Definition at line 35 of file IgNet.h.

Referenced by markObjectsDead(), markObjectsZombies(), and sendObjectToPeer().

const uint32_t IgNet::VIS_MSG_GET_OBJECT = 3 [static]

Definition at line 24 of file IgNet.h.

Referenced by onMessage(), and requestObject().

const uint32_t IgNet::VIS_MSG_HELLO = 0 [static]

Definition at line 21 of file IgNet.h.

const uint32_t IgNet::VIS_MSG_LIST_OBJECTS = 2 [static]

Definition at line 23 of file IgNet.h.

Referenced by onMessage(), and run().

const uint32_t IgNet::VIS_MSG_UPDATE_ME = 1 [static]

Definition at line 22 of file IgNet.h.

Referenced by onMessage(), and run().

const uint32_t IgNet::VIS_REPLY_LIST_BEGIN = 101 [static]

Definition at line 26 of file IgNet.h.

Referenced by onMessage(), and sendObjectListToPeer().

const uint32_t IgNet::VIS_REPLY_LIST_END = 102 [static]

Definition at line 27 of file IgNet.h.

Referenced by onMessage(), and sendObjectListToPeer().

const uint32_t IgNet::VIS_REPLY_NONE = 103 [static]

Definition at line 28 of file IgNet.h.

Referenced by onMessage(), and releaseFromWait().

const uint32_t IgNet::VIS_REPLY_OBJECT = 104 [static]

Definition at line 29 of file IgNet.h.

Referenced by onMessage(), and sendObjectToPeer().

WaitList IgNet::waiting_ [private]

Definition at line 173 of file IgNet.h.

Referenced by losePeer(), releaseFromWait(), releaseWaiters(), run(), and waitForData().

lat::Pipe IgNet::wakeup_ [private]

Definition at line 167 of file IgNet.h.

Referenced by IgNet(), and sendLocalChanges().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:25:15 2009 for CMSSW by  doxygen 1.5.4