#include <VisFramework/VisFrameworkBase/interface/VisNet.h>
Public Types | |
typedef std::vector< unsigned char > | DataBlob |
typedef std::map< std::string, Object > | ObjectMap |
typedef std::map< lat::Socket *, Peer > | PeerMap |
typedef std::list< WaitObject > | WaitList |
Public Member Functions | |
void | debug (bool doit) |
Enable or disable verbose debugging. | |
void | delay (int delay) |
Set the I/O dispatching delay. | |
void | listenToSource (const std::string &host, int port) |
Tell the network layer to connect to host and port and automatically receive updates from upstream source(s). | |
void | lock (void) |
Acquire a lock on the net layer. | |
virtual int | receive (void(*callback)(void *arg, uint32_t reason, Object &obj), void *arg) |
virtual void | removeLocalObject (const std::string &name) |
Delete the local object. | |
void | run (void) |
Run the actual I/O processing loop. | |
void | sendLocalChanges (void) |
void | shutdown (void) |
Stop the network layer and wait it to finish. | |
void | start (void) |
Start running the network layer in a new thread. | |
void | startLocalServer (int port) |
Start a server socket for accessing this node remotely. | |
void | unlock (void) |
Release the lock on the net layer. | |
virtual void | updateLocalObject (Object &o) |
Update the network cache for an object. | |
void | updateToCollector (const std::string &host, int port) |
Tell the network layer to connect to host and port and automatically send updates whenever local data changes. | |
VisNet (const std::string &appname="") | |
virtual | ~VisNet (void) |
Static Public Attributes | |
static const uint32_t | MAX_PEER_WAITREQS = 128 |
static const uint32_t | VIS_FLAG_DEAD = 0x40000000 |
static const uint32_t | VIS_FLAG_NEW = 0x20000000 |
static const uint32_t | VIS_FLAG_RECEIVED = 0x10000000 |
static const uint32_t | VIS_FLAG_SCALAR = 0x1 |
static const uint32_t | VIS_FLAG_ZOMBIE = 0x80000000 |
static const uint32_t | VIS_MSG_GET_OBJECT = 3 |
static const uint32_t | VIS_MSG_HELLO = 0 |
static const uint32_t | VIS_MSG_LIST_OBJECTS = 2 |
static const uint32_t | VIS_MSG_UPDATE_ME = 1 |
static const uint32_t | VIS_REPLY_LIST_BEGIN = 101 |
static const uint32_t | VIS_REPLY_LIST_END = 102 |
static const uint32_t | VIS_REPLY_NONE = 103 |
static const uint32_t | VIS_REPLY_OBJECT = 104 |
Protected Member Functions | |
virtual Peer * | createPeer (lat::Socket *s) |
virtual Object * | findObject (Peer *p, const std::string &name, Peer **owner=0) |
virtual Peer * | getPeer (lat::Socket *s) |
std::ostream & | logme (void) |
virtual Object * | makeObject (Peer *p, const std::string &name) |
virtual void | markObjectsDead (Peer *p) |
virtual void | markObjectsZombies (Peer *p) |
virtual bool | onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len) |
virtual void | purgeDeadObjects (lat::Time oldobj, lat::Time deadobj) |
virtual void | releaseFromWait (Bucket *msg, WaitObject &w, Object *o) |
virtual void | removePeer (Peer *p, lat::Socket *s) |
virtual void | sendObjectListToPeer (Bucket *msg, bool all, bool clear) |
Send all objects to a peer and optionally mark sent objects old. | |
virtual void | sendObjectListToPeers (bool all) |
void | sendObjectToPeer (Bucket *msg, Object &o, bool data) |
virtual bool | shouldStop (void) |
void | updateMask (Peer *p) |
Update the selector mask for a peer based on data queues. | |
virtual void | updatePeerMasks (void) |
void | waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner) |
Queue a request for an object and put a peer into the mode of waiting for object data to appear. | |
Static Protected Member Functions | |
static void | copydata (Bucket *b, const void *data, size_t len) |
Protected Attributes | |
bool | debug_ |
Private Member Functions | |
bool | losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0) |
Handle errors with a peer socket. | |
bool | onLocalNotify (lat::IOSelectEvent *ev) |
React to notifications from the app thread. | |
bool | onPeerConnect (lat::IOSelectEvent *ev) |
Respond to new connections on the server socket. | |
bool | onPeerData (lat::IOSelectEvent *ev, Peer *p) |
Handle communication to a particular client. | |
VisNet & | operator= (const VisNet &) |
void | releaseFromWait (WaitList::iterator i, Object *o) |
void | releaseWaiters (Object *o) |
void | requestObject (Peer *p, const char *name, size_t len) |
Queue an object request to the data server. | |
VisNet (const VisNet &) | |
Static Private Member Functions | |
static void | discard (Bucket *&b) |
Private Attributes | |
std::string | appname_ |
pthread_t | communicate_ |
int | delay_ |
AutoPeer | downstream_ |
bool | flush_ |
Peer * | local_ |
pthread_mutex_t | lock_ |
PeerMap | peers_ |
int | pid_ |
lat::IOSelector | sel_ |
lat::InetServerSocket * | server_ |
sig_atomic_t | shutdown_ |
AutoPeer | upstream_ |
lat::Time | version_ |
WaitList | waiting_ |
lat::Pipe | wakeup_ |
Classes | |
struct | AutoPeer |
struct | Bucket |
struct | Object |
struct | Peer |
struct | WaitObject |
Definition at line 18 of file VisNet.h.
typedef std::vector<unsigned char> VisNet::DataBlob |
typedef std::map<std::string, Object> VisNet::ObjectMap |
typedef std::map<lat::Socket *, Peer> VisNet::PeerMap |
typedef std::list<WaitObject> VisNet::WaitList |
VisNet::VisNet | ( | const std::string & | appname = "" |
) |
Definition at line 781 of file VisNet.cc.
References lat::IOSelector::attach(), lat::CreateHook(), createPeer(), downstream_, lat::IOChannel::fd(), IORead, local_, VisNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), VisNet::AutoPeer::peer, VisNet::AutoPeer::port, sel_, lat::Pipe::source(), VisNet::AutoPeer::update, upstream_, wakeup_, and VisNet::AutoPeer::warned.
00782 : debug_ (false), 00783 appname_ (appname.empty() ? "VisNet" : appname.c_str()), 00784 pid_ (getpid()), 00785 server_ (0), 00786 version_ (Time::current()), 00787 communicate_ ((pthread_t) -1), 00788 shutdown_ (0), 00789 delay_ (1000), 00790 flush_ (false) 00791 { 00792 // Create a pipe for the local apps to tell the communicator 00793 // thread that local app data has changed and that the peers 00794 // should be notified. 00795 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK); 00796 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &VisNet::onLocalNotify)); 00797 00798 // Initialise the upstream and downstream to empty. 00799 upstream_.peer = downstream_.peer = 0; 00800 upstream_.next = downstream_.next = 0; 00801 upstream_.port = downstream_.port = 0; 00802 upstream_.update = downstream_.update = false; 00803 upstream_.warned = downstream_.warned = false; 00804 00805 local_ = createPeer((Socket *) -1); 00806 }
VisNet::~VisNet | ( | void | ) | [virtual] |
VisNet::VisNet | ( | const VisNet & | ) | [private] |
Definition at line 38 of file VisNet.cc.
References VisNet::Bucket::data.
Referenced by onMessage(), releaseFromWait(), requestObject(), run(), sendObjectListToPeer(), and sendObjectToPeer().
00039 { 00040 b->data.insert(b->data.end(), 00041 (const unsigned char *)data, 00042 (const unsigned char *)data + len); 00043 }
VisNet::Peer * VisNet::createPeer | ( | lat::Socket * | s | ) | [protected, virtual] |
Definition at line 1223 of file VisNet.cc.
References VisNet::Peer::automatic, VisNet::Peer::mask, p, peers_, VisNet::Peer::sendpos, VisNet::Peer::sendq, VisNet::Peer::socket, VisNet::Peer::source, VisNet::Peer::update, VisNet::Peer::updated, VisNet::Peer::updates, and VisNet::Peer::waiting.
Referenced by onPeerConnect(), run(), and VisNet().
01224 { 01225 Peer *p = &peers_[s]; 01226 p->socket = 0; 01227 p->sendq = 0; 01228 p->sendpos = 0; 01229 p->mask = 0; 01230 p->source = false; 01231 p->update = false; 01232 p->updated = false; 01233 p->updates = 0; 01234 p->waiting = 0; 01235 p->automatic = 0; 01236 return p; 01237 }
Definition at line 47 of file VisNet.cc.
References VisNet::Bucket::next.
Referenced by losePeer(), and onPeerData().
00048 { 00049 while (b) 00050 { 00051 Bucket *next = b->next; 00052 delete b; 00053 b = next; 00054 } 00055 }
VisNet::Object * VisNet::findObject | ( | Peer * | p, | |
const std::string & | name, | |||
Peer ** | owner = 0 | |||
) | [protected, virtual] |
Definition at line 1110 of file VisNet.cc.
References e, i, VisNet::Peer::objs, and peers_.
Referenced by onMessage(), and run().
01111 { 01112 ObjectMap::iterator pos; 01113 PeerMap::iterator i, e; 01114 if (owner) 01115 *owner = 0; 01116 if (p) 01117 { 01118 pos = p->objs.find(name); 01119 if (pos == p->objs.end()) 01120 return 0; 01121 else 01122 { 01123 if (owner) *owner = p; 01124 return &pos->second; 01125 } 01126 } 01127 else 01128 { 01129 for (i = peers_.begin(), e = peers_.end(); i != e; ++i) 01130 { 01131 pos = i->second.objs.find(name); 01132 if (pos != i->second.objs.end()) 01133 { 01134 if (owner) *owner = &i->second; 01135 return &pos->second; 01136 } 01137 } 01138 return 0; 01139 } 01140 }
VisNet::Peer * VisNet::getPeer | ( | lat::Socket * | s | ) | [protected, virtual] |
Tell the network layer to connect to host and port and automatically receive updates from upstream source(s).
Must be called before calling run() or start().
Definition at line 889 of file VisNet.cc.
References lat::endl(), VisNet::AutoPeer::host, logme(), VisNet::AutoPeer::port, VisNet::AutoPeer::update, and upstream_.
00890 { 00891 if (! upstream_.host.empty()) 00892 { 00893 logme() 00894 << "ERROR: Already receiving data from another collector at " 00895 << upstream_.host << ":" << upstream_.port << std::endl; 00896 return; 00897 } 00898 00899 upstream_.update = false; 00900 upstream_.host = host; 00901 upstream_.port = port; 00902 }
Acquire a lock on the net layer.
Definition at line 929 of file VisNet.cc.
References communicate_, and lock_.
Referenced by onMessage(), receive(), and run().
00930 { 00931 if (communicate_ != (pthread_t) -1) 00932 pthread_mutex_lock(&lock_); 00933 }
std::ostream & VisNet::logme | ( | void | ) | [protected] |
Definition at line 29 of file VisNet.cc.
References appname_, TestMuL1L2Filter_cff::cerr, and pid_.
Referenced by listenToSource(), losePeer(), onLocalNotify(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), start(), startLocalServer(), updateMask(), and updateToCollector().
00030 { 00031 return std::cerr 00032 << Time::current().format(true, "%Y-%m-%d %H:%M:%S") 00033 << " " << appname_ << "[" << pid_ << "]: "; 00034 }
bool VisNet::losePeer | ( | const char * | reason, | |
Peer * | peer, | |||
lat::IOSelectEvent * | event, | |||
lat::Error * | err = 0 | |||
) | [private] |
Handle errors with a peer socket.
Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.
Definition at line 62 of file VisNet.cc.
References VisNet::Peer::automatic, lat::Socket::close(), lat::IOSelector::detach(), discard(), e, lat::endl(), lat::Error::explain(), i, logme(), VisNet::AutoPeer::peer, VisNet::Peer::peeraddr, removePeer(), s, sel_, VisNet::Peer::sendq, VisNet::Peer::socket, lat::IOSelectEvent::source, and waiting_.
Referenced by onPeerData(), and updateMask().
00066 { 00067 if (reason) 00068 logme () 00069 << reason << peer->peeraddr 00070 << (err ? "; error was: " + err->explain() : std::string("")) 00071 << std::endl; 00072 00073 Socket *s = peer->socket; 00074 00075 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 00076 if (i->peer == peer) 00077 waiting_.erase(i++); 00078 else 00079 ++i; 00080 00081 if (ev) 00082 ev->source = 0; 00083 00084 discard(peer->sendq); 00085 if (peer->automatic) 00086 peer->automatic->peer = 0; 00087 00088 sel_.detach(s); 00089 s->close(); 00090 removePeer(peer, s); 00091 delete s; 00092 return true; 00093 }
VisNet::Object * VisNet::makeObject | ( | Peer * | p, | |
const std::string & | name | |||
) | [protected, virtual] |
Definition at line 1143 of file VisNet.cc.
References VisNet::Object::flags, VisNet::Object::lastreq, VisNet::Object::name, VisNet::Peer::objs, and VisNet::Object::version.
Referenced by onMessage().
01144 { 01145 Object *o = &p->objs[name]; 01146 o->version = 0; 01147 o->name = name; 01148 o->flags = 0; 01149 o->lastreq = 0; 01150 return o; 01151 }
Definition at line 1169 of file VisNet.cc.
References e, i, VisNet::Peer::objs, VIS_FLAG_DEAD, and VIS_FLAG_ZOMBIE.
Referenced by onMessage().
01170 { 01171 ObjectMap::iterator i, e; 01172 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i) 01173 if (i->second.flags & VIS_FLAG_ZOMBIE) 01174 i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD; 01175 }
Definition at line 1160 of file VisNet.cc.
References e, i, VisNet::Peer::objs, and VIS_FLAG_ZOMBIE.
Referenced by onMessage().
01161 { 01162 ObjectMap::iterator i, e; 01163 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i) 01164 i->second.flags |= VIS_FLAG_ZOMBIE; 01165 }
bool VisNet::onLocalNotify | ( | lat::IOSelectEvent * | ev | ) | [private] |
React to notifications from the app thread.
This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new app data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent object updates.
Definition at line 720 of file VisNet.cc.
References e, lat::endl(), lat::SysErr::ErrTryAgain, lat::Error::explain(), flush_, logme(), lat::Error::next(), lat::SystemError::portable(), lat::IOChannel::read(), and lat::IOSelectEvent::source.
Referenced by VisNet().
00721 { 00722 // Discard the data in the pipe, we care only about the wakeup. 00723 try 00724 { 00725 IOSize sz; 00726 unsigned char buf [1024]; 00727 while ((sz = ev->source->read(buf, sizeof(buf)))) 00728 ; 00729 } 00730 catch (Error &e) 00731 { 00732 SystemError *next = dynamic_cast<SystemError *>(e.next()); 00733 if (next && next->portable() == SysErr::ErrTryAgain) 00734 ; // Ignore it 00735 else 00736 logme() 00737 << "WARNING: error reading from notification pipe: " 00738 << e.explain() << std::endl; 00739 } 00740 00741 // Tell the main event pump to send an update in a little while. 00742 flush_ = true; 00743 00744 // We are never done, always keep going. 00745 return false; 00746 }
bool VisNet::onMessage | ( | Bucket * | msg, | |
Peer * | p, | |||
unsigned char * | data, | |||
size_t | len | |||
) | [protected, virtual] |
Definition at line 224 of file VisNet.cc.
References copydata(), VisNet::Bucket::data, debug_, lat::endl(), findObject(), VisNet::Object::flags, flags, flush_, VisNet::Object::lastreq, lock(), logme(), makeObject(), markObjectsDead(), markObjectsZombies(), name, VisNet::Peer::peeraddr, VisNet::Object::rawdata, releaseWaiters(), requestObject(), sendObjectListToPeer(), sendObjectToPeer(), VisNet::Peer::source, unlock(), VisNet::Peer::update, VisNet::Peer::updates, VisNet::Object::version, VIS_FLAG_DEAD, VIS_FLAG_RECEIVED, VIS_MSG_GET_OBJECT, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, VIS_REPLY_LIST_BEGIN, VIS_REPLY_LIST_END, VIS_REPLY_NONE, VIS_REPLY_OBJECT, and waitForData().
Referenced by onPeerData().
00225 { 00226 // Decode and process this message. 00227 uint32_t type; 00228 memcpy (&type, data + sizeof(uint32_t), sizeof (type)); 00229 switch (type) 00230 { 00231 case VIS_MSG_UPDATE_ME: 00232 { 00233 if (len != 2*sizeof(uint32_t)) 00234 { 00235 logme() 00236 << "ERROR: corrupt 'UPDATE_ME' message of length " << len 00237 << " from peer " << p->peeraddr << std::endl; 00238 return false; 00239 } 00240 00241 if (debug_) 00242 logme() 00243 << "DEBUG: received message 'UPDATE ME' from peer " 00244 << p->peeraddr << std::endl; 00245 00246 p->update = true; 00247 } 00248 return true; 00249 00250 case VIS_MSG_LIST_OBJECTS: 00251 { 00252 if (debug_) 00253 logme() 00254 << "DEBUG: received message 'LIST OBJECTS' from peer " 00255 << p->peeraddr << std::endl; 00256 00257 // Send over current status: list of known objects. 00258 lock(); 00259 sendObjectListToPeer(msg, true, false); 00260 unlock(); 00261 } 00262 return true; 00263 00264 case VIS_MSG_GET_OBJECT: 00265 { 00266 if (debug_) 00267 logme() 00268 << "DEBUG: received message 'GET OBJECT' from peer " 00269 << p->peeraddr << std::endl; 00270 00271 if (len < 3*sizeof(uint32_t)) 00272 { 00273 logme() 00274 << "ERROR: corrupt 'GET IMAGE' message of length " << len 00275 << " from peer " << p->peeraddr << std::endl; 00276 return false; 00277 } 00278 00279 uint32_t namelen; 00280 memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen)); 00281 if (len != 3*sizeof(uint32_t) + namelen) 00282 { 00283 logme() 00284 << "ERROR: corrupt 'GET OBJECT' message of length " << len 00285 << " from peer " << p->peeraddr 00286 << ", expected length " << (3*sizeof(uint32_t)) 00287 << " + " << namelen << std::endl; 00288 return false; 00289 } 00290 00291 lock(); 00292 std::string name((char *) data + 3*sizeof(uint32_t), namelen); 00293 Peer *owner = 0; 00294 Object *o = findObject(0, name, &owner); 00295 if (o) 00296 { 00297 o->lastreq = Time::current(); 00298 if (o->rawdata.empty()) 00299 waitForData(p, name, "", owner); 00300 else 00301 sendObjectToPeer(msg, *o, true); 00302 } 00303 else 00304 { 00305 uint32_t words[3]; 00306 words[0] = sizeof(words) + name.size(); 00307 words[1] = VIS_REPLY_NONE; 00308 words[2] = name.size(); 00309 00310 msg->data.reserve(msg->data.size() + words[0]); 00311 copydata(msg, &words[0], sizeof(words)); 00312 copydata(msg, &name[0], name.size()); 00313 } 00314 unlock(); 00315 } 00316 return true; 00317 00318 case VIS_REPLY_LIST_BEGIN: 00319 { 00320 if (len != 4*sizeof(uint32_t)) 00321 { 00322 logme() 00323 << "ERROR: corrupt 'LIST BEGIN' message of length " << len 00324 << " from peer " << p->peeraddr << std::endl; 00325 return false; 00326 } 00327 00328 if (debug_) 00329 logme() 00330 << "DEBUG: received message 'LIST BEGIN' from " 00331 << p->peeraddr << std::endl; 00332 00333 // Get the update status: whether this is a full update. 00334 uint32_t flags; 00335 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t)); 00336 00337 // If we are about to receive a full list of objects, flag all 00338 // objects dead. Subsequent object notifications will undo this 00339 // for the live objects. This tells us the object has been 00340 // removed, but we can keep making it available for a while if 00341 // there continues to be interest in it. 00342 if (flags) 00343 { 00344 lock(); 00345 markObjectsZombies(p); 00346 unlock(); 00347 } 00348 } 00349 return true; 00350 00351 case VIS_REPLY_LIST_END: 00352 { 00353 if (len != 4*sizeof(uint32_t)) 00354 { 00355 logme() 00356 << "ERROR: corrupt 'LIST END' message of length " << len 00357 << " from peer " << p->peeraddr << std::endl; 00358 return false; 00359 } 00360 00361 // Get the update status: whether this is a full update. 00362 uint32_t flags; 00363 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t)); 00364 00365 // If we received a full list of objects, flag all zombie objects 00366 // now dead. We need to do this in two stages in case we receive 00367 // updates in many parts, and end up sending updates to others in 00368 // between; this avoids us lying live objects are dead. 00369 if (flags) 00370 { 00371 lock(); 00372 markObjectsDead(p); 00373 unlock(); 00374 } 00375 00376 if (debug_) 00377 logme() 00378 << "DEBUG: received message 'LIST END' from " 00379 << p->peeraddr << std::endl; 00380 00381 // Indicate we have received another update from this peer. 00382 // Also indicate we should flush to our clients. 00383 flush_ = true; 00384 p->updates++; 00385 } 00386 return true; 00387 00388 case VIS_REPLY_OBJECT: 00389 { 00390 uint32_t words[7]; 00391 if (len < sizeof(words)) 00392 { 00393 logme() 00394 << "ERROR: corrupt 'OBJECT' message of length " << len 00395 << " from peer " << p->peeraddr << std::endl; 00396 return false; 00397 } 00398 00399 memcpy(&words[0], data, sizeof(words)); 00400 uint32_t &namelen = words[5]; 00401 uint32_t &datalen = words[6]; 00402 00403 if (len != sizeof(words) + namelen + datalen) 00404 { 00405 logme() 00406 << "ERROR: corrupt 'OBJECT' message of length " << len 00407 << " from peer " << p->peeraddr 00408 << ", expected length " << sizeof(words) 00409 << " + " << namelen 00410 << " + " << datalen 00411 << std::endl; 00412 return false; 00413 } 00414 00415 unsigned char *namedata = data + sizeof(words); 00416 unsigned char *objdata = namedata + namelen; 00417 unsigned char *enddata = objdata + datalen; 00418 std::string name((char *) namedata, namelen); 00419 assert(enddata == data + len); 00420 00421 if (debug_) 00422 logme() 00423 << "DEBUG: received message 'OBJECT " << name 00424 << "' from " << p->peeraddr << std::endl; 00425 00426 // Mark the peer as a known object source. 00427 p->source = true; 00428 00429 // Initialise or update an object entry. 00430 lock(); 00431 Object *o = findObject(p, name); 00432 if (! o) 00433 o = makeObject(p, name); 00434 00435 bool hadobject = ! o->rawdata.empty(); 00436 o->flags = words[2] | VIS_FLAG_RECEIVED; 00437 o->version = ((uint64_t) words[4] << 32 | words[3]); 00438 o->rawdata.clear(); 00439 o->rawdata.insert(o->rawdata.end(), objdata, enddata); 00440 00441 // If we had an object for this one already and this is a list 00442 // update without data, issue an immediate data get request. 00443 if (hadobject && ! datalen) 00444 requestObject(p, (namelen ? &name[0] : 0), namelen); 00445 00446 // If we have the object data, release from wait. 00447 if (datalen) 00448 releaseWaiters(o); 00449 unlock(); 00450 } 00451 return true; 00452 00453 case VIS_REPLY_NONE: 00454 { 00455 uint32_t words[3]; 00456 if (len < sizeof(words)) 00457 { 00458 logme() 00459 << "ERROR: corrupt 'NONE' message of length " << len 00460 << " from peer " << p->peeraddr << std::endl; 00461 return false; 00462 } 00463 00464 memcpy(&words[0], data, sizeof(words)); 00465 uint32_t &namelen = words[2]; 00466 00467 if (len != sizeof(words) + namelen) 00468 { 00469 logme() 00470 << "ERROR: corrupt 'NONE' message of length " << len 00471 << " from peer " << p->peeraddr 00472 << ", expected length " << sizeof(words) 00473 << " + " << namelen << std::endl; 00474 return false; 00475 } 00476 00477 unsigned char *namedata = data + sizeof(words); 00478 unsigned char *enddata = namedata + namelen; 00479 std::string name((char *) namedata, namelen); 00480 assert(enddata == data + len); 00481 00482 if (debug_) 00483 logme() 00484 << "DEBUG: received message 'NONE " << name 00485 << "' from " << p->peeraddr << std::endl; 00486 00487 // Mark the peer as a known object source. 00488 p->source = true; 00489 00490 // If this was a known object, update its entry. 00491 lock(); 00492 Object *o = findObject(p, name); 00493 if (o) 00494 o->flags |= VIS_FLAG_DEAD; 00495 00496 // If someone was waiting for this, let them go. 00497 releaseWaiters(o); 00498 unlock(); 00499 } 00500 return true; 00501 00502 default: 00503 logme() 00504 << "ERROR: unrecognised message of length " << len 00505 << " and type " << type << " from peer " << p->peeraddr 00506 << std::endl; 00507 return false; 00508 } 00509 }
bool VisNet::onPeerConnect | ( | lat::IOSelectEvent * | ev | ) | [private] |
Respond to new connections on the server socket.
Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false
always to tell the IOSelector to keep processing events for the server socket.
Definition at line 680 of file VisNet.cc.
References lat::Socket::accept(), arg, lat::IOSelector::attach(), lat::CreateHook(), createPeer(), debug_, lat::endl(), lat::InetAddress::hostname(), IORead, IOUrgent, lat::IOChannel::isBlocking(), logme(), onPeerData(), p, lat::InetAddress::port(), s, sel_, server_, and lat::IOSelectEvent::source.
Referenced by startLocalServer().
00681 { 00682 // Recover the server socket. 00683 assert(ev->source == server_); 00684 00685 // Accept the connection. 00686 Socket *s = server_->accept(); 00687 assert(s); 00688 assert(! s->isBlocking()); 00689 00690 // Record it to our list of peers. 00691 Peer *p = createPeer(s); 00692 InetAddress peeraddr = ((InetSocket *) s)->peername(); 00693 InetAddress myaddr = ((InetSocket *) s)->sockname(); 00694 p->peeraddr = StringFormat("%1:%2") 00695 .arg(peeraddr.hostname()) 00696 .arg(peeraddr.port()); 00697 p->mask = IORead|IOUrgent; 00698 p->socket = s; 00699 00700 // Report the new connection. 00701 if (debug_) 00702 logme() 00703 << "INFO: new peer " << p->peeraddr << " is now connected to " 00704 << myaddr.hostname() << ":" << myaddr.port() << std::endl; 00705 00706 // Attach it to the listener. 00707 sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p)); 00708 00709 // We are never done. 00710 return false; 00711 }
bool VisNet::onPeerData | ( | lat::IOSelectEvent * | ev, | |
Peer * | p | |||
) | [private] |
Handle communication to a particular client.
Definition at line 514 of file VisNet.cc.
References VisNet::Peer::automatic, b, data, VisNet::Bucket::data, debug_, discard(), e, lat::endl(), lat::SysErr::ErrTryAgain, lat::IOSelectEvent::events, getPeer(), VisNet::Peer::incoming, IORead, IOUrgent, IOWrite, len, logme(), losePeer(), VisNet::Peer::mask, MAX_PEER_WAITREQS, MESSAGE_SIZE_LIMIT, alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, lat::Error::next(), old, onMessage(), VisNet::Peer::peeraddr, lat::SystemError::portable(), lat::IOChannel::read(), sel_, VisNet::Peer::sendpos, VisNet::Peer::sendq, lat::IOSelector::setMask(), VisNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, lat::IOSelectEvent::source, TrackValidation_HighPurity_cff::valid, VisNet::Peer::waiting, and lat::IOChannel::write().
Referenced by onPeerConnect(), and run().
00515 { 00516 assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p); 00517 00518 // If there is a problem with the peer socket, discard the peer 00519 // and tell the selector to stop prcessing events for it. If 00520 // this is a server connection, we will eventually recreate 00521 // everything if/when the data server comes back. 00522 if (ev->events & IOUrgent) 00523 { 00524 if (p->automatic) 00525 { 00526 logme() 00527 << "WARNING: connection to the server at " << p->peeraddr 00528 << " lost (will attempt to reconnect in 15 seconds)\n"; 00529 return losePeer(0, p, ev); 00530 } 00531 else 00532 return losePeer("WARNING: lost peer connection ", p, ev); 00533 } 00534 00535 // If we can write to the peer socket, pump whatever we can into it. 00536 if (ev->events & IOWrite) 00537 { 00538 while (Bucket *b = p->sendq) 00539 { 00540 IOSize len = b->data.size() - p->sendpos; 00541 const void *data = (len ? (const void *)&b->data[p->sendpos] 00542 : (const void *)&data); 00543 IOSize done; 00544 00545 try 00546 { 00547 done = (len ? ev->source->write (data, len) : 0); 00548 if (debug_ && len) 00549 logme() 00550 << "DEBUG: sent " << done << " bytes to peer " 00551 << p->peeraddr << std::endl; 00552 } 00553 catch (Error &e) 00554 { 00555 return losePeer("WARNING: unable to write to peer ", 00556 p, ev, &e); 00557 } 00558 00559 p->sendpos += done; 00560 if (p->sendpos == b->data.size()) 00561 { 00562 Bucket *old = p->sendq; 00563 p->sendq = old->next; 00564 p->sendpos = 0; 00565 old->next = 0; 00566 discard(old); 00567 } 00568 00569 if (! done && len) 00570 // Cannot write any more. 00571 break; 00572 } 00573 } 00574 00575 // If there is data to be read from the peer, first receive what we 00576 // can get out the socket, the process all complete requests. 00577 if (ev->events & IORead) 00578 { 00579 // First build up the incoming buffer of data in the socket. 00580 // Remember the last size returned by the socket; we need 00581 // it to determine if the remote end closed the connection. 00582 IOSize sz; 00583 try 00584 { 00585 std::vector<unsigned char> buf(SOCKET_READ_SIZE); 00586 do 00587 if ((sz = ev->source->read(&buf[0], buf.size()))) 00588 { 00589 if (debug_) 00590 logme() 00591 << "DEBUG: received " << sz << " bytes from peer " 00592 << p->peeraddr << std::endl; 00593 DataBlob &data = p->incoming; 00594 if (data.capacity () < data.size () + sz) 00595 data.reserve (data.size() + SOCKET_READ_GROWTH); 00596 data.insert (data.end(), &buf[0], &buf[0] + sz); 00597 } 00598 while (sz == sizeof (buf)); 00599 } 00600 catch (Error &e) 00601 { 00602 SystemError *next = dynamic_cast<SystemError *>(e.next()); 00603 if (next && next->portable() == SysErr::ErrTryAgain) 00604 sz = 1; // Ignore it, and fake no end of data. 00605 else 00606 // Houston we have a problem. 00607 return losePeer("WARNING: failed to read from peer ", 00608 p, ev, &e); 00609 } 00610 00611 // Process fully received messages as long as we can. 00612 size_t consumed = 0; 00613 DataBlob &data = p->incoming; 00614 while (data.size()-consumed >= sizeof(uint32_t) 00615 && p->waiting < MAX_PEER_WAITREQS) 00616 { 00617 uint32_t msglen; 00618 memcpy (&msglen, &data[0]+consumed, sizeof(msglen)); 00619 00620 if (msglen >= MESSAGE_SIZE_LIMIT) 00621 return losePeer("WARNING: excessively large message from ", p, ev); 00622 00623 if (data.size()-consumed >= msglen) 00624 { 00625 bool valid = true; 00626 if (msglen < 2*sizeof(uint32_t)) 00627 { 00628 logme() 00629 << "ERROR: corrupt peer message of length " << msglen 00630 << " from peer " << p->peeraddr << std::endl; 00631 valid = false; 00632 } 00633 else 00634 { 00635 // Decode and process this message. 00636 Bucket msg; 00637 msg.next = 0; 00638 valid = onMessage(&msg, p, &data[0]+consumed, msglen); 00639 00640 // If we created a response, chain it to the write queue. 00641 if (! msg.data.empty()) 00642 { 00643 Bucket **prev = &p->sendq; 00644 while (*prev) 00645 prev = &(*prev)->next; 00646 00647 *prev = new Bucket; 00648 (*prev)->next = 0; 00649 (*prev)->data.swap(msg.data); 00650 } 00651 } 00652 00653 if (! valid) 00654 return losePeer("WARNING: data stream error with ", p, ev); 00655 00656 consumed += msglen; 00657 } 00658 else 00659 break; 00660 } 00661 00662 data.erase(data.begin(), data.begin()+consumed); 00663 00664 // If the client has closed the connection, shut down our end. If 00665 // we have something to send back still, leave the write direction 00666 // open. Otherwise close the shop for this client. 00667 if (sz == 0) 00668 sel_.setMask(p->socket, p->mask &= ~IORead); 00669 } 00670 00671 // Yes, please keep processing events for this socket. 00672 return false; 00673 }
Definition at line 1179 of file VisNet.cc.
References debug_, lat::endl(), VisNet::Object::flags, VisNet::Object::lastreq, logme(), VisNet::Object::name, peers_, pi, VisNet::Object::rawdata, VisNet::Object::version, VIS_FLAG_DEAD, and VIS_FLAG_SCALAR.
Referenced by run().
01180 { 01181 PeerMap::iterator pi, pe; 01182 ObjectMap::iterator oi, oe; 01183 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01184 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ) 01185 { 01186 Object &o = oi->second; 01187 01188 // Compact non-scalar objects that are unused. We send scalar 01189 // objects to the web server so we keep them around. 01190 if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR)) 01191 { 01192 if (debug_) 01193 logme() 01194 << "DEBUG: compacting idle '" << o.name 01195 << "' from " << pi->second.peeraddr << std::endl; 01196 } 01197 01198 // Remove if dead, old and unused. 01199 if (o.lastreq < deadobj 01200 && o.version < deadobj 01201 && (o.flags & VIS_FLAG_DEAD)) 01202 { 01203 if (debug_) 01204 logme() 01205 << "DEBUG: removing dead '" << o.name 01206 << "' from " << pi->second.peeraddr << std::endl; 01207 01208 pi->second.objs.erase(oi++); 01209 } 01210 else 01211 ++oi; 01212 } 01213 }
int VisNet::receive | ( | void(*)(void *arg, uint32_t reason, Object &obj) | callback, | |
void * | arg | |||
) | [virtual] |
Definition at line 1333 of file VisNet.cc.
References VisNet::Object::flags, local_, lock(), VisNet::Peer::objs, p, peers_, pi, unlock(), VisNet::Peer::updates, VIS_FLAG_DEAD, and VIS_FLAG_RECEIVED.
01334 { 01335 int updates = 0; 01336 01337 lock(); 01338 PeerMap::iterator pi, pe; 01339 ObjectMap::iterator oi, oe; 01340 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01341 { 01342 Peer &p = pi->second; 01343 if (&p == local_) 01344 continue; 01345 01346 updates += p.updates; 01347 01348 for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; ) 01349 { 01350 Object &o = oi->second; 01351 if (o.flags & VIS_FLAG_DEAD) 01352 { 01353 callback(arg, VIS_FLAG_DEAD, o); 01354 p.objs.erase(oi++); 01355 } 01356 else if (o.flags & VIS_FLAG_RECEIVED) 01357 { 01358 callback(arg, VIS_FLAG_RECEIVED, o); 01359 o.flags &= ~VIS_FLAG_RECEIVED; 01360 ++oi; 01361 } 01362 else 01363 ++oi; 01364 } 01365 } 01366 unlock(); 01367 01368 return updates; 01369 }
Definition at line 133 of file VisNet.cc.
References alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, releaseFromWait(), and waiting_.
00134 { 00135 Bucket **msg = &i->peer->sendq; 00136 while (*msg) 00137 msg = &(*msg)->next; 00138 *msg = new Bucket; 00139 (*msg)->next = 0; 00140 00141 releaseFromWait(*msg, *i, o); 00142 00143 assert(i->peer->waiting > 0); 00144 i->peer->waiting--; 00145 waiting_.erase(i); 00146 }
void VisNet::releaseFromWait | ( | Bucket * | msg, | |
WaitObject & | w, | |||
Object * | o | |||
) | [protected, virtual] |
Definition at line 170 of file VisNet.cc.
References copydata(), VisNet::Bucket::data, VisNet::WaitObject::name, sendObjectToPeer(), and VIS_REPLY_NONE.
Referenced by releaseFromWait(), releaseWaiters(), and run().
00171 { 00172 if (o) 00173 sendObjectToPeer (msg, *o, true); 00174 else 00175 { 00176 uint32_t words[3]; 00177 words[0] = sizeof(words) + w.name.size(); 00178 words[1] = VIS_REPLY_NONE; 00179 words[2] = w.name.size(); 00180 00181 msg->data.reserve(msg->data.size() + words[0]); 00182 copydata(msg, &words[0], sizeof(words)); 00183 copydata(msg, &w.name[0], w.name.size()); 00184 } 00185 }
Definition at line 150 of file VisNet.cc.
References e, i, VisNet::Object::name, releaseFromWait(), and waiting_.
Referenced by onMessage().
00151 { 00152 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 00153 if (i->name == o->name) 00154 releaseFromWait(i++, o); 00155 else 00156 ++i; 00157 }
void VisNet::removeLocalObject | ( | const std::string & | path | ) | [virtual] |
Delete the local object.
The caller must call sendLocalChanges() later to push out the changes.
Definition at line 1391 of file VisNet.cc.
References local_, and VisNet::Peer::objs.
void VisNet::removePeer | ( | Peer * | p, | |
lat::Socket * | s | |||
) | [protected, virtual] |
Definition at line 1240 of file VisNet.cc.
References VisNet::Peer::objs, peers_, and sendLocalChanges().
Referenced by losePeer().
01241 { 01242 bool needflush = ! p->objs.empty(); 01243 01244 p->objs.clear(); 01245 peers_.erase(s); 01246 01247 // If we removed a peer with objects, our list of objects 01248 // has changed and we need to update downstream peers. 01249 if (needflush) 01250 sendLocalChanges(); 01251 }
Queue an object request to the data server.
Definition at line 97 of file VisNet.cc.
References copydata(), alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, VisNet::Peer::sendq, and VIS_MSG_GET_OBJECT.
Referenced by onMessage(), and waitForData().
00098 { 00099 Bucket **msg = &p->sendq; 00100 while (*msg) 00101 msg = &(*msg)->next; 00102 *msg = new Bucket; 00103 (*msg)->next = 0; 00104 00105 uint32_t words[3]; 00106 words[0] = sizeof(words) + len; 00107 words[1] = VIS_MSG_GET_OBJECT; 00108 words[2] = len; 00109 (*msg)->data.reserve((*msg)->data.size() + words[0]); 00110 copydata(*msg, words, sizeof(words)); 00111 copydata(*msg, name, len); 00112 }
Run the actual I/O processing loop.
Definition at line 962 of file VisNet.cc.
References lat::Socket::abort(), arg, lat::IOSelector::attach(), VisNet::Peer::automatic, lat::InetSocket::connect(), copydata(), lat::CreateHook(), createPeer(), debug_, delay_, lat::IOSelector::dispatch(), downstream_, e, lat::endl(), lat::SysErr::ErrOperationInProgress, lat::Error::explain(), findObject(), flush_, VisNet::AutoPeer::host, lat::InetAddress::hostname(), i, IORead, IOUrgent, IOWrite, lock(), logme(), VisNet::Peer::mask, VisNet::Bucket::next, lat::Error::next(), VisNet::AutoPeer::next, onPeerData(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, p, VisNet::AutoPeer::peer, VisNet::Peer::peeraddr, VisNet::AutoPeer::port, lat::InetAddress::port(), lat::SystemError::portable(), purgeDeadObjects(), releaseFromWait(), s, sel_, sendObjectListToPeers(), VisNet::Peer::sendq, lat::IOChannel::setBlocking(), lat::Socket::setopt(), shouldStop(), VisNet::Peer::socket, SOCKET_BUF_SIZE, lat::SocketConst::TypeStream, unlock(), VisNet::Peer::update, VisNet::AutoPeer::update, updatePeerMasks(), upstream_, VIS_MSG_LIST_OBJECTS, VIS_MSG_UPDATE_ME, waiting_, and VisNet::AutoPeer::warned.
00963 { 00964 Time now; 00965 AutoPeer *automatic[2] = { &upstream_, &downstream_ }; 00966 00967 // Perform I/O. Every once in a while flush updates to peers. 00968 while (! shouldStop()) 00969 { 00970 for (int i = 0; i < 2; ++i) 00971 { 00972 AutoPeer *ap = automatic[i]; 00973 00974 // If we need a server connection and don't have one yet, 00975 // initiate asynchronous connection creation. Swallow errors 00976 // in case the server won't talk to us. 00977 if (! ap->host.empty() 00978 && ! ap->peer 00979 && (now = Time::current()) > ap->next) 00980 { 00981 ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0); 00982 InetSocket *s = 0; 00983 try 00984 { 00985 s = new InetSocket (SocketConst::TypeStream); 00986 s->setBlocking (false); 00987 s->connect(InetAddress (ap->host.c_str(), ap->port)); 00988 s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE); 00989 s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE); 00990 } 00991 catch (Error &e) 00992 { 00993 SystemError *sys = dynamic_cast<SystemError *>(e.next()); 00994 if (! sys || sys->portable() != SysErr::ErrOperationInProgress) 00995 { 00996 // "In progress" just means the connection is in progress. 00997 // The connection is ready when the socket is writeable. 00998 // Anything else is a real problem. 00999 if (! ap->warned) 01000 { 01001 logme() 01002 << "NOTE: server at " << ap->host << ":" << ap->port 01003 << " is unavailable. Connection will be established" 01004 << " automatically on the background once the server" 01005 << " becomes available. Error from the attempt was: " 01006 << e.explain() << '\n'; 01007 ap->warned = true; 01008 } 01009 01010 if (s) 01011 s->abort(); 01012 delete s; 01013 s = 0; 01014 } 01015 } 01016 01017 // Set up with the selector if we were successful. If this is 01018 // the upstream collector, queue a request for updates. 01019 if (s) 01020 { 01021 lock(); 01022 Peer *p = createPeer(s); 01023 ap->peer = p; 01024 ap->warned = false; 01025 unlock(); 01026 01027 InetAddress peeraddr = ((InetSocket *) s)->peername(); 01028 InetAddress myaddr = ((InetSocket *) s)->sockname(); 01029 p->peeraddr = StringFormat("%1:%2") 01030 .arg(peeraddr.hostname()) 01031 .arg(peeraddr.port()); 01032 p->mask = IORead|IOWrite|IOUrgent; 01033 p->update = ap->update; 01034 p->automatic = ap; 01035 p->socket = s; 01036 sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p)); 01037 if (ap == &upstream_) 01038 { 01039 uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS, 01040 2*sizeof(uint32_t), VIS_MSG_UPDATE_ME }; 01041 p->sendq = new Bucket; 01042 p->sendq->next = 0; 01043 copydata(p->sendq, words, sizeof(words)); 01044 } 01045 01046 // Report the new connection. 01047 if (debug_) 01048 logme() 01049 << "INFO: now connected to " << p->peeraddr << " from " 01050 << myaddr.hostname() << ":" << myaddr.port() << std::endl; 01051 } 01052 } 01053 } 01054 01055 // Pump events for a while. 01056 sel_.dispatch(delay_); 01057 now = Time::current(); 01058 01059 // Check if flush is required. Flush only if one is needed. 01060 // Always sends the full object list. Compact objects no longer 01061 // in active use before sending out the update. 01062 if (flush_) 01063 { 01064 flush_ = false; 01065 01066 lock(); 01067 purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0), 01068 now - TimeSpan(0, 0, 20 /* minutes */, 0, 0)); 01069 sendObjectListToPeers(true); 01070 unlock(); 01071 } 01072 01073 // Update the data server and peer selection masks. If we 01074 // have no more data to send and listening for writes, remove 01075 // the write mask. If we have something to write and aren't 01076 // listening for writes, start listening so we can send off 01077 // the data. 01078 updatePeerMasks(); 01079 01080 // Release peers that have been waiting for data for too long. 01081 lock(); 01082 Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0); 01083 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 01084 { 01085 // If the peer has waited for too long, send something. 01086 if (i->time < waitold) 01087 releaseFromWait(i++, findObject(0, i->name)); 01088 01089 // Keep it for now. 01090 else 01091 ++i; 01092 } 01093 unlock(); 01094 } 01095 }
Definition at line 1100 of file VisNet.cc.
References lat::Pipe::sink(), wakeup_, and lat::IOChannel::write().
Referenced by removePeer().
Send all objects to a peer and optionally mark sent objects old.
Definition at line 1255 of file VisNet.cc.
References copydata(), VisNet::Bucket::data, peers_, pi, sendObjectToPeer(), VIS_FLAG_NEW, VIS_REPLY_LIST_BEGIN, and VIS_REPLY_LIST_END.
Referenced by onMessage(), and sendObjectListToPeers().
01256 { 01257 PeerMap::iterator pi, pe; 01258 ObjectMap::iterator oi, oe; 01259 uint32_t numobjs = 0; 01260 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01261 numobjs += pi->second.objs.size(); 01262 01263 msg->data.reserve(msg->data.size() + 300*numobjs); 01264 01265 uint32_t nupdates = 0; 01266 uint32_t words[4]; 01267 words[0] = sizeof(words); 01268 words[1] = VIS_REPLY_LIST_BEGIN; 01269 words[2] = numobjs; 01270 words[3] = all; 01271 copydata(msg, &words[0], sizeof(words)); 01272 01273 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi) 01274 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi) 01275 if (all || (oi->second.flags & VIS_FLAG_NEW)) 01276 { 01277 sendObjectToPeer(msg, oi->second, false); 01278 if (clear) 01279 oi->second.flags &= ~VIS_FLAG_NEW; 01280 ++nupdates; 01281 } 01282 01283 words[1] = VIS_REPLY_LIST_END; 01284 words[2] = nupdates; 01285 copydata(msg, &words[0], sizeof(words)); 01286 }
Definition at line 1289 of file VisNet.cc.
References VisNet::Bucket::data, debug_, e, lat::endl(), i, logme(), alivecheck_mergeAndRegister::msg, VisNet::Bucket::next, p, VisNet::Peer::peeraddr, peers_, sendObjectListToPeer(), VisNet::Peer::sendq, VisNet::Peer::update, and VisNet::Peer::updated.
Referenced by run().
01290 { 01291 PeerMap::iterator i, e; 01292 for (i = peers_.begin(), e = peers_.end(); i != e; ++i) 01293 { 01294 Peer &p = i->second; 01295 if (! p.update) 01296 continue; 01297 01298 if (debug_) 01299 logme() 01300 << "DEBUG: notifying " << p.peeraddr << std::endl; 01301 01302 Bucket msg; 01303 msg.next = 0; 01304 sendObjectListToPeer(&msg, !p.updated || all, true); 01305 01306 if (! msg.data.empty()) 01307 { 01308 Bucket **prev = &p.sendq; 01309 while (*prev) 01310 prev = &(*prev)->next; 01311 01312 *prev = new Bucket; 01313 (*prev)->next = 0; 01314 (*prev)->data.swap(msg.data); 01315 } 01316 p.updated = true; 01317 } 01318 }
Definition at line 191 of file VisNet.cc.
References copydata(), VisNet::Bucket::data, VisNet::Object::flags, flags, VisNet::Object::name, VisNet::Object::rawdata, VisNet::Object::version, VIS_FLAG_SCALAR, VIS_FLAG_ZOMBIE, and VIS_REPLY_OBJECT.
Referenced by onMessage(), releaseFromWait(), and sendObjectListToPeer().
00192 { 00193 uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE; 00194 DataBlob objdata; 00195 00196 if (data || (flags & VIS_FLAG_SCALAR)) 00197 objdata.insert(objdata.end(), 00198 &o.rawdata[0], 00199 &o.rawdata[0] + o.rawdata.size()); 00200 00201 uint32_t words[7]; 00202 uint32_t namelen = o.name.size(); 00203 uint32_t datalen = objdata.size(); 00204 00205 words[0] = sizeof(words) + namelen + datalen; 00206 words[1] = VIS_REPLY_OBJECT; 00207 words[2] = flags; 00208 words[3] = (o.version >> 0 ) & 0xffffffff; 00209 words[4] = (o.version >> 32) & 0xffffffff; 00210 words[5] = namelen; 00211 words[6] = datalen; 00212 00213 msg->data.reserve(msg->data.size() + words[0]); 00214 copydata(msg, &words[0], sizeof(words)); 00215 if (namelen) 00216 copydata(msg, &o.name[0], namelen); 00217 if (datalen) 00218 copydata(msg, &objdata[0], datalen); 00219 }
Stop the network layer and wait it to finish.
Definition at line 906 of file VisNet.cc.
References communicate_, and shutdown_.
00907 { 00908 shutdown_ = 1; 00909 if (communicate_ != (pthread_t) -1) 00910 pthread_join(communicate_, 0); 00911 }
Start running the network layer in a new thread.
This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.
Definition at line 947 of file VisNet.cc.
References communicate(), communicate_, lock_, logme(), and pthread_create.
00948 { 00949 if (communicate_ != (pthread_t) -1) 00950 { 00951 logme() 00952 << "ERROR: Shared memory networking thread has already been started\n"; 00953 return; 00954 } 00955 00956 pthread_mutex_init(&lock_, 0); 00957 pthread_create(&communicate_, 0, &communicate, this); 00958 }
Start a server socket for accessing this node remotely.
Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.
Definition at line 833 of file VisNet.cc.
References lat::IOSelector::attach(), lat::CreateHook(), e, lat::endl(), Exception, lat::Error::explain(), IOAccept, logme(), onPeerConnect(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, sel_, server_, lat::IOChannel::setBlocking(), lat::Socket::setopt(), and SOCKET_BUF_SIZE.
00834 { 00835 if (server_) 00836 { 00837 logme() << "ERROR: server was already started.\n"; 00838 return; 00839 } 00840 00841 try 00842 { 00843 server_ = new InetServerSocket(InetAddress (port), 10); 00844 server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE); 00845 server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE); 00846 server_->setBlocking(false); 00847 sel_.attach(server_, IOAccept, CreateHook(this, &VisNet::onPeerConnect)); 00848 } 00849 catch (Error &e) 00850 { 00851 // FIXME: Do we need to do this when we throw an exception anyway? 00852 // FIXME: Abort instead? 00853 logme() 00854 << "ERROR: Failed to start server at port " << port << ": " 00855 << e.explain() << std::endl; 00856 00857 // FIXME: Throw something simpler that removes the dependency? 00858 throw cms::Exception("VisNet::startLocalServer") 00859 << "Failed to start server at port " << port << ": " 00860 << e.explain(); 00861 } 00862 00863 logme() << "INFO: Shared memory server started at port " << port << std::endl; 00864 }
Release the lock on the net layer.
Definition at line 937 of file VisNet.cc.
References communicate_, and lock_.
Referenced by onMessage(), receive(), and run().
00938 { 00939 if (communicate_ != (pthread_t) -1) 00940 pthread_mutex_unlock(&lock_); 00941 }
Update the network cache for an object.
The caller must call sendLocalChanges() later to push out the changes.
Definition at line 1374 of file VisNet.cc.
References VisNet::Object::flags, local_, VisNet::Object::name, VisNet::Peer::objs, VisNet::Object::rawdata, std::swap(), and VisNet::Object::version.
01375 { 01376 ObjectMap::iterator pos = local_->objs.find(o.name); 01377 if (pos == local_->objs.end()) 01378 local_->objs.insert(ObjectMap::value_type(o.name, o)); 01379 else 01380 { 01381 std::swap(pos->second.version, o.version); 01382 std::swap(pos->second.flags, o.flags); 01383 std::swap(pos->second.rawdata, o.rawdata); 01384 pos->second.lastreq = 0; 01385 } 01386 }
Update the selector mask for a peer based on data queues.
Close the connection if there is no reason to maintain it open.
Definition at line 751 of file VisNet.cc.
References debug_, lat::endl(), IOUrgent, IOWrite, logme(), losePeer(), VisNet::Peer::mask, VisNet::Peer::peeraddr, sel_, VisNet::Peer::sendq, lat::IOSelector::setMask(), VisNet::Peer::socket, and VisNet::Peer::waiting.
Referenced by updatePeerMasks().
00752 { 00753 if (! p->socket) 00754 return; 00755 00756 // Listen to writes iff we have data to send. 00757 unsigned oldmask = p->mask; 00758 if (! p->sendq && (p->mask & IOWrite)) 00759 sel_.setMask(p->socket, p->mask &= ~IOWrite); 00760 00761 if (p->sendq && ! (p->mask & IOWrite)) 00762 sel_.setMask(p->socket, p->mask |= IOWrite); 00763 00764 if (debug_ && oldmask != p->mask) 00765 logme() 00766 << "DEBUG: updating mask for " << p->peeraddr << " to " 00767 << p->mask << " from " << oldmask << std::endl; 00768 00769 // If we have nothing more to send and are no longer listening 00770 // for reads, close up the shop for this peer. 00771 if (p->mask == IOUrgent && ! p->waiting) 00772 { 00773 assert(! p->sendq); 00774 if (debug_) 00775 logme() << "INFO: connection closed to " << p->peeraddr << std::endl; 00776 losePeer(0, p, 0); 00777 } 00778 }
Tell the network layer to connect to host and port and automatically send updates whenever local data changes.
Must be called before calling run() or start().
Definition at line 870 of file VisNet.cc.
References downstream_, lat::endl(), VisNet::AutoPeer::host, logme(), VisNet::AutoPeer::port, and VisNet::AutoPeer::update.
00871 { 00872 if (! downstream_.host.empty()) 00873 { 00874 logme() 00875 << "ERROR: Already updating another collector at " 00876 << downstream_.host << ":" << downstream_.port << std::endl; 00877 return; 00878 } 00879 00880 downstream_.update = true; 00881 downstream_.host = host; 00882 downstream_.port = port; 00883 }
void VisNet::waitForData | ( | Peer * | p, | |
const std::string & | name, | |||
const std::string & | info, | |||
Peer * | owner | |||
) | [protected] |
Queue a request for an object and put a peer into the mode of waiting for object data to appear.
Definition at line 117 of file VisNet.cc.
References requestObject(), VisNet::Peer::waiting, and waiting_.
Referenced by onMessage().
00118 { 00119 // FIXME: Should we automatically record which exact peer the waiter 00120 // is expecting to deliver data so we know to release the waiter if 00121 // the other peer vanishes? The current implementation stands a 00122 // chance for the waiter to wait indefinitely -- although we do 00123 // force terminate the wait after a while. 00124 requestObject(owner, name.size() ? &name[0] : 0, name.size()); 00125 WaitObject wo = { Time::current(), name, info, p }; 00126 waiting_.push_back(wo); 00127 p->waiting++; 00128 }
std::string VisNet::appname_ [private] |
pthread_t VisNet::communicate_ [private] |
bool VisNet::debug_ [protected] |
Definition at line 146 of file VisNet.h.
Referenced by debug(), onMessage(), onPeerConnect(), onPeerData(), purgeDeadObjects(), run(), sendObjectListToPeers(), and updateMask().
int VisNet::delay_ [private] |
AutoPeer VisNet::downstream_ [private] |
bool VisNet::flush_ [private] |
Peer* VisNet::local_ [private] |
Definition at line 174 of file VisNet.h.
Referenced by receive(), removeLocalObject(), updateLocalObject(), and VisNet().
pthread_mutex_t VisNet::lock_ [private] |
const uint32_t VisNet::MAX_PEER_WAITREQS = 128 [static] |
PeerMap VisNet::peers_ [private] |
Definition at line 170 of file VisNet.h.
Referenced by createPeer(), findObject(), getPeer(), purgeDeadObjects(), receive(), removePeer(), sendObjectListToPeer(), sendObjectListToPeers(), and updatePeerMasks().
int VisNet::pid_ [private] |
lat::IOSelector VisNet::sel_ [private] |
Definition at line 165 of file VisNet.h.
Referenced by losePeer(), onPeerConnect(), onPeerData(), run(), startLocalServer(), updateMask(), and VisNet().
lat::InetServerSocket* VisNet::server_ [private] |
sig_atomic_t VisNet::shutdown_ [private] |
AutoPeer VisNet::upstream_ [private] |
lat::Time VisNet::version_ [private] |
const uint32_t VisNet::VIS_FLAG_DEAD = 0x40000000 [static] |
Definition at line 34 of file VisNet.h.
Referenced by markObjectsDead(), onMessage(), purgeDeadObjects(), and receive().
const uint32_t VisNet::VIS_FLAG_NEW = 0x20000000 [static] |
const uint32_t VisNet::VIS_FLAG_RECEIVED = 0x10000000 [static] |
const uint32_t VisNet::VIS_FLAG_SCALAR = 0x1 [static] |
const uint32_t VisNet::VIS_FLAG_ZOMBIE = 0x80000000 [static] |
Definition at line 35 of file VisNet.h.
Referenced by markObjectsDead(), markObjectsZombies(), and sendObjectToPeer().
const uint32_t VisNet::VIS_MSG_GET_OBJECT = 3 [static] |
const uint32_t VisNet::VIS_MSG_HELLO = 0 [static] |
const uint32_t VisNet::VIS_MSG_LIST_OBJECTS = 2 [static] |
const uint32_t VisNet::VIS_MSG_UPDATE_ME = 1 [static] |
const uint32_t VisNet::VIS_REPLY_LIST_BEGIN = 101 [static] |
const uint32_t VisNet::VIS_REPLY_LIST_END = 102 [static] |
const uint32_t VisNet::VIS_REPLY_NONE = 103 [static] |
const uint32_t VisNet::VIS_REPLY_OBJECT = 104 [static] |
WaitList VisNet::waiting_ [private] |
Definition at line 173 of file VisNet.h.
Referenced by losePeer(), releaseFromWait(), releaseWaiters(), run(), and waitForData().
lat::Pipe VisNet::wakeup_ [private] |