00001 #include "VisFramework/VisFrameworkBase/interface/VisNet.h"
00002 #include "FWCore/Utilities/interface/EDMException.h"
00003 #include "classlib/sysapi/InetSocket.h"
00004 #include "classlib/iobase/Filename.h"
00005 #include "classlib/utils/TimeInfo.h"
00006 #include "classlib/utils/StringList.h"
00007 #include "classlib/utils/StringFormat.h"
00008 #include "classlib/utils/StringOps.h"
00009 #include "classlib/utils/SystemError.h"
00010 #include "classlib/utils/Regexp.h"
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <sys/wait.h>
00014 #include <stdio.h>
00015 #include <stdint.h>
00016 #include <iostream>
00017 #include <cassert>
00018
00019 #define MESSAGE_SIZE_LIMIT (2*1024*1024)
00020 #define SOCKET_BUF_SIZE (8*1024*1024)
00021 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
00022 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
00023
00024 using namespace lat;
00025
00027
00028 std::ostream &
00029 VisNet::logme (void)
00030 {
00031 return std::cerr
00032 << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00033 << " " << appname_ << "[" << pid_ << "]: ";
00034 }
00035
00036
00037 void
00038 VisNet::copydata(Bucket *b, const void *data, size_t len)
00039 {
00040 b->data.insert(b->data.end(),
00041 (const unsigned char *)data,
00042 (const unsigned char *)data + len);
00043 }
00044
00045
00046 void
00047 VisNet::discard (Bucket *&b)
00048 {
00049 while (b)
00050 {
00051 Bucket *next = b->next;
00052 delete b;
00053 b = next;
00054 }
00055 }
00056
00058
00061 bool
00062 VisNet::losePeer(const char *reason,
00063 Peer *peer,
00064 IOSelectEvent *ev,
00065 Error *err)
00066 {
00067 if (reason)
00068 logme ()
00069 << reason << peer->peeraddr
00070 << (err ? "; error was: " + err->explain() : std::string(""))
00071 << std::endl;
00072
00073 Socket *s = peer->socket;
00074
00075 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00076 if (i->peer == peer)
00077 waiting_.erase(i++);
00078 else
00079 ++i;
00080
00081 if (ev)
00082 ev->source = 0;
00083
00084 discard(peer->sendq);
00085 if (peer->automatic)
00086 peer->automatic->peer = 0;
00087
00088 sel_.detach(s);
00089 s->close();
00090 removePeer(peer, s);
00091 delete s;
00092 return true;
00093 }
00094
00096 void
00097 VisNet::requestObject(Peer *p, const char *name, size_t len)
00098 {
00099 Bucket **msg = &p->sendq;
00100 while (*msg)
00101 msg = &(*msg)->next;
00102 *msg = new Bucket;
00103 (*msg)->next = 0;
00104
00105 uint32_t words[3];
00106 words[0] = sizeof(words) + len;
00107 words[1] = VIS_MSG_GET_OBJECT;
00108 words[2] = len;
00109 (*msg)->data.reserve((*msg)->data.size() + words[0]);
00110 copydata(*msg, words, sizeof(words));
00111 copydata(*msg, name, len);
00112 }
00113
00116 void
00117 VisNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00118 {
00119
00120
00121
00122
00123
00124 requestObject(owner, name.size() ? &name[0] : 0, name.size());
00125 WaitObject wo = { Time::current(), name, info, p };
00126 waiting_.push_back(wo);
00127 p->waiting++;
00128 }
00129
00130
00131
00132 void
00133 VisNet::releaseFromWait(WaitList::iterator i, Object *o)
00134 {
00135 Bucket **msg = &i->peer->sendq;
00136 while (*msg)
00137 msg = &(*msg)->next;
00138 *msg = new Bucket;
00139 (*msg)->next = 0;
00140
00141 releaseFromWait(*msg, *i, o);
00142
00143 assert(i->peer->waiting > 0);
00144 i->peer->waiting--;
00145 waiting_.erase(i);
00146 }
00147
00148
00149 void
00150 VisNet::releaseWaiters(Object *o)
00151 {
00152 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00153 if (i->name == o->name)
00154 releaseFromWait(i++, o);
00155 else
00156 ++i;
00157 }
00158
00160
00161 bool
00162 VisNet::shouldStop(void)
00163 {
00164 return shutdown_;
00165 }
00166
00167
00168
00169 void
00170 VisNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00171 {
00172 if (o)
00173 sendObjectToPeer (msg, *o, true);
00174 else
00175 {
00176 uint32_t words[3];
00177 words[0] = sizeof(words) + w.name.size();
00178 words[1] = VIS_REPLY_NONE;
00179 words[2] = w.name.size();
00180
00181 msg->data.reserve(msg->data.size() + words[0]);
00182 copydata(msg, &words[0], sizeof(words));
00183 copydata(msg, &w.name[0], w.name.size());
00184 }
00185 }
00186
00187
00188
00189
00190 void
00191 VisNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00192 {
00193 uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE;
00194 DataBlob objdata;
00195
00196 if (data || (flags & VIS_FLAG_SCALAR))
00197 objdata.insert(objdata.end(),
00198 &o.rawdata[0],
00199 &o.rawdata[0] + o.rawdata.size());
00200
00201 uint32_t words[7];
00202 uint32_t namelen = o.name.size();
00203 uint32_t datalen = objdata.size();
00204
00205 words[0] = sizeof(words) + namelen + datalen;
00206 words[1] = VIS_REPLY_OBJECT;
00207 words[2] = flags;
00208 words[3] = (o.version >> 0 ) & 0xffffffff;
00209 words[4] = (o.version >> 32) & 0xffffffff;
00210 words[5] = namelen;
00211 words[6] = datalen;
00212
00213 msg->data.reserve(msg->data.size() + words[0]);
00214 copydata(msg, &words[0], sizeof(words));
00215 if (namelen)
00216 copydata(msg, &o.name[0], namelen);
00217 if (datalen)
00218 copydata(msg, &objdata[0], datalen);
00219 }
00220
00222
00223 bool
00224 VisNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00225 {
00226
00227 uint32_t type;
00228 memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00229 switch (type)
00230 {
00231 case VIS_MSG_UPDATE_ME:
00232 {
00233 if (len != 2*sizeof(uint32_t))
00234 {
00235 logme()
00236 << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00237 << " from peer " << p->peeraddr << std::endl;
00238 return false;
00239 }
00240
00241 if (debug_)
00242 logme()
00243 << "DEBUG: received message 'UPDATE ME' from peer "
00244 << p->peeraddr << std::endl;
00245
00246 p->update = true;
00247 }
00248 return true;
00249
00250 case VIS_MSG_LIST_OBJECTS:
00251 {
00252 if (debug_)
00253 logme()
00254 << "DEBUG: received message 'LIST OBJECTS' from peer "
00255 << p->peeraddr << std::endl;
00256
00257
00258 lock();
00259 sendObjectListToPeer(msg, true, false);
00260 unlock();
00261 }
00262 return true;
00263
00264 case VIS_MSG_GET_OBJECT:
00265 {
00266 if (debug_)
00267 logme()
00268 << "DEBUG: received message 'GET OBJECT' from peer "
00269 << p->peeraddr << std::endl;
00270
00271 if (len < 3*sizeof(uint32_t))
00272 {
00273 logme()
00274 << "ERROR: corrupt 'GET IMAGE' message of length " << len
00275 << " from peer " << p->peeraddr << std::endl;
00276 return false;
00277 }
00278
00279 uint32_t namelen;
00280 memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00281 if (len != 3*sizeof(uint32_t) + namelen)
00282 {
00283 logme()
00284 << "ERROR: corrupt 'GET OBJECT' message of length " << len
00285 << " from peer " << p->peeraddr
00286 << ", expected length " << (3*sizeof(uint32_t))
00287 << " + " << namelen << std::endl;
00288 return false;
00289 }
00290
00291 lock();
00292 std::string name((char *) data + 3*sizeof(uint32_t), namelen);
00293 Peer *owner = 0;
00294 Object *o = findObject(0, name, &owner);
00295 if (o)
00296 {
00297 o->lastreq = Time::current();
00298 if (o->rawdata.empty())
00299 waitForData(p, name, "", owner);
00300 else
00301 sendObjectToPeer(msg, *o, true);
00302 }
00303 else
00304 {
00305 uint32_t words[3];
00306 words[0] = sizeof(words) + name.size();
00307 words[1] = VIS_REPLY_NONE;
00308 words[2] = name.size();
00309
00310 msg->data.reserve(msg->data.size() + words[0]);
00311 copydata(msg, &words[0], sizeof(words));
00312 copydata(msg, &name[0], name.size());
00313 }
00314 unlock();
00315 }
00316 return true;
00317
00318 case VIS_REPLY_LIST_BEGIN:
00319 {
00320 if (len != 4*sizeof(uint32_t))
00321 {
00322 logme()
00323 << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00324 << " from peer " << p->peeraddr << std::endl;
00325 return false;
00326 }
00327
00328 if (debug_)
00329 logme()
00330 << "DEBUG: received message 'LIST BEGIN' from "
00331 << p->peeraddr << std::endl;
00332
00333
00334 uint32_t flags;
00335 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00336
00337
00338
00339
00340
00341
00342 if (flags)
00343 {
00344 lock();
00345 markObjectsZombies(p);
00346 unlock();
00347 }
00348 }
00349 return true;
00350
00351 case VIS_REPLY_LIST_END:
00352 {
00353 if (len != 4*sizeof(uint32_t))
00354 {
00355 logme()
00356 << "ERROR: corrupt 'LIST END' message of length " << len
00357 << " from peer " << p->peeraddr << std::endl;
00358 return false;
00359 }
00360
00361
00362 uint32_t flags;
00363 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00364
00365
00366
00367
00368
00369 if (flags)
00370 {
00371 lock();
00372 markObjectsDead(p);
00373 unlock();
00374 }
00375
00376 if (debug_)
00377 logme()
00378 << "DEBUG: received message 'LIST END' from "
00379 << p->peeraddr << std::endl;
00380
00381
00382
00383 flush_ = true;
00384 p->updates++;
00385 }
00386 return true;
00387
00388 case VIS_REPLY_OBJECT:
00389 {
00390 uint32_t words[7];
00391 if (len < sizeof(words))
00392 {
00393 logme()
00394 << "ERROR: corrupt 'OBJECT' message of length " << len
00395 << " from peer " << p->peeraddr << std::endl;
00396 return false;
00397 }
00398
00399 memcpy(&words[0], data, sizeof(words));
00400 uint32_t &namelen = words[5];
00401 uint32_t &datalen = words[6];
00402
00403 if (len != sizeof(words) + namelen + datalen)
00404 {
00405 logme()
00406 << "ERROR: corrupt 'OBJECT' message of length " << len
00407 << " from peer " << p->peeraddr
00408 << ", expected length " << sizeof(words)
00409 << " + " << namelen
00410 << " + " << datalen
00411 << std::endl;
00412 return false;
00413 }
00414
00415 unsigned char *namedata = data + sizeof(words);
00416 unsigned char *objdata = namedata + namelen;
00417 unsigned char *enddata = objdata + datalen;
00418 std::string name((char *) namedata, namelen);
00419 assert(enddata == data + len);
00420
00421 if (debug_)
00422 logme()
00423 << "DEBUG: received message 'OBJECT " << name
00424 << "' from " << p->peeraddr << std::endl;
00425
00426
00427 p->source = true;
00428
00429
00430 lock();
00431 Object *o = findObject(p, name);
00432 if (! o)
00433 o = makeObject(p, name);
00434
00435 bool hadobject = ! o->rawdata.empty();
00436 o->flags = words[2] | VIS_FLAG_RECEIVED;
00437 o->version = ((uint64_t) words[4] << 32 | words[3]);
00438 o->rawdata.clear();
00439 o->rawdata.insert(o->rawdata.end(), objdata, enddata);
00440
00441
00442
00443 if (hadobject && ! datalen)
00444 requestObject(p, (namelen ? &name[0] : 0), namelen);
00445
00446
00447 if (datalen)
00448 releaseWaiters(o);
00449 unlock();
00450 }
00451 return true;
00452
00453 case VIS_REPLY_NONE:
00454 {
00455 uint32_t words[3];
00456 if (len < sizeof(words))
00457 {
00458 logme()
00459 << "ERROR: corrupt 'NONE' message of length " << len
00460 << " from peer " << p->peeraddr << std::endl;
00461 return false;
00462 }
00463
00464 memcpy(&words[0], data, sizeof(words));
00465 uint32_t &namelen = words[2];
00466
00467 if (len != sizeof(words) + namelen)
00468 {
00469 logme()
00470 << "ERROR: corrupt 'NONE' message of length " << len
00471 << " from peer " << p->peeraddr
00472 << ", expected length " << sizeof(words)
00473 << " + " << namelen << std::endl;
00474 return false;
00475 }
00476
00477 unsigned char *namedata = data + sizeof(words);
00478 unsigned char *enddata = namedata + namelen;
00479 std::string name((char *) namedata, namelen);
00480 assert(enddata == data + len);
00481
00482 if (debug_)
00483 logme()
00484 << "DEBUG: received message 'NONE " << name
00485 << "' from " << p->peeraddr << std::endl;
00486
00487
00488 p->source = true;
00489
00490
00491 lock();
00492 Object *o = findObject(p, name);
00493 if (o)
00494 o->flags |= VIS_FLAG_DEAD;
00495
00496
00497 releaseWaiters(o);
00498 unlock();
00499 }
00500 return true;
00501
00502 default:
00503 logme()
00504 << "ERROR: unrecognised message of length " << len
00505 << " and type " << type << " from peer " << p->peeraddr
00506 << std::endl;
00507 return false;
00508 }
00509 }
00510
00513 bool
00514 VisNet::onPeerData(IOSelectEvent *ev, Peer *p)
00515 {
00516 assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00517
00518
00519
00520
00521
00522 if (ev->events & IOUrgent)
00523 {
00524 if (p->automatic)
00525 {
00526 logme()
00527 << "WARNING: connection to the server at " << p->peeraddr
00528 << " lost (will attempt to reconnect in 15 seconds)\n";
00529 return losePeer(0, p, ev);
00530 }
00531 else
00532 return losePeer("WARNING: lost peer connection ", p, ev);
00533 }
00534
00535
00536 if (ev->events & IOWrite)
00537 {
00538 while (Bucket *b = p->sendq)
00539 {
00540 IOSize len = b->data.size() - p->sendpos;
00541 const void *data = (len ? (const void *)&b->data[p->sendpos]
00542 : (const void *)&data);
00543 IOSize done;
00544
00545 try
00546 {
00547 done = (len ? ev->source->write (data, len) : 0);
00548 if (debug_ && len)
00549 logme()
00550 << "DEBUG: sent " << done << " bytes to peer "
00551 << p->peeraddr << std::endl;
00552 }
00553 catch (Error &e)
00554 {
00555 return losePeer("WARNING: unable to write to peer ",
00556 p, ev, &e);
00557 }
00558
00559 p->sendpos += done;
00560 if (p->sendpos == b->data.size())
00561 {
00562 Bucket *old = p->sendq;
00563 p->sendq = old->next;
00564 p->sendpos = 0;
00565 old->next = 0;
00566 discard(old);
00567 }
00568
00569 if (! done && len)
00570
00571 break;
00572 }
00573 }
00574
00575
00576
00577 if (ev->events & IORead)
00578 {
00579
00580
00581
00582 IOSize sz;
00583 try
00584 {
00585 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00586 do
00587 if ((sz = ev->source->read(&buf[0], buf.size())))
00588 {
00589 if (debug_)
00590 logme()
00591 << "DEBUG: received " << sz << " bytes from peer "
00592 << p->peeraddr << std::endl;
00593 DataBlob &data = p->incoming;
00594 if (data.capacity () < data.size () + sz)
00595 data.reserve (data.size() + SOCKET_READ_GROWTH);
00596 data.insert (data.end(), &buf[0], &buf[0] + sz);
00597 }
00598 while (sz == sizeof (buf));
00599 }
00600 catch (Error &e)
00601 {
00602 SystemError *next = dynamic_cast<SystemError *>(e.next());
00603 if (next && next->portable() == SysErr::ErrTryAgain)
00604 sz = 1;
00605 else
00606
00607 return losePeer("WARNING: failed to read from peer ",
00608 p, ev, &e);
00609 }
00610
00611
00612 size_t consumed = 0;
00613 DataBlob &data = p->incoming;
00614 while (data.size()-consumed >= sizeof(uint32_t)
00615 && p->waiting < MAX_PEER_WAITREQS)
00616 {
00617 uint32_t msglen;
00618 memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00619
00620 if (msglen >= MESSAGE_SIZE_LIMIT)
00621 return losePeer("WARNING: excessively large message from ", p, ev);
00622
00623 if (data.size()-consumed >= msglen)
00624 {
00625 bool valid = true;
00626 if (msglen < 2*sizeof(uint32_t))
00627 {
00628 logme()
00629 << "ERROR: corrupt peer message of length " << msglen
00630 << " from peer " << p->peeraddr << std::endl;
00631 valid = false;
00632 }
00633 else
00634 {
00635
00636 Bucket msg;
00637 msg.next = 0;
00638 valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00639
00640
00641 if (! msg.data.empty())
00642 {
00643 Bucket **prev = &p->sendq;
00644 while (*prev)
00645 prev = &(*prev)->next;
00646
00647 *prev = new Bucket;
00648 (*prev)->next = 0;
00649 (*prev)->data.swap(msg.data);
00650 }
00651 }
00652
00653 if (! valid)
00654 return losePeer("WARNING: data stream error with ", p, ev);
00655
00656 consumed += msglen;
00657 }
00658 else
00659 break;
00660 }
00661
00662 data.erase(data.begin(), data.begin()+consumed);
00663
00664
00665
00666
00667 if (sz == 0)
00668 sel_.setMask(p->socket, p->mask &= ~IORead);
00669 }
00670
00671
00672 return false;
00673 }
00674
00679 bool
00680 VisNet::onPeerConnect(IOSelectEvent *ev)
00681 {
00682
00683 assert(ev->source == server_);
00684
00685
00686 Socket *s = server_->accept();
00687 assert(s);
00688 assert(! s->isBlocking());
00689
00690
00691 Peer *p = createPeer(s);
00692 InetAddress peeraddr = ((InetSocket *) s)->peername();
00693 InetAddress myaddr = ((InetSocket *) s)->sockname();
00694 p->peeraddr = StringFormat("%1:%2")
00695 .arg(peeraddr.hostname())
00696 .arg(peeraddr.port());
00697 p->mask = IORead|IOUrgent;
00698 p->socket = s;
00699
00700
00701 if (debug_)
00702 logme()
00703 << "INFO: new peer " << p->peeraddr << " is now connected to "
00704 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00705
00706
00707 sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p));
00708
00709
00710 return false;
00711 }
00712
00719 bool
00720 VisNet::onLocalNotify(IOSelectEvent *ev)
00721 {
00722
00723 try
00724 {
00725 IOSize sz;
00726 unsigned char buf [1024];
00727 while ((sz = ev->source->read(buf, sizeof(buf))))
00728 ;
00729 }
00730 catch (Error &e)
00731 {
00732 SystemError *next = dynamic_cast<SystemError *>(e.next());
00733 if (next && next->portable() == SysErr::ErrTryAgain)
00734 ;
00735 else
00736 logme()
00737 << "WARNING: error reading from notification pipe: "
00738 << e.explain() << std::endl;
00739 }
00740
00741
00742 flush_ = true;
00743
00744
00745 return false;
00746 }
00747
00750 void
00751 VisNet::updateMask(Peer *p)
00752 {
00753 if (! p->socket)
00754 return;
00755
00756
00757 unsigned oldmask = p->mask;
00758 if (! p->sendq && (p->mask & IOWrite))
00759 sel_.setMask(p->socket, p->mask &= ~IOWrite);
00760
00761 if (p->sendq && ! (p->mask & IOWrite))
00762 sel_.setMask(p->socket, p->mask |= IOWrite);
00763
00764 if (debug_ && oldmask != p->mask)
00765 logme()
00766 << "DEBUG: updating mask for " << p->peeraddr << " to "
00767 << p->mask << " from " << oldmask << std::endl;
00768
00769
00770
00771 if (p->mask == IOUrgent && ! p->waiting)
00772 {
00773 assert(! p->sendq);
00774 if (debug_)
00775 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
00776 losePeer(0, p, 0);
00777 }
00778 }
00779
00781 VisNet::VisNet (const std::string &appname )
00782 : debug_ (false),
00783 appname_ (appname.empty() ? "VisNet" : appname.c_str()),
00784 pid_ (getpid()),
00785 server_ (0),
00786 version_ (Time::current()),
00787 communicate_ ((pthread_t) -1),
00788 shutdown_ (0),
00789 delay_ (1000),
00790 flush_ (false)
00791 {
00792
00793
00794
00795 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
00796 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &VisNet::onLocalNotify));
00797
00798
00799 upstream_.peer = downstream_.peer = 0;
00800 upstream_.next = downstream_.next = 0;
00801 upstream_.port = downstream_.port = 0;
00802 upstream_.update = downstream_.update = false;
00803 upstream_.warned = downstream_.warned = false;
00804
00805 local_ = createPeer((Socket *) -1);
00806 }
00807
00808 VisNet::~VisNet(void)
00809 {
00810
00811 }
00812
00815 void
00816 VisNet::debug(bool doit)
00817 {
00818 debug_ = doit;
00819 }
00820
00823 void
00824 VisNet::delay(int delay)
00825 {
00826 delay_ = delay;
00827 }
00828
00832 void
00833 VisNet::startLocalServer(int port)
00834 {
00835 if (server_)
00836 {
00837 logme() << "ERROR: server was already started.\n";
00838 return;
00839 }
00840
00841 try
00842 {
00843 server_ = new InetServerSocket(InetAddress (port), 10);
00844 server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00845 server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00846 server_->setBlocking(false);
00847 sel_.attach(server_, IOAccept, CreateHook(this, &VisNet::onPeerConnect));
00848 }
00849 catch (Error &e)
00850 {
00851
00852
00853 logme()
00854 << "ERROR: Failed to start server at port " << port << ": "
00855 << e.explain() << std::endl;
00856
00857
00858 throw cms::Exception("VisNet::startLocalServer")
00859 << "Failed to start server at port " << port << ": "
00860 << e.explain();
00861 }
00862
00863 logme() << "INFO: Shared memory server started at port " << port << std::endl;
00864 }
00865
00869 void
00870 VisNet::updateToCollector(const std::string &host, int port)
00871 {
00872 if (! downstream_.host.empty())
00873 {
00874 logme()
00875 << "ERROR: Already updating another collector at "
00876 << downstream_.host << ":" << downstream_.port << std::endl;
00877 return;
00878 }
00879
00880 downstream_.update = true;
00881 downstream_.host = host;
00882 downstream_.port = port;
00883 }
00884
00888 void
00889 VisNet::listenToSource(const std::string &host, int port)
00890 {
00891 if (! upstream_.host.empty())
00892 {
00893 logme()
00894 << "ERROR: Already receiving data from another collector at "
00895 << upstream_.host << ":" << upstream_.port << std::endl;
00896 return;
00897 }
00898
00899 upstream_.update = false;
00900 upstream_.host = host;
00901 upstream_.port = port;
00902 }
00903
00905 void
00906 VisNet::shutdown(void)
00907 {
00908 shutdown_ = 1;
00909 if (communicate_ != (pthread_t) -1)
00910 pthread_join(communicate_, 0);
00911 }
00912
00918 static void *communicate(void *obj)
00919 {
00920 sigset_t sigs;
00921 sigfillset(&sigs);
00922 pthread_sigmask(SIG_BLOCK, &sigs, 0);
00923 ((VisNet *)obj)->run();
00924 return 0;
00925 }
00926
00928 void
00929 VisNet::lock(void)
00930 {
00931 if (communicate_ != (pthread_t) -1)
00932 pthread_mutex_lock(&lock_);
00933 }
00934
00936 void
00937 VisNet::unlock(void)
00938 {
00939 if (communicate_ != (pthread_t) -1)
00940 pthread_mutex_unlock(&lock_);
00941 }
00942
00946 void
00947 VisNet::start(void)
00948 {
00949 if (communicate_ != (pthread_t) -1)
00950 {
00951 logme()
00952 << "ERROR: Shared memory networking thread has already been started\n";
00953 return;
00954 }
00955
00956 pthread_mutex_init(&lock_, 0);
00957 pthread_create(&communicate_, 0, &communicate, this);
00958 }
00959
00961 void
00962 VisNet::run(void)
00963 {
00964 Time now;
00965 AutoPeer *automatic[2] = { &upstream_, &downstream_ };
00966
00967
00968 while (! shouldStop())
00969 {
00970 for (int i = 0; i < 2; ++i)
00971 {
00972 AutoPeer *ap = automatic[i];
00973
00974
00975
00976
00977 if (! ap->host.empty()
00978 && ! ap->peer
00979 && (now = Time::current()) > ap->next)
00980 {
00981 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
00982 InetSocket *s = 0;
00983 try
00984 {
00985 s = new InetSocket (SocketConst::TypeStream);
00986 s->setBlocking (false);
00987 s->connect(InetAddress (ap->host.c_str(), ap->port));
00988 s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00989 s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00990 }
00991 catch (Error &e)
00992 {
00993 SystemError *sys = dynamic_cast<SystemError *>(e.next());
00994 if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
00995 {
00996
00997
00998
00999 if (! ap->warned)
01000 {
01001 logme()
01002 << "NOTE: server at " << ap->host << ":" << ap->port
01003 << " is unavailable. Connection will be established"
01004 << " automatically on the background once the server"
01005 << " becomes available. Error from the attempt was: "
01006 << e.explain() << '\n';
01007 ap->warned = true;
01008 }
01009
01010 if (s)
01011 s->abort();
01012 delete s;
01013 s = 0;
01014 }
01015 }
01016
01017
01018
01019 if (s)
01020 {
01021 lock();
01022 Peer *p = createPeer(s);
01023 ap->peer = p;
01024 ap->warned = false;
01025 unlock();
01026
01027 InetAddress peeraddr = ((InetSocket *) s)->peername();
01028 InetAddress myaddr = ((InetSocket *) s)->sockname();
01029 p->peeraddr = StringFormat("%1:%2")
01030 .arg(peeraddr.hostname())
01031 .arg(peeraddr.port());
01032 p->mask = IORead|IOWrite|IOUrgent;
01033 p->update = ap->update;
01034 p->automatic = ap;
01035 p->socket = s;
01036 sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p));
01037 if (ap == &upstream_)
01038 {
01039 uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS,
01040 2*sizeof(uint32_t), VIS_MSG_UPDATE_ME };
01041 p->sendq = new Bucket;
01042 p->sendq->next = 0;
01043 copydata(p->sendq, words, sizeof(words));
01044 }
01045
01046
01047 if (debug_)
01048 logme()
01049 << "INFO: now connected to " << p->peeraddr << " from "
01050 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01051 }
01052 }
01053 }
01054
01055
01056 sel_.dispatch(delay_);
01057 now = Time::current();
01058
01059
01060
01061
01062 if (flush_)
01063 {
01064 flush_ = false;
01065
01066 lock();
01067 purgeDeadObjects(now - TimeSpan(0, 0, 2 , 0, 0),
01068 now - TimeSpan(0, 0, 20 , 0, 0));
01069 sendObjectListToPeers(true);
01070 unlock();
01071 }
01072
01073
01074
01075
01076
01077
01078 updatePeerMasks();
01079
01080
01081 lock();
01082 Time waitold = now - TimeSpan(0, 0, 2 , 0, 0);
01083 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01084 {
01085
01086 if (i->time < waitold)
01087 releaseFromWait(i++, findObject(0, i->name));
01088
01089
01090 else
01091 ++i;
01092 }
01093 unlock();
01094 }
01095 }
01096
01097
01098
01099 void
01100 VisNet::sendLocalChanges(void)
01101 {
01102 char byte = 0;
01103 wakeup_.sink()->write(&byte, 1);
01104 }
01105
01109 VisNet::Object *
01110 VisNet::findObject(Peer *p, const std::string &name, Peer **owner)
01111 {
01112 ObjectMap::iterator pos;
01113 PeerMap::iterator i, e;
01114 if (owner)
01115 *owner = 0;
01116 if (p)
01117 {
01118 pos = p->objs.find(name);
01119 if (pos == p->objs.end())
01120 return 0;
01121 else
01122 {
01123 if (owner) *owner = p;
01124 return &pos->second;
01125 }
01126 }
01127 else
01128 {
01129 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01130 {
01131 pos = i->second.objs.find(name);
01132 if (pos != i->second.objs.end())
01133 {
01134 if (owner) *owner = &i->second;
01135 return &pos->second;
01136 }
01137 }
01138 return 0;
01139 }
01140 }
01141
01142 VisNet::Object *
01143 VisNet::makeObject(Peer *p, const std::string &name)
01144 {
01145 Object *o = &p->objs[name];
01146 o->version = 0;
01147 o->name = name;
01148 o->flags = 0;
01149 o->lastreq = 0;
01150 return o;
01151 }
01152
01153
01154
01155
01156
01157
01158
01159 void
01160 VisNet::markObjectsZombies(Peer *p)
01161 {
01162 ObjectMap::iterator i, e;
01163 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01164 i->second.flags |= VIS_FLAG_ZOMBIE;
01165 }
01166
01167
01168 void
01169 VisNet::markObjectsDead(Peer *p)
01170 {
01171 ObjectMap::iterator i, e;
01172 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01173 if (i->second.flags & VIS_FLAG_ZOMBIE)
01174 i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD;
01175 }
01176
01177
01178 void
01179 VisNet::purgeDeadObjects(lat::Time oldobj, lat::Time deadobj)
01180 {
01181 PeerMap::iterator pi, pe;
01182 ObjectMap::iterator oi, oe;
01183 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01184 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
01185 {
01186 Object &o = oi->second;
01187
01188
01189
01190 if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR))
01191 {
01192 if (debug_)
01193 logme()
01194 << "DEBUG: compacting idle '" << o.name
01195 << "' from " << pi->second.peeraddr << std::endl;
01196 }
01197
01198
01199 if (o.lastreq < deadobj
01200 && o.version < deadobj
01201 && (o.flags & VIS_FLAG_DEAD))
01202 {
01203 if (debug_)
01204 logme()
01205 << "DEBUG: removing dead '" << o.name
01206 << "' from " << pi->second.peeraddr << std::endl;
01207
01208 pi->second.objs.erase(oi++);
01209 }
01210 else
01211 ++oi;
01212 }
01213 }
01214
01215 VisNet::Peer *
01216 VisNet::getPeer(lat::Socket *s)
01217 {
01218 PeerMap::iterator pos = peers_.find(s);
01219 return pos == peers_.end() ? 0 : &pos->second;
01220 }
01221
01222 VisNet::Peer *
01223 VisNet::createPeer(lat::Socket *s)
01224 {
01225 Peer *p = &peers_[s];
01226 p->socket = 0;
01227 p->sendq = 0;
01228 p->sendpos = 0;
01229 p->mask = 0;
01230 p->source = false;
01231 p->update = false;
01232 p->updated = false;
01233 p->updates = 0;
01234 p->waiting = 0;
01235 p->automatic = 0;
01236 return p;
01237 }
01238
01239 void
01240 VisNet::removePeer(Peer *p, lat::Socket *s)
01241 {
01242 bool needflush = ! p->objs.empty();
01243
01244 p->objs.clear();
01245 peers_.erase(s);
01246
01247
01248
01249 if (needflush)
01250 sendLocalChanges();
01251 }
01252
01254 void
01255 VisNet::sendObjectListToPeer(Bucket *msg, bool all, bool clear)
01256 {
01257 PeerMap::iterator pi, pe;
01258 ObjectMap::iterator oi, oe;
01259 uint32_t numobjs = 0;
01260 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01261 numobjs += pi->second.objs.size();
01262
01263 msg->data.reserve(msg->data.size() + 300*numobjs);
01264
01265 uint32_t nupdates = 0;
01266 uint32_t words[4];
01267 words[0] = sizeof(words);
01268 words[1] = VIS_REPLY_LIST_BEGIN;
01269 words[2] = numobjs;
01270 words[3] = all;
01271 copydata(msg, &words[0], sizeof(words));
01272
01273 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01274 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
01275 if (all || (oi->second.flags & VIS_FLAG_NEW))
01276 {
01277 sendObjectToPeer(msg, oi->second, false);
01278 if (clear)
01279 oi->second.flags &= ~VIS_FLAG_NEW;
01280 ++nupdates;
01281 }
01282
01283 words[1] = VIS_REPLY_LIST_END;
01284 words[2] = nupdates;
01285 copydata(msg, &words[0], sizeof(words));
01286 }
01287
01288 void
01289 VisNet::sendObjectListToPeers(bool all)
01290 {
01291 PeerMap::iterator i, e;
01292 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01293 {
01294 Peer &p = i->second;
01295 if (! p.update)
01296 continue;
01297
01298 if (debug_)
01299 logme()
01300 << "DEBUG: notifying " << p.peeraddr << std::endl;
01301
01302 Bucket msg;
01303 msg.next = 0;
01304 sendObjectListToPeer(&msg, !p.updated || all, true);
01305
01306 if (! msg.data.empty())
01307 {
01308 Bucket **prev = &p.sendq;
01309 while (*prev)
01310 prev = &(*prev)->next;
01311
01312 *prev = new Bucket;
01313 (*prev)->next = 0;
01314 (*prev)->data.swap(msg.data);
01315 }
01316 p.updated = true;
01317 }
01318 }
01319
01320 void
01321 VisNet::updatePeerMasks(void)
01322 {
01323 PeerMap::iterator i, e;
01324 for (i = peers_.begin(), e = peers_.end(); i != e; )
01325 updateMask(&(i++)->second);
01326 }
01327
01328
01332 int
01333 VisNet::receive(void (*callback) (void *arg, uint32_t reason, Object &obj), void *arg)
01334 {
01335 int updates = 0;
01336
01337 lock();
01338 PeerMap::iterator pi, pe;
01339 ObjectMap::iterator oi, oe;
01340 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01341 {
01342 Peer &p = pi->second;
01343 if (&p == local_)
01344 continue;
01345
01346 updates += p.updates;
01347
01348 for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01349 {
01350 Object &o = oi->second;
01351 if (o.flags & VIS_FLAG_DEAD)
01352 {
01353 callback(arg, VIS_FLAG_DEAD, o);
01354 p.objs.erase(oi++);
01355 }
01356 else if (o.flags & VIS_FLAG_RECEIVED)
01357 {
01358 callback(arg, VIS_FLAG_RECEIVED, o);
01359 o.flags &= ~VIS_FLAG_RECEIVED;
01360 ++oi;
01361 }
01362 else
01363 ++oi;
01364 }
01365 }
01366 unlock();
01367
01368 return updates;
01369 }
01370
01373 void
01374 VisNet::updateLocalObject(Object &o)
01375 {
01376 ObjectMap::iterator pos = local_->objs.find(o.name);
01377 if (pos == local_->objs.end())
01378 local_->objs.insert(ObjectMap::value_type(o.name, o));
01379 else
01380 {
01381 std::swap(pos->second.version, o.version);
01382 std::swap(pos->second.flags, o.flags);
01383 std::swap(pos->second.rawdata, o.rawdata);
01384 pos->second.lastreq = 0;
01385 }
01386 }
01387
01390 void
01391 VisNet::removeLocalObject(const std::string &path)
01392 {
01393 local_->objs.erase(path);
01394 }