CMS 3D CMS Logo

IgNet.cc

Go to the documentation of this file.
00001 #include "Iguana/Framework/interface/IgNet.h"
00002 #include "Iguana/Framework/interface/IgNetError.h"
00003 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
00004 #include "classlib/iobase/Filename.h"
00005 #include "classlib/utils/TimeInfo.h"
00006 #include "classlib/utils/StringList.h"
00007 #include "classlib/utils/StringFormat.h"
00008 #include "classlib/utils/StringOps.h"
00009 #include "classlib/utils/SystemError.h"
00010 #include "classlib/utils/Regexp.h"
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <sys/wait.h>
00014 #include <stdio.h>
00015 #include <stdint.h>
00016 #include <iostream>
00017 #include <cassert>
00018 
00019 #define MESSAGE_SIZE_LIMIT      (2*1024*1024)
00020 #define SOCKET_BUF_SIZE         (8*1024*1024)
00021 #define SOCKET_READ_SIZE        (SOCKET_BUF_SIZE/8)
00022 #define SOCKET_READ_GROWTH      (SOCKET_BUF_SIZE)
00023 
00024 using namespace lat;
00025 
00027 // Generate log prefix.
00028 std::ostream &
00029 IgNet::logme (void)
00030 {
00031   return std::cerr
00032     << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00033     << " " << appname_ << "[" << pid_ << "]: ";
00034 }
00035 
00036 // Append data into a bucket.
00037 void
00038 IgNet::copydata(Bucket *b, const void *data, size_t len)
00039 {
00040   b->data.insert(b->data.end(),
00041                  (const unsigned char *)data,
00042                  (const unsigned char *)data + len);
00043 }
00044 
00045 // Discard a bucket chain.
00046 void
00047 IgNet::discard (Bucket *&b)
00048 {
00049   while (b)
00050   {
00051     Bucket *next = b->next;
00052     delete b;
00053     b = next;
00054   }
00055 }
00056 
00058 
00061 bool
00062 IgNet::losePeer(const char *reason,
00063                  Peer *peer,
00064                  IOSelectEvent *ev,
00065                  Error *err)
00066 {
00067   if (reason)
00068     logme ()
00069       << reason << peer->peeraddr
00070       << (err ? "; error was: " + err->explain() : std::string(""))
00071       << std::endl;
00072 
00073   Socket *s = peer->socket;
00074 
00075   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00076     if (i->peer == peer)
00077       waiting_.erase(i++);
00078     else
00079       ++i;
00080 
00081   if (ev)
00082     ev->source = 0;
00083 
00084   discard(peer->sendq);
00085   if (peer->automatic)
00086     peer->automatic->peer = 0;
00087 
00088   sel_.detach(s);
00089   s->close();
00090   removePeer(peer, s);
00091   delete s;
00092   return true;
00093 }
00094 
00096 void
00097 IgNet::requestObject(Peer *p, const char *name, size_t len)
00098 {
00099   Bucket **msg = &p->sendq;
00100   while (*msg)
00101     msg = &(*msg)->next;
00102   *msg = new Bucket;
00103   (*msg)->next = 0;
00104 
00105   uint32_t words[3];
00106   words[0] = sizeof(words) + len;
00107   words[1] = VIS_MSG_GET_OBJECT;
00108   words[2] = len;
00109   (*msg)->data.reserve((*msg)->data.size() + words[0]);
00110   copydata(*msg, words, sizeof(words));
00111   copydata(*msg, name, len);
00112 }
00113 
00116 void
00117 IgNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00118 {
00119   // FIXME: Should we automatically record which exact peer the waiter
00120   // is expecting to deliver data so we know to release the waiter if
00121   // the other peer vanishes?  The current implementation stands a
00122   // chance for the waiter to wait indefinitely -- although we do
00123   // force terminate the wait after a while.
00124   requestObject(owner, name.size() ? &name[0] : 0, name.size());
00125   WaitObject wo = { Time::current(), name, info, p };
00126   waiting_.push_back(wo);
00127   p->waiting++;
00128 }
00129 
00130 // Once an object has been updated, this is invoked for all waiting
00131 // peers.  Send the object back to the peer in suitable form.
00132 void
00133 IgNet::releaseFromWait(WaitList::iterator i, Object *o)
00134 {
00135   Bucket **msg = &i->peer->sendq;
00136   while (*msg)
00137     msg = &(*msg)->next;
00138   *msg = new Bucket;
00139   (*msg)->next = 0;
00140 
00141   releaseFromWait(*msg, *i, o);
00142 
00143   assert(i->peer->waiting > 0);
00144   i->peer->waiting--;
00145   waiting_.erase(i);
00146 }
00147 
00148 // Release everyone waiting for the object @a o.
00149 void
00150 IgNet::releaseWaiters(Object *o)
00151 {
00152   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00153     if (i->name == o->name)
00154       releaseFromWait(i++, o);
00155     else
00156       ++i;
00157 }
00158 
00160 // Check if the network layer should stop.
00161 bool
00162 IgNet::shouldStop(void)
00163 {
00164   return shutdown_;
00165 }
00166 
00167 // Once an object has been updated, this is invoked for all waiting
00168 // peers.  Send the requested object to the waiting peer.
00169 void
00170 IgNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00171 {
00172   if (o)
00173     sendObjectToPeer (msg, *o, true);
00174   else
00175   {
00176     uint32_t words[3];
00177     words[0] = sizeof(words) + w.name.size();
00178     words[1] = VIS_REPLY_NONE;
00179     words[2] = w.name.size();
00180 
00181     msg->data.reserve(msg->data.size() + words[0]);
00182     copydata(msg, &words[0], sizeof(words));
00183     copydata(msg, &w.name[0], w.name.size());
00184   }
00185 }
00186 
00187 // Send an object to a peer.  If not @a data, only sends a summary
00188 // without the object data, except the data is always sent for scalar
00189 // objects.
00190 void
00191 IgNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00192 {
00193   uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE;
00194   DataBlob objdata;
00195 
00196   if (data || (flags & VIS_FLAG_SCALAR))
00197     objdata.insert(objdata.end(),
00198                    &o.rawdata[0],
00199                    &o.rawdata[0] + o.rawdata.size());
00200 
00201   uint32_t words[7];
00202   uint32_t namelen = o.name.size();
00203   uint32_t datalen = objdata.size();
00204 
00205   words[0] = sizeof(words) + namelen + datalen;
00206   words[1] = VIS_REPLY_OBJECT;
00207   words[2] = flags;
00208   words[3] = (o.version >> 0 ) & 0xffffffff;
00209   words[4] = (o.version >> 32) & 0xffffffff;
00210   words[5] = namelen;
00211   words[6] = datalen;
00212 
00213   msg->data.reserve(msg->data.size() + words[0]);
00214   copydata(msg, &words[0], sizeof(words));
00215   if (namelen)
00216     copydata(msg, &o.name[0], namelen);
00217   if (datalen)
00218     copydata(msg, &objdata[0], datalen);
00219 }
00220 
00222 // Handle peer messages.
00223 bool
00224 IgNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00225 {
00226   // Decode and process this message.
00227   uint32_t type;
00228   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00229   switch (type)
00230   {
00231   case VIS_MSG_UPDATE_ME:
00232     {
00233       if (len != 2*sizeof(uint32_t))
00234       {
00235         logme()
00236           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00237           << " from peer " << p->peeraddr << std::endl;
00238         return false;
00239       }
00240 
00241       if (debug_)
00242         logme()
00243           << "DEBUG: received message 'UPDATE ME' from peer "
00244           << p->peeraddr << std::endl;
00245 
00246       p->update = true;
00247     }
00248     return true;
00249 
00250   case VIS_MSG_LIST_OBJECTS:
00251     {
00252       if (debug_)
00253         logme()
00254           << "DEBUG: received message 'LIST OBJECTS' from peer "
00255           << p->peeraddr << std::endl;
00256 
00257       // Send over current status: list of known objects.
00258       lock();
00259       sendObjectListToPeer(msg, true, false);
00260       unlock();
00261     }
00262     return true;
00263 
00264   case VIS_MSG_GET_OBJECT:
00265     {
00266       if (debug_)
00267         logme()
00268           << "DEBUG: received message 'GET OBJECT' from peer "
00269           << p->peeraddr << std::endl;
00270 
00271       if (len < 3*sizeof(uint32_t))
00272       {
00273         logme()
00274           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00275           << " from peer " << p->peeraddr << std::endl;
00276         return false;
00277       }
00278 
00279       uint32_t namelen;
00280       memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00281       if (len != 3*sizeof(uint32_t) + namelen)
00282       {
00283         logme()
00284           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00285           << " from peer " << p->peeraddr
00286           << ", expected length " << (3*sizeof(uint32_t))
00287           << " + " << namelen << std::endl;
00288         return false;
00289       }
00290 
00291       lock();
00292       std::string name((char *) data + 3*sizeof(uint32_t), namelen);
00293       Peer *owner = 0;
00294       Object *o = findObject(0, name, &owner);
00295       if (o)
00296       {
00297         o->lastreq = Time::current();
00298         if (o->rawdata.empty())
00299           waitForData(p, name, "", owner);
00300         else
00301           sendObjectToPeer(msg, *o, true);
00302       }
00303       else
00304       {
00305         uint32_t words[3];
00306         words[0] = sizeof(words) + name.size();
00307         words[1] = VIS_REPLY_NONE;
00308         words[2] = name.size();
00309 
00310         msg->data.reserve(msg->data.size() + words[0]);
00311         copydata(msg, &words[0], sizeof(words));
00312         copydata(msg, &name[0], name.size());
00313       }
00314       unlock();
00315     }
00316     return true;
00317 
00318   case VIS_REPLY_LIST_BEGIN:
00319     {
00320       if (len != 4*sizeof(uint32_t))
00321       {
00322         logme()
00323           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00324           << " from peer " << p->peeraddr << std::endl;
00325         return false;
00326       }
00327 
00328       if (debug_)
00329         logme()
00330           << "DEBUG: received message 'LIST BEGIN' from "
00331           << p->peeraddr << std::endl;
00332 
00333       // Get the update status: whether this is a full update.
00334       uint32_t flags;
00335       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00336 
00337       // If we are about to receive a full list of objects, flag all
00338       // objects dead.  Subsequent object notifications will undo this
00339       // for the live objects.  This tells us the object has been
00340       // removed, but we can keep making it available for a while if
00341       // there continues to be interest in it.
00342       if (flags)
00343       {
00344         lock();
00345         markObjectsZombies(p);
00346         unlock();
00347       }
00348     }
00349     return true;
00350 
00351   case VIS_REPLY_LIST_END:
00352     {
00353       if (len != 4*sizeof(uint32_t))
00354       {
00355         logme()
00356           << "ERROR: corrupt 'LIST END' message of length " << len
00357           << " from peer " << p->peeraddr << std::endl;
00358         return false;
00359       }
00360 
00361       // Get the update status: whether this is a full update.
00362       uint32_t flags;
00363       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00364 
00365       // If we received a full list of objects, flag all zombie objects
00366       // now dead. We need to do this in two stages in case we receive
00367       // updates in many parts, and end up sending updates to others in
00368       // between; this avoids us lying live objects are dead.
00369       if (flags)
00370       {
00371         lock();
00372         markObjectsDead(p);
00373         unlock();
00374       }
00375 
00376       if (debug_)
00377         logme()
00378           << "DEBUG: received message 'LIST END' from "
00379           << p->peeraddr << std::endl;
00380 
00381       // Indicate we have received another update from this peer.
00382       // Also indicate we should flush to our clients.
00383       flush_ = true;
00384       p->updates++;
00385     }
00386     return true;
00387 
00388   case VIS_REPLY_OBJECT:
00389     {
00390       uint32_t words[7];
00391       if (len < sizeof(words))
00392       {
00393         logme()
00394           << "ERROR: corrupt 'OBJECT' message of length " << len
00395           << " from peer " << p->peeraddr << std::endl;
00396         return false;
00397       }
00398 
00399       memcpy(&words[0], data, sizeof(words));
00400       uint32_t &namelen = words[5];
00401       uint32_t &datalen = words[6];
00402 
00403       if (len != sizeof(words) + namelen + datalen)
00404       {
00405         logme()
00406           << "ERROR: corrupt 'OBJECT' message of length " << len
00407           << " from peer " << p->peeraddr
00408           << ", expected length " << sizeof(words)
00409           << " + " << namelen
00410           << " + " << datalen
00411           << std::endl;
00412         return false;
00413       }
00414 
00415       unsigned char *namedata = data + sizeof(words);
00416       unsigned char *objdata = namedata + namelen;
00417       unsigned char *enddata = objdata + datalen;
00418       std::string name((char *) namedata, namelen);
00419       assert(enddata == data + len);
00420 
00421       if (debug_)
00422         logme()
00423           << "DEBUG: received message 'OBJECT " << name
00424           << "' from " << p->peeraddr << std::endl;
00425 
00426       // Mark the peer as a known object source.
00427       p->source = true;
00428 
00429       // Initialise or update an object entry.
00430       lock();
00431       Object *o = findObject(p, name);
00432       if (! o)
00433         o = makeObject(p, name);
00434 
00435       bool hadobject = ! o->rawdata.empty();
00436       o->flags = words[2] | VIS_FLAG_RECEIVED;
00437       o->version = ((uint64_t) words[4] << 32 | words[3]);
00438       o->rawdata.clear();
00439       o->rawdata.insert(o->rawdata.end(), objdata, enddata);
00440 
00441       // If we had an object for this one already and this is a list
00442       // update without data, issue an immediate data get request.
00443       if (hadobject && ! datalen)
00444         requestObject(p, (namelen ? &name[0] : 0), namelen);
00445 
00446       // If we have the object data, release from wait.
00447       if (datalen)
00448         releaseWaiters(o);
00449       unlock();
00450     }
00451     return true;
00452 
00453   case VIS_REPLY_NONE:
00454     {
00455       uint32_t words[3];
00456       if (len < sizeof(words))
00457       {
00458         logme()
00459           << "ERROR: corrupt 'NONE' message of length " << len
00460           << " from peer " << p->peeraddr << std::endl;
00461         return false;
00462       }
00463 
00464       memcpy(&words[0], data, sizeof(words));
00465       uint32_t &namelen = words[2];
00466 
00467       if (len != sizeof(words) + namelen)
00468       {
00469         logme()
00470           << "ERROR: corrupt 'NONE' message of length " << len
00471           << " from peer " << p->peeraddr
00472           << ", expected length " << sizeof(words)
00473           << " + " << namelen << std::endl;
00474         return false;
00475       }
00476 
00477       unsigned char *namedata = data + sizeof(words);
00478       unsigned char *enddata = namedata + namelen;
00479       std::string name((char *) namedata, namelen);
00480       assert(enddata == data + len);
00481 
00482       if (debug_)
00483         logme()
00484           << "DEBUG: received message 'NONE " << name
00485           << "' from " << p->peeraddr << std::endl;
00486 
00487       // Mark the peer as a known object source.
00488       p->source = true;
00489 
00490       // If this was a known object, update its entry.
00491       lock();
00492       Object *o = findObject(p, name);
00493       if (o)
00494         o->flags |= VIS_FLAG_DEAD;
00495 
00496       // If someone was waiting for this, let them go.
00497       releaseWaiters(o);
00498       unlock();
00499     }
00500     return true;
00501 
00502   default:
00503     logme()
00504       << "ERROR: unrecognised message of length " << len
00505       << " and type " << type << " from peer " << p->peeraddr
00506       << std::endl;
00507     return false;
00508   }
00509 }
00510 
00513 bool
00514 IgNet::onPeerData(IOSelectEvent *ev, Peer *p)
00515 {
00516   assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00517 
00518   // If there is a problem with the peer socket, discard the peer
00519   // and tell the selector to stop prcessing events for it.  If
00520   // this is a server connection, we will eventually recreate
00521   // everything if/when the data server comes back.
00522   if (ev->events & IOUrgent)
00523   {
00524     if (p->automatic)
00525     {
00526       logme()
00527         << "WARNING: connection to the server at " << p->peeraddr
00528         << " lost (will attempt to reconnect in 15 seconds)\n";
00529       return losePeer(0, p, ev);
00530     }
00531     else
00532       return losePeer("WARNING: lost peer connection ", p, ev);
00533   }
00534 
00535   // If we can write to the peer socket, pump whatever we can into it.
00536   if (ev->events & IOWrite)
00537   {
00538     while (Bucket *b = p->sendq)
00539     {
00540       IOSize len = b->data.size() - p->sendpos;
00541       const void *data = (len ? (const void *)&b->data[p->sendpos]
00542                           : (const void *)&data);
00543       IOSize done;
00544 
00545       try
00546       {
00547         done = (len ? ev->source->write (data, len) : 0);
00548         if (debug_ && len)
00549           logme()
00550             << "DEBUG: sent " << done << " bytes to peer "
00551             << p->peeraddr << std::endl;
00552       }
00553       catch (Error &e)
00554       {
00555         return losePeer("WARNING: unable to write to peer ",
00556                         p, ev, &e);
00557       }
00558 
00559       p->sendpos += done;
00560       if (p->sendpos == b->data.size())
00561       {
00562         Bucket *old = p->sendq;
00563         p->sendq = old->next;
00564         p->sendpos = 0;
00565         old->next = 0;
00566         discard(old);
00567       }
00568 
00569       if (! done && len)
00570         // Cannot write any more.
00571         break;
00572     }
00573   }
00574 
00575   // If there is data to be read from the peer, first receive what we
00576   // can get out the socket, the process all complete requests.
00577   if (ev->events & IORead)
00578   {
00579     // First build up the incoming buffer of data in the socket.
00580     // Remember the last size returned by the socket; we need
00581     // it to determine if the remote end closed the connection.
00582     IOSize sz;
00583     try
00584     {
00585       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00586       do
00587         if ((sz = ev->source->read(&buf[0], buf.size())))
00588         {
00589           if (debug_)
00590             logme()
00591               << "DEBUG: received " << sz << " bytes from peer "
00592               << p->peeraddr << std::endl;
00593           DataBlob &data = p->incoming;
00594           if (data.capacity () < data.size () + sz)
00595             data.reserve (data.size() + SOCKET_READ_GROWTH);
00596           data.insert (data.end(), &buf[0], &buf[0] + sz);
00597         }
00598       while (sz == sizeof (buf));
00599     }
00600     catch (Error &e)
00601     {
00602       SystemError *next = dynamic_cast<SystemError *>(e.next());
00603       if (next && next->portable() == SysErr::ErrTryAgain)
00604         sz = 1; // Ignore it, and fake no end of data.
00605       else
00606         // Houston we have a problem.
00607         return losePeer("WARNING: failed to read from peer ",
00608                         p, ev, &e);
00609     }
00610 
00611     // Process fully received messages as long as we can.
00612     size_t consumed = 0;
00613     DataBlob &data = p->incoming;
00614     while (data.size()-consumed >= sizeof(uint32_t)
00615            && p->waiting < MAX_PEER_WAITREQS)
00616     {
00617       uint32_t msglen;
00618       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00619 
00620       if (msglen >= MESSAGE_SIZE_LIMIT)
00621         return losePeer("WARNING: excessively large message from ", p, ev);
00622 
00623       if (data.size()-consumed >= msglen)
00624       {
00625         bool valid = true;
00626         if (msglen < 2*sizeof(uint32_t))
00627         {
00628           logme()
00629             << "ERROR: corrupt peer message of length " << msglen
00630             << " from peer " << p->peeraddr << std::endl;
00631           valid = false;
00632         }
00633         else
00634         {
00635           // Decode and process this message.
00636           Bucket msg;
00637           msg.next = 0;
00638           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00639 
00640           // If we created a response, chain it to the write queue.
00641           if (! msg.data.empty())
00642           {
00643             Bucket **prev = &p->sendq;
00644             while (*prev)
00645                prev = &(*prev)->next;
00646 
00647             *prev = new Bucket;
00648             (*prev)->next = 0;
00649             (*prev)->data.swap(msg.data);
00650           }
00651         }
00652 
00653         if (! valid)
00654           return losePeer("WARNING: data stream error with ", p, ev);
00655 
00656         consumed += msglen;
00657       }
00658       else
00659         break;
00660     }
00661 
00662     data.erase(data.begin(), data.begin()+consumed);
00663 
00664     // If the client has closed the connection, shut down our end.  If
00665     // we have something to send back still, leave the write direction
00666     // open.  Otherwise close the shop for this client.
00667     if (sz == 0)
00668       sel_.setMask(p->socket, p->mask &= ~IORead);
00669   }
00670 
00671   // Yes, please keep processing events for this socket.
00672   return false;
00673 }
00674 
00679 bool
00680 IgNet::onPeerConnect(IOSelectEvent *ev)
00681 {
00682   // Recover the server socket.
00683   assert(ev->source == server_);
00684 
00685   // Accept the connection.
00686   Socket *s = server_->accept();
00687   assert(s);
00688   assert(! s->isBlocking());
00689 
00690   // Record it to our list of peers.
00691   Peer *p = createPeer(s);
00692   InetAddress peeraddr = ((InetSocket *) s)->peername();
00693   InetAddress myaddr = ((InetSocket *) s)->sockname();
00694   p->peeraddr = StringFormat("%1:%2")
00695                 .arg(peeraddr.hostname())
00696                 .arg(peeraddr.port());
00697   p->mask = IORead|IOUrgent;
00698   p->socket = s;
00699 
00700   // Report the new connection.
00701   if (debug_)
00702     logme()
00703       << "INFO: new peer " << p->peeraddr << " is now connected to "
00704       << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00705 
00706   // Attach it to the listener.
00707   sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p));
00708 
00709   // We are never done.
00710   return false;
00711 }
00712 
00719 bool
00720 IgNet::onLocalNotify(IOSelectEvent *ev)
00721 {
00722   // Discard the data in the pipe, we care only about the wakeup.
00723   try
00724   {
00725     IOSize sz;
00726     unsigned char buf [1024];
00727     while ((sz = ev->source->read(buf, sizeof(buf))))
00728       ;
00729   }
00730   catch (Error &e)
00731   {
00732     SystemError *next = dynamic_cast<SystemError *>(e.next());
00733     if (next && next->portable() == SysErr::ErrTryAgain)
00734       ; // Ignore it
00735     else
00736       logme()
00737         << "WARNING: error reading from notification pipe: "
00738         << e.explain() << std::endl;
00739   }
00740 
00741   // Tell the main event pump to send an update in a little while.
00742   flush_ = true;
00743 
00744   // We are never done, always keep going.
00745   return false;
00746 }
00747 
00750 void
00751 IgNet::updateMask(Peer *p)
00752 {
00753   if (! p->socket)
00754     return;
00755 
00756   // Listen to writes iff we have data to send.
00757   unsigned oldmask = p->mask;
00758   if (! p->sendq && (p->mask & IOWrite))
00759     sel_.setMask(p->socket, p->mask &= ~IOWrite);
00760 
00761   if (p->sendq && ! (p->mask & IOWrite))
00762     sel_.setMask(p->socket, p->mask |= IOWrite);
00763 
00764   if (debug_ && oldmask != p->mask)
00765     logme()
00766       << "DEBUG: updating mask for " << p->peeraddr << " to "
00767       << p->mask << " from " << oldmask << std::endl;
00768 
00769   // If we have nothing more to send and are no longer listening
00770   // for reads, close up the shop for this peer.
00771   if (p->mask == IOUrgent && ! p->waiting)
00772   {
00773     assert(! p->sendq);
00774     if (debug_)
00775       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
00776     losePeer(0, p, 0);
00777   }
00778 }
00779 
00781 IgNet::IgNet (const std::string &appname /* = "" */)
00782   : debug_ (false),
00783     appname_ (appname.empty() ? "IgNet" : appname.c_str()),
00784     pid_ (getpid()),
00785     server_ (0),
00786     version_ (Time::current()),
00787     communicate_ ((pthread_t) -1),
00788     shutdown_ (0),
00789     delay_ (1000),
00790     flush_ (false)
00791 {
00792   // Create a pipe for the local apps to tell the communicator
00793   // thread that local app data has changed and that the peers
00794   // should be notified.
00795   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
00796   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &IgNet::onLocalNotify));
00797 
00798   // Initialise the upstream and downstream to empty.
00799   upstream_.peer   = downstream_.peer   = 0;
00800   upstream_.next   = downstream_.next   = 0;
00801   upstream_.port   = downstream_.port   = 0;
00802   upstream_.update = downstream_.update = false;
00803   upstream_.warned = downstream_.warned = false;
00804 
00805   local_ = createPeer((Socket *) -1);
00806 }
00807 
00808 IgNet::~IgNet(void)
00809 {
00810   // FIXME
00811 }
00812 
00815 void
00816 IgNet::debug(bool doit)
00817 {
00818   debug_ = doit;
00819 }
00820 
00823 void
00824 IgNet::delay(int delay)
00825 {
00826   delay_ = delay;
00827 }
00828 
00832 void
00833 IgNet::startLocalServer(int port)
00834 {
00835   if (server_)
00836   {
00837     logme() << "ERROR: server was already started.\n";
00838     return;
00839   }
00840 
00841   try
00842   {
00843     server_ = new InetServerSocket(InetAddress (port), 10);
00844     server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00845     server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00846     server_->setBlocking(false);
00847     sel_.attach(server_, IOAccept, CreateHook(this, &IgNet::onPeerConnect));
00848   }
00849   catch (Error &e)
00850   {
00851     // FIXME: Do we need to do this when we throw an exception anyway?
00852     // FIXME: Abort instead?
00853     logme()
00854       << "ERROR: Failed to start server at port " << port << ": "
00855       << e.explain() << std::endl;
00856 
00857     throw IgNetError("Failed to start server at port ", e.clone());
00858     
00859     // FIXME: Throw something simpler that removes the dependency?
00860 //     throw cms::Exception("IgNet::startLocalServer")
00861 //       << "Failed to start server at port " << port << ": "
00862 //       << e.explain();
00863   }
00864   
00865   logme() << "INFO: Shared memory server started at port " << port << std::endl;
00866 }
00867 
00871 void
00872 IgNet::updateToCollector(const std::string &host, int port)
00873 {
00874   if (! downstream_.host.empty())
00875   {
00876     logme()
00877       << "ERROR: Already updating another collector at "
00878       << downstream_.host << ":" << downstream_.port << std::endl;
00879     return;
00880   }
00881 
00882   downstream_.update = true;
00883   downstream_.host = host;
00884   downstream_.port = port;
00885 }
00886 
00890 void
00891 IgNet::listenToSource(const std::string &host, int port)
00892 {
00893   if (! upstream_.host.empty())
00894   {
00895     logme()
00896       << "ERROR: Already receiving data from another collector at "
00897       << upstream_.host << ":" << upstream_.port << std::endl;
00898     return;
00899   }
00900 
00901   upstream_.update = false;
00902   upstream_.host = host;
00903   upstream_.port = port;
00904 }
00905 
00907 void
00908 IgNet::shutdown(void)
00909 {
00910   shutdown_ = 1;
00911   if (communicate_ != (pthread_t) -1)
00912     pthread_join(communicate_, 0);
00913 }
00914 
00920 static void *communicate(void *obj)
00921 {
00922   sigset_t sigs;
00923   sigfillset(&sigs);
00924   pthread_sigmask(SIG_BLOCK, &sigs, 0);
00925   ((IgNet *)obj)->run();
00926   return 0;
00927 }
00928 
00930 void
00931 IgNet::lock(void)
00932 {
00933   if (communicate_ != (pthread_t) -1)
00934     pthread_mutex_lock(&lock_);
00935 }
00936 
00938 void
00939 IgNet::unlock(void)
00940 {
00941   if (communicate_ != (pthread_t) -1)
00942     pthread_mutex_unlock(&lock_);
00943 }
00944 
00948 void
00949 IgNet::start(void)
00950 {
00951   if (communicate_ != (pthread_t) -1)
00952   {
00953     logme()
00954       << "ERROR: Shared memory networking thread has already been started\n";
00955     return;
00956   }
00957 
00958   pthread_mutex_init(&lock_, 0);
00959   pthread_create(&communicate_, 0, &communicate, this);
00960 }
00961 
00963 void
00964 IgNet::run(void)
00965 {
00966   Time now;
00967   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
00968 
00969   // Perform I/O.  Every once in a while flush updates to peers.
00970   while (! shouldStop())
00971   {
00972     for (int i = 0; i < 2; ++i)
00973     {
00974       AutoPeer *ap = automatic[i];
00975 
00976       // If we need a server connection and don't have one yet,
00977       // initiate asynchronous connection creation.  Swallow errors
00978       // in case the server won't talk to us.
00979       if (! ap->host.empty()
00980           && ! ap->peer
00981           && (now = Time::current()) > ap->next)
00982       {
00983         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
00984         InetSocket *s = 0;
00985         try
00986         {
00987           s = new InetSocket (SocketConst::TypeStream);
00988           s->setBlocking (false);
00989           s->connect(InetAddress (ap->host.c_str(), ap->port));
00990           s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00991           s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00992         }
00993         catch (Error &e)
00994         {
00995           SystemError *sys = dynamic_cast<SystemError *>(e.next());
00996           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
00997           {
00998             // "In progress" just means the connection is in progress.
00999             // The connection is ready when the socket is writeable.
01000             // Anything else is a real problem.
01001             if (! ap->warned)
01002             {
01003               logme()
01004                 << "NOTE: server at " << ap->host << ":" << ap->port
01005                 << " is unavailable.  Connection will be established"
01006                 << " automatically on the background once the server"
01007                 << " becomes available.  Error from the attempt was: "
01008                 << e.explain() << '\n';
01009               ap->warned = true;
01010             }
01011 
01012             if (s)
01013               s->abort();
01014             delete s;
01015             s = 0;
01016           }
01017         }
01018 
01019         // Set up with the selector if we were successful.  If this is
01020         // the upstream collector, queue a request for updates.
01021         if (s)
01022         {
01023           lock();
01024           Peer *p = createPeer(s);
01025           ap->peer = p;
01026           ap->warned = false;
01027           unlock();
01028 
01029           InetAddress peeraddr = ((InetSocket *) s)->peername();
01030           InetAddress myaddr = ((InetSocket *) s)->sockname();
01031           p->peeraddr = StringFormat("%1:%2")
01032                         .arg(peeraddr.hostname())
01033                         .arg(peeraddr.port());
01034           p->mask = IORead|IOWrite|IOUrgent;
01035           p->update = ap->update;
01036           p->automatic = ap;
01037           p->socket = s;
01038           sel_.attach(s, p->mask, CreateHook(this, &IgNet::onPeerData, p));
01039           if (ap == &upstream_)
01040           {
01041             uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS,
01042                                   2*sizeof(uint32_t), VIS_MSG_UPDATE_ME };
01043             p->sendq = new Bucket;
01044             p->sendq->next = 0;
01045             copydata(p->sendq, words, sizeof(words));
01046           }
01047 
01048           // Report the new connection.
01049           if (debug_)
01050             logme()
01051               << "INFO: now connected to " << p->peeraddr << " from "
01052               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01053         }
01054       }
01055     }
01056 
01057     // Pump events for a while.
01058     sel_.dispatch(delay_);
01059     now = Time::current();
01060 
01061     // Check if flush is required.  Flush only if one is needed.
01062     // Always sends the full object list.  Compact objects no longer
01063     // in active use before sending out the update.
01064     if (flush_)
01065     {
01066       flush_ = false;
01067 
01068       lock();
01069       purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0),
01070                        now - TimeSpan(0, 0, 20 /* minutes */, 0, 0));
01071       sendObjectListToPeers(true);
01072       unlock();
01073     }
01074 
01075     // Update the data server and peer selection masks.  If we
01076     // have no more data to send and listening for writes, remove
01077     // the write mask.  If we have something to write and aren't
01078     // listening for writes, start listening so we can send off
01079     // the data.
01080     updatePeerMasks();
01081 
01082     // Release peers that have been waiting for data for too long.
01083     lock();
01084     Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0);
01085     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01086     {
01087       // If the peer has waited for too long, send something.
01088       if (i->time < waitold)
01089         releaseFromWait(i++, findObject(0, i->name));
01090 
01091       // Keep it for now.
01092       else
01093         ++i;
01094     }
01095     unlock();
01096   }
01097 }
01098 
01099 // Tell the network cache that there have been local changes that
01100 // should be advertised to the downstream listeners.
01101 void
01102 IgNet::sendLocalChanges(void)
01103 {
01104   char byte = 0;
01105   wakeup_.sink()->write(&byte, 1);
01106 }
01107 
01111 IgNet::Object *
01112 IgNet::findObject(Peer *p, const std::string &name, Peer **owner)
01113 {
01114   ObjectMap::iterator pos;
01115   PeerMap::iterator i, e;
01116   if (owner)
01117     *owner = 0;
01118   if (p)
01119   {
01120     pos = p->objs.find(name);
01121     if (pos == p->objs.end())
01122       return 0;
01123     else
01124     {
01125       if (owner) *owner = p;
01126       return &pos->second;
01127     }
01128   }
01129   else
01130   {
01131     for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01132     {
01133       pos = i->second.objs.find(name);
01134       if (pos != i->second.objs.end())
01135       {
01136         if (owner) *owner = &i->second;
01137         return &pos->second;
01138       }
01139     }
01140     return 0;
01141   }
01142 }
01143 
01144 IgNet::Object *
01145 IgNet::makeObject(Peer *p, const std::string &name)
01146 {
01147   Object *o = &p->objs[name];
01148   o->version = 0;
01149   o->name = name;
01150   o->flags = 0;
01151   o->lastreq = 0;
01152   return o;
01153 }
01154 
01155 // Mark all the objects as zombies.  This is intended to be used
01156 // when starting to process a complete list of objects, in order
01157 // to flag the objects that need to be killed at the end.  After
01158 // call to this method, revive all live objects by removing the
01159 // VIS_FLAG_ZOMBIE flag, then call markObjectsDead() at the end
01160 // to flag dead as all remaining zombies.
01161 void
01162 IgNet::markObjectsZombies(Peer *p)
01163 {
01164   ObjectMap::iterator i, e;
01165   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01166     i->second.flags |= VIS_FLAG_ZOMBIE;
01167 }
01168 
01169 // Mark remaining zombie objects as dead.  See markObjectsZombies().
01170 void
01171 IgNet::markObjectsDead(Peer *p)
01172 {
01173   ObjectMap::iterator i, e;
01174   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01175     if (i->second.flags & VIS_FLAG_ZOMBIE)
01176       i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD;
01177 }
01178 
01179 // Purge all old and dead objects.
01180 void
01181 IgNet::purgeDeadObjects(lat::Time oldobj, lat::Time deadobj)
01182 {
01183   PeerMap::iterator pi, pe;
01184   ObjectMap::iterator oi, oe;
01185   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01186     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
01187     {
01188       Object &o = oi->second;
01189 
01190       // Compact non-scalar objects that are unused.  We send scalar
01191       // objects to the web server so we keep them around.
01192       if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR))
01193       {
01194         if (debug_)
01195           logme()
01196             << "DEBUG: compacting idle '" << o.name
01197             << "' from " << pi->second.peeraddr << std::endl;
01198       }
01199 
01200       // Remove if dead, old and unused.
01201       if (o.lastreq < deadobj
01202           && o.version < deadobj
01203           && (o.flags & VIS_FLAG_DEAD))
01204       {
01205         if (debug_)
01206           logme()
01207             << "DEBUG: removing dead '" << o.name
01208             << "' from " << pi->second.peeraddr << std::endl;
01209 
01210         pi->second.objs.erase(oi++);
01211       }
01212       else
01213         ++oi;
01214     }
01215 }
01216 
01217 IgNet::Peer *
01218 IgNet::getPeer(lat::Socket *s)
01219 {
01220   PeerMap::iterator pos = peers_.find(s);
01221   return pos == peers_.end() ? 0 : &pos->second;
01222 }
01223 
01224 IgNet::Peer *
01225 IgNet::createPeer(lat::Socket *s)
01226 {
01227   Peer *p = &peers_[s];
01228   p->socket = 0;
01229   p->sendq = 0;
01230   p->sendpos = 0;
01231   p->mask = 0;
01232   p->source = false;
01233   p->update = false;
01234   p->updated = false;
01235   p->updates = 0;
01236   p->waiting = 0;
01237   p->automatic = 0;
01238   return p;
01239 }
01240 
01241 void
01242 IgNet::removePeer(Peer *p, lat::Socket *s)
01243 {
01244   bool needflush = ! p->objs.empty();
01245 
01246   p->objs.clear();
01247   peers_.erase(s);
01248 
01249   // If we removed a peer with objects, our list of objects
01250   // has changed and we need to update downstream peers.
01251   if (needflush)
01252     sendLocalChanges();
01253 }
01254 
01256 void
01257 IgNet::sendObjectListToPeer(Bucket *msg, bool all, bool clear)
01258 {
01259   PeerMap::iterator pi, pe;
01260   ObjectMap::iterator oi, oe;
01261   uint32_t numobjs = 0;
01262   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01263     numobjs += pi->second.objs.size();
01264 
01265   msg->data.reserve(msg->data.size() + 300*numobjs);
01266 
01267   uint32_t nupdates = 0;
01268   uint32_t words[4];
01269   words[0] = sizeof(words);
01270   words[1] = VIS_REPLY_LIST_BEGIN;
01271   words[2] = numobjs;
01272   words[3] = all;
01273   copydata(msg, &words[0], sizeof(words));
01274 
01275   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01276     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
01277       if (all || (oi->second.flags & VIS_FLAG_NEW))
01278       {
01279         sendObjectToPeer(msg, oi->second, false);
01280         if (clear)
01281           oi->second.flags &= ~VIS_FLAG_NEW;
01282         ++nupdates;
01283       }
01284 
01285   words[1] = VIS_REPLY_LIST_END;
01286   words[2] = nupdates;
01287   copydata(msg, &words[0], sizeof(words));
01288 }
01289 
01290 void
01291 IgNet::sendObjectListToPeers(bool all)
01292 {
01293   PeerMap::iterator i, e;
01294   for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01295   {
01296     Peer &p = i->second;
01297     if (! p.update)
01298       continue;
01299 
01300     if (debug_)
01301       logme()
01302         << "DEBUG: notifying " << p.peeraddr << std::endl;
01303 
01304     Bucket msg;
01305     msg.next = 0;
01306     sendObjectListToPeer(&msg, !p.updated || all, true);
01307 
01308     if (! msg.data.empty())
01309     {
01310       Bucket **prev = &p.sendq;
01311       while (*prev)
01312         prev = &(*prev)->next;
01313 
01314       *prev = new Bucket;
01315       (*prev)->next = 0;
01316       (*prev)->data.swap(msg.data);
01317     }
01318     p.updated = true;
01319   }
01320 }
01321 
01322 void
01323 IgNet::updatePeerMasks(void)
01324 {
01325   PeerMap::iterator i, e;
01326   for (i = peers_.begin(), e = peers_.end(); i != e; )
01327     updateMask(&(i++)->second);
01328 }
01329 
01330 
01334 int
01335 IgNet::receive(void (*callback) (void *arg, uint32_t reason, Object &obj), void *arg)
01336 {
01337   int updates = 0;
01338 
01339   lock();
01340   PeerMap::iterator pi, pe;
01341   ObjectMap::iterator oi, oe;
01342   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01343   {
01344     Peer &p = pi->second;
01345     if (&p == local_)
01346       continue;
01347 
01348     updates += p.updates;
01349 
01350     for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01351     {
01352       Object &o = oi->second;
01353       if (o.flags & VIS_FLAG_DEAD)
01354       {
01355         callback(arg, VIS_FLAG_DEAD, o);
01356         p.objs.erase(oi++);
01357       }
01358       else if (o.flags & VIS_FLAG_RECEIVED)
01359       {
01360         callback(arg, VIS_FLAG_RECEIVED, o);
01361         o.flags &= ~VIS_FLAG_RECEIVED;
01362         ++oi;
01363       }
01364       else
01365         ++oi;
01366     }
01367   }
01368   unlock();
01369 
01370   return updates;
01371 }
01372 
01375 void
01376 IgNet::updateLocalObject(Object &o)
01377 {
01378   ObjectMap::iterator pos = local_->objs.find(o.name);
01379   if (pos == local_->objs.end())
01380     local_->objs.insert(ObjectMap::value_type(o.name, o));
01381   else
01382   {
01383     std::swap(pos->second.version,   o.version);
01384     std::swap(pos->second.flags,     o.flags);
01385     std::swap(pos->second.rawdata,   o.rawdata);
01386     pos->second.lastreq = 0;
01387   }
01388 }
01389 
01392 void
01393 IgNet::removeLocalObject(const std::string &path)
01394 {
01395   local_->objs.erase(path);
01396 }

Generated on Tue Jun 9 17:38:29 2009 for CMSSW by  doxygen 1.5.4