00001 #include "Iguana/Framework/interface/IgNet.h"
00002 #include "Iguana/Framework/interface/IgNetError.h"
00003 #include "classlib/sysapi/InetSocket.h"
00004 #include "classlib/iobase/Filename.h"
00005 #include "classlib/utils/TimeInfo.h"
00006 #include "classlib/utils/StringList.h"
00007 #include "classlib/utils/StringFormat.h"
00008 #include "classlib/utils/StringOps.h"
00009 #include "classlib/utils/SystemError.h"
00010 #include "classlib/utils/Regexp.h"
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <sys/wait.h>
00014 #include <stdio.h>
00015 #include <stdint.h>
00016 #include <iostream>
00017 #include <cassert>
00018
00019 #define MESSAGE_SIZE_LIMIT (2*1024*1024)
00020 #define SOCKET_BUF_SIZE (8*1024*1024)
00021 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
00022 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
00023
00024 using namespace lat;
00025
00027
00028 std::ostream &
00029 IgNet::logme (void)
00030 {
00031 return std::cerr
00032 << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00033 << " " << appname_ << "[" << pid_ << "]: ";
00034 }
00035
00036
00037 void
00038 IgNet::copydata(Bucket *b, const void *data, size_t len)
00039 {
00040 b->data.insert(b->data.end(),
00041 (const unsigned char *)data,
00042 (const unsigned char *)data + len);
00043 }
00044
00045
00046 void
00047 IgNet::discard (Bucket *&b)
00048 {
00049 while (b)
00050 {
00051 Bucket *next = b->next;
00052 delete b;
00053 b = next;
00054 }
00055 }
00056
00058
00061 bool
00062 IgNet::losePeer(const char *reason,
00063 Peer *peer,
00064 IOSelectEvent *ev,
00065 Error *err)
00066 {
00067 if (reason)
00068 logme ()
00069 << reason << peer->peeraddr
00070 << (err ? "; error was: " + err->explain() : std::string(""))
00071 << std::endl;
00072
00073 Socket *s = peer->socket;
00074
00075 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00076 if (i->peer == peer)
00077 waiting_.erase(i++);
00078 else
00079 ++i;
00080
00081 if (ev)
00082 ev->source = 0;
00083
00084 discard(peer->sendq);
00085 if (peer->automatic)
00086 peer->automatic->peer = 0;
00087
00088 sel_.detach(s);
00089 s->close();
00090 removePeer(peer, s);
00091 delete s;
00092 return true;
00093 }
00094
00096 void
00097 IgNet::requestObject(Peer *p, const char *name, size_t len)
00098 {
00099 Bucket **msg = &p->sendq;
00100 while (*msg)
00101 msg = &(*msg)->next;
00102 *msg = new Bucket;
00103 (*msg)->next = 0;
00104
00105 uint32_t words[3];
00106 words[0] = sizeof(words) + len;
00107 words[1] = VIS_MSG_GET_OBJECT;
00108 words[2] = len;
00109 (*msg)->data.reserve((*msg)->data.size() + words[0]);
00110 copydata(*msg, words, sizeof(words));
00111 copydata(*msg, name, len);
00112 }
00113
00116 void
00117 IgNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00118 {
00119
00120
00121
00122
00123
00124 requestObject(owner, name.size() ? &name[0] : 0, name.size());
00125 WaitObject wo = { Time::current(), name, info, p };
00126 waiting_.push_back(wo);
00127 p->waiting++;
00128 }
00129
00130
00131
00132 void
00133 IgNet::releaseFromWait(WaitList::iterator i, Object *o)
00134 {
00135 Bucket **msg = &i->peer->sendq;
00136 while (*msg)
00137 msg = &(*msg)->next;
00138 *msg = new Bucket;
00139 (*msg)->next = 0;
00140
00141 releaseFromWait(*msg, *i, o);
00142
00143 assert(i->peer->waiting > 0);
00144 i->peer->waiting--;
00145 waiting_.erase(i);
00146 }
00147
00148
00149 void
00150 IgNet::releaseWaiters(Object *o)
00151 {
00152 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00153 if (i->name == o->name)
00154 releaseFromWait(i++, o);
00155 else
00156 ++i;
00157 }
00158
00160
00161 bool
00162 IgNet::shouldStop(void)
00163 {
00164 return shutdown_;
00165 }
00166
00167
00168
00169 void
00170 IgNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00171 {
00172 if (o)
00173 sendObjectToPeer (msg, *o, true);
00174 else
00175 {
00176 uint32_t words[3];
00177 words[0] = sizeof(words) + w.name.size();
00178 words[1] = VIS_REPLY_NONE;
00179 words[2] = w.name.size();
00180
00181 msg->data.reserve(msg->data.size() + words[0]);
00182 copydata(msg, &words[0], sizeof(words));
00183 copydata(msg, &w.name[0], w.name.size());
00184 }
00185 }
00186
00187
00188
00189
00190 void
00191 IgNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00192 {
00193 uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE;
00194 DataBlob objdata;
00195
00196 if (data || (flags & VIS_FLAG_SCALAR))
00197 objdata.insert(objdata.end(),
00198 &o.rawdata[0],
00199 &o.rawdata[0] + o.rawdata.size());
00200
00201 uint32_t words[7];
00202 uint32_t namelen = o.name.size();
00203 uint32_t datalen = objdata.size();
00204
00205 words[0] = sizeof(words) + namelen + datalen;
00206 words[1] = VIS_REPLY_OBJECT;
00207 words[2] = flags;
00208 words[3] = (o.version >> 0 ) & 0xffffffff;
00209 words[4] = (o.version >> 32) & 0xffffffff;
00210 words[5] = namelen;
00211 words[6] = datalen;
00212
00213 msg->data.reserve(msg->data.size() + words[0]);
00214 copydata(msg, &words[0], sizeof(words));
00215 if (namelen)
00216 copydata(msg, &o.name[0], namelen);
00217 if (datalen)
00218 copydata(msg, &objdata[0], datalen);
00219 }
00220
00222
00223 bool
00224 IgNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00225 {
00226
00227 uint32_t type;
00228 memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00229 switch (type)
00230 {
00231 case VIS_MSG_UPDATE_ME:
00232 {
00233 if (len != 2*sizeof(uint32_t))
00234 {
00235 logme()
00236 << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00237 << " from peer " << p->peeraddr << std::endl;
00238 return false;
00239 }
00240
00241 if (debug_)
00242 logme()
00243 << "DEBUG: received message 'UPDATE ME' from peer "
00244 << p->peeraddr << std::endl;
00245
00246 p->update = true;
00247 }
00248 return true;
00249
00250 case VIS_MSG_LIST_OBJECTS:
00251 {
00252 if (debug_)
00253 logme()
00254 << "DEBUG: received message 'LIST OBJECTS' from peer "
00255 << p->peeraddr << std::endl;
00256
00257
00258 lock();
00259 sendObjectListToPeer(msg, true, false);
00260 unlock();
00261 }
00262 return true;
00263
00264 case VIS_MSG_GET_OBJECT:
00265 {
00266 if (debug_)
00267 logme()
00268 << "DEBUG: received message 'GET OBJECT' from peer "
00269 << p->peeraddr << std::endl;
00270
00271 if (len < 3*sizeof(uint32_t))
00272 {
00273 logme()
00274 << "ERROR: corrupt 'GET IMAGE' message of length " << len
00275 << " from peer " << p->peeraddr << std::endl;
00276 return false;
00277 }
00278
00279 uint32_t namelen;
00280 memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00281 if (len != 3*sizeof(uint32_t) + namelen)
00282 {
00283 logme()
00284 << "ERROR: corrupt 'GET OBJECT' message of length " << len
00285 << " from peer " << p->peeraddr
00286 << ", expected length " << (3*sizeof(uint32_t))
00287 << " + " << namelen << std::endl;
00288 return false;
00289 }
00290
00291 lock();
00292 std::string name((char *) data + 3*sizeof(uint32_t), namelen);
00293 Peer *owner = 0;
00294 Object *o = findObject(0, name, &owner);
00295 if (o)
00296 {
00297 o->lastreq = Time::current();
00298 if (o->rawdata.empty())
00299 waitForData(p, name, "", owner);
00300 else
00301 sendObjectToPeer(msg, *o, true);
00302 }
00303 else
00304 {
00305 uint32_t words[3];
00306 words[0] = sizeof(words) + name.size();
00307 words[1] = VIS_REPLY_NONE;
00308 words[2] = name.size();
00309
00310 msg->data.reserve(msg->data.size() + words[0]);
00311 copydata(msg, &words[0], sizeof(words));
00312 copydata(msg, &name[0], name.size());
00313 }
00314 unlock();
00315 }
00316 return true;
00317
00318 case VIS_REPLY_LIST_BEGIN:
00319 {
00320 if (len != 4*sizeof(uint32_t))
00321 {
00322 logme()
00323 << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00324 << " from peer " << p->peeraddr << std::endl;
00325 return false;
00326 }
00327
00328 if (debug_)
00329 logme()
00330 << "DEBUG: received message 'LIST BEGIN' from "
00331 << p->peeraddr << std::endl;
00332
00333
00334 uint32_t flags;
00335 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00336
00337
00338
00339
00340
00341
00342 if (flags)
00343 {
00344 lock();
00345 markObjectsZombies(p);
00346 unlock();
00347 }
00348 }
00349 return true;
00350
00351 case VIS_REPLY_LIST_END:
00352 {
00353 if (len != 4*sizeof(uint32_t))
00354 {
00355 logme()
00356 << "ERROR: corrupt 'LIST END' message of length " << len
00357 << " from peer " << p->peeraddr << std::endl;
00358 return false;
00359 }
00360
00361
00362 uint32_t flags;
00363 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00364
00365
00366
00367
00368
00369 if (flags)
00370 {
00371 lock();
00372 markObjectsDead(p);
00373 unlock();
00374 }
00375
00376 if (debug_)
00377 logme()
00378 << "DEBUG: received message 'LIST END' from "
00379 << p->peeraddr << std::endl;
00380
00381
00382
00383 flush_ = true;
00384 p->updates++;
00385 }
00386 return true;
00387
00388 case VIS_REPLY_OBJECT:
00389 {
00390 uint32_t words[7];
00391 if (len < sizeof(words))
00392 {
00393 logme()
00394 << "ERROR: corrupt 'OBJECT' message of length " << len
00395 << " from peer " << p->peeraddr << std::endl;
00396 return false;
00397 }
00398
00399 memcpy(&words[0], data, sizeof(words));
00400 uint32_t &namelen = words[5];
00401 uint32_t &datalen = words[6];
00402
00403 if (len != sizeof(words) + namelen + datalen)
00404 {
00405 logme()
00406 << "ERROR: corrupt 'OBJECT' message of length " << len
00407 << " from peer " << p->peeraddr
00408 << ", expected length " << sizeof(words)
00409 << " + " << namelen
00410 << " + " << datalen
00411 << std::endl;
00412 return false;
00413 }
00414
00415 unsigned char *namedata = data + sizeof(words);
00416 unsigned char *objdata = namedata + namelen;
00417 unsigned char *enddata = objdata + datalen;
00418 std::string name((char *) namedata, namelen);
00419 assert(enddata == data + len);
00420
00421 if (debug_)
00422 logme()
00423 << "DEBUG: received message 'OBJECT " << name
00424 << "' from " << p->peeraddr << std::endl;
00425
00426
00427 p->source = true;
00428
00429
00430 lock();
00431 Object *o = findObject(p, name);
00432 if (! o)
00433 o = makeObject(p, name);
00434
00435 bool hadobject = ! o->rawdata.empty();
00436 o->flags = words[2] | VIS_FLAG_RECEIVED;
00437 o->version = ((uint64_t) words[4] << 32 | words[3]);
00438 o->rawdata.clear();
00439 o->rawdata.insert(o->rawdata.end(), objdata, enddata);
00440
00441
00442
00443 if (hadobject && ! datalen)
00444 requestObject(p, (namelen ? &name[0] : 0), namelen);
00445
00446
00447 if (datalen)
00448 releaseWaiters(o);
00449 unlock();
00450 }
00451 return true;
00452
00453 case VIS_REPLY_NONE:
00454 {
00455 uint32_t words[3];
00456 if (len < sizeof(words))
00457 {
00458 logme()
00459 << "ERROR: corrupt 'NONE' message of length " << len
00460 << " from peer " << p->peeraddr << std::endl;
00461 return false;
00462 }
00463
00464 memcpy(&words[0], data, sizeof(words));
00465 uint32_t &namelen = words[2];
00466
00467 if (len != sizeof(words) + namelen)
00468 {
00469 logme()
00470 << "ERROR: corrupt 'NONE' message of length " << len
00471 << " from peer " << p->peeraddr
00472 << ", expected length " << sizeof(words)
00473 << " + " << namelen << std::endl;
00474 return false;
00475 }
00476
00477 unsigned char *namedata = data + sizeof(words);
00478 unsigned char *enddata = namedata + namelen;
00479 std::string name((char *) namedata, namelen);
00480 assert(enddata == data + len);
00481
00482 if (debug_)
00483 logme()
00484 << "DEBUG: received message 'NONE " << name
00485 << "' from " << p->peeraddr << std::endl;
00486
00487
00488 p->source = true;
00489
00490
00491 lock();
00492 Object *o = findObject(p, name);
00493 if (o)
00494 o->flags |= VIS_FLAG_DEAD;
00495
00496
00497 releaseWaiters(o);
00498 unlock();
00499 }
00500 return true;
00501
00502 default:
00503 logme()
00504 << "ERROR: unrecognised message of length " << len
00505 << " and type " << type << " from peer " << p->peeraddr
00506 << std::endl;
00507 return false;
00508 }
00509 }
00510
00513 bool
00514 IgNet::onPeerData(IOSelectEvent *ev, Peer *p)
00515 {
00516 assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00517
00518
00519
00520
00521
00522 if (ev->events & IOUrgent)
00523 {
00524 if (p->automatic)
00525 {
00526 logme()
00527 << "WARNING: connection to the server at " << p->peeraddr
00528 << " lost (will attempt to reconnect in 15 seconds)\n";
00529 return losePeer(0, p, ev);
00530 }
00531 else
00532 return losePeer("WARNING: lost peer connection ", p, ev);
00533 }
00534
00535
00536 if (ev->events & IOWrite)
00537 {
00538 while (Bucket *b = p->sendq)
00539 {
00540 IOSize len = b->data.size() - p->sendpos;
00541 const void *data = (len ? (const void *)&b->data[p->sendpos]
00542 : (const void *)&data);
00543 IOSize done;
00544
00545 try
00546 {
00547 done = (len ? ev->source->write (data, len) : 0);
00548 if (debug_ && len)
00549 logme()
00550 << "DEBUG: sent " << done << " bytes to peer "
00551 << p->peeraddr << std::endl;
00552 }
00553 catch (Error &e)
00554 {
00555 return losePeer("WARNING: unable to write to peer ",
00556 p, ev, &e);
00557 }
00558
00559 p->sendpos += done;
00560 if (p->sendpos == b->data.size())
00561 {
00562 Bucket *old = p->sendq;
00563 p->sendq = old->next;
00564 p->sendpos = 0;
00565 old->next = 0;
00566 discard(old);
00567 }
00568
00569 if (! done && len)
00570
00571 break;
00572 }
00573 }
00574
00575
00576
00577 if (ev->events & IORead)
00578 {
00579
00580
00581
00582 IOSize sz;
00583 try
00584 {
00585 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00586 do
00587 if ((sz = ev->source->read(&buf[0], buf.size())))
00588 {
00589 if (debug_)
00590 logme()
00591 << "DEBUG: received " << sz << " bytes from peer "
00592 << p->peeraddr << std::endl;
00593 DataBlob &data = p->incoming;
00594 if (data.capacity () < data.size () + sz)
00595 data.reserve (data.size() + SOCKET_READ_GROWTH);
00596 data.insert (data.end(), &buf[0], &buf[0] + sz);
00597 }
00598 while (sz == sizeof (buf));
00599 }
00600 catch (Error &e)
00601 {
00602 SystemError *next = dynamic_cast<SystemError *>(e.next());
00603 if (next && next->portable() == SysErr::ErrTryAgain)
00604 sz = 1;
00605 else
00606
00607 return losePeer("WARNING: failed to read from peer ",
00608 p, ev, &e);
00609 }
00610
00611
00612 size_t consumed = 0;
00613 DataBlob &data = p->incoming;
00614 while (data.size()-consumed >= sizeof(uint32_t)
00615 && p->waiting < MAX_PEER_WAITREQS)
00616 {
00617 uint32_t msglen;
00618 memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00619
00620 if (msglen >= MESSAGE_SIZE_LIMIT)
00621 return losePeer("WARNING: excessively large message from ", p, ev);
00622
00623 if (data.size()-consumed >= msglen)
00624 {
00625 bool valid = true;
00626 if (msglen < 2*sizeof(uint32_t))
00627 {
00628 logme()
00629 << "ERROR: corrupt peer message of length " << msglen
00630 << " from peer " << p->peeraddr << std::endl;
00631 valid = false;
00632 }
00633 else
00634 {
00635
00636 Bucket msg;
00637 msg.next = 0;
00638 valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00639
00640
00641 if (! msg.data.empty())
00642 {
00643 Bucket **prev = &p->sendq;
00644 while (*prev)
00645 prev = &(*prev)->next;
00646
00647 *prev = new Bucket;
00648 (*prev)->next = 0;
00649 (*prev)->data.swap(msg.data);
00650 }
00651 }
00652
00653 if (! valid)
00654 return losePeer("WARNING: data stream error with ", p, ev);
00655
00656 consumed += msglen;
00657 }
00658 else
00659 break;
00660 }
00661
00662 data.erase(data.begin(), data.begin()+consumed);
00663
00664
00665
00666
00667 if (sz == 0)
00668 sel_.setMask(p->socket, p->mask &= ~IORead);
00669 }
00670
00671
00672 return false;
00673 }
00674
00679 bool
00680 IgNet::onPeerConnect(IOSelectEvent *ev)
00681 {
00682
00683 assert(ev->source == server_);
00684
00685
00686 Socket *s = server_->accept();
00687 assert(s);
00688 assert(! s->isBlocking());
00689
00690
00691 Peer *p = createPeer(s);
00692 InetAddress peeraddr = ((InetSocket *) s)->peername();
00693 InetAddress myaddr = ((InetSocket *) s)->sockname();
00694 p->peeraddr = StringFormat("%1:%2")
00695 .arg(peeraddr.hostname())
00696 .arg(peeraddr.port());
00697 p->mask = IORead|IOUrgent;
00698 p->socket = s;
00699
00700
00701 if (debug_)
00702 logme()
00703 << "INFO: new peer " << p->peeraddr << " is now connected to "
00704 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00705
00706
00707 sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p));
00708
00709
00710 return false;
00711 }
00712
00719 bool
00720 IgNet::onLocalNotify(IOSelectEvent *ev)
00721 {
00722
00723 try
00724 {
00725 IOSize sz;
00726 unsigned char buf [1024];
00727 while ((sz = ev->source->read(buf, sizeof(buf))))
00728 ;
00729 }
00730 catch (Error &e)
00731 {
00732 SystemError *next = dynamic_cast<SystemError *>(e.next());
00733 if (next && next->portable() == SysErr::ErrTryAgain)
00734 ;
00735 else
00736 logme()
00737 << "WARNING: error reading from notification pipe: "
00738 << e.explain() << std::endl;
00739 }
00740
00741
00742 flush_ = true;
00743
00744
00745 return false;
00746 }
00747
00750 void
00751 IgNet::updateMask(Peer *p)
00752 {
00753 if (! p->socket)
00754 return;
00755
00756
00757 unsigned oldmask = p->mask;
00758 if (! p->sendq && (p->mask & IOWrite))
00759 sel_.setMask(p->socket, p->mask &= ~IOWrite);
00760
00761 if (p->sendq && ! (p->mask & IOWrite))
00762 sel_.setMask(p->socket, p->mask |= IOWrite);
00763
00764 if (debug_ && oldmask != p->mask)
00765 logme()
00766 << "DEBUG: updating mask for " << p->peeraddr << " to "
00767 << p->mask << " from " << oldmask << std::endl;
00768
00769
00770
00771 if (p->mask == IOUrgent && ! p->waiting)
00772 {
00773 assert(! p->sendq);
00774 if (debug_)
00775 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
00776 losePeer(0, p, 0);
00777 }
00778 }
00779
00781 IgNet::IgNet (const std::string &appname )
00782 : debug_ (false),
00783 appname_ (appname.empty() ? "IgNet" : appname.c_str()),
00784 pid_ (getpid()),
00785 server_ (0),
00786 version_ (Time::current()),
00787 communicate_ ((pthread_t) -1),
00788 shutdown_ (0),
00789 delay_ (1000),
00790 flush_ (false)
00791 {
00792
00793
00794
00795 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
00796 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &IgNet::onLocalNotify));
00797
00798
00799 upstream_.peer = downstream_.peer = 0;
00800 upstream_.next = downstream_.next = 0;
00801 upstream_.port = downstream_.port = 0;
00802 upstream_.update = downstream_.update = false;
00803 upstream_.warned = downstream_.warned = false;
00804
00805 local_ = createPeer((Socket *) -1);
00806 }
00807
00808 IgNet::~IgNet(void)
00809 {
00810
00811 }
00812
00815 void
00816 IgNet::debug(bool doit)
00817 {
00818 debug_ = doit;
00819 }
00820
00823 void
00824 IgNet::delay(int delay)
00825 {
00826 delay_ = delay;
00827 }
00828
00832 void
00833 IgNet::startLocalServer(int port)
00834 {
00835 if (server_)
00836 {
00837 logme() << "ERROR: server was already started.\n";
00838 return;
00839 }
00840
00841 try
00842 {
00843 server_ = new InetServerSocket(InetAddress (port), 10);
00844 server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00845 server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00846 server_->setBlocking(false);
00847 sel_.attach(server_, IOAccept, CreateHook(this, &IgNet::onPeerConnect));
00848 }
00849 catch (Error &e)
00850 {
00851
00852
00853 logme()
00854 << "ERROR: Failed to start server at port " << port << ": "
00855 << e.explain() << std::endl;
00856
00857 throw IgNetError("Failed to start server at port ", e.clone());
00858
00859
00860
00861
00862
00863 }
00864
00865 logme() << "INFO: Shared memory server started at port " << port << std::endl;
00866 }
00867
00871 void
00872 IgNet::updateToCollector(const std::string &host, int port)
00873 {
00874 if (! downstream_.host.empty())
00875 {
00876 logme()
00877 << "ERROR: Already updating another collector at "
00878 << downstream_.host << ":" << downstream_.port << std::endl;
00879 return;
00880 }
00881
00882 downstream_.update = true;
00883 downstream_.host = host;
00884 downstream_.port = port;
00885 }
00886
00890 void
00891 IgNet::listenToSource(const std::string &host, int port)
00892 {
00893 if (! upstream_.host.empty())
00894 {
00895 logme()
00896 << "ERROR: Already receiving data from another collector at "
00897 << upstream_.host << ":" << upstream_.port << std::endl;
00898 return;
00899 }
00900
00901 upstream_.update = false;
00902 upstream_.host = host;
00903 upstream_.port = port;
00904 }
00905
00907 void
00908 IgNet::shutdown(void)
00909 {
00910 shutdown_ = 1;
00911 if (communicate_ != (pthread_t) -1)
00912 pthread_join(communicate_, 0);
00913 }
00914
00920 static void *communicate(void *obj)
00921 {
00922 sigset_t sigs;
00923 sigfillset(&sigs);
00924 pthread_sigmask(SIG_BLOCK, &sigs, 0);
00925 ((IgNet *)obj)->run();
00926 return 0;
00927 }
00928
00930 void
00931 IgNet::lock(void)
00932 {
00933 if (communicate_ != (pthread_t) -1)
00934 pthread_mutex_lock(&lock_);
00935 }
00936
00938 void
00939 IgNet::unlock(void)
00940 {
00941 if (communicate_ != (pthread_t) -1)
00942 pthread_mutex_unlock(&lock_);
00943 }
00944
00948 void
00949 IgNet::start(void)
00950 {
00951 if (communicate_ != (pthread_t) -1)
00952 {
00953 logme()
00954 << "ERROR: Shared memory networking thread has already been started\n";
00955 return;
00956 }
00957
00958 pthread_mutex_init(&lock_, 0);
00959 pthread_create(&communicate_, 0, &communicate, this);
00960 }
00961
00963 void
00964 IgNet::run(void)
00965 {
00966 Time now;
00967 AutoPeer *automatic[2] = { &upstream_, &downstream_ };
00968
00969
00970 while (! shouldStop())
00971 {
00972 for (int i = 0; i < 2; ++i)
00973 {
00974 AutoPeer *ap = automatic[i];
00975
00976
00977
00978
00979 if (! ap->host.empty()
00980 && ! ap->peer
00981 && (now = Time::current()) > ap->next)
00982 {
00983 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
00984 InetSocket *s = 0;
00985 try
00986 {
00987 s = new InetSocket (SocketConst::TypeStream);
00988 s->setBlocking (false);
00989 s->connect(InetAddress (ap->host.c_str(), ap->port));
00990 s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00991 s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00992 }
00993 catch (Error &e)
00994 {
00995 SystemError *sys = dynamic_cast<SystemError *>(e.next());
00996 if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
00997 {
00998
00999
01000
01001 if (! ap->warned)
01002 {
01003 logme()
01004 << "NOTE: server at " << ap->host << ":" << ap->port
01005 << " is unavailable. Connection will be established"
01006 << " automatically on the background once the server"
01007 << " becomes available. Error from the attempt was: "
01008 << e.explain() << '\n';
01009 ap->warned = true;
01010 }
01011
01012 if (s)
01013 s->abort();
01014 delete s;
01015 s = 0;
01016 }
01017 }
01018
01019
01020
01021 if (s)
01022 {
01023 lock();
01024 Peer *p = createPeer(s);
01025 ap->peer = p;
01026 ap->warned = false;
01027 unlock();
01028
01029 InetAddress peeraddr = ((InetSocket *) s)->peername();
01030 InetAddress myaddr = ((InetSocket *) s)->sockname();
01031 p->peeraddr = StringFormat("%1:%2")
01032 .arg(peeraddr.hostname())
01033 .arg(peeraddr.port());
01034 p->mask = IORead|IOWrite|IOUrgent;
01035 p->update = ap->update;
01036 p->automatic = ap;
01037 p->socket = s;
01038 sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p));
01039 if (ap == &upstream_)
01040 {
01041 uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS,
01042 2*sizeof(uint32_t), VIS_MSG_UPDATE_ME };
01043 p->sendq = new Bucket;
01044 p->sendq->next = 0;
01045 copydata(p->sendq, words, sizeof(words));
01046 }
01047
01048
01049 if (debug_)
01050 logme()
01051 << "INFO: now connected to " << p->peeraddr << " from "
01052 << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01053 }
01054 }
01055 }
01056
01057
01058 sel_.dispatch(delay_);
01059 now = Time::current();
01060
01061
01062
01063
01064 if (flush_)
01065 {
01066 flush_ = false;
01067
01068 lock();
01069 purgeDeadObjects(now - TimeSpan(0, 0, 2 , 0, 0),
01070 now - TimeSpan(0, 0, 20 , 0, 0));
01071 sendObjectListToPeers(true);
01072 unlock();
01073 }
01074
01075
01076
01077
01078
01079
01080 updatePeerMasks();
01081
01082
01083 lock();
01084 Time waitold = now - TimeSpan(0, 0, 2 , 0, 0);
01085 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01086 {
01087
01088 if (i->time < waitold)
01089 releaseFromWait(i++, findObject(0, i->name));
01090
01091
01092 else
01093 ++i;
01094 }
01095 unlock();
01096 }
01097 }
01098
01099
01100
01101 void
01102 IgNet::sendLocalChanges(void)
01103 {
01104 char byte = 0;
01105 wakeup_.sink()->write(&byte, 1);
01106 }
01107
01111 IgNet::Object *
01112 IgNet::findObject(Peer *p, const std::string &name, Peer **owner)
01113 {
01114 ObjectMap::iterator pos;
01115 PeerMap::iterator i, e;
01116 if (owner)
01117 *owner = 0;
01118 if (p)
01119 {
01120 pos = p->objs.find(name);
01121 if (pos == p->objs.end())
01122 return 0;
01123 else
01124 {
01125 if (owner) *owner = p;
01126 return &pos->second;
01127 }
01128 }
01129 else
01130 {
01131 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01132 {
01133 pos = i->second.objs.find(name);
01134 if (pos != i->second.objs.end())
01135 {
01136 if (owner) *owner = &i->second;
01137 return &pos->second;
01138 }
01139 }
01140 return 0;
01141 }
01142 }
01143
01144 IgNet::Object *
01145 IgNet::makeObject(Peer *p, const std::string &name)
01146 {
01147 Object *o = &p->objs[name];
01148 o->version = 0;
01149 o->name = name;
01150 o->flags = 0;
01151 o->lastreq = 0;
01152 return o;
01153 }
01154
01155
01156
01157
01158
01159
01160
01161 void
01162 IgNet::markObjectsZombies(Peer *p)
01163 {
01164 ObjectMap::iterator i, e;
01165 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01166 i->second.flags |= VIS_FLAG_ZOMBIE;
01167 }
01168
01169
01170 void
01171 IgNet::markObjectsDead(Peer *p)
01172 {
01173 ObjectMap::iterator i, e;
01174 for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01175 if (i->second.flags & VIS_FLAG_ZOMBIE)
01176 i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD;
01177 }
01178
01179
01180 void
01181 IgNet::purgeDeadObjects(lat::Time oldobj, lat::Time deadobj)
01182 {
01183 PeerMap::iterator pi, pe;
01184 ObjectMap::iterator oi, oe;
01185 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01186 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
01187 {
01188 Object &o = oi->second;
01189
01190
01191
01192 if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR))
01193 {
01194 if (debug_)
01195 logme()
01196 << "DEBUG: compacting idle '" << o.name
01197 << "' from " << pi->second.peeraddr << std::endl;
01198 }
01199
01200
01201 if (o.lastreq < deadobj
01202 && o.version < deadobj
01203 && (o.flags & VIS_FLAG_DEAD))
01204 {
01205 if (debug_)
01206 logme()
01207 << "DEBUG: removing dead '" << o.name
01208 << "' from " << pi->second.peeraddr << std::endl;
01209
01210 pi->second.objs.erase(oi++);
01211 }
01212 else
01213 ++oi;
01214 }
01215 }
01216
01217 IgNet::Peer *
01218 IgNet::getPeer(lat::Socket *s)
01219 {
01220 PeerMap::iterator pos = peers_.find(s);
01221 return pos == peers_.end() ? 0 : &pos->second;
01222 }
01223
01224 IgNet::Peer *
01225 IgNet::createPeer(lat::Socket *s)
01226 {
01227 Peer *p = &peers_[s];
01228 p->socket = 0;
01229 p->sendq = 0;
01230 p->sendpos = 0;
01231 p->mask = 0;
01232 p->source = false;
01233 p->update = false;
01234 p->updated = false;
01235 p->updates = 0;
01236 p->waiting = 0;
01237 p->automatic = 0;
01238 return p;
01239 }
01240
01241 void
01242 IgNet::removePeer(Peer *p, lat::Socket *s)
01243 {
01244 bool needflush = ! p->objs.empty();
01245
01246 p->objs.clear();
01247 peers_.erase(s);
01248
01249
01250
01251 if (needflush)
01252 sendLocalChanges();
01253 }
01254
01256 void
01257 IgNet::sendObjectListToPeer(Bucket *msg, bool all, bool clear)
01258 {
01259 PeerMap::iterator pi, pe;
01260 ObjectMap::iterator oi, oe;
01261 uint32_t numobjs = 0;
01262 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01263 numobjs += pi->second.objs.size();
01264
01265 msg->data.reserve(msg->data.size() + 300*numobjs);
01266
01267 uint32_t nupdates = 0;
01268 uint32_t words[4];
01269 words[0] = sizeof(words);
01270 words[1] = VIS_REPLY_LIST_BEGIN;
01271 words[2] = numobjs;
01272 words[3] = all;
01273 copydata(msg, &words[0], sizeof(words));
01274
01275 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01276 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
01277 if (all || (oi->second.flags & VIS_FLAG_NEW))
01278 {
01279 sendObjectToPeer(msg, oi->second, false);
01280 if (clear)
01281 oi->second.flags &= ~VIS_FLAG_NEW;
01282 ++nupdates;
01283 }
01284
01285 words[1] = VIS_REPLY_LIST_END;
01286 words[2] = nupdates;
01287 copydata(msg, &words[0], sizeof(words));
01288 }
01289
01290 void
01291 IgNet::sendObjectListToPeers(bool all)
01292 {
01293 PeerMap::iterator i, e;
01294 for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01295 {
01296 Peer &p = i->second;
01297 if (! p.update)
01298 continue;
01299
01300 if (debug_)
01301 logme()
01302 << "DEBUG: notifying " << p.peeraddr << std::endl;
01303
01304 Bucket msg;
01305 msg.next = 0;
01306 sendObjectListToPeer(&msg, !p.updated || all, true);
01307
01308 if (! msg.data.empty())
01309 {
01310 Bucket **prev = &p.sendq;
01311 while (*prev)
01312 prev = &(*prev)->next;
01313
01314 *prev = new Bucket;
01315 (*prev)->next = 0;
01316 (*prev)->data.swap(msg.data);
01317 }
01318 p.updated = true;
01319 }
01320 }
01321
01322 void
01323 IgNet::updatePeerMasks(void)
01324 {
01325 PeerMap::iterator i, e;
01326 for (i = peers_.begin(), e = peers_.end(); i != e; )
01327 updateMask(&(i++)->second);
01328 }
01329
01330
01334 int
01335 IgNet::receive(void (*callback) (void *arg, uint32_t reason, Object &obj), void *arg)
01336 {
01337 int updates = 0;
01338
01339 lock();
01340 PeerMap::iterator pi, pe;
01341 ObjectMap::iterator oi, oe;
01342 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01343 {
01344 Peer &p = pi->second;
01345 if (&p == local_)
01346 continue;
01347
01348 updates += p.updates;
01349
01350 for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01351 {
01352 Object &o = oi->second;
01353 if (o.flags & VIS_FLAG_DEAD)
01354 {
01355 callback(arg, VIS_FLAG_DEAD, o);
01356 p.objs.erase(oi++);
01357 }
01358 else if (o.flags & VIS_FLAG_RECEIVED)
01359 {
01360 callback(arg, VIS_FLAG_RECEIVED, o);
01361 o.flags &= ~VIS_FLAG_RECEIVED;
01362 ++oi;
01363 }
01364 else
01365 ++oi;
01366 }
01367 }
01368 unlock();
01369
01370 return updates;
01371 }
01372
01375 void
01376 IgNet::updateLocalObject(Object &o)
01377 {
01378 ObjectMap::iterator pos = local_->objs.find(o.name);
01379 if (pos == local_->objs.end())
01380 local_->objs.insert(ObjectMap::value_type(o.name, o));
01381 else
01382 {
01383 std::swap(pos->second.version, o.version);
01384 std::swap(pos->second.flags, o.flags);
01385 std::swap(pos->second.rawdata, o.rawdata);
01386 pos->second.lastreq = 0;
01387 }
01388 }
01389
01392 void
01393 IgNet::removeLocalObject(const std::string &path)
01394 {
01395 local_->objs.erase(path);
01396 }