#include <Iguana/Framework/interface/IgNet.h>
Public Types | |
typedef std::vector< unsigned char > | DataBlob |
typedef std::map< std::string, Object > | ObjectMap |
typedef std::map< lat::Socket *, Peer > | PeerMap |
typedef std::list< WaitObject > | WaitList |
Public Member Functions | |
void | debug (bool doit) |
Enable or disable verbose debugging. | |
void | delay (int delay) |
Set the I/O dispatching delay. | |
IgNet (const std::string &appname="") | |
void | listenToSource (const std::string &host, int port) |
Tell the network layer to connect to host and port and automatically receive updates from upstream source(s). | |
void | lock (void) |
Acquire a lock on the net layer. | |
virtual int | receive (void(*callback)(void *arg, uint32_t reason, Object &obj), void *arg) |
virtual void | removeLocalObject (const std::string &name) |
Delete the local object. | |
void | run (void) |
Run the actual I/O processing loop. | |
void | sendLocalChanges (void) |
void | shutdown (void) |
Stop the network layer and wait it to finish. | |
void | start (void) |
Start running the network layer in a new thread. | |
void | startLocalServer (int port) |
Start a server socket for accessing this node remotely. | |
void | unlock (void) |
Release the lock on the net layer. | |
virtual void | updateLocalObject (Object &o) |
Update the network cache for an object. | |
void | updateToCollector (const std::string &host, int port) |
Tell the network layer to connect to host and port and automatically send updates whenever local data changes. | |
virtual | ~IgNet (void) |
Static Public Attributes | |
static const uint32_t | MAX_PEER_WAITREQS = 128 |
static const uint32_t | VIS_FLAG_DEAD = 0x40000000 |
static const uint32_t | VIS_FLAG_NEW = 0x20000000 |
static const uint32_t | VIS_FLAG_RECEIVED = 0x10000000 |
static const uint32_t | VIS_FLAG_SCALAR = 0x1 |
static const uint32_t | VIS_FLAG_ZOMBIE = 0x80000000 |
static const uint32_t | VIS_MSG_GET_OBJECT = 3 |
static const uint32_t | VIS_MSG_HELLO = 0 |
static const uint32_t | VIS_MSG_LIST_OBJECTS = 2 |
static const uint32_t | VIS_MSG_UPDATE_ME = 1 |
static const uint32_t | VIS_REPLY_LIST_BEGIN = 101 |
static const uint32_t | VIS_REPLY_LIST_END = 102 |
static const uint32_t | VIS_REPLY_NONE = 103 |
static const uint32_t | VIS_REPLY_OBJECT = 104 |
Protected Member Functions | |
virtual Peer * | createPeer (lat::Socket *s) |
virtual Object * | findObject (Peer *p, const std::string &name, Peer **owner=0) |
virtual Peer * | getPeer (lat::Socket *s) |
std::ostream & | logme (void) |
virtual Object * | makeObject (Peer *p, const std::string &name) |
virtual void | markObjectsDead (Peer *p) |
virtual void | markObjectsZombies (Peer *p) |
virtual bool | onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len) |
virtual void | purgeDeadObjects (lat::Time oldobj, lat::Time deadobj) |
virtual void | releaseFromWait (Bucket *msg, WaitObject &w, Object *o) |
virtual void | removePeer (Peer *p, lat::Socket *s) |
virtual void | sendObjectListToPeer (Bucket *msg, bool all, bool clear) |
Send all objects to a peer and optionally mark sent objects old. | |
virtual void | sendObjectListToPeers (bool all) |
void | sendObjectToPeer (Bucket *msg, Object &o, bool data) |
virtual bool | shouldStop (void) |
void | updateMask (Peer *p) |
Update the selector mask for a peer based on data queues. | |
virtual void | updatePeerMasks (void) |
void | waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner) |
Queue a request for an object and put a peer into the mode of waiting for object data to appear. | |
Static Protected Member Functions | |
static void | copydata (Bucket *b, const void *data, size_t len) |
Protected Attributes | |
bool | debug_ |
Private Member Functions | |
IgNet (const IgNet &) | |
bool | losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0) |
Handle errors with a peer socket. | |
bool | onLocalNotify (lat::IOSelectEvent *ev) |
React to notifications from the app thread. | |
bool | onPeerConnect (lat::IOSelectEvent *ev) |
Respond to new connections on the server socket. | |
bool | onPeerData (lat::IOSelectEvent *ev, Peer *p) |
Handle communication to a particular client. | |
IgNet & | operator= (const IgNet &) |
void | releaseFromWait (WaitList::iterator i, Object *o) |
void | releaseWaiters (Object *o) |
void | requestObject (Peer *p, const char *name, size_t len) |
Queue an object request to the data server. | |
Static Private Member Functions | |
static void | discard (Bucket *&b) |
Private Attributes | |
std::string | appname_ |
pthread_t | communicate_ |
int | delay_ |
AutoPeer | downstream_ |
bool | flush_ |
Peer * | local_ |
pthread_mutex_t | lock_ |
PeerMap | peers_ |
int | pid_ |
lat::IOSelector | sel_ |
lat::InetServerSocket * | server_ |
sig_atomic_t | shutdown_ |
AutoPeer | upstream_ |
lat::Time | version_ |
WaitList | waiting_ |
lat::Pipe | wakeup_ |
Classes | |
struct | AutoPeer |
struct | Bucket |
struct | Object |
struct | Peer |
struct | WaitObject |
Definition at line 18 of file IgNet.h.
typedef std::vector<unsigned char> IgNet::DataBlob |
typedef std::map<std::string, Object> IgNet::ObjectMap |
typedef std::map<lat::Socket *, Peer> IgNet::PeerMap |
typedef std::list<WaitObject> IgNet::WaitList |
IgNet::IgNet | ( | const std::string & | appname = "" |
) |
Definition at line 781 of file IgNet.cc.
References lat::IOSelector::attach(), lat::CreateHook(), createPeer(), downstream_, lat::IOChannel::fd(), IORead, local_, IgNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), IgNet::AutoPeer::peer, IgNet::AutoPeer::port, sel_, lat::Pipe::source(), IgNet::AutoPeer::update, upstream_, wakeup_, and IgNet::AutoPeer::warned.
00782 : debug_ (false), 00783 appname_ (appname.empty() ? "IgNet" : appname.c_str()), 00784 pid_ (getpid()), 00785 server_ (0), 00786 version_ (Time::current()), 00787 communicate_ ((pthread_t) -1), 00788 shutdown_ (0), 00789 delay_ (1000), 00790 flush_ (false) 00791 { 00792 // Create a pipe for the local apps to tell the communicator 00793 // thread that local app data has changed and that the peers 00794 // should be notified. 00795 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK); 00796 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &IgNet::onLocalNotify)); 00797 00798 // Initialise the upstream and downstream to empty. 00799 upstream_.peer = downstream_.peer = 0; 00800 upstream_.next = downstream_.next = 0; 00801 upstream_.port = downstream_.port = 0; 00802 upstream_.update = downstream_.update = false; 00803 upstream_.warned = downstream_.warned = false; 00804 00805 local_ = createPeer((Socket *) -1); 00806 }
IgNet::~IgNet | ( | void | ) | [virtual] |
IgNet::IgNet | ( | const IgNet & | ) | [private] |
Definition at line 38 of file IgNet.cc.
References IgNet::Bucket::data.
Referenced by onMessage(), releaseFromWait(), requestObject(), run(), sendObjectListToPeer(), and sendObjectToPeer().
00039 { 00040 b->data.insert(b->data.end(), 00041 (const unsigned char *)data, 00042 (const unsigned char *)data + len); 00043 }
IgNet::Peer * IgNet::createPeer | ( | lat::Socket * | s | ) | [protected, virtual] |
Definition at line 1225 of file IgNet.cc.
References IgNet::Peer::automatic, IgNet::Peer::mask, p, peers_, IgNet::Peer::sendpos, IgNet::Peer::sendq, IgNet::Peer::socket, IgNet::Peer::source, IgNet::Peer::update, IgNet::Peer::updated, IgNet::Peer::updates, and IgNet::Peer::waiting.
Referenced by IgNet(), onPeerConnect(), and run().
01226 { 01227 Peer *p = &peers_[s]; 01228 p->socket = 0; 01229 p->sendq = 0; 01230 p->sendpos = 0; 01231 p->mask = 0; 01232 p->source = false; 01233 p->update = false; 01234 p->updated = false; 01235 p->updates = 0; 01236 p->waiting = 0; 01237 p->automatic = 0; 01238 return p; 01239 }
Definition at line 47 of file IgNet.cc.
References IgNet::Bucket::next.
Referenced by losePeer(), and onPeerData().
00048 { 00049 while (b) 00050 { 00051 Bucket *next = b->next; 00052 delete b; 00053 b = next; 00054 } 00055 }
IgNet::Object * IgNet::findObject | ( | Peer * | p, | |
const std::string & | name, | |||
Peer ** | owner = 0 | |||
) | [protected, virtual] |
Definition at line 1112 of file IgNet.cc.
References e, i, IgNet::Peer::objs, and peers_.
Referenced by onMessage(), and run().
01113 { 01114 ObjectMap::iterator pos; 01115 PeerMap::iterator i, e; 01116 if (owner) 01117 *owner = 0; 01118 if (p) 01119 { 01120 pos = p->objs.find(name); 01121 if (pos == p->objs.end()) 01122 return 0; 01123 else 01124 { 01125 if (owner) *owner = p; 01126 return &pos->second; 01127 } 01128 } 01129 else 01130 { 01131 for (i = peers_.begin(), e = peers_.end(); i != e; ++i) 01132 { 01133 pos = i->second.objs.find(name); 01134 if (pos != i->second.objs.end()) 01135 { 01136 if (owner) *owner = &i->second; 01137 return &pos->second; 01138 } 01139 } 01140 return 0; 01141 } 01142 }
IgNet::Peer * IgNet::getPeer | ( | lat::Socket * | s | ) | [protected, virtual] |
Tell the network layer to connect to host and port and automatically receive updates from upstream source(s).
Must be called before calling run() or start().
Definition at line 891 of file IgNet.cc.
References lat::endl(), IgNet::AutoPeer::host, logme(), IgNet::AutoPeer::port, IgNet::AutoPeer::update, and upstream_.
00892 { 00893 if (! upstream_.host.empty()) 00894 { 00895 logme() 00896 << "ERROR: Already receiving data from another collector at " 00897 << upstream_.host << ":" << upstream_.port << std::endl; 00898 return; 00899 } 00900 00901 upstream_.update = false; 00902 upstream_.host = host; 00903 upstream_.port = port; 00904 }
Acquire a lock on the net layer.
Definition at line 931 of file IgNet.cc.
References communicate_, and lock_.
Referenced by onMessage(), edm::service::IguanaService::produceEvent(), receive(), and run().
00932 { 00933 if (communicate_ != (pthread_t) -1) 00934 pthread_mutex_lock(&lock_); 00935 }
std::ostream & IgNet::logme | ( | void | ) | [protected] |
Definition at line 29 of file IgNet.cc.
References appname_, TestMuL1L2Filter_cff::cerr, and pid_.
Referenced by IguanaNetProducer::IguanaNetProducer(), listenToSource(), losePeer(), onLocalNotify(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), start(), startLocalServer(), updateMask(), and updateToCollector().
00030 { 00031 return std::cerr 00032 << Time::current().format(true, "%Y-%m-%d %H:%M:%S") 00033 << " " << appname_ << "[" << pid_ << "]: "; 00034 }
bool IgNet::losePeer | ( | const char * | reason, | |
Peer * | peer, | |||
lat::IOSelectEvent * | event, | |||
lat::Error * | err = 0 | |||
) | [private] |
Handle errors with a peer socket.
Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.
Definition at line 62 of file IgNet.cc.
References IgNet::Peer::automatic, lat::Socket::close(), lat::IOSelector::detach(), discard(), e, lat::endl(), lat::Error::explain(), i, logme(), IgNet::AutoPeer::peer, IgNet::Peer::peeraddr, removePeer(), s, sel_, IgNet::Peer::sendq, IgNet::Peer::socket, lat::IOSelectEvent::source, and waiting_.
Referenced by onPeerData(), and updateMask().
00066 { 00067 if (reason) 00068 logme () 00069 << reason << peer->peeraddr 00070 << (err ? "; error was: " + err->explain() : std::string("")) 00071 << std::endl; 00072 00073 Socket *s = peer->socket; 00074 00075 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 00076 if (i->peer == peer) 00077 waiting_.erase(i++); 00078 else 00079 ++i; 00080 00081 if (ev) 00082 ev->source = 0; 00083 00084 discard(peer->sendq); 00085 if (peer->automatic) 00086 peer->automatic->peer = 0; 00087 00088 sel_.detach(s); 00089 s->close(); 00090 removePeer(peer, s); 00091 delete s; 00092 return true; 00093 }
IgNet::Object * IgNet::makeObject | ( | Peer * | p, | |
const std::string & | name | |||
) | [protected, virtual] |
Definition at line 1145 of file IgNet.cc.
References IgNet::Object::flags, IgNet::Object::lastreq, IgNet::Object::name, IgNet::Peer::objs, and IgNet::Object::version.
Referenced by onMessage().
01146 { 01147 Object *o = &p->objs[name]; 01148 o->version = 0; 01149 o->name = name; 01150 o->flags = 0; 01151 o->lastreq = 0; 01152 return o; 01153 }
Definition at line 1171 of file IgNet.cc.
References e, i, IgNet::Peer::objs, VIS_FLAG_DEAD, and VIS_FLAG_ZOMBIE.
Referenced by onMessage().
01172 { 01173 ObjectMap::iterator i, e; 01174 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i) 01175 if (i->second.flags & VIS_FLAG_ZOMBIE) 01176 i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD; 01177 }
Definition at line 1162 of file IgNet.cc.
References e, i, IgNet::Peer::objs, and VIS_FLAG_ZOMBIE.
Referenced by onMessage().
01163 { 01164 ObjectMap::iterator i, e; 01165 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i) 01166 i->second.flags |= VIS_FLAG_ZOMBIE; 01167 }
bool IgNet::onLocalNotify | ( | lat::IOSelectEvent * | ev | ) | [private] |
React to notifications from the app thread.
This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new app data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent object updates.
Definition at line 720 of file IgNet.cc.
References e, lat::endl(), lat::SysErr::ErrTryAgain, lat::Error::explain(), flush_, logme(), lat::Error::next(), lat::SystemError::portable(), lat::IOChannel::read(), and lat::IOSelectEvent::source.
Referenced by IgNet().
00721 { 00722 // Discard the data in the pipe, we care only about the wakeup. 00723 try 00724 { 00725 IOSize sz; 00726 unsigned char buf [1024]; 00727 while ((sz = ev->source->read(buf, sizeof(buf)))) 00728 ; 00729 } 00730 catch (Error &e) 00731 { 00732 SystemError *next = dynamic_cast<SystemError *>(e.next()); 00733 if (next && next->portable() == SysErr::ErrTryAgain) 00734 ; // Ignore it 00735 else 00736 logme() 00737 << "WARNING: error reading from notification pipe: " 00738 << e.explain() << std::endl; 00739 } 00740 00741 // Tell the main event pump to send an update in a little while. 00742 flush_ = true; 00743 00744 // We are never done, always keep going. 00745 return false; 00746 }
bool IgNet::onMessage | ( | Bucket * | msg, | |
Peer * | p, | |||
unsigned char * | data, | |||
size_t | len | |||
) | [protected, virtual] |
Definition at line 224 of file IgNet.cc.
References copydata(), IgNet::Bucket::data, debug_, lat::endl(), findObject(), IgNet::Object::flags, flags, flush_, IgNet::Object::lastreq, lock(), logme(), makeObject(), markObjectsDead(), markObjectsZombies(), name, IgNet::Peer::peeraddr, IgNet::Object::rawdata, releaseWaiters(), requestObject(), sendObjectListToPeer(), sendObjectToPeer(), IgNet::Peer::source, unlock(), IgNet::Peer::update, IgNet::Peer::updates, IgNet::Object::version, VIS_FLAG_DEAD, VIS_FLAG_RECEIVED, VIS_MSG_GET_OBJECT, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, VIS_REPLY_LIST_BEGIN, VIS_REPLY_LIST_END, VIS_REPLY_NONE, VIS_REPLY_OBJECT, and waitForData().
Referenced by onPeerData().
00225 { 00226 // Decode and process this message. 00227 uint32_t type; 00228 memcpy (&type, data + sizeof(uint32_t), sizeof (type)); 00229 switch (type) 00230 { 00231 case VIS_MSG_UPDATE_ME: 00232 { 00233 if (len != 2*sizeof(uint32_t)) 00234 { 00235 logme() 00236 << "ERROR: corrupt 'UPDATE_ME' message of length " << len 00237 << " from peer " << p->peeraddr << std::endl; 00238 return false; 00239 } 00240 00241 if (debug_) 00242 logme() 00243 << "DEBUG: received message 'UPDATE ME' from peer " 00244 << p->peeraddr << std::endl; 00245 00246 p->update = true; 00247 } 00248 return true; 00249 00250 case VIS_MSG_LIST_OBJECTS: 00251 { 00252 if (debug_) 00253 logme() 00254 << "DEBUG: received message 'LIST OBJECTS' from peer " 00255 << p->peeraddr << std::endl; 00256 00257 // Send over current status: list of known objects. 00258 lock(); 00259 sendObjectListToPeer(msg, true, false); 00260 unlock(); 00261 } 00262 return true; 00263 00264 case VIS_MSG_GET_OBJECT: 00265 { 00266 if (debug_) 00267 logme() 00268 << "DEBUG: received message 'GET OBJECT' from peer " 00269 << p->peeraddr << std::endl; 00270 00271 if (len < 3*sizeof(uint32_t)) 00272 { 00273 logme() 00274 << "ERROR: corrupt 'GET IMAGE' message of length " << len 00275 << " from peer " << p->peeraddr << std::endl; 00276 return false; 00277 } 00278 00279 uint32_t namelen; 00280 memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen)); 00281 if (len != 3*sizeof(uint32_t) + namelen) 00282 { 00283 logme() 00284 << "ERROR: corrupt 'GET OBJECT' message of length " << len 00285 << " from peer " << p->peeraddr 00286 << ", expected length " << (3*sizeof(uint32_t)) 00287 << " + " << namelen << std::endl; 00288 return false; 00289 } 00290 00291 lock(); 00292 std::string name((char *) data + 3*sizeof(uint32_t), namelen); 00293 Peer *owner = 0; 00294 Object *o = findObject(0, name, &owner); 00295 if (o) 00296 { 00297 o->lastreq = Time::current(); 00298 if (o->rawdata.empty()) 00299 waitForData(p, name, "", owner); 00300 else 00301 sendObjectToPeer(msg, *o, true); 00302 } 00303 else 00304 { 00305 uint32_t words[3]; 00306 words[0] = sizeof(words) + name.size(); 00307 words[1] = VIS_REPLY_NONE; 00308 words[2] = name.size(); 00309 00310 msg->data.reserve(msg->data.size() + words[0]); 00311 copydata(msg, &words[0], sizeof(words)); 00312 copydata(msg, &name[0], name.size()); 00313 } 00314 unlock(); 00315 } 00316 return true; 00317 00318 case VIS_REPLY_LIST_BEGIN: 00319 { 00320 if (len != 4*sizeof(uint32_t)) 00321 { 00322 logme() 00323 << "ERROR: corrupt 'LIST BEGIN' message of length " << len 00324 << " from peer " << p->peeraddr << std::endl; 00325 return false; 00326 } 00327 00328 if (debug_) 00329 logme() 00330 << "DEBUG: received message 'LIST BEGIN' from " 00331 << p->peeraddr << std::endl; 00332 00333 // Get the update status: whether this is a full update. 00334 uint32_t flags; 00335 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t)); 00336 00337 // If we are about to receive a full list of objects, flag all 00338 // objects dead. Subsequent object notifications will undo this 00339 // for the live objects. This tells us the object has been 00340 // removed, but we can keep making it available for a while if 00341 // there continues to be interest in it. 00342 if (flags) 00343 { 00344 lock(); 00345 markObjectsZombies(p); 00346 unlock(); 00347 } 00348 } 00349 return true; 00350 00351 case VIS_REPLY_LIST_END: 00352 { 00353 if (len != 4*sizeof(uint32_t)) 00354 { 00355 logme() 00356 << "ERROR: corrupt 'LIST END' message of length " << len 00357 << " from peer " << p->peeraddr << std::endl; 00358 return false; 00359 } 00360 00361 // Get the update status: whether this is a full update. 00362 uint32_t flags; 00363 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t)); 00364 00365 // If we received a full list of objects, flag all zombie objects 00366 // now dead. We need to do this in two stages in case we receive 00367 // updates in many parts, and end up sending updates to others in 00368 // between; this avoids us lying live objects are dead. 00369 if (flags) 00370 { 00371 lock(); 00372 markObjectsDead(p); 00373 unlock(); 00374 } 00375 00376 if (debug_) 00377 logme() 00378 << "DEBUG: received message 'LIST END' from " 00379 << p->peeraddr << std::endl; 00380 00381 // Indicate we have received another update from this peer. 00382 // Also indicate we should flush to our clients. 00383 flush_ = true; 00384 p->updates++; 00385 } 00386 return true; 00387 00388 case VIS_REPLY_OBJECT: 00389 { 00390 uint32_t words[7]; 00391 if (len < sizeof(words)) 00392 { 00393 logme() 00394 << "ERROR: corrupt 'OBJECT' message of length " << len 00395 << " from peer " << p->peeraddr << std::endl; 00396 return false; 00397 } 00398 00399 memcpy(&words[0], data, sizeof(words)); 00400 uint32_t &namelen = words[5]; 00401 uint32_t &datalen = words[6]; 00402 00403 if (len != sizeof(words) + namelen + datalen) 00404 { 00405 logme() 00406 << "ERROR: corrupt 'OBJECT' message of length " << len 00407 << " from peer " << p->peeraddr 00408 << ", expected length " << sizeof(words) 00409 << " + " << namelen 00410 << " + " << datalen 00411 << std::endl; 00412 return false; 00413 } 00414 00415 unsigned char *namedata = data + sizeof(words); 00416 unsigned char *objdata = namedata + namelen; 00417 unsigned char *enddata = objdata + datalen; 00418 std::string name((char *) namedata, namelen); 00419 assert(enddata == data + len); 00420 00421 if (debug_) 00422 logme() 00423 << "DEBUG: received message 'OBJECT " << name 00424 << "' from " << p->peeraddr << std::endl; 00425 00426 // Mark the peer as a known object source. 00427 p->source = true; 00428 00429 // Initialise or update an object entry. 00430 lock(); 00431 Object *o = findObject(p, name); 00432 if (! o) 00433 o = makeObject(p, name); 00434 00435 bool hadobject = ! o->rawdata.empty(); 00436 o->flags = words[2] | VIS_FLAG_RECEIVED; 00437 o->version = ((uint64_t) words[4] << 32 | words[3]); 00438 o->rawdata.clear(); 00439 o->rawdata.insert(o->rawdata.end(), objdata, enddata); 00440 00441 // If we had an object for this one already and this is a list 00442 // update without data, issue an immediate data get request. 00443 if (hadobject && ! datalen) 00444 requestObject(p, (namelen ? &name[0] : 0), namelen); 00445 00446 // If we have the object data, release from wait. 00447 if (datalen) 00448 releaseWaiters(o); 00449 unlock(); 00450 } 00451 return true; 00452 00453 case VIS_REPLY_NONE: 00454 { 00455 uint32_t words[3]; 00456 if (len < sizeof(words)) 00457 { 00458 logme() 00459 << "ERROR: corrupt 'NONE' message of length " << len 00460 << " from peer " << p->peeraddr << std::endl; 00461 return false; 00462 } 00463 00464 memcpy(&words[0], data, sizeof(words)); 00465 uint32_t &namelen = words[2]; 00466 00467 if (len != sizeof(words) + namelen) 00468 { 00469 logme() 00470 << "ERROR: corrupt 'NONE' message of length " << len 00471 << " from peer " << p->peeraddr 00472 << ", expected length " << sizeof(words) 00473 << " + " << namelen << std::endl; 00474 return false; 00475 } 00476 00477 unsigned char *namedata = data + sizeof(words); 00478 unsigned char *enddata = namedata + namelen; 00479 std::string name((char *) namedata, namelen); 00480 assert(enddata == data + len); 00481 00482 if (debug_) 00483 logme() 00484 << "DEBUG: received message 'NONE " << name 00485 << "' from " << p->peeraddr << std::endl; 00486 00487 // Mark the peer as a known object source. 00488 p->source = true; 00489 00490 // If this was a known object, update its entry. 00491 lock(); 00492 Object *o = findObject(p, name); 00493 if (o) 00494 o->flags |= VIS_FLAG_DEAD; 00495 00496 // If someone was waiting for this, let them go. 00497 releaseWaiters(o); 00498 unlock(); 00499 } 00500 return true; 00501 00502 default: 00503 logme() 00504 << "ERROR: unrecognised message of length " << len 00505 << " and type " << type << " from peer " << p->peeraddr 00506 << std::endl; 00507 return false; 00508 } 00509 }
bool IgNet::onPeerConnect | ( | lat::IOSelectEvent * | ev | ) | [private] |
Respond to new connections on the server socket.
Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false
always to tell the IOSelector to keep processing events for the server socket.
Definition at line 680 of file IgNet.cc.
References lat::Socket::accept(), arg, lat::IOSelector::attach(), lat::CreateHook(), createPeer(), debug_, lat::endl(), lat::InetAddress::hostname(), IORead, IOUrgent, lat::IOChannel::isBlocking(), logme(), onPeerData(), p, lat::InetAddress::port(), s, sel_, server_, and lat::IOSelectEvent::source.
Referenced by startLocalServer().
00681 { 00682 // Recover the server socket. 00683 assert(ev->source == server_); 00684 00685 // Accept the connection. 00686 Socket *s = server_->accept(); 00687 assert(s); 00688 assert(! s->isBlocking()); 00689 00690 // Record it to our list of peers. 00691 Peer *p = createPeer(s); 00692 InetAddress peeraddr = ((InetSocket *) s)->peername(); 00693 InetAddress myaddr = ((InetSocket *) s)->sockname(); 00694 p->peeraddr = StringFormat("%1:%2") 00695 .arg(peeraddr.hostname()) 00696 .arg(peeraddr.port()); 00697 p->mask = IORead|IOUrgent; 00698 p->socket = s; 00699 00700 // Report the new connection. 00701 if (debug_) 00702 logme() 00703 << "INFO: new peer " << p->peeraddr << " is now connected to " 00704 << myaddr.hostname() << ":" << myaddr.port() << std::endl; 00705 00706 // Attach it to the listener. 00707 sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p)); 00708 00709 // We are never done. 00710 return false; 00711 }
bool IgNet::onPeerData | ( | lat::IOSelectEvent * | ev, | |
Peer * | p | |||
) | [private] |
Handle communication to a particular client.
Definition at line 514 of file IgNet.cc.
References IgNet::Peer::automatic, b, data, IgNet::Bucket::data, debug_, discard(), e, lat::endl(), lat::SysErr::ErrTryAgain, lat::IOSelectEvent::events, getPeer(), IgNet::Peer::incoming, IORead, IOUrgent, IOWrite, len, logme(), losePeer(), IgNet::Peer::mask, MAX_PEER_WAITREQS, MESSAGE_SIZE_LIMIT, alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, lat::Error::next(), old, onMessage(), IgNet::Peer::peeraddr, lat::SystemError::portable(), lat::IOChannel::read(), sel_, IgNet::Peer::sendpos, IgNet::Peer::sendq, lat::IOSelector::setMask(), IgNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, lat::IOSelectEvent::source, TrackValidation_HighPurity_cff::valid, IgNet::Peer::waiting, and lat::IOChannel::write().
Referenced by onPeerConnect(), and run().
00515 { 00516 assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p); 00517 00518 // If there is a problem with the peer socket, discard the peer 00519 // and tell the selector to stop prcessing events for it. If 00520 // this is a server connection, we will eventually recreate 00521 // everything if/when the data server comes back. 00522 if (ev->events & IOUrgent) 00523 { 00524 if (p->automatic) 00525 { 00526 logme() 00527 << "WARNING: connection to the server at " << p->peeraddr 00528 << " lost (will attempt to reconnect in 15 seconds)\n"; 00529 return losePeer(0, p, ev); 00530 } 00531 else 00532 return losePeer("WARNING: lost peer connection ", p, ev); 00533 } 00534 00535 // If we can write to the peer socket, pump whatever we can into it. 00536 if (ev->events & IOWrite) 00537 { 00538 while (Bucket *b = p->sendq) 00539 { 00540 IOSize len = b->data.size() - p->sendpos; 00541 const void *data = (len ? (const void *)&b->data[p->sendpos] 00542 : (const void *)&data); 00543 IOSize done; 00544 00545 try 00546 { 00547 done = (len ? ev->source->write (data, len) : 0); 00548 if (debug_ && len) 00549 logme() 00550 << "DEBUG: sent " << done << " bytes to peer " 00551 << p->peeraddr << std::endl; 00552 } 00553 catch (Error &e) 00554 { 00555 return losePeer("WARNING: unable to write to peer ", 00556 p, ev, &e); 00557 } 00558 00559 p->sendpos += done; 00560 if (p->sendpos == b->data.size()) 00561 { 00562 Bucket *old = p->sendq; 00563 p->sendq = old->next; 00564 p->sendpos = 0; 00565 old->next = 0; 00566 discard(old); 00567 } 00568 00569 if (! done && len) 00570 // Cannot write any more. 00571 break; 00572 } 00573 } 00574 00575 // If there is data to be read from the peer, first receive what we 00576 // can get out the socket, the process all complete requests. 00577 if (ev->events & IORead) 00578 { 00579 // First build up the incoming buffer of data in the socket. 00580 // Remember the last size returned by the socket; we need 00581 // it to determine if the remote end closed the connection. 00582 IOSize sz; 00583 try 00584 { 00585 std::vector<unsigned char> buf(SOCKET_READ_SIZE); 00586 do 00587 if ((sz = ev->source->read(&buf[0], buf.size()))) 00588 { 00589 if (debug_) 00590 logme() 00591 << "DEBUG: received " << sz << " bytes from peer " 00592 << p->peeraddr << std::endl; 00593 DataBlob &data = p->incoming; 00594 if (data.capacity () < data.size () + sz) 00595 data.reserve (data.size() + SOCKET_READ_GROWTH); 00596 data.insert (data.end(), &buf[0], &buf[0] + sz); 00597 } 00598 while (sz == sizeof (buf)); 00599 } 00600 catch (Error &e) 00601 { 00602 SystemError *next = dynamic_cast<SystemError *>(e.next()); 00603 if (next && next->portable() == SysErr::ErrTryAgain) 00604 sz = 1; // Ignore it, and fake no end of data. 00605 else 00606 // Houston we have a problem. 00607 return losePeer("WARNING: failed to read from peer ", 00608 p, ev, &e); 00609 } 00610 00611 // Process fully received messages as long as we can. 00612 size_t consumed = 0; 00613 DataBlob &data = p->incoming; 00614 while (data.size()-consumed >= sizeof(uint32_t) 00615 && p->waiting < MAX_PEER_WAITREQS) 00616 { 00617 uint32_t msglen; 00618 memcpy (&msglen, &data[0]+consumed, sizeof(msglen)); 00619 00620 if (msglen >= MESSAGE_SIZE_LIMIT) 00621 return losePeer("WARNING: excessively large message from ", p, ev); 00622 00623 if (data.size()-consumed >= msglen) 00624 { 00625 bool valid = true; 00626 if (msglen < 2*sizeof(uint32_t)) 00627 { 00628 logme() 00629 << "ERROR: corrupt peer message of length " << msglen 00630 << " from peer " << p->peeraddr << std::endl; 00631 valid = false; 00632 } 00633 else 00634 { 00635 // Decode and process this message. 00636 Bucket msg; 00637 msg.next = 0; 00638 valid = onMessage(&msg, p, &data[0]+consumed, msglen); 00639 00640 // If we created a response, chain it to the write queue. 00641 if (! msg.data.empty()) 00642 { 00643 Bucket **prev = &p->sendq; 00644 while (*prev) 00645 prev = &(*prev)->next; 00646 00647 *prev = new Bucket; 00648 (*prev)->next = 0; 00649 (*prev)->data.swap(msg.data); 00650 } 00651 } 00652 00653 if (! valid) 00654 return losePeer("WARNING: data stream error with ", p, ev); 00655 00656 consumed += msglen; 00657 } 00658 else 00659 break; 00660 } 00661 00662 data.erase(data.begin(), data.begin()+consumed); 00663 00664 // If the client has closed the connection, shut down our end. If 00665 // we have something to send back still, leave the write direction 00666 // open. Otherwise close the shop for this client. 00667 if (sz == 0) 00668 sel_.setMask(p->socket, p->mask &= ~IORead); 00669 } 00670 00671 // Yes, please keep processing events for this socket. 00672 return false; 00673 }
Definition at line 1181 of file IgNet.cc.
References debug_, lat::endl(), IgNet::Object::flags, IgNet::Object::lastreq, logme(), IgNet::Object::name, peers_, pi, IgNet::Object::rawdata, IgNet::Object::version, VIS_FLAG_DEAD, and VIS_FLAG_SCALAR.
Referenced by run().
01182 { 01183 PeerMap::iterator pi, pe; 01184 ObjectMap::iterator oi, oe; 01185 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01186 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ) 01187 { 01188 Object &o = oi->second; 01189 01190 // Compact non-scalar objects that are unused. We send scalar 01191 // objects to the web server so we keep them around. 01192 if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR)) 01193 { 01194 if (debug_) 01195 logme() 01196 << "DEBUG: compacting idle '" << o.name 01197 << "' from " << pi->second.peeraddr << std::endl; 01198 } 01199 01200 // Remove if dead, old and unused. 01201 if (o.lastreq < deadobj 01202 && o.version < deadobj 01203 && (o.flags & VIS_FLAG_DEAD)) 01204 { 01205 if (debug_) 01206 logme() 01207 << "DEBUG: removing dead '" << o.name 01208 << "' from " << pi->second.peeraddr << std::endl; 01209 01210 pi->second.objs.erase(oi++); 01211 } 01212 else 01213 ++oi; 01214 } 01215 }
int IgNet::receive | ( | void(*)(void *arg, uint32_t reason, Object &obj) | callback, | |
void * | arg | |||
) | [virtual] |
Definition at line 1335 of file IgNet.cc.
References IgNet::Object::flags, local_, lock(), IgNet::Peer::objs, p, peers_, pi, unlock(), IgNet::Peer::updates, VIS_FLAG_DEAD, and VIS_FLAG_RECEIVED.
01336 { 01337 int updates = 0; 01338 01339 lock(); 01340 PeerMap::iterator pi, pe; 01341 ObjectMap::iterator oi, oe; 01342 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01343 { 01344 Peer &p = pi->second; 01345 if (&p == local_) 01346 continue; 01347 01348 updates += p.updates; 01349 01350 for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; ) 01351 { 01352 Object &o = oi->second; 01353 if (o.flags & VIS_FLAG_DEAD) 01354 { 01355 callback(arg, VIS_FLAG_DEAD, o); 01356 p.objs.erase(oi++); 01357 } 01358 else if (o.flags & VIS_FLAG_RECEIVED) 01359 { 01360 callback(arg, VIS_FLAG_RECEIVED, o); 01361 o.flags &= ~VIS_FLAG_RECEIVED; 01362 ++oi; 01363 } 01364 else 01365 ++oi; 01366 } 01367 } 01368 unlock(); 01369 01370 return updates; 01371 }
Definition at line 133 of file IgNet.cc.
References alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, releaseFromWait(), and waiting_.
00134 { 00135 Bucket **msg = &i->peer->sendq; 00136 while (*msg) 00137 msg = &(*msg)->next; 00138 *msg = new Bucket; 00139 (*msg)->next = 0; 00140 00141 releaseFromWait(*msg, *i, o); 00142 00143 assert(i->peer->waiting > 0); 00144 i->peer->waiting--; 00145 waiting_.erase(i); 00146 }
void IgNet::releaseFromWait | ( | Bucket * | msg, | |
WaitObject & | w, | |||
Object * | o | |||
) | [protected, virtual] |
Definition at line 170 of file IgNet.cc.
References copydata(), IgNet::Bucket::data, IgNet::WaitObject::name, sendObjectToPeer(), and VIS_REPLY_NONE.
Referenced by releaseFromWait(), releaseWaiters(), and run().
00171 { 00172 if (o) 00173 sendObjectToPeer (msg, *o, true); 00174 else 00175 { 00176 uint32_t words[3]; 00177 words[0] = sizeof(words) + w.name.size(); 00178 words[1] = VIS_REPLY_NONE; 00179 words[2] = w.name.size(); 00180 00181 msg->data.reserve(msg->data.size() + words[0]); 00182 copydata(msg, &words[0], sizeof(words)); 00183 copydata(msg, &w.name[0], w.name.size()); 00184 } 00185 }
Definition at line 150 of file IgNet.cc.
References e, i, IgNet::Object::name, releaseFromWait(), and waiting_.
Referenced by onMessage().
00151 { 00152 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 00153 if (i->name == o->name) 00154 releaseFromWait(i++, o); 00155 else 00156 ++i; 00157 }
void IgNet::removeLocalObject | ( | const std::string & | path | ) | [virtual] |
Delete the local object.
The caller must call sendLocalChanges() later to push out the changes.
Definition at line 1393 of file IgNet.cc.
References local_, and IgNet::Peer::objs.
Referenced by edm::service::IguanaService::produceEvent().
void IgNet::removePeer | ( | Peer * | p, | |
lat::Socket * | s | |||
) | [protected, virtual] |
Definition at line 1242 of file IgNet.cc.
References IgNet::Peer::objs, peers_, and sendLocalChanges().
Referenced by losePeer().
01243 { 01244 bool needflush = ! p->objs.empty(); 01245 01246 p->objs.clear(); 01247 peers_.erase(s); 01248 01249 // If we removed a peer with objects, our list of objects 01250 // has changed and we need to update downstream peers. 01251 if (needflush) 01252 sendLocalChanges(); 01253 }
Queue an object request to the data server.
Definition at line 97 of file IgNet.cc.
References copydata(), alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, IgNet::Peer::sendq, and VIS_MSG_GET_OBJECT.
Referenced by onMessage(), and waitForData().
00098 { 00099 Bucket **msg = &p->sendq; 00100 while (*msg) 00101 msg = &(*msg)->next; 00102 *msg = new Bucket; 00103 (*msg)->next = 0; 00104 00105 uint32_t words[3]; 00106 words[0] = sizeof(words) + len; 00107 words[1] = VIS_MSG_GET_OBJECT; 00108 words[2] = len; 00109 (*msg)->data.reserve((*msg)->data.size() + words[0]); 00110 copydata(*msg, words, sizeof(words)); 00111 copydata(*msg, name, len); 00112 }
Run the actual I/O processing loop.
Definition at line 964 of file IgNet.cc.
References lat::Socket::abort(), arg, lat::IOSelector::attach(), IgNet::Peer::automatic, lat::InetSocket::connect(), copydata(), lat::CreateHook(), createPeer(), debug_, delay_, lat::IOSelector::dispatch(), downstream_, e, lat::endl(), lat::SysErr::ErrOperationInProgress, lat::Error::explain(), findObject(), flush_, IgNet::AutoPeer::host, lat::InetAddress::hostname(), i, IORead, IOUrgent, IOWrite, lock(), logme(), IgNet::Peer::mask, IgNet::Bucket::next, IgNet::AutoPeer::next, lat::Error::next(), onPeerData(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, p, IgNet::AutoPeer::peer, IgNet::Peer::peeraddr, IgNet::AutoPeer::port, lat::InetAddress::port(), lat::SystemError::portable(), purgeDeadObjects(), releaseFromWait(), s, sel_, sendObjectListToPeers(), IgNet::Peer::sendq, lat::IOChannel::setBlocking(), lat::Socket::setopt(), shouldStop(), IgNet::Peer::socket, SOCKET_BUF_SIZE, lat::SocketConst::TypeStream, unlock(), IgNet::Peer::update, IgNet::AutoPeer::update, updatePeerMasks(), upstream_, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, waiting_, and IgNet::AutoPeer::warned.
00965 { 00966 Time now; 00967 AutoPeer *automatic[2] = { &upstream_, &downstream_ }; 00968 00969 // Perform I/O. Every once in a while flush updates to peers. 00970 while (! shouldStop()) 00971 { 00972 for (int i = 0; i < 2; ++i) 00973 { 00974 AutoPeer *ap = automatic[i]; 00975 00976 // If we need a server connection and don't have one yet, 00977 // initiate asynchronous connection creation. Swallow errors 00978 // in case the server won't talk to us. 00979 if (! ap->host.empty() 00980 && ! ap->peer 00981 && (now = Time::current()) > ap->next) 00982 { 00983 ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0); 00984 InetSocket *s = 0; 00985 try 00986 { 00987 s = new InetSocket (SocketConst::TypeStream); 00988 s->setBlocking (false); 00989 s->connect(InetAddress (ap->host.c_str(), ap->port)); 00990 s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE); 00991 s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE); 00992 } 00993 catch (Error &e) 00994 { 00995 SystemError *sys = dynamic_cast<SystemError *>(e.next()); 00996 if (! sys || sys->portable() != SysErr::ErrOperationInProgress) 00997 { 00998 // "In progress" just means the connection is in progress. 00999 // The connection is ready when the socket is writeable. 01000 // Anything else is a real problem. 01001 if (! ap->warned) 01002 { 01003 logme() 01004 << "NOTE: server at " << ap->host << ":" << ap->port 01005 << " is unavailable. Connection will be established" 01006 << " automatically on the background once the server" 01007 << " becomes available. Error from the attempt was: " 01008 << e.explain() << '\n'; 01009 ap->warned = true; 01010 } 01011 01012 if (s) 01013 s->abort(); 01014 delete s; 01015 s = 0; 01016 } 01017 } 01018 01019 // Set up with the selector if we were successful. If this is 01020 // the upstream collector, queue a request for updates. 01021 if (s) 01022 { 01023 lock(); 01024 Peer *p = createPeer(s); 01025 ap->peer = p; 01026 ap->warned = false; 01027 unlock(); 01028 01029 InetAddress peeraddr = ((InetSocket *) s)->peername(); 01030 InetAddress myaddr = ((InetSocket *) s)->sockname(); 01031 p->peeraddr = StringFormat("%1:%2") 01032 .arg(peeraddr.hostname()) 01033 .arg(peeraddr.port()); 01034 p->mask = IORead|IOWrite|IOUrgent; 01035 p->update = ap->update; 01036 p->automatic = ap; 01037 p->socket = s; 01038 sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p)); 01039 if (ap == &upstream_) 01040 { 01041 uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS, 01042 2*sizeof(uint32_t), VIS_MSG_UPDATE_ME }; 01043 p->sendq = new Bucket; 01044 p->sendq->next = 0; 01045 copydata(p->sendq, words, sizeof(words)); 01046 } 01047 01048 // Report the new connection. 01049 if (debug_) 01050 logme() 01051 << "INFO: now connected to " << p->peeraddr << " from " 01052 << myaddr.hostname() << ":" << myaddr.port() << std::endl; 01053 } 01054 } 01055 } 01056 01057 // Pump events for a while. 01058 sel_.dispatch(delay_); 01059 now = Time::current(); 01060 01061 // Check if flush is required. Flush only if one is needed. 01062 // Always sends the full object list. Compact objects no longer 01063 // in active use before sending out the update. 01064 if (flush_) 01065 { 01066 flush_ = false; 01067 01068 lock(); 01069 purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0), 01070 now - TimeSpan(0, 0, 20 /* minutes */, 0, 0)); 01071 sendObjectListToPeers(true); 01072 unlock(); 01073 } 01074 01075 // Update the data server and peer selection masks. If we 01076 // have no more data to send and listening for writes, remove 01077 // the write mask. If we have something to write and aren't 01078 // listening for writes, start listening so we can send off 01079 // the data. 01080 updatePeerMasks(); 01081 01082 // Release peers that have been waiting for data for too long. 01083 lock(); 01084 Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0); 01085 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 01086 { 01087 // If the peer has waited for too long, send something. 01088 if (i->time < waitold) 01089 releaseFromWait(i++, findObject(0, i->name)); 01090 01091 // Keep it for now. 01092 else 01093 ++i; 01094 } 01095 unlock(); 01096 } 01097 }
Definition at line 1102 of file IgNet.cc.
References lat::Pipe::sink(), wakeup_, and lat::IOChannel::write().
Referenced by edm::service::IguanaService::produceEvent(), and removePeer().
Send all objects to a peer and optionally mark sent objects old.
Definition at line 1257 of file IgNet.cc.
References copydata(), IgNet::Bucket::data, peers_, pi, sendObjectToPeer(), VIS_FLAG_NEW, VIS_REPLY_LIST_BEGIN, and VIS_REPLY_LIST_END.
Referenced by onMessage(), and sendObjectListToPeers().
01258 { 01259 PeerMap::iterator pi, pe; 01260 ObjectMap::iterator oi, oe; 01261 uint32_t numobjs = 0; 01262 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01263 numobjs += pi->second.objs.size(); 01264 01265 msg->data.reserve(msg->data.size() + 300*numobjs); 01266 01267 uint32_t nupdates = 0; 01268 uint32_t words[4]; 01269 words[0] = sizeof(words); 01270 words[1] = VIS_REPLY_LIST_BEGIN; 01271 words[2] = numobjs; 01272 words[3] = all; 01273 copydata(msg, &words[0], sizeof(words)); 01274 01275 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01276 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi) 01277 if (all || (oi->second.flags & VIS_FLAG_NEW)) 01278 { 01279 sendObjectToPeer(msg, oi->second, false); 01280 if (clear) 01281 oi->second.flags &= ~VIS_FLAG_NEW; 01282 ++nupdates; 01283 } 01284 01285 words[1] = VIS_REPLY_LIST_END; 01286 words[2] = nupdates; 01287 copydata(msg, &words[0], sizeof(words)); 01288 }
Definition at line 1291 of file IgNet.cc.
References IgNet::Bucket::data, debug_, e, lat::endl(), i, logme(), alivecheck_mergeAndRegister::msg, IgNet::Bucket::next, p, IgNet::Peer::peeraddr, peers_, sendObjectListToPeer(), IgNet::Peer::sendq, IgNet::Peer::update, and IgNet::Peer::updated.
Referenced by run().
01292 { 01293 PeerMap::iterator i, e; 01294 for (i = peers_.begin(), e = peers_.end(); i != e; ++i) 01295 { 01296 Peer &p = i->second; 01297 if (! p.update) 01298 continue; 01299 01300 if (debug_) 01301 logme() 01302 << "DEBUG: notifying " << p.peeraddr << std::endl; 01303 01304 Bucket msg; 01305 msg.next = 0; 01306 sendObjectListToPeer(&msg, !p.updated || all, true); 01307 01308 if (! msg.data.empty()) 01309 { 01310 Bucket **prev = &p.sendq; 01311 while (*prev) 01312 prev = &(*prev)->next; 01313 01314 *prev = new Bucket; 01315 (*prev)->next = 0; 01316 (*prev)->data.swap(msg.data); 01317 } 01318 p.updated = true; 01319 } 01320 }
Definition at line 191 of file IgNet.cc.
References copydata(), IgNet::Bucket::data, IgNet::Object::flags, flags, IgNet::Object::name, IgNet::Object::rawdata, IgNet::Object::version, VIS_FLAG_SCALAR, VIS_FLAG_ZOMBIE, and VIS_REPLY_OBJECT.
Referenced by onMessage(), releaseFromWait(), and sendObjectListToPeer().
00192 { 00193 uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE; 00194 DataBlob objdata; 00195 00196 if (data || (flags & VIS_FLAG_SCALAR)) 00197 objdata.insert(objdata.end(), 00198 &o.rawdata[0], 00199 &o.rawdata[0] + o.rawdata.size()); 00200 00201 uint32_t words[7]; 00202 uint32_t namelen = o.name.size(); 00203 uint32_t datalen = objdata.size(); 00204 00205 words[0] = sizeof(words) + namelen + datalen; 00206 words[1] = VIS_REPLY_OBJECT; 00207 words[2] = flags; 00208 words[3] = (o.version >> 0 ) & 0xffffffff; 00209 words[4] = (o.version >> 32) & 0xffffffff; 00210 words[5] = namelen; 00211 words[6] = datalen; 00212 00213 msg->data.reserve(msg->data.size() + words[0]); 00214 copydata(msg, &words[0], sizeof(words)); 00215 if (namelen) 00216 copydata(msg, &o.name[0], namelen); 00217 if (datalen) 00218 copydata(msg, &objdata[0], datalen); 00219 }
Stop the network layer and wait it to finish.
Definition at line 908 of file IgNet.cc.
References communicate_, and shutdown_.
00909 { 00910 shutdown_ = 1; 00911 if (communicate_ != (pthread_t) -1) 00912 pthread_join(communicate_, 0); 00913 }
Start running the network layer in a new thread.
This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.
Definition at line 949 of file IgNet.cc.
References communicate(), communicate_, lock_, logme(), and pthread_create.
Referenced by edm::service::IguanaService::init().
00950 { 00951 if (communicate_ != (pthread_t) -1) 00952 { 00953 logme() 00954 << "ERROR: Shared memory networking thread has already been started\n"; 00955 return; 00956 } 00957 00958 pthread_mutex_init(&lock_, 0); 00959 pthread_create(&communicate_, 0, &communicate, this); 00960 }
Start a server socket for accessing this node remotely.
Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.
Definition at line 833 of file IgNet.cc.
References lat::IOSelector::attach(), lat::Error::clone(), lat::CreateHook(), e, lat::endl(), lat::Error::explain(), IOAccept, logme(), onPeerConnect(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, sel_, server_, lat::IOChannel::setBlocking(), lat::Socket::setopt(), and SOCKET_BUF_SIZE.
Referenced by IguanaNetProducer::IguanaNetProducer().
00834 { 00835 if (server_) 00836 { 00837 logme() << "ERROR: server was already started.\n"; 00838 return; 00839 } 00840 00841 try 00842 { 00843 server_ = new InetServerSocket(InetAddress (port), 10); 00844 server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE); 00845 server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE); 00846 server_->setBlocking(false); 00847 sel_.attach(server_, IOAccept, CreateHook(this, &IgNet::onPeerConnect)); 00848 } 00849 catch (Error &e) 00850 { 00851 // FIXME: Do we need to do this when we throw an exception anyway? 00852 // FIXME: Abort instead? 00853 logme() 00854 << "ERROR: Failed to start server at port " << port << ": " 00855 << e.explain() << std::endl; 00856 00857 throw IgNetError("Failed to start server at port ", e.clone()); 00858 00859 // FIXME: Throw something simpler that removes the dependency? 00860 // throw cms::Exception("IgNet::startLocalServer") 00861 // << "Failed to start server at port " << port << ": " 00862 // << e.explain(); 00863 } 00864 00865 logme() << "INFO: Shared memory server started at port " << port << std::endl; 00866 }
Release the lock on the net layer.
Definition at line 939 of file IgNet.cc.
References communicate_, and lock_.
Referenced by onMessage(), edm::service::IguanaService::produceEvent(), receive(), and run().
00940 { 00941 if (communicate_ != (pthread_t) -1) 00942 pthread_mutex_unlock(&lock_); 00943 }
Update the network cache for an object.
The caller must call sendLocalChanges() later to push out the changes.
Definition at line 1376 of file IgNet.cc.
References IgNet::Object::flags, local_, IgNet::Object::name, IgNet::Peer::objs, IgNet::Object::rawdata, std::swap(), and IgNet::Object::version.
Referenced by edm::service::IguanaService::produceEvent().
01377 { 01378 ObjectMap::iterator pos = local_->objs.find(o.name); 01379 if (pos == local_->objs.end()) 01380 local_->objs.insert(ObjectMap::value_type(o.name, o)); 01381 else 01382 { 01383 std::swap(pos->second.version, o.version); 01384 std::swap(pos->second.flags, o.flags); 01385 std::swap(pos->second.rawdata, o.rawdata); 01386 pos->second.lastreq = 0; 01387 } 01388 }
Update the selector mask for a peer based on data queues.
Close the connection if there is no reason to maintain it open.
Definition at line 751 of file IgNet.cc.
References debug_, lat::endl(), IOUrgent, IOWrite, logme(), losePeer(), IgNet::Peer::mask, IgNet::Peer::peeraddr, sel_, IgNet::Peer::sendq, lat::IOSelector::setMask(), IgNet::Peer::socket, and IgNet::Peer::waiting.
Referenced by updatePeerMasks().
00752 { 00753 if (! p->socket) 00754 return; 00755 00756 // Listen to writes iff we have data to send. 00757 unsigned oldmask = p->mask; 00758 if (! p->sendq && (p->mask & IOWrite)) 00759 sel_.setMask(p->socket, p->mask &= ~IOWrite); 00760 00761 if (p->sendq && ! (p->mask & IOWrite)) 00762 sel_.setMask(p->socket, p->mask |= IOWrite); 00763 00764 if (debug_ && oldmask != p->mask) 00765 logme() 00766 << "DEBUG: updating mask for " << p->peeraddr << " to " 00767 << p->mask << " from " << oldmask << std::endl; 00768 00769 // If we have nothing more to send and are no longer listening 00770 // for reads, close up the shop for this peer. 00771 if (p->mask == IOUrgent && ! p->waiting) 00772 { 00773 assert(! p->sendq); 00774 if (debug_) 00775 logme() << "INFO: connection closed to " << p->peeraddr << std::endl; 00776 losePeer(0, p, 0); 00777 } 00778 }
Tell the network layer to connect to host and port and automatically send updates whenever local data changes.
Must be called before calling run() or start().
Definition at line 872 of file IgNet.cc.
References downstream_, lat::endl(), IgNet::AutoPeer::host, logme(), IgNet::AutoPeer::port, and IgNet::AutoPeer::update.
00873 { 00874 if (! downstream_.host.empty()) 00875 { 00876 logme() 00877 << "ERROR: Already updating another collector at " 00878 << downstream_.host << ":" << downstream_.port << std::endl; 00879 return; 00880 } 00881 00882 downstream_.update = true; 00883 downstream_.host = host; 00884 downstream_.port = port; 00885 }
void IgNet::waitForData | ( | Peer * | p, | |
const std::string & | name, | |||
const std::string & | info, | |||
Peer * | owner | |||
) | [protected] |
Queue a request for an object and put a peer into the mode of waiting for object data to appear.
Definition at line 117 of file IgNet.cc.
References requestObject(), IgNet::Peer::waiting, and waiting_.
Referenced by onMessage().
00118 { 00119 // FIXME: Should we automatically record which exact peer the waiter 00120 // is expecting to deliver data so we know to release the waiter if 00121 // the other peer vanishes? The current implementation stands a 00122 // chance for the waiter to wait indefinitely -- although we do 00123 // force terminate the wait after a while. 00124 requestObject(owner, name.size() ? &name[0] : 0, name.size()); 00125 WaitObject wo = { Time::current(), name, info, p }; 00126 waiting_.push_back(wo); 00127 p->waiting++; 00128 }
std::string IgNet::appname_ [private] |
pthread_t IgNet::communicate_ [private] |
bool IgNet::debug_ [protected] |
Definition at line 146 of file IgNet.h.
Referenced by debug(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), and updateMask().
int IgNet::delay_ [private] |
AutoPeer IgNet::downstream_ [private] |
bool IgNet::flush_ [private] |
Peer* IgNet::local_ [private] |
Definition at line 174 of file IgNet.h.
Referenced by IgNet(), receive(), removeLocalObject(), and updateLocalObject().
pthread_mutex_t IgNet::lock_ [private] |
const uint32_t IgNet::MAX_PEER_WAITREQS = 128 [static] |
PeerMap IgNet::peers_ [private] |
Definition at line 170 of file IgNet.h.
Referenced by createPeer(), findObject(), getPeer(), purgeDeadObjects(), receive(), removePeer(), sendObjectListToPeer(), sendObjectListToPeers(), and updatePeerMasks().
int IgNet::pid_ [private] |
lat::IOSelector IgNet::sel_ [private] |
Definition at line 165 of file IgNet.h.
Referenced by IgNet(), losePeer(), onPeerConnect(), onPeerData(), run(), startLocalServer(), and updateMask().
lat::InetServerSocket* IgNet::server_ [private] |
sig_atomic_t IgNet::shutdown_ [private] |
AutoPeer IgNet::upstream_ [private] |
lat::Time IgNet::version_ [private] |
const uint32_t IgNet::VIS_FLAG_DEAD = 0x40000000 [static] |
Definition at line 34 of file IgNet.h.
Referenced by markObjectsDead(), onMessage(), purgeDeadObjects(), and receive().
const uint32_t IgNet::VIS_FLAG_NEW = 0x20000000 [static] |
Definition at line 33 of file IgNet.h.
Referenced by edm::service::IguanaService::produceEvent(), and sendObjectListToPeer().
const uint32_t IgNet::VIS_FLAG_RECEIVED = 0x10000000 [static] |
const uint32_t IgNet::VIS_FLAG_SCALAR = 0x1 [static] |
Definition at line 31 of file IgNet.h.
Referenced by edm::service::IguanaService::produceEvent(), purgeDeadObjects(), and sendObjectToPeer().
const uint32_t IgNet::VIS_FLAG_ZOMBIE = 0x80000000 [static] |
Definition at line 35 of file IgNet.h.
Referenced by markObjectsDead(), markObjectsZombies(), and sendObjectToPeer().
const uint32_t IgNet::VIS_MSG_GET_OBJECT = 3 [static] |
const uint32_t IgNet::VIS_MSG_HELLO = 0 [static] |
const uint32_t IgNet::VIS_MSG_LIST_OBJECTS = 2 [static] |
const uint32_t IgNet::VIS_MSG_UPDATE_ME = 1 [static] |
const uint32_t IgNet::VIS_REPLY_LIST_BEGIN = 101 [static] |
const uint32_t IgNet::VIS_REPLY_LIST_END = 102 [static] |
const uint32_t IgNet::VIS_REPLY_NONE = 103 [static] |
const uint32_t IgNet::VIS_REPLY_OBJECT = 104 [static] |
WaitList IgNet::waiting_ [private] |
Definition at line 173 of file IgNet.h.
Referenced by losePeer(), releaseFromWait(), releaseWaiters(), run(), and waitForData().
lat::Pipe IgNet::wakeup_ [private] |