CMS 3D CMS Logo

VisNet.cc

Go to the documentation of this file.
00001 #include "VisFramework/VisFrameworkBase/interface/VisNet.h"
00002 #include "FWCore/Utilities/interface/EDMException.h"
00003 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
00004 #include "classlib/iobase/Filename.h"
00005 #include "classlib/utils/TimeInfo.h"
00006 #include "classlib/utils/StringList.h"
00007 #include "classlib/utils/StringFormat.h"
00008 #include "classlib/utils/StringOps.h"
00009 #include "classlib/utils/SystemError.h"
00010 #include "classlib/utils/Regexp.h"
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <sys/wait.h>
00014 #include <stdio.h>
00015 #include <stdint.h>
00016 #include <iostream>
00017 #include <cassert>
00018 
00019 #define MESSAGE_SIZE_LIMIT      (2*1024*1024)
00020 #define SOCKET_BUF_SIZE         (8*1024*1024)
00021 #define SOCKET_READ_SIZE        (SOCKET_BUF_SIZE/8)
00022 #define SOCKET_READ_GROWTH      (SOCKET_BUF_SIZE)
00023 
00024 using namespace lat;
00025 
00027 // Generate log prefix.
00028 std::ostream &
00029 VisNet::logme (void)
00030 {
00031   return std::cerr
00032     << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00033     << " " << appname_ << "[" << pid_ << "]: ";
00034 }
00035 
00036 // Append data into a bucket.
00037 void
00038 VisNet::copydata(Bucket *b, const void *data, size_t len)
00039 {
00040   b->data.insert(b->data.end(),
00041                  (const unsigned char *)data,
00042                  (const unsigned char *)data + len);
00043 }
00044 
00045 // Discard a bucket chain.
00046 void
00047 VisNet::discard (Bucket *&b)
00048 {
00049   while (b)
00050   {
00051     Bucket *next = b->next;
00052     delete b;
00053     b = next;
00054   }
00055 }
00056 
00058 
00061 bool
00062 VisNet::losePeer(const char *reason,
00063                  Peer *peer,
00064                  IOSelectEvent *ev,
00065                  Error *err)
00066 {
00067   if (reason)
00068     logme ()
00069       << reason << peer->peeraddr
00070       << (err ? "; error was: " + err->explain() : std::string(""))
00071       << std::endl;
00072 
00073   Socket *s = peer->socket;
00074 
00075   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00076     if (i->peer == peer)
00077       waiting_.erase(i++);
00078     else
00079       ++i;
00080 
00081   if (ev)
00082     ev->source = 0;
00083 
00084   discard(peer->sendq);
00085   if (peer->automatic)
00086     peer->automatic->peer = 0;
00087 
00088   sel_.detach(s);
00089   s->close();
00090   removePeer(peer, s);
00091   delete s;
00092   return true;
00093 }
00094 
00096 void
00097 VisNet::requestObject(Peer *p, const char *name, size_t len)
00098 {
00099   Bucket **msg = &p->sendq;
00100   while (*msg)
00101     msg = &(*msg)->next;
00102   *msg = new Bucket;
00103   (*msg)->next = 0;
00104 
00105   uint32_t words[3];
00106   words[0] = sizeof(words) + len;
00107   words[1] = VIS_MSG_GET_OBJECT;
00108   words[2] = len;
00109   (*msg)->data.reserve((*msg)->data.size() + words[0]);
00110   copydata(*msg, words, sizeof(words));
00111   copydata(*msg, name, len);
00112 }
00113 
00116 void
00117 VisNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
00118 {
00119   // FIXME: Should we automatically record which exact peer the waiter
00120   // is expecting to deliver data so we know to release the waiter if
00121   // the other peer vanishes?  The current implementation stands a
00122   // chance for the waiter to wait indefinitely -- although we do
00123   // force terminate the wait after a while.
00124   requestObject(owner, name.size() ? &name[0] : 0, name.size());
00125   WaitObject wo = { Time::current(), name, info, p };
00126   waiting_.push_back(wo);
00127   p->waiting++;
00128 }
00129 
00130 // Once an object has been updated, this is invoked for all waiting
00131 // peers.  Send the object back to the peer in suitable form.
00132 void
00133 VisNet::releaseFromWait(WaitList::iterator i, Object *o)
00134 {
00135   Bucket **msg = &i->peer->sendq;
00136   while (*msg)
00137     msg = &(*msg)->next;
00138   *msg = new Bucket;
00139   (*msg)->next = 0;
00140 
00141   releaseFromWait(*msg, *i, o);
00142 
00143   assert(i->peer->waiting > 0);
00144   i->peer->waiting--;
00145   waiting_.erase(i);
00146 }
00147 
00148 // Release everyone waiting for the object @a o.
00149 void
00150 VisNet::releaseWaiters(Object *o)
00151 {
00152   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00153     if (i->name == o->name)
00154       releaseFromWait(i++, o);
00155     else
00156       ++i;
00157 }
00158 
00160 // Check if the network layer should stop.
00161 bool
00162 VisNet::shouldStop(void)
00163 {
00164   return shutdown_;
00165 }
00166 
00167 // Once an object has been updated, this is invoked for all waiting
00168 // peers.  Send the requested object to the waiting peer.
00169 void
00170 VisNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
00171 {
00172   if (o)
00173     sendObjectToPeer (msg, *o, true);
00174   else
00175   {
00176     uint32_t words[3];
00177     words[0] = sizeof(words) + w.name.size();
00178     words[1] = VIS_REPLY_NONE;
00179     words[2] = w.name.size();
00180 
00181     msg->data.reserve(msg->data.size() + words[0]);
00182     copydata(msg, &words[0], sizeof(words));
00183     copydata(msg, &w.name[0], w.name.size());
00184   }
00185 }
00186 
00187 // Send an object to a peer.  If not @a data, only sends a summary
00188 // without the object data, except the data is always sent for scalar
00189 // objects.
00190 void
00191 VisNet::sendObjectToPeer(Bucket *msg, Object &o, bool data)
00192 {
00193   uint32_t flags = o.flags & ~VIS_FLAG_ZOMBIE;
00194   DataBlob objdata;
00195 
00196   if (data || (flags & VIS_FLAG_SCALAR))
00197     objdata.insert(objdata.end(),
00198                    &o.rawdata[0],
00199                    &o.rawdata[0] + o.rawdata.size());
00200 
00201   uint32_t words[7];
00202   uint32_t namelen = o.name.size();
00203   uint32_t datalen = objdata.size();
00204 
00205   words[0] = sizeof(words) + namelen + datalen;
00206   words[1] = VIS_REPLY_OBJECT;
00207   words[2] = flags;
00208   words[3] = (o.version >> 0 ) & 0xffffffff;
00209   words[4] = (o.version >> 32) & 0xffffffff;
00210   words[5] = namelen;
00211   words[6] = datalen;
00212 
00213   msg->data.reserve(msg->data.size() + words[0]);
00214   copydata(msg, &words[0], sizeof(words));
00215   if (namelen)
00216     copydata(msg, &o.name[0], namelen);
00217   if (datalen)
00218     copydata(msg, &objdata[0], datalen);
00219 }
00220 
00222 // Handle peer messages.
00223 bool
00224 VisNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
00225 {
00226   // Decode and process this message.
00227   uint32_t type;
00228   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00229   switch (type)
00230   {
00231   case VIS_MSG_UPDATE_ME:
00232     {
00233       if (len != 2*sizeof(uint32_t))
00234       {
00235         logme()
00236           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00237           << " from peer " << p->peeraddr << std::endl;
00238         return false;
00239       }
00240 
00241       if (debug_)
00242         logme()
00243           << "DEBUG: received message 'UPDATE ME' from peer "
00244           << p->peeraddr << std::endl;
00245 
00246       p->update = true;
00247     }
00248     return true;
00249 
00250   case VIS_MSG_LIST_OBJECTS:
00251     {
00252       if (debug_)
00253         logme()
00254           << "DEBUG: received message 'LIST OBJECTS' from peer "
00255           << p->peeraddr << std::endl;
00256 
00257       // Send over current status: list of known objects.
00258       lock();
00259       sendObjectListToPeer(msg, true, false);
00260       unlock();
00261     }
00262     return true;
00263 
00264   case VIS_MSG_GET_OBJECT:
00265     {
00266       if (debug_)
00267         logme()
00268           << "DEBUG: received message 'GET OBJECT' from peer "
00269           << p->peeraddr << std::endl;
00270 
00271       if (len < 3*sizeof(uint32_t))
00272       {
00273         logme()
00274           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00275           << " from peer " << p->peeraddr << std::endl;
00276         return false;
00277       }
00278 
00279       uint32_t namelen;
00280       memcpy(&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00281       if (len != 3*sizeof(uint32_t) + namelen)
00282       {
00283         logme()
00284           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00285           << " from peer " << p->peeraddr
00286           << ", expected length " << (3*sizeof(uint32_t))
00287           << " + " << namelen << std::endl;
00288         return false;
00289       }
00290 
00291       lock();
00292       std::string name((char *) data + 3*sizeof(uint32_t), namelen);
00293       Peer *owner = 0;
00294       Object *o = findObject(0, name, &owner);
00295       if (o)
00296       {
00297         o->lastreq = Time::current();
00298         if (o->rawdata.empty())
00299           waitForData(p, name, "", owner);
00300         else
00301           sendObjectToPeer(msg, *o, true);
00302       }
00303       else
00304       {
00305         uint32_t words[3];
00306         words[0] = sizeof(words) + name.size();
00307         words[1] = VIS_REPLY_NONE;
00308         words[2] = name.size();
00309 
00310         msg->data.reserve(msg->data.size() + words[0]);
00311         copydata(msg, &words[0], sizeof(words));
00312         copydata(msg, &name[0], name.size());
00313       }
00314       unlock();
00315     }
00316     return true;
00317 
00318   case VIS_REPLY_LIST_BEGIN:
00319     {
00320       if (len != 4*sizeof(uint32_t))
00321       {
00322         logme()
00323           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00324           << " from peer " << p->peeraddr << std::endl;
00325         return false;
00326       }
00327 
00328       if (debug_)
00329         logme()
00330           << "DEBUG: received message 'LIST BEGIN' from "
00331           << p->peeraddr << std::endl;
00332 
00333       // Get the update status: whether this is a full update.
00334       uint32_t flags;
00335       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00336 
00337       // If we are about to receive a full list of objects, flag all
00338       // objects dead.  Subsequent object notifications will undo this
00339       // for the live objects.  This tells us the object has been
00340       // removed, but we can keep making it available for a while if
00341       // there continues to be interest in it.
00342       if (flags)
00343       {
00344         lock();
00345         markObjectsZombies(p);
00346         unlock();
00347       }
00348     }
00349     return true;
00350 
00351   case VIS_REPLY_LIST_END:
00352     {
00353       if (len != 4*sizeof(uint32_t))
00354       {
00355         logme()
00356           << "ERROR: corrupt 'LIST END' message of length " << len
00357           << " from peer " << p->peeraddr << std::endl;
00358         return false;
00359       }
00360 
00361       // Get the update status: whether this is a full update.
00362       uint32_t flags;
00363       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00364 
00365       // If we received a full list of objects, flag all zombie objects
00366       // now dead. We need to do this in two stages in case we receive
00367       // updates in many parts, and end up sending updates to others in
00368       // between; this avoids us lying live objects are dead.
00369       if (flags)
00370       {
00371         lock();
00372         markObjectsDead(p);
00373         unlock();
00374       }
00375 
00376       if (debug_)
00377         logme()
00378           << "DEBUG: received message 'LIST END' from "
00379           << p->peeraddr << std::endl;
00380 
00381       // Indicate we have received another update from this peer.
00382       // Also indicate we should flush to our clients.
00383       flush_ = true;
00384       p->updates++;
00385     }
00386     return true;
00387 
00388   case VIS_REPLY_OBJECT:
00389     {
00390       uint32_t words[7];
00391       if (len < sizeof(words))
00392       {
00393         logme()
00394           << "ERROR: corrupt 'OBJECT' message of length " << len
00395           << " from peer " << p->peeraddr << std::endl;
00396         return false;
00397       }
00398 
00399       memcpy(&words[0], data, sizeof(words));
00400       uint32_t &namelen = words[5];
00401       uint32_t &datalen = words[6];
00402 
00403       if (len != sizeof(words) + namelen + datalen)
00404       {
00405         logme()
00406           << "ERROR: corrupt 'OBJECT' message of length " << len
00407           << " from peer " << p->peeraddr
00408           << ", expected length " << sizeof(words)
00409           << " + " << namelen
00410           << " + " << datalen
00411           << std::endl;
00412         return false;
00413       }
00414 
00415       unsigned char *namedata = data + sizeof(words);
00416       unsigned char *objdata = namedata + namelen;
00417       unsigned char *enddata = objdata + datalen;
00418       std::string name((char *) namedata, namelen);
00419       assert(enddata == data + len);
00420 
00421       if (debug_)
00422         logme()
00423           << "DEBUG: received message 'OBJECT " << name
00424           << "' from " << p->peeraddr << std::endl;
00425 
00426       // Mark the peer as a known object source.
00427       p->source = true;
00428 
00429       // Initialise or update an object entry.
00430       lock();
00431       Object *o = findObject(p, name);
00432       if (! o)
00433         o = makeObject(p, name);
00434 
00435       bool hadobject = ! o->rawdata.empty();
00436       o->flags = words[2] | VIS_FLAG_RECEIVED;
00437       o->version = ((uint64_t) words[4] << 32 | words[3]);
00438       o->rawdata.clear();
00439       o->rawdata.insert(o->rawdata.end(), objdata, enddata);
00440 
00441       // If we had an object for this one already and this is a list
00442       // update without data, issue an immediate data get request.
00443       if (hadobject && ! datalen)
00444         requestObject(p, (namelen ? &name[0] : 0), namelen);
00445 
00446       // If we have the object data, release from wait.
00447       if (datalen)
00448         releaseWaiters(o);
00449       unlock();
00450     }
00451     return true;
00452 
00453   case VIS_REPLY_NONE:
00454     {
00455       uint32_t words[3];
00456       if (len < sizeof(words))
00457       {
00458         logme()
00459           << "ERROR: corrupt 'NONE' message of length " << len
00460           << " from peer " << p->peeraddr << std::endl;
00461         return false;
00462       }
00463 
00464       memcpy(&words[0], data, sizeof(words));
00465       uint32_t &namelen = words[2];
00466 
00467       if (len != sizeof(words) + namelen)
00468       {
00469         logme()
00470           << "ERROR: corrupt 'NONE' message of length " << len
00471           << " from peer " << p->peeraddr
00472           << ", expected length " << sizeof(words)
00473           << " + " << namelen << std::endl;
00474         return false;
00475       }
00476 
00477       unsigned char *namedata = data + sizeof(words);
00478       unsigned char *enddata = namedata + namelen;
00479       std::string name((char *) namedata, namelen);
00480       assert(enddata == data + len);
00481 
00482       if (debug_)
00483         logme()
00484           << "DEBUG: received message 'NONE " << name
00485           << "' from " << p->peeraddr << std::endl;
00486 
00487       // Mark the peer as a known object source.
00488       p->source = true;
00489 
00490       // If this was a known object, update its entry.
00491       lock();
00492       Object *o = findObject(p, name);
00493       if (o)
00494         o->flags |= VIS_FLAG_DEAD;
00495 
00496       // If someone was waiting for this, let them go.
00497       releaseWaiters(o);
00498       unlock();
00499     }
00500     return true;
00501 
00502   default:
00503     logme()
00504       << "ERROR: unrecognised message of length " << len
00505       << " and type " << type << " from peer " << p->peeraddr
00506       << std::endl;
00507     return false;
00508   }
00509 }
00510 
00513 bool
00514 VisNet::onPeerData(IOSelectEvent *ev, Peer *p)
00515 {
00516   assert(getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00517 
00518   // If there is a problem with the peer socket, discard the peer
00519   // and tell the selector to stop prcessing events for it.  If
00520   // this is a server connection, we will eventually recreate
00521   // everything if/when the data server comes back.
00522   if (ev->events & IOUrgent)
00523   {
00524     if (p->automatic)
00525     {
00526       logme()
00527         << "WARNING: connection to the server at " << p->peeraddr
00528         << " lost (will attempt to reconnect in 15 seconds)\n";
00529       return losePeer(0, p, ev);
00530     }
00531     else
00532       return losePeer("WARNING: lost peer connection ", p, ev);
00533   }
00534 
00535   // If we can write to the peer socket, pump whatever we can into it.
00536   if (ev->events & IOWrite)
00537   {
00538     while (Bucket *b = p->sendq)
00539     {
00540       IOSize len = b->data.size() - p->sendpos;
00541       const void *data = (len ? (const void *)&b->data[p->sendpos]
00542                           : (const void *)&data);
00543       IOSize done;
00544 
00545       try
00546       {
00547         done = (len ? ev->source->write (data, len) : 0);
00548         if (debug_ && len)
00549           logme()
00550             << "DEBUG: sent " << done << " bytes to peer "
00551             << p->peeraddr << std::endl;
00552       }
00553       catch (Error &e)
00554       {
00555         return losePeer("WARNING: unable to write to peer ",
00556                         p, ev, &e);
00557       }
00558 
00559       p->sendpos += done;
00560       if (p->sendpos == b->data.size())
00561       {
00562         Bucket *old = p->sendq;
00563         p->sendq = old->next;
00564         p->sendpos = 0;
00565         old->next = 0;
00566         discard(old);
00567       }
00568 
00569       if (! done && len)
00570         // Cannot write any more.
00571         break;
00572     }
00573   }
00574 
00575   // If there is data to be read from the peer, first receive what we
00576   // can get out the socket, the process all complete requests.
00577   if (ev->events & IORead)
00578   {
00579     // First build up the incoming buffer of data in the socket.
00580     // Remember the last size returned by the socket; we need
00581     // it to determine if the remote end closed the connection.
00582     IOSize sz;
00583     try
00584     {
00585       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00586       do
00587         if ((sz = ev->source->read(&buf[0], buf.size())))
00588         {
00589           if (debug_)
00590             logme()
00591               << "DEBUG: received " << sz << " bytes from peer "
00592               << p->peeraddr << std::endl;
00593           DataBlob &data = p->incoming;
00594           if (data.capacity () < data.size () + sz)
00595             data.reserve (data.size() + SOCKET_READ_GROWTH);
00596           data.insert (data.end(), &buf[0], &buf[0] + sz);
00597         }
00598       while (sz == sizeof (buf));
00599     }
00600     catch (Error &e)
00601     {
00602       SystemError *next = dynamic_cast<SystemError *>(e.next());
00603       if (next && next->portable() == SysErr::ErrTryAgain)
00604         sz = 1; // Ignore it, and fake no end of data.
00605       else
00606         // Houston we have a problem.
00607         return losePeer("WARNING: failed to read from peer ",
00608                         p, ev, &e);
00609     }
00610 
00611     // Process fully received messages as long as we can.
00612     size_t consumed = 0;
00613     DataBlob &data = p->incoming;
00614     while (data.size()-consumed >= sizeof(uint32_t)
00615            && p->waiting < MAX_PEER_WAITREQS)
00616     {
00617       uint32_t msglen;
00618       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00619 
00620       if (msglen >= MESSAGE_SIZE_LIMIT)
00621         return losePeer("WARNING: excessively large message from ", p, ev);
00622 
00623       if (data.size()-consumed >= msglen)
00624       {
00625         bool valid = true;
00626         if (msglen < 2*sizeof(uint32_t))
00627         {
00628           logme()
00629             << "ERROR: corrupt peer message of length " << msglen
00630             << " from peer " << p->peeraddr << std::endl;
00631           valid = false;
00632         }
00633         else
00634         {
00635           // Decode and process this message.
00636           Bucket msg;
00637           msg.next = 0;
00638           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00639 
00640           // If we created a response, chain it to the write queue.
00641           if (! msg.data.empty())
00642           {
00643             Bucket **prev = &p->sendq;
00644             while (*prev)
00645                prev = &(*prev)->next;
00646 
00647             *prev = new Bucket;
00648             (*prev)->next = 0;
00649             (*prev)->data.swap(msg.data);
00650           }
00651         }
00652 
00653         if (! valid)
00654           return losePeer("WARNING: data stream error with ", p, ev);
00655 
00656         consumed += msglen;
00657       }
00658       else
00659         break;
00660     }
00661 
00662     data.erase(data.begin(), data.begin()+consumed);
00663 
00664     // If the client has closed the connection, shut down our end.  If
00665     // we have something to send back still, leave the write direction
00666     // open.  Otherwise close the shop for this client.
00667     if (sz == 0)
00668       sel_.setMask(p->socket, p->mask &= ~IORead);
00669   }
00670 
00671   // Yes, please keep processing events for this socket.
00672   return false;
00673 }
00674 
00679 bool
00680 VisNet::onPeerConnect(IOSelectEvent *ev)
00681 {
00682   // Recover the server socket.
00683   assert(ev->source == server_);
00684 
00685   // Accept the connection.
00686   Socket *s = server_->accept();
00687   assert(s);
00688   assert(! s->isBlocking());
00689 
00690   // Record it to our list of peers.
00691   Peer *p = createPeer(s);
00692   InetAddress peeraddr = ((InetSocket *) s)->peername();
00693   InetAddress myaddr = ((InetSocket *) s)->sockname();
00694   p->peeraddr = StringFormat("%1:%2")
00695                 .arg(peeraddr.hostname())
00696                 .arg(peeraddr.port());
00697   p->mask = IORead|IOUrgent;
00698   p->socket = s;
00699 
00700   // Report the new connection.
00701   if (debug_)
00702     logme()
00703       << "INFO: new peer " << p->peeraddr << " is now connected to "
00704       << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00705 
00706   // Attach it to the listener.
00707   sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p));
00708 
00709   // We are never done.
00710   return false;
00711 }
00712 
00719 bool
00720 VisNet::onLocalNotify(IOSelectEvent *ev)
00721 {
00722   // Discard the data in the pipe, we care only about the wakeup.
00723   try
00724   {
00725     IOSize sz;
00726     unsigned char buf [1024];
00727     while ((sz = ev->source->read(buf, sizeof(buf))))
00728       ;
00729   }
00730   catch (Error &e)
00731   {
00732     SystemError *next = dynamic_cast<SystemError *>(e.next());
00733     if (next && next->portable() == SysErr::ErrTryAgain)
00734       ; // Ignore it
00735     else
00736       logme()
00737         << "WARNING: error reading from notification pipe: "
00738         << e.explain() << std::endl;
00739   }
00740 
00741   // Tell the main event pump to send an update in a little while.
00742   flush_ = true;
00743 
00744   // We are never done, always keep going.
00745   return false;
00746 }
00747 
00750 void
00751 VisNet::updateMask(Peer *p)
00752 {
00753   if (! p->socket)
00754     return;
00755 
00756   // Listen to writes iff we have data to send.
00757   unsigned oldmask = p->mask;
00758   if (! p->sendq && (p->mask & IOWrite))
00759     sel_.setMask(p->socket, p->mask &= ~IOWrite);
00760 
00761   if (p->sendq && ! (p->mask & IOWrite))
00762     sel_.setMask(p->socket, p->mask |= IOWrite);
00763 
00764   if (debug_ && oldmask != p->mask)
00765     logme()
00766       << "DEBUG: updating mask for " << p->peeraddr << " to "
00767       << p->mask << " from " << oldmask << std::endl;
00768 
00769   // If we have nothing more to send and are no longer listening
00770   // for reads, close up the shop for this peer.
00771   if (p->mask == IOUrgent && ! p->waiting)
00772   {
00773     assert(! p->sendq);
00774     if (debug_)
00775       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
00776     losePeer(0, p, 0);
00777   }
00778 }
00779 
00781 VisNet::VisNet (const std::string &appname /* = "" */)
00782   : debug_ (false),
00783     appname_ (appname.empty() ? "VisNet" : appname.c_str()),
00784     pid_ (getpid()),
00785     server_ (0),
00786     version_ (Time::current()),
00787     communicate_ ((pthread_t) -1),
00788     shutdown_ (0),
00789     delay_ (1000),
00790     flush_ (false)
00791 {
00792   // Create a pipe for the local apps to tell the communicator
00793   // thread that local app data has changed and that the peers
00794   // should be notified.
00795   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
00796   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &VisNet::onLocalNotify));
00797 
00798   // Initialise the upstream and downstream to empty.
00799   upstream_.peer   = downstream_.peer   = 0;
00800   upstream_.next   = downstream_.next   = 0;
00801   upstream_.port   = downstream_.port   = 0;
00802   upstream_.update = downstream_.update = false;
00803   upstream_.warned = downstream_.warned = false;
00804 
00805   local_ = createPeer((Socket *) -1);
00806 }
00807 
00808 VisNet::~VisNet(void)
00809 {
00810   // FIXME
00811 }
00812 
00815 void
00816 VisNet::debug(bool doit)
00817 {
00818   debug_ = doit;
00819 }
00820 
00823 void
00824 VisNet::delay(int delay)
00825 {
00826   delay_ = delay;
00827 }
00828 
00832 void
00833 VisNet::startLocalServer(int port)
00834 {
00835   if (server_)
00836   {
00837     logme() << "ERROR: server was already started.\n";
00838     return;
00839   }
00840 
00841   try
00842   {
00843     server_ = new InetServerSocket(InetAddress (port), 10);
00844     server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00845     server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00846     server_->setBlocking(false);
00847     sel_.attach(server_, IOAccept, CreateHook(this, &VisNet::onPeerConnect));
00848   }
00849   catch (Error &e)
00850   {
00851     // FIXME: Do we need to do this when we throw an exception anyway?
00852     // FIXME: Abort instead?
00853     logme()
00854       << "ERROR: Failed to start server at port " << port << ": "
00855       << e.explain() << std::endl;
00856 
00857     // FIXME: Throw something simpler that removes the dependency?
00858     throw cms::Exception("VisNet::startLocalServer")
00859       << "Failed to start server at port " << port << ": "
00860       << e.explain();
00861   }
00862   
00863   logme() << "INFO: Shared memory server started at port " << port << std::endl;
00864 }
00865 
00869 void
00870 VisNet::updateToCollector(const std::string &host, int port)
00871 {
00872   if (! downstream_.host.empty())
00873   {
00874     logme()
00875       << "ERROR: Already updating another collector at "
00876       << downstream_.host << ":" << downstream_.port << std::endl;
00877     return;
00878   }
00879 
00880   downstream_.update = true;
00881   downstream_.host = host;
00882   downstream_.port = port;
00883 }
00884 
00888 void
00889 VisNet::listenToSource(const std::string &host, int port)
00890 {
00891   if (! upstream_.host.empty())
00892   {
00893     logme()
00894       << "ERROR: Already receiving data from another collector at "
00895       << upstream_.host << ":" << upstream_.port << std::endl;
00896     return;
00897   }
00898 
00899   upstream_.update = false;
00900   upstream_.host = host;
00901   upstream_.port = port;
00902 }
00903 
00905 void
00906 VisNet::shutdown(void)
00907 {
00908   shutdown_ = 1;
00909   if (communicate_ != (pthread_t) -1)
00910     pthread_join(communicate_, 0);
00911 }
00912 
00918 static void *communicate(void *obj)
00919 {
00920   sigset_t sigs;
00921   sigfillset(&sigs);
00922   pthread_sigmask(SIG_BLOCK, &sigs, 0);
00923   ((VisNet *)obj)->run();
00924   return 0;
00925 }
00926 
00928 void
00929 VisNet::lock(void)
00930 {
00931   if (communicate_ != (pthread_t) -1)
00932     pthread_mutex_lock(&lock_);
00933 }
00934 
00936 void
00937 VisNet::unlock(void)
00938 {
00939   if (communicate_ != (pthread_t) -1)
00940     pthread_mutex_unlock(&lock_);
00941 }
00942 
00946 void
00947 VisNet::start(void)
00948 {
00949   if (communicate_ != (pthread_t) -1)
00950   {
00951     logme()
00952       << "ERROR: Shared memory networking thread has already been started\n";
00953     return;
00954   }
00955 
00956   pthread_mutex_init(&lock_, 0);
00957   pthread_create(&communicate_, 0, &communicate, this);
00958 }
00959 
00961 void
00962 VisNet::run(void)
00963 {
00964   Time now;
00965   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
00966 
00967   // Perform I/O.  Every once in a while flush updates to peers.
00968   while (! shouldStop())
00969   {
00970     for (int i = 0; i < 2; ++i)
00971     {
00972       AutoPeer *ap = automatic[i];
00973 
00974       // If we need a server connection and don't have one yet,
00975       // initiate asynchronous connection creation.  Swallow errors
00976       // in case the server won't talk to us.
00977       if (! ap->host.empty()
00978           && ! ap->peer
00979           && (now = Time::current()) > ap->next)
00980       {
00981         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
00982         InetSocket *s = 0;
00983         try
00984         {
00985           s = new InetSocket (SocketConst::TypeStream);
00986           s->setBlocking (false);
00987           s->connect(InetAddress (ap->host.c_str(), ap->port));
00988           s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
00989           s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
00990         }
00991         catch (Error &e)
00992         {
00993           SystemError *sys = dynamic_cast<SystemError *>(e.next());
00994           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
00995           {
00996             // "In progress" just means the connection is in progress.
00997             // The connection is ready when the socket is writeable.
00998             // Anything else is a real problem.
00999             if (! ap->warned)
01000             {
01001               logme()
01002                 << "NOTE: server at " << ap->host << ":" << ap->port
01003                 << " is unavailable.  Connection will be established"
01004                 << " automatically on the background once the server"
01005                 << " becomes available.  Error from the attempt was: "
01006                 << e.explain() << '\n';
01007               ap->warned = true;
01008             }
01009 
01010             if (s)
01011               s->abort();
01012             delete s;
01013             s = 0;
01014           }
01015         }
01016 
01017         // Set up with the selector if we were successful.  If this is
01018         // the upstream collector, queue a request for updates.
01019         if (s)
01020         {
01021           lock();
01022           Peer *p = createPeer(s);
01023           ap->peer = p;
01024           ap->warned = false;
01025           unlock();
01026 
01027           InetAddress peeraddr = ((InetSocket *) s)->peername();
01028           InetAddress myaddr = ((InetSocket *) s)->sockname();
01029           p->peeraddr = StringFormat("%1:%2")
01030                         .arg(peeraddr.hostname())
01031                         .arg(peeraddr.port());
01032           p->mask = IORead|IOWrite|IOUrgent;
01033           p->update = ap->update;
01034           p->automatic = ap;
01035           p->socket = s;
01036           sel_.attach(s, p->mask, CreateHook(this, &VisNet::onPeerData, p));
01037           if (ap == &upstream_)
01038           {
01039             uint32_t words[4] = { 2*sizeof(uint32_t), VIS_MSG_LIST_OBJECTS,
01040                                   2*sizeof(uint32_t), VIS_MSG_UPDATE_ME };
01041             p->sendq = new Bucket;
01042             p->sendq->next = 0;
01043             copydata(p->sendq, words, sizeof(words));
01044           }
01045 
01046           // Report the new connection.
01047           if (debug_)
01048             logme()
01049               << "INFO: now connected to " << p->peeraddr << " from "
01050               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01051         }
01052       }
01053     }
01054 
01055     // Pump events for a while.
01056     sel_.dispatch(delay_);
01057     now = Time::current();
01058 
01059     // Check if flush is required.  Flush only if one is needed.
01060     // Always sends the full object list.  Compact objects no longer
01061     // in active use before sending out the update.
01062     if (flush_)
01063     {
01064       flush_ = false;
01065 
01066       lock();
01067       purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0),
01068                        now - TimeSpan(0, 0, 20 /* minutes */, 0, 0));
01069       sendObjectListToPeers(true);
01070       unlock();
01071     }
01072 
01073     // Update the data server and peer selection masks.  If we
01074     // have no more data to send and listening for writes, remove
01075     // the write mask.  If we have something to write and aren't
01076     // listening for writes, start listening so we can send off
01077     // the data.
01078     updatePeerMasks();
01079 
01080     // Release peers that have been waiting for data for too long.
01081     lock();
01082     Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0);
01083     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01084     {
01085       // If the peer has waited for too long, send something.
01086       if (i->time < waitold)
01087         releaseFromWait(i++, findObject(0, i->name));
01088 
01089       // Keep it for now.
01090       else
01091         ++i;
01092     }
01093     unlock();
01094   }
01095 }
01096 
01097 // Tell the network cache that there have been local changes that
01098 // should be advertised to the downstream listeners.
01099 void
01100 VisNet::sendLocalChanges(void)
01101 {
01102   char byte = 0;
01103   wakeup_.sink()->write(&byte, 1);
01104 }
01105 
01109 VisNet::Object *
01110 VisNet::findObject(Peer *p, const std::string &name, Peer **owner)
01111 {
01112   ObjectMap::iterator pos;
01113   PeerMap::iterator i, e;
01114   if (owner)
01115     *owner = 0;
01116   if (p)
01117   {
01118     pos = p->objs.find(name);
01119     if (pos == p->objs.end())
01120       return 0;
01121     else
01122     {
01123       if (owner) *owner = p;
01124       return &pos->second;
01125     }
01126   }
01127   else
01128   {
01129     for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01130     {
01131       pos = i->second.objs.find(name);
01132       if (pos != i->second.objs.end())
01133       {
01134         if (owner) *owner = &i->second;
01135         return &pos->second;
01136       }
01137     }
01138     return 0;
01139   }
01140 }
01141 
01142 VisNet::Object *
01143 VisNet::makeObject(Peer *p, const std::string &name)
01144 {
01145   Object *o = &p->objs[name];
01146   o->version = 0;
01147   o->name = name;
01148   o->flags = 0;
01149   o->lastreq = 0;
01150   return o;
01151 }
01152 
01153 // Mark all the objects as zombies.  This is intended to be used
01154 // when starting to process a complete list of objects, in order
01155 // to flag the objects that need to be killed at the end.  After
01156 // call to this method, revive all live objects by removing the
01157 // VIS_FLAG_ZOMBIE flag, then call markObjectsDead() at the end
01158 // to flag dead as all remaining zombies.
01159 void
01160 VisNet::markObjectsZombies(Peer *p)
01161 {
01162   ObjectMap::iterator i, e;
01163   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01164     i->second.flags |= VIS_FLAG_ZOMBIE;
01165 }
01166 
01167 // Mark remaining zombie objects as dead.  See markObjectsZombies().
01168 void
01169 VisNet::markObjectsDead(Peer *p)
01170 {
01171   ObjectMap::iterator i, e;
01172   for (i = p->objs.begin(), e = p->objs.end(); i != e; ++i)
01173     if (i->second.flags & VIS_FLAG_ZOMBIE)
01174       i->second.flags = (i->second.flags & ~VIS_FLAG_ZOMBIE) | VIS_FLAG_DEAD;
01175 }
01176 
01177 // Purge all old and dead objects.
01178 void
01179 VisNet::purgeDeadObjects(lat::Time oldobj, lat::Time deadobj)
01180 {
01181   PeerMap::iterator pi, pe;
01182   ObjectMap::iterator oi, oe;
01183   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01184     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; )
01185     {
01186       Object &o = oi->second;
01187 
01188       // Compact non-scalar objects that are unused.  We send scalar
01189       // objects to the web server so we keep them around.
01190       if (o.lastreq < oldobj && ! o.rawdata.empty() && ! (o.flags & VIS_FLAG_SCALAR))
01191       {
01192         if (debug_)
01193           logme()
01194             << "DEBUG: compacting idle '" << o.name
01195             << "' from " << pi->second.peeraddr << std::endl;
01196       }
01197 
01198       // Remove if dead, old and unused.
01199       if (o.lastreq < deadobj
01200           && o.version < deadobj
01201           && (o.flags & VIS_FLAG_DEAD))
01202       {
01203         if (debug_)
01204           logme()
01205             << "DEBUG: removing dead '" << o.name
01206             << "' from " << pi->second.peeraddr << std::endl;
01207 
01208         pi->second.objs.erase(oi++);
01209       }
01210       else
01211         ++oi;
01212     }
01213 }
01214 
01215 VisNet::Peer *
01216 VisNet::getPeer(lat::Socket *s)
01217 {
01218   PeerMap::iterator pos = peers_.find(s);
01219   return pos == peers_.end() ? 0 : &pos->second;
01220 }
01221 
01222 VisNet::Peer *
01223 VisNet::createPeer(lat::Socket *s)
01224 {
01225   Peer *p = &peers_[s];
01226   p->socket = 0;
01227   p->sendq = 0;
01228   p->sendpos = 0;
01229   p->mask = 0;
01230   p->source = false;
01231   p->update = false;
01232   p->updated = false;
01233   p->updates = 0;
01234   p->waiting = 0;
01235   p->automatic = 0;
01236   return p;
01237 }
01238 
01239 void
01240 VisNet::removePeer(Peer *p, lat::Socket *s)
01241 {
01242   bool needflush = ! p->objs.empty();
01243 
01244   p->objs.clear();
01245   peers_.erase(s);
01246 
01247   // If we removed a peer with objects, our list of objects
01248   // has changed and we need to update downstream peers.
01249   if (needflush)
01250     sendLocalChanges();
01251 }
01252 
01254 void
01255 VisNet::sendObjectListToPeer(Bucket *msg, bool all, bool clear)
01256 {
01257   PeerMap::iterator pi, pe;
01258   ObjectMap::iterator oi, oe;
01259   uint32_t numobjs = 0;
01260   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01261     numobjs += pi->second.objs.size();
01262 
01263   msg->data.reserve(msg->data.size() + 300*numobjs);
01264 
01265   uint32_t nupdates = 0;
01266   uint32_t words[4];
01267   words[0] = sizeof(words);
01268   words[1] = VIS_REPLY_LIST_BEGIN;
01269   words[2] = numobjs;
01270   words[3] = all;
01271   copydata(msg, &words[0], sizeof(words));
01272 
01273   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01274     for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
01275       if (all || (oi->second.flags & VIS_FLAG_NEW))
01276       {
01277         sendObjectToPeer(msg, oi->second, false);
01278         if (clear)
01279           oi->second.flags &= ~VIS_FLAG_NEW;
01280         ++nupdates;
01281       }
01282 
01283   words[1] = VIS_REPLY_LIST_END;
01284   words[2] = nupdates;
01285   copydata(msg, &words[0], sizeof(words));
01286 }
01287 
01288 void
01289 VisNet::sendObjectListToPeers(bool all)
01290 {
01291   PeerMap::iterator i, e;
01292   for (i = peers_.begin(), e = peers_.end(); i != e; ++i)
01293   {
01294     Peer &p = i->second;
01295     if (! p.update)
01296       continue;
01297 
01298     if (debug_)
01299       logme()
01300         << "DEBUG: notifying " << p.peeraddr << std::endl;
01301 
01302     Bucket msg;
01303     msg.next = 0;
01304     sendObjectListToPeer(&msg, !p.updated || all, true);
01305 
01306     if (! msg.data.empty())
01307     {
01308       Bucket **prev = &p.sendq;
01309       while (*prev)
01310         prev = &(*prev)->next;
01311 
01312       *prev = new Bucket;
01313       (*prev)->next = 0;
01314       (*prev)->data.swap(msg.data);
01315     }
01316     p.updated = true;
01317   }
01318 }
01319 
01320 void
01321 VisNet::updatePeerMasks(void)
01322 {
01323   PeerMap::iterator i, e;
01324   for (i = peers_.begin(), e = peers_.end(); i != e; )
01325     updateMask(&(i++)->second);
01326 }
01327 
01328 
01332 int
01333 VisNet::receive(void (*callback) (void *arg, uint32_t reason, Object &obj), void *arg)
01334 {
01335   int updates = 0;
01336 
01337   lock();
01338   PeerMap::iterator pi, pe;
01339   ObjectMap::iterator oi, oe;
01340   for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
01341   {
01342     Peer &p = pi->second;
01343     if (&p == local_)
01344       continue;
01345 
01346     updates += p.updates;
01347 
01348     for (oi = p.objs.begin(), oe = p.objs.end(); oi != oe; )
01349     {
01350       Object &o = oi->second;
01351       if (o.flags & VIS_FLAG_DEAD)
01352       {
01353         callback(arg, VIS_FLAG_DEAD, o);
01354         p.objs.erase(oi++);
01355       }
01356       else if (o.flags & VIS_FLAG_RECEIVED)
01357       {
01358         callback(arg, VIS_FLAG_RECEIVED, o);
01359         o.flags &= ~VIS_FLAG_RECEIVED;
01360         ++oi;
01361       }
01362       else
01363         ++oi;
01364     }
01365   }
01366   unlock();
01367 
01368   return updates;
01369 }
01370 
01373 void
01374 VisNet::updateLocalObject(Object &o)
01375 {
01376   ObjectMap::iterator pos = local_->objs.find(o.name);
01377   if (pos == local_->objs.end())
01378     local_->objs.insert(ObjectMap::value_type(o.name, o));
01379   else
01380   {
01381     std::swap(pos->second.version,   o.version);
01382     std::swap(pos->second.flags,     o.flags);
01383     std::swap(pos->second.rawdata,   o.rawdata);
01384     pos->second.lastreq = 0;
01385   }
01386 }
01387 
01390 void
01391 VisNet::removeLocalObject(const std::string &path)
01392 {
01393   local_->objs.erase(path);
01394 }

Generated on Tue Jun 9 17:50:04 2009 for CMSSW by  doxygen 1.5.4