CMS 3D CMS Logo

DQMNet Class Reference

#include <DQMServices/Core/interface/DQMNet.h>

Inheritance diagram for DQMNet:

DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

List of all members.

Public Types

typedef std::vector< unsigned
char > 
DataBlob
typedef std::vector< QValueQReports
typedef std::vector< uint32_t > TagList
typedef std::list< WaitObjectWaitList

Public Member Functions

void debug (bool doit)
 Enable or disable verbose debugging.
void delay (int delay)
 Set the I/O dispatching delay.
 DQMNet (const std::string &appname="")
void listenToCollector (const std::string &host, int port)
 Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources.
void lock (void)
 Acquire a lock on the DQM net layer.
virtual int receive (DQMStore *store)
virtual void removeLocalObject (const std::string &name)
void requestFullUpdates (bool doit)
 Enable or disable requests for full updates.
void run (void)
 Run the actual I/O processing loop.
void sendLocalChanges (void)
void sendScalarAsText (bool doit)
 Enable or disable sending scalar monitoring values as text, rather than their ROOT object values.
void shutdown (void)
 Stop the network layer and wait it to finish.
void start (void)
 Start running the network layer in a new thread.
void startLocalServer (int port)
 Start a server socket for accessing this DQM node remotely.
void unlock (void)
 Release the lock on the DQM net layer.
virtual void updateLocalObject (Object &o)
void updateToCollector (const std::string &host, int port)
 Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes.
virtual ~DQMNet (void)

Static Public Attributes

static const uint32_t DQM_FLAG_DEAD = 0x80000000
static const uint32_t DQM_FLAG_NEW = 0x40000000
static const uint32_t DQM_FLAG_RECEIVED = 0x20000000
static const uint32_t DQM_FLAG_REPORT_ERROR = 0x1
static const uint32_t DQM_FLAG_REPORT_OTHER = 0x4
static const uint32_t DQM_FLAG_REPORT_WARNING = 0x2
static const uint32_t DQM_FLAG_SCALAR = 0x8
static const uint32_t DQM_FLAG_TEXT = 0x10000000
static const uint32_t DQM_FLAG_ZOMBIE = 0x08000000
static const uint32_t DQM_MSG_GET_OBJECT = 3
static const uint32_t DQM_MSG_HELLO = 0
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
static const uint32_t DQM_MSG_UPDATE_ME = 1
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
static const uint32_t DQM_REPLY_LIST_END = 102
static const uint32_t DQM_REPLY_NONE = 103
static const uint32_t DQM_REPLY_OBJECT = 104
static const uint32_t MAX_PEER_WAITREQS = 128

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
bool extractScalarData (DataBlob &objdata, Object &o)
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=0)=0
virtual PeergetPeer (lat::Socket *s)=0
std::ostream & logme (void)
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
virtual void markObjectsDead (Peer *p)=0
virtual void markObjectsZombies (Peer *p)=0
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
virtual void purgeDeadObjects (lat::Time oldobj, lat::Time deadobj)=0
bool reconstructObject (Object &o)
bool reinstateObject (DQMStore *store, Object &o)
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
virtual void removePeer (Peer *p, lat::Socket *s)=0
virtual void requestFullUpdatesFromPeers (void)=0
virtual void sendObjectListToPeer (Bucket *msg, bool data, bool all, bool clear)=0
virtual void sendObjectListToPeers (bool all)=0
void sendObjectToPeer (Bucket *msg, Object &o, bool data, bool text)
virtual bool shouldStop (void)
void updateMask (Peer *p)
 Update the selector mask for a peer based on data queues.
virtual void updatePeerMasks (void)=0
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)

Protected Attributes

bool debug_
bool requestFullUpdates_
bool sendScalarAsText_

Private Member Functions

 DQMNet (const DQMNet &)
bool losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
 Handle errors with a peer socket.
bool onLocalNotify (lat::IOSelectEvent *ev)
 React to notifications from the DQM thread.
bool onPeerConnect (lat::IOSelectEvent *ev)
 Respond to new connections on the server socket.
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client.
DQMNetoperator= (const DQMNet &)
void releaseFromWait (WaitList::iterator i, Object *o)
void releaseWaiters (Object *o)
void requestObject (Peer *p, const char *name, size_t len)
 Queue an object request to the data server.

Static Private Member Functions

static void discard (Bucket *&b)

Private Attributes

std::string appname_
pthread_t communicate_
int delay_
AutoPeer downstream_
bool flush_
pthread_mutex_t lock_
int pid_
lat::IOSelector sel_
lat::InetServerSocketserver_
sig_atomic_t shutdown_
AutoPeer upstream_
lat::Time version_
WaitList waiting_
lat::Pipe wakeup_

Classes

struct  AutoPeer
struct  Bucket
struct  CoreObject
struct  Object
struct  Peer
struct  QValue
struct  WaitObject


Detailed Description

Definition at line 21 of file DQMNet.h.


Member Typedef Documentation

typedef std::vector<unsigned char> DQMNet::DataBlob

Definition at line 48 of file DQMNet.h.

typedef std::vector<QValue> DQMNet::QReports

Definition at line 52 of file DQMNet.h.

typedef std::vector<uint32_t> DQMNet::TagList

Definition at line 51 of file DQMNet.h.

typedef std::list<WaitObject> DQMNet::WaitList

Definition at line 53 of file DQMNet.h.


Constructor & Destructor Documentation

DQMNet::DQMNet ( const std::string &  appname = ""  ) 

Definition at line 1053 of file DQMNet.cc.

References lat::IOSelector::attach(), lat::CreateHook(), downstream_, lat::IOChannel::fd(), IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, lat::Pipe::source(), DQMNet::AutoPeer::update, upstream_, wakeup_, and DQMNet::AutoPeer::warned.

01054   : debug_ (false),
01055     sendScalarAsText_ (false),
01056     requestFullUpdates_ (false),
01057     appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
01058     pid_ (getpid()),
01059     server_ (0),
01060     version_ (Time::current()),
01061     communicate_ ((pthread_t) -1),
01062     shutdown_ (0),
01063     delay_ (1000),
01064     flush_ (false)
01065 {
01066   // Create a pipe for the local DQM to tell the communicator
01067   // thread that local DQM data has changed and that the peers
01068   // should be notified.
01069   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
01070   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
01071 
01072   // Initialise the upstream and downstream to empty.
01073   upstream_.peer   = downstream_.peer   = 0;
01074   upstream_.next   = downstream_.next   = 0;
01075   upstream_.port   = downstream_.port   = 0;
01076   upstream_.update = downstream_.update = false;
01077   upstream_.warned = downstream_.warned = false;
01078 }

DQMNet::~DQMNet ( void   )  [virtual]

Definition at line 1080 of file DQMNet.cc.

01081 {
01082   // FIXME
01083 }

DQMNet::DQMNet ( const DQMNet  )  [private]


Member Function Documentation

void DQMNet::copydata ( Bucket b,
const void data,
size_t  len 
) [static, protected]

Definition at line 69 of file DQMNet.cc.

References DQMNet::Bucket::data.

Referenced by onMessage(), releaseFromWait(), DQMBasicNet::requestFullUpdatesFromPeers(), requestObject(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), and sendObjectToPeer().

00070 {
00071   b->data.insert(b->data.end(),
00072                  (const unsigned char *)data,
00073                  (const unsigned char *)data + len);
00074 }

virtual Peer* DQMNet::createPeer ( lat::Socket s  )  [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by onPeerConnect(), and run().

void DQMNet::debug ( bool  doit  ) 

Enable or disable verbose debugging.

Must be called before calling run() or start().

Definition at line 1088 of file DQMNet.cc.

References debug_.

Referenced by DQMService::DQMService().

01089 {
01090   debug_ = doit;
01091 }

void DQMNet::delay ( int  delay  ) 

Set the I/O dispatching delay.

Must be called before calling run() or start().

Definition at line 1096 of file DQMNet.cc.

References delay_.

01097 {
01098   delay_ = delay;
01099 }

void DQMNet::discard ( Bucket *&  b  )  [static, private]

Definition at line 78 of file DQMNet.cc.

References DQMNet::Bucket::next.

Referenced by losePeer(), and onPeerData().

00079 {
00080   while (b)
00081   {
00082     Bucket *next = b->next;
00083     delete b;
00084     b = next;
00085   }
00086 }

bool DQMNet::extractScalarData ( DataBlob objdata,
Object o 
) [protected]

Definition at line 401 of file DQMNet.cc.

References DQM_FLAG_SCALAR, extractNextObject(), DQMNet::CoreObject::flags, VarParsing::obj, DQMNet::CoreObject::object, DQMNet::Object::rawdata, and s.

Referenced by sendObjectToPeer().

00402 {
00403   if (! o.flags & DQM_FLAG_SCALAR)
00404     return false;
00405 
00406   TObject *obj = o.object;
00407   if (! obj && o.rawdata.size())
00408   {
00409     DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00410     buf.InitMap();
00411     buf.Reset();
00412     obj = extractNextObject(buf);
00413   }
00414 
00415   if (TObjString *ostr = dynamic_cast<TObjString *>(obj))
00416   {
00417     const TString &s = ostr->String();
00418     objdata.insert(objdata.end(),
00419                    (unsigned char *) s.Data(),
00420                    (unsigned char *) s.Data() + s.Length());
00421     return true;
00422   }
00423 
00424   return false;
00425 }

virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = 0 
) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by onMessage(), and run().

virtual Peer* DQMNet::getPeer ( lat::Socket s  )  [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by onPeerData().

void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources.

Must be called before calling run() or start().

Definition at line 1181 of file DQMNet.cc.

References lat::endl(), DQMNet::AutoPeer::host, logme(), DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

01182 {
01183   if (! upstream_.host.empty())
01184   {
01185     logme()
01186       << "ERROR: Already receiving data from another collector at "
01187       << upstream_.host << ":" << upstream_.port << std::endl;
01188     return;
01189   }
01190 
01191   upstream_.update = false;
01192   upstream_.host = host;
01193   upstream_.port = port;
01194 }

void DQMNet::lock ( void   ) 

Acquire a lock on the DQM net layer.

Definition at line 1221 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flush(), onMessage(), DQMBasicNet::receive(), and run().

01222 {
01223   if (communicate_ != (pthread_t) -1)
01224     pthread_mutex_lock(&lock_);
01225 }

std::ostream & DQMNet::logme ( void   )  [protected]

Definition at line 60 of file DQMNet.cc.

References appname_, TestMuL1L2Filter_cff::cerr, and pid_.

Referenced by listenToCollector(), losePeer(), onLocalNotify(), onMessage(), onPeerConnect(), onPeerData(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), receive(), reconstructObject(), reinstateObject(), removeLocalObject(), DQMImplNet< DQMNet::Object >::requestFullUpdatesFromPeers(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), updateLocalObject(), updateMask(), and updateToCollector().

00061 {
00062   return std::cerr
00063     << Time::current().format(true, "%Y-%m-%d %H:%M:%S")
00064     << " " << appname_ << "[" << pid_ << "]: ";
00065 }

bool DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent event,
lat::Error err = 0 
) [private]

Handle errors with a peer socket.

Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 93 of file DQMNet.cc.

References DQMNet::Peer::automatic, lat::Socket::close(), lat::IOSelector::detach(), discard(), e, lat::endl(), lat::Error::explain(), i, logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, removePeer(), s, sel_, DQMNet::Peer::sendq, DQMNet::Peer::socket, lat::IOSelectEvent::source, and waiting_.

Referenced by onPeerData(), and updateMask().

00097 {
00098   if (reason)
00099     logme ()
00100       << reason << peer->peeraddr
00101       << (err ? "; error was: " + err->explain() : std::string(""))
00102       << std::endl;
00103 
00104   Socket *s = peer->socket;
00105 
00106   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00107     if (i->peer == peer)
00108       waiting_.erase(i++);
00109     else
00110       ++i;
00111 
00112   if (ev)
00113     ev->source = 0;
00114 
00115   discard(peer->sendq);
00116   if (peer->automatic)
00117     peer->automatic->peer = 0;
00118 
00119   sel_.detach (s);
00120   s->close();
00121   removePeer (peer, s);
00122   delete s;
00123   return true;
00124 }

virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by onMessage().

virtual void DQMNet::markObjectsDead ( Peer p  )  [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by onMessage().

virtual void DQMNet::markObjectsZombies ( Peer p  )  [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by onMessage().

bool DQMNet::onLocalNotify ( lat::IOSelectEvent ev  )  [private]

React to notifications from the DQM thread.

This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 992 of file DQMNet.cc.

References e, lat::endl(), lat::SysErr::ErrTryAgain, lat::Error::explain(), flush_, logme(), lat::Error::next(), lat::SystemError::portable(), lat::IOChannel::read(), and lat::IOSelectEvent::source.

Referenced by DQMNet().

00993 {
00994   // Discard the data in the pipe, we care only about the wakeup.
00995   try
00996   {
00997     IOSize sz;
00998     unsigned char buf [1024];
00999     while ((sz = ev->source->read(buf, sizeof(buf))))
01000       ;
01001   }
01002   catch (Error &e)
01003   {
01004     SystemError *next = dynamic_cast<SystemError *>(e.next());
01005     if (next && next->portable() == SysErr::ErrTryAgain)
01006       ; // Ignore it
01007     else
01008       logme()
01009         << "WARNING: error reading from notification pipe: "
01010         << e.explain() << std::endl;
01011   }
01012 
01013   // Tell the main event pump to send an update in a little while.
01014   flush_ = true;
01015 
01016   // We are never done, always keep going.
01017   return false;
01018 }

bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
) [protected, virtual]

Definition at line 471 of file DQMNet.cc.

References copydata(), DQMNet::Bucket::data, debug_, DQM_FLAG_DEAD, DQM_FLAG_NEW, DQM_FLAG_RECEIVED, DQM_MSG_GET_OBJECT, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_REPLY_LIST_BEGIN, DQM_REPLY_LIST_END, DQM_REPLY_NONE, DQM_REPLY_OBJECT, lat::endl(), findObject(), DQMNet::CoreObject::flags, flags, flush_, full, DQMNet::Object::lastreq, lock(), logme(), makeObject(), markObjectsDead(), markObjectsZombies(), name, DQMNet::CoreObject::object, DQMNet::Peer::peeraddr, DQMNet::Object::rawdata, DQMNet::CoreObject::reference, releaseWaiters(), requestFullUpdates_, requestFullUpdatesFromPeers(), requestObject(), sendObjectListToPeer(), sendObjectToPeer(), sendScalarAsText_, DQMNet::Peer::source, DQMNet::CoreObject::tags, unlock(), DQMNet::Peer::update, DQMNet::Peer::updatefull, DQMNet::Peer::updates, DQMNet::CoreObject::version, and waitForData().

Referenced by onPeerData().

00472 {
00473   // Decode and process this message.
00474   uint32_t type;
00475   memcpy (&type, data + sizeof(uint32_t), sizeof (type));
00476   switch (type)
00477   {
00478   case DQM_MSG_UPDATE_ME:
00479     {
00480       if (len != 3*sizeof(uint32_t))
00481       {
00482         logme()
00483           << "ERROR: corrupt 'UPDATE_ME' message of length " << len
00484           << " from peer " << p->peeraddr << std::endl;
00485         return false;
00486       }
00487 
00488       // Get the update status: whether this is a full update.
00489       uint32_t full;
00490       memcpy(&full, data + 2*sizeof(uint32_t), sizeof(uint32_t));
00491 
00492       if (debug_)
00493         logme()
00494           << "DEBUG: received message 'UPDATE ME' from peer "
00495           << p->peeraddr << ", full = " << full << std::endl;
00496 
00497       p->update = true;
00498       p->updatefull = full;
00499 
00500       if (full && ! requestFullUpdates_)
00501       {
00502         if (debug_)
00503           logme()
00504             << "WARNING: forcing full update request mode on due to "
00505             << "request from " << p->peeraddr << std::endl;
00506         requestFullUpdates_ = true;
00507         requestFullUpdatesFromPeers();
00508       }
00509     }
00510     return true;
00511 
00512   case DQM_MSG_LIST_OBJECTS:
00513     {
00514       if (debug_)
00515         logme()
00516           << "DEBUG: received message 'LIST OBJECTS' from peer "
00517           << p->peeraddr << std::endl;
00518 
00519       // Send over current status: list of known objects.
00520       lock();
00521       sendObjectListToPeer(msg, p->updatefull, true, false);
00522       unlock();
00523     }
00524     return true;
00525 
00526   case DQM_MSG_GET_OBJECT:
00527     {
00528       if (debug_)
00529         logme()
00530           << "DEBUG: received message 'GET OBJECT' from peer "
00531           << p->peeraddr << std::endl;
00532 
00533       if (len < 3*sizeof(uint32_t))
00534       {
00535         logme()
00536           << "ERROR: corrupt 'GET IMAGE' message of length " << len
00537           << " from peer " << p->peeraddr << std::endl;
00538         return false;
00539       }
00540 
00541       uint32_t namelen;
00542       memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
00543       if (len != 3*sizeof(uint32_t) + namelen)
00544       {
00545         logme()
00546           << "ERROR: corrupt 'GET OBJECT' message of length " << len
00547           << " from peer " << p->peeraddr
00548           << ", expected length " << (3*sizeof(uint32_t))
00549           << " + " << namelen << std::endl;
00550         return false;
00551       }
00552 
00553       lock();
00554       std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
00555       Peer *owner = 0;
00556       Object *o = findObject(0, name, &owner);
00557       if (o)
00558       {
00559         o->lastreq = Time::current();
00560         if (o->rawdata.empty())
00561           waitForData(p, name, "", owner);
00562         else
00563           sendObjectToPeer(msg, *o, true, sendScalarAsText_);
00564       }
00565       else
00566       {
00567         uint32_t words [3];
00568         words[0] = sizeof(words) + name.size();
00569         words[1] = DQM_REPLY_NONE;
00570         words[2] = name.size();
00571 
00572         msg->data.reserve(msg->data.size() + words[0]);
00573         copydata(msg, &words[0], sizeof(words));
00574         copydata(msg, &name[0], name.size());
00575       }
00576       unlock();
00577     }
00578     return true;
00579 
00580   case DQM_REPLY_LIST_BEGIN:
00581     {
00582       if (len != 4*sizeof(uint32_t))
00583       {
00584         logme()
00585           << "ERROR: corrupt 'LIST BEGIN' message of length " << len
00586           << " from peer " << p->peeraddr << std::endl;
00587         return false;
00588       }
00589 
00590       if (debug_)
00591         logme()
00592           << "DEBUG: received message 'LIST BEGIN' from "
00593           << p->peeraddr << std::endl;
00594 
00595       // Get the update status: whether this is a full update.
00596       uint32_t flags;
00597       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00598 
00599       // If we are about to receive a full list of objects, flag all
00600       // objects dead.  Subsequent object notifications will undo this
00601       // for the live objects.  This tells us the object has been
00602       // removed, but we can keep making it available for a while if
00603       // there continues to be interest in it.
00604       if (flags)
00605       {
00606         lock();
00607         markObjectsZombies(p);
00608         unlock();
00609       }
00610     }
00611     return true;
00612 
00613   case DQM_REPLY_LIST_END:
00614     {
00615       if (len != 4*sizeof(uint32_t))
00616       {
00617         logme()
00618           << "ERROR: corrupt 'LIST END' message of length " << len
00619           << " from peer " << p->peeraddr << std::endl;
00620         return false;
00621       }
00622 
00623       // Get the update status: whether this is a full update.
00624       uint32_t flags;
00625       memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
00626 
00627       // If we received a full list of objects, flag all zombie objects
00628       // now dead. We need to do this in two stages in case we receive
00629       // updates in many parts, and end up sending updates to others in
00630       // between; this avoids us lying live objects are dead.
00631       if (flags)
00632       {
00633         lock();
00634         markObjectsDead(p);
00635         unlock();
00636       }
00637 
00638       if (debug_)
00639         logme()
00640           << "DEBUG: received message 'LIST END' from "
00641           << p->peeraddr << std::endl;
00642 
00643       // Indicate we have received another update from this peer.
00644       // Also indicate we should flush to our clients.
00645       flush_ = true;
00646       p->updates++;
00647     }
00648     return true;
00649 
00650   case DQM_REPLY_OBJECT:
00651     {
00652       uint32_t words[8];
00653       if (len < sizeof(words))
00654       {
00655         logme()
00656           << "ERROR: corrupt 'OBJECT' message of length " << len
00657           << " from peer " << p->peeraddr << std::endl;
00658         return false;
00659       }
00660 
00661       memcpy (&words[0], data, sizeof(words));
00662       uint32_t &namelen = words[5];
00663       uint32_t &taglen = words[6];
00664       uint32_t &datalen = words[7];
00665 
00666       if (len != sizeof(words) + namelen + taglen*sizeof(uint32_t) + datalen)
00667       {
00668         logme()
00669           << "ERROR: corrupt 'OBJECT' message of length " << len
00670           << " from peer " << p->peeraddr
00671           << ", expected length " << sizeof(words)
00672           << " + " << namelen
00673           << " + " << (taglen*sizeof(uint32_t))
00674           << " + " << datalen
00675           << std::endl;
00676         return false;
00677       }
00678 
00679       unsigned char *namedata = data + sizeof(words);
00680       unsigned char *tagdata = namedata + namelen;
00681       unsigned char *objdata = tagdata + taglen*sizeof(uint32_t);
00682       unsigned char *enddata = objdata + datalen;
00683       std::string name ((char *) namedata, namelen);
00684       assert (enddata == data + len);
00685 
00686       if (debug_)
00687         logme()
00688           << "DEBUG: received message 'OBJECT " << name
00689           << "' from " << p->peeraddr << std::endl;
00690 
00691       // Mark the peer as a known object source.
00692       p->source = true;
00693 
00694       // Initialise or update an object entry.
00695       lock();
00696       Object *o = findObject(p, name);
00697       if (! o)
00698         o = makeObject(p, name);
00699 
00700       o->flags = words[2] | DQM_FLAG_NEW | DQM_FLAG_RECEIVED;
00701       o->version = ((uint64_t) words[4] << 32 | words[3]);
00702       o->tags.clear();
00703       o->tags.insert(o->tags.end(), (uint32_t *) tagdata, (uint32_t *) objdata);
00704       o->rawdata.clear();
00705       o->rawdata.insert (o->rawdata.end(), objdata, enddata);
00706 
00707       bool hadobject = (o->object != 0);
00708       delete o->object;
00709       o->object = 0;
00710       delete o->reference;
00711       o->reference = 0;
00712 
00713       // If we had an object for this one already and this is a list
00714       // update without data, issue an immediate data get request.
00715       if (hadobject && ! datalen)
00716         requestObject(p, (namelen ? &name[0] : 0), namelen);
00717 
00718       // If we have the object data, release from wait.
00719       if (datalen)
00720         releaseWaiters(o);
00721       unlock();
00722     }
00723     return true;
00724 
00725   case DQM_REPLY_NONE:
00726     {
00727       uint32_t words[3];
00728       if (len < sizeof(words))
00729       {
00730         logme()
00731           << "ERROR: corrupt 'NONE' message of length " << len
00732           << " from peer " << p->peeraddr << std::endl;
00733         return false;
00734       }
00735 
00736       memcpy (&words[0], data, sizeof(words));
00737       uint32_t &namelen = words[2];
00738 
00739       if (len != sizeof(words) + namelen)
00740       {
00741         logme()
00742           << "ERROR: corrupt 'NONE' message of length " << len
00743           << " from peer " << p->peeraddr
00744           << ", expected length " << sizeof(words)
00745           << " + " << namelen << std::endl;
00746         return false;
00747       }
00748 
00749       unsigned char *namedata = data + sizeof(words);
00750       unsigned char *enddata = namedata + namelen;
00751       std::string name ((char *) namedata, namelen);
00752       assert (enddata == data + len);
00753 
00754       if (debug_)
00755         logme()
00756           << "DEBUG: received message 'NONE " << name
00757           << "' from " << p->peeraddr << std::endl;
00758 
00759       // Mark the peer as a known object source.
00760       p->source = true;
00761 
00762       // If this was a known object, update its entry.
00763       lock();
00764       Object *o = findObject(p, name);
00765       if (o)
00766         o->flags |= DQM_FLAG_DEAD;
00767 
00768       // If someone was waiting for this, let them go.
00769       releaseWaiters(o);
00770       unlock();
00771     }
00772     return true;
00773 
00774   default:
00775     logme()
00776       << "ERROR: unrecognised message of length " << len
00777       << " and type " << type << " from peer " << p->peeraddr
00778       << std::endl;
00779     return false;
00780   }
00781 }

bool DQMNet::onPeerConnect ( lat::IOSelectEvent ev  )  [private]

Respond to new connections on the server socket.

Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 952 of file DQMNet.cc.

References lat::Socket::accept(), arg, lat::IOSelector::attach(), lat::CreateHook(), createPeer(), debug_, lat::endl(), lat::InetAddress::hostname(), IORead, IOUrgent, lat::IOChannel::isBlocking(), logme(), onPeerData(), p, lat::InetAddress::port(), s, sel_, server_, and lat::IOSelectEvent::source.

Referenced by startLocalServer().

00953 {
00954   // Recover the server socket.
00955   assert (ev->source == server_);
00956 
00957   // Accept the connection.
00958   Socket *s = server_->accept();
00959   assert (s);
00960   assert (! s->isBlocking());
00961 
00962   // Record it to our list of peers.
00963   Peer *p = createPeer(s);
00964   InetAddress peeraddr = ((InetSocket *) s)->peername();
00965   InetAddress myaddr = ((InetSocket *) s)->sockname();
00966   p->peeraddr = StringFormat("%1:%2")
00967                 .arg(peeraddr.hostname())
00968                 .arg(peeraddr.port());
00969   p->mask = IORead|IOUrgent;
00970   p->socket = s;
00971 
00972   // Report the new connection.
00973   if (debug_)
00974     logme()
00975       << "INFO: new peer " << p->peeraddr << " is now connected to "
00976       << myaddr.hostname() << ":" << myaddr.port() << std::endl;
00977 
00978   // Attach it to the listener.
00979   sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
00980 
00981   // We are never done.
00982   return false;
00983 }

bool DQMNet::onPeerData ( lat::IOSelectEvent ev,
Peer p 
) [private]

Handle communication to a particular client.

Definition at line 786 of file DQMNet.cc.

References DQMNet::Peer::automatic, b, data, DQMNet::Bucket::data, debug_, discard(), e, lat::endl(), lat::SysErr::ErrTryAgain, lat::IOSelectEvent::events, getPeer(), DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, len, logme(), losePeer(), DQMNet::Peer::mask, MAX_PEER_WAITREQS, MESSAGE_SIZE_LIMIT, alivecheck_mergeAndRegister::msg, DQMNet::Bucket::next, lat::Error::next(), old, onMessage(), DQMNet::Peer::peeraddr, lat::SystemError::portable(), lat::IOChannel::read(), sel_, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, lat::IOSelector::setMask(), DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, lat::IOSelectEvent::source, TrackValidation_HighPurity_cff::valid, DQMNet::Peer::waiting, and lat::IOChannel::write().

Referenced by onPeerConnect(), and run().

00787 {
00788   assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
00789 
00790   // If there is a problem with the peer socket, discard the peer
00791   // and tell the selector to stop prcessing events for it.  If
00792   // this is a server connection, we will eventually recreate
00793   // everything if/when the data server comes back.
00794   if (ev->events & IOUrgent)
00795   {
00796     if (p->automatic)
00797     {
00798       logme()
00799         << "WARNING: connection to the DQM server at " << p->peeraddr
00800         << " lost (will attempt to reconnect in 15 seconds)\n";
00801       return losePeer(0, p, ev);
00802     }
00803     else
00804       return losePeer("WARNING: lost peer connection ", p, ev);
00805   }
00806 
00807   // If we can write to the peer socket, pump whatever we can into it.
00808   if (ev->events & IOWrite)
00809   {
00810     while (Bucket *b = p->sendq)
00811     {
00812       IOSize len = b->data.size() - p->sendpos;
00813       const void *data = (len ? (const void *)&b->data[p->sendpos]
00814                           : (const void *)&data);
00815       IOSize done;
00816 
00817       try
00818       {
00819         done = (len ? ev->source->write (data, len) : 0);
00820         if (debug_ && len)
00821           logme()
00822             << "DEBUG: sent " << done << " bytes to peer "
00823             << p->peeraddr << std::endl;
00824       }
00825       catch (Error &e)
00826       {
00827         return losePeer("WARNING: unable to write to peer ",
00828                         p, ev, &e);
00829       }
00830 
00831       p->sendpos += done;
00832       if (p->sendpos == b->data.size())
00833       {
00834         Bucket *old = p->sendq;
00835         p->sendq = old->next;
00836         p->sendpos = 0;
00837         old->next = 0;
00838         discard(old);
00839       }
00840 
00841       if (! done && len)
00842         // Cannot write any more.
00843         break;
00844     }
00845   }
00846 
00847   // If there is data to be read from the peer, first receive what we
00848   // can get out the socket, the process all complete requests.
00849   if (ev->events & IORead)
00850   {
00851     // First build up the incoming buffer of data in the socket.
00852     // Remember the last size returned by the socket; we need
00853     // it to determine if the remote end closed the connection.
00854     IOSize sz;
00855     try
00856     {
00857       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
00858       do
00859         if ((sz = ev->source->read(&buf[0], buf.size())))
00860         {
00861           if (debug_)
00862             logme()
00863               << "DEBUG: received " << sz << " bytes from peer "
00864               << p->peeraddr << std::endl;
00865           DataBlob &data = p->incoming;
00866           if (data.capacity () < data.size () + sz)
00867             data.reserve (data.size() + SOCKET_READ_GROWTH);
00868           data.insert (data.end(), &buf[0], &buf[0] + sz);
00869         }
00870       while (sz == sizeof (buf));
00871     }
00872     catch (Error &e)
00873     {
00874       SystemError *next = dynamic_cast<SystemError *>(e.next());
00875       if (next && next->portable() == SysErr::ErrTryAgain)
00876         sz = 1; // Ignore it, and fake no end of data.
00877       else
00878         // Houston we have a problem.
00879         return losePeer("WARNING: failed to read from peer ",
00880                         p, ev, &e);
00881     }
00882 
00883     // Process fully received messages as long as we can.
00884     size_t consumed = 0;
00885     DataBlob &data = p->incoming;
00886     while (data.size()-consumed >= sizeof(uint32_t)
00887            && p->waiting < MAX_PEER_WAITREQS)
00888     {
00889       uint32_t msglen;
00890       memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
00891 
00892       if (msglen >= MESSAGE_SIZE_LIMIT)
00893         return losePeer("WARNING: excessively large message from ", p, ev);
00894 
00895       if (data.size()-consumed >= msglen)
00896       {
00897         bool valid = true;
00898         if (msglen < 2*sizeof(uint32_t))
00899         {
00900           logme()
00901             << "ERROR: corrupt peer message of length " << msglen
00902             << " from peer " << p->peeraddr << std::endl;
00903           valid = false;
00904         }
00905         else
00906         {
00907           // Decode and process this message.
00908           Bucket msg;
00909           msg.next = 0;
00910           valid = onMessage(&msg, p, &data[0]+consumed, msglen);
00911 
00912           // If we created a response, chain it to the write queue.
00913           if (! msg.data.empty())
00914           {
00915             Bucket **prev = &p->sendq;
00916             while (*prev)
00917                prev = &(*prev)->next;
00918 
00919             *prev = new Bucket;
00920             (*prev)->next = 0;
00921             (*prev)->data.swap(msg.data);
00922           }
00923         }
00924 
00925         if (! valid)
00926           return losePeer("WARNING: data stream error with ", p, ev);
00927 
00928         consumed += msglen;
00929       }
00930       else
00931         break;
00932     }
00933 
00934     data.erase(data.begin(), data.begin()+consumed);
00935 
00936     // If the client has closed the connection, shut down our end.  If
00937     // we have something to send back still, leave the write direction
00938     // open.  Otherwise close the shop for this client.
00939     if (sz == 0)
00940       sel_.setMask(p->socket, p->mask &= ~IORead);
00941   }
00942 
00943   // Yes, please keep processing events for this socket.
00944   return false;
00945 }

DQMNet& DQMNet::operator= ( const DQMNet  )  [private]

virtual void DQMNet::purgeDeadObjects ( lat::Time  oldobj,
lat::Time  deadobj 
) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

int DQMNet::receive ( DQMStore store  )  [virtual]

Reimplemented in DQMBasicNet.

Definition at line 1394 of file DQMNet.cc.

References logme().

Referenced by DQMOldReceiver::doMonitoring().

01395 {
01396   logme() << "ERROR: receive() method is not supported.\n";
01397   return 0;
01398 }

bool DQMNet::reconstructObject ( Object o  )  [protected]

Definition at line 217 of file DQMNet.cc.

References abortReconstructObject(), DQMNet::QValue::code, extractNextObject(), label, logme(), m, lat::Regexp::match(), lat::RegexpMatch::matchString(), DQMNet::QValue::message, DQMNet::CoreObject::name, DQMNet::CoreObject::object, p, parseInt(), DQMNet::CoreObject::qreports, DQMNet::QValue::qtname, DQMNet::Object::rawdata, DQMNet::CoreObject::reference, lat::RegexpMatch::reset(), s_rxmeqr, s_rxmeval(), and value.

Referenced by reinstateObject().

00218 {
00219   DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
00220   buf.Reset();
00221 
00222   // Extract the main object.
00223   if (! (o.object = extractNextObject(buf)))
00224     return false;
00225   
00226   // Extract the reference object.
00227   o.reference = extractNextObject(buf);
00228 
00229   // Calculate quality report base name.
00230   int slash = StringOps::rfind(o.name, '/');
00231   std::string qrbase;
00232   qrbase.reserve(o.name.size()+2);
00233   qrbase = (slash == -1 ? o.name : o.name.substr(slash+1, std::string::npos));
00234   qrbase += ".";
00235 
00236   // Extract quality reports.
00237   while (TObjString *qrstr = dynamic_cast<TObjString *>(extractNextObject(buf)))
00238   {
00239     RegexpMatch m;
00240     if (! s_rxmeval.match(qrstr->GetName(), 0, 0, &m))
00241     {
00242       logme()
00243         << "ERROR: unexpected quality report string '"
00244         << qrstr->GetName() << "' for object '"
00245         << o.name << "'\n";
00246       return abortReconstructObject(o);
00247     }
00248 
00249     std::string label = m.matchString(qrstr->GetName(), 1);
00250     std::string type = m.matchString(qrstr->GetName(), 2);
00251     std::string value = m.matchString(qrstr->GetName(), 3);
00252 
00253     if (type != "qr")
00254     {
00255       logme()
00256         << "ERROR: expected a 'qr' for a quality report '"
00257         << qrstr->GetName() << "' but found '" << type
00258         << "' instead\n";
00259       return abortReconstructObject(o);
00260     }
00261 
00262     std::string qrname = label;
00263     qrname.replace(0, qrbase.size(), "");
00264     if (qrname == label)
00265     {
00266       logme()
00267         << "ERROR: quality report label in '"
00268         << qrstr->GetName()
00269         << "' does not match object name '"
00270         << o.name << "'\n";
00271       return abortReconstructObject(o);
00272     }
00273 
00274     m.reset();
00275     if (! s_rxmeqr.match(value, 0, 0, &m))
00276     {
00277       logme()
00278         << "ERROR: quality test value '"
00279         << value << "' is incorrectly formatted\n";
00280       return abortReconstructObject(o);
00281     }
00282 
00283     QValue qval;
00284     qval.code = 0;
00285     qval.qtname = qrname;
00286     qval.message = m.matchString(value, 2);
00287     std::string strcode = m.matchString(value, 1);
00288     const char *p = strcode.c_str();
00289     if (! parseInt(p, "", 0, qval.code) || *p)
00290     {
00291       logme()
00292         << "ERROR: failed to determine quality test code from '"
00293         << value << "'\n";
00294       return abortReconstructObject(o);
00295     }
00296 
00297     o.qreports.push_back(qval);
00298   }
00299 
00300   return true;
00301 }

bool DQMNet::reinstateObject ( DQMStore store,
Object o 
) [protected]

Definition at line 304 of file DQMNet.cc.

References DQMStore::book1D(), DQMStore::book1S(), DQMStore::book2D(), DQMStore::book2S(), DQMStore::book3D(), DQMStore::bookFloat(), DQMStore::bookInt(), DQMStore::bookProfile(), DQMStore::bookProfile2D(), DQMStore::bookString(), e, MonitorElement::Fill(), i, logme(), m, lat::Regexp::match(), lat::RegexpMatch::matchString(), name, DQMNet::CoreObject::name, DQMNet::CoreObject::object, reconstructObject(), s_rxmeval(), DQMStore::setCurrentFolder(), t, DQMStore::tag(), DQMNet::CoreObject::tags, and value.

Referenced by DQMBasicNet::receive().

00305 {
00306   if (! reconstructObject (o))
00307     return false;
00308 
00309   // Reconstruct the main object
00310   std::string folder = o.name;
00311   std::string name = o.name;
00312   folder.erase(folder.rfind('/'), std::string::npos);
00313   name.erase(0, name.rfind('/')+1);
00314   store->setCurrentFolder(folder);
00315   if (TProfile2D *t = dynamic_cast<TProfile2D *>(o.object))
00316     store->bookProfile2D(name, t);
00317   else if (TProfile *t = dynamic_cast<TProfile *>(o.object))
00318     store->bookProfile(name, t);
00319   else if (TH3F *t = dynamic_cast<TH3F *>(o.object))
00320     store->book3D(name, t);
00321   else if (TH2F *t = dynamic_cast<TH2F *>(o.object))
00322     store->book2D(name, t);
00323   else if (TH2S *t = dynamic_cast<TH2S *>(o.object))
00324     store->book2S(name, t);
00325   else if (TH1F *t = dynamic_cast<TH1F *>(o.object))
00326     store->book1D(name, t);
00327   else if (TH1S *t = dynamic_cast<TH1S *>(o.object))
00328     store->book1S(name, t);
00329   else if (TObjString *t = dynamic_cast<TObjString *>(o.object))
00330   {
00331     RegexpMatch m;
00332     if (! s_rxmeval.match(t->GetName(), 0, 0, &m))
00333     {
00334       logme()
00335         << "ERROR: unexpected monitor element string '"
00336         << t->GetName() << "' for object '"
00337         << o.name << "'\n";
00338       return false;
00339     }
00340 
00341     // std::string label = m.matchString(t->GetName(), 1);
00342     std::string type = m.matchString(t->GetName(), 2);
00343     std::string value = m.matchString(t->GetName(), 3);
00344 
00345     if (type == "i")
00346       store->bookInt(name)->Fill(atoi(value.c_str()));
00347     else if (type == "f")
00348       store->bookFloat(name)->Fill(atof(value.c_str()));
00349     else if (type == "s")
00350       store->bookString(name, value);
00351     else
00352     {
00353       logme()
00354         << "ERROR: unexpected string monitor element of type '"
00355         << type << "' (from '" << t->GetName() << "') for object '"
00356         << o.name << "'\n";
00357       return false;
00358     }
00359   }
00360 
00361   // Reconstruct tags.  (FIXME: untag old tags first?)
00362   for (size_t i = 0, e = o.tags.size(); i < e; ++i)
00363     store->tag(o.name, o.tags[i]);
00364 
00365   // FIXME: Reference and quality reports?
00366 
00367   // Inidicate success.
00368   return true;
00369 }

void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
) [private]

Definition at line 163 of file DQMNet.cc.

References alivecheck_mergeAndRegister::msg, DQMNet::Bucket::next, releaseFromWait(), and waiting_.

00164 {
00165   Bucket **msg = &i->peer->sendq;
00166   while (*msg)
00167     msg = &(*msg)->next;
00168   *msg = new Bucket;
00169   (*msg)->next = 0;
00170 
00171   releaseFromWait(*msg, *i, o);
00172 
00173   assert(i->peer->waiting > 0);
00174   i->peer->waiting--;
00175   waiting_.erase(i);
00176 }

void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
) [protected, virtual]

Definition at line 382 of file DQMNet.cc.

References copydata(), DQMNet::Bucket::data, DQM_REPLY_NONE, DQMNet::WaitObject::name, sendObjectToPeer(), and sendScalarAsText_.

Referenced by releaseFromWait(), releaseWaiters(), and run().

00383 {
00384   if (o)
00385     sendObjectToPeer (msg, *o, true, sendScalarAsText_);
00386   else
00387   {
00388     uint32_t words [3];
00389     words[0] = sizeof(words) + w.name.size();
00390     words[1] = DQM_REPLY_NONE;
00391     words[2] = w.name.size();
00392 
00393     msg->data.reserve(msg->data.size() + words[0]);
00394     copydata(msg, &words[0], sizeof(words));
00395     copydata(msg, &w.name[0], w.name.size());
00396   }
00397 }

void DQMNet::releaseWaiters ( Object o  )  [private]

Definition at line 180 of file DQMNet.cc.

References e, i, DQMNet::CoreObject::name, releaseFromWait(), and waiting_.

Referenced by onMessage().

00181 {
00182   for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
00183     if (i->name == o->name)
00184       releaseFromWait(i++, o);
00185     else
00186       ++i;
00187 }

void DQMNet::removeLocalObject ( const std::string &  name  )  [virtual]

Reimplemented in DQMBasicNet.

Definition at line 1407 of file DQMNet.cc.

References logme().

Referenced by DQMService::flush().

01408 {
01409   logme() << "ERROR: removeLocalObject() method is not supported.\n";
01410 }

virtual void DQMNet::removePeer ( Peer p,
lat::Socket s 
) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by losePeer().

void DQMNet::requestFullUpdates ( bool  doit  ) 

Enable or disable requests for full updates.

Set this to get the "old" DQM networking behaviour to automatically fetch all upstream content when it changes, rather than fetching it lazily as needed. You must call this method if you use receive(); any other use is strongly discouraged. Must be called before run() or start().

Definition at line 1116 of file DQMNet.cc.

References requestFullUpdates_.

Referenced by DQMOldReceiver::DQMOldReceiver().

01117 {
01118   requestFullUpdates_ = doit;
01119 }

virtual void DQMNet::requestFullUpdatesFromPeers ( void   )  [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, DQMBasicNet, and DQMImplNet< DQMNet::Object >.

Referenced by onMessage().

void DQMNet::requestObject ( Peer p,
const char *  name,
size_t  len 
) [private]

Queue an object request to the data server.

Definition at line 128 of file DQMNet.cc.

References copydata(), DQM_MSG_GET_OBJECT, alivecheck_mergeAndRegister::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.

Referenced by onMessage(), and waitForData().

00129 {
00130   Bucket **msg = &p->sendq;
00131   while (*msg)
00132     msg = &(*msg)->next;
00133   *msg = new Bucket;
00134   (*msg)->next = 0;
00135 
00136   uint32_t words[3];
00137   words[0] = sizeof(words) + len;
00138   words[1] = DQM_MSG_GET_OBJECT;
00139   words[2] = len;
00140   copydata(*msg, words, sizeof(words));
00141   copydata(*msg, name, len);
00142 }

void DQMNet::run ( void   ) 

Run the actual I/O processing loop.

Definition at line 1254 of file DQMNet.cc.

References lat::Socket::abort(), arg, lat::IOSelector::attach(), DQMNet::Peer::automatic, lat::InetSocket::connect(), copydata(), lat::CreateHook(), createPeer(), debug_, delay_, lat::IOSelector::dispatch(), downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, e, lat::endl(), lat::SysErr::ErrOperationInProgress, lat::Error::explain(), findObject(), flush_, DQMNet::AutoPeer::host, lat::InetAddress::hostname(), i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, lat::Error::next(), onPeerData(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, p, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, DQMNet::AutoPeer::port, lat::InetAddress::port(), lat::SystemError::portable(), purgeDeadObjects(), releaseFromWait(), requestFullUpdates_, s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, lat::IOChannel::setBlocking(), lat::Socket::setopt(), shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, lat::SocketConst::TypeStream, unlock(), DQMNet::AutoPeer::update, DQMNet::Peer::update, updatePeerMasks(), upstream_, waiting_, and DQMNet::AutoPeer::warned.

01255 {
01256   Time now;
01257   Time nextFlush = 0;
01258   AutoPeer *automatic[2] = { &upstream_, &downstream_ };
01259 
01260   // Perform I/O.  Every once in a while flush updates to peers.
01261   while (! shouldStop())
01262   {
01263     for (int i = 0; i < 2; ++i)
01264     {
01265       AutoPeer *ap = automatic[i];
01266 
01267       // If we need a server connection and don't have one yet,
01268       // initiate asynchronous connection creation.  Swallow errors
01269       // in case the server won't talk to us.
01270       if (! ap->host.empty()
01271           && ! ap->peer
01272           && (now = Time::current()) > ap->next)
01273       {
01274         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01275         InetSocket *s = 0;
01276         try
01277         {
01278           s = new InetSocket (SocketConst::TypeStream);
01279           s->setBlocking (false);
01280           s->connect(InetAddress (ap->host.c_str(), ap->port));
01281           s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
01282           s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
01283         }
01284         catch (Error &e)
01285         {
01286           SystemError *sys = dynamic_cast<SystemError *>(e.next());
01287           if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
01288           {
01289             // "In progress" just means the connection is in progress.
01290             // The connection is ready when the socket is writeable.
01291             // Anything else is a real problem.
01292             if (! ap->warned)
01293             {
01294               logme()
01295                 << "NOTE: DQM server at " << ap->host << ":" << ap->port
01296                 << " is unavailable.  Connection will be established"
01297                 << " automatically on the background once the server"
01298                 << " becomes available.  Error from the attempt was: "
01299                 << e.explain() << '\n';
01300               ap->warned = true;
01301             }
01302 
01303             if (s)
01304               s->abort();
01305             delete s;
01306             s = 0;
01307           }
01308         }
01309 
01310         // Set up with the selector if we were successful.  If this is
01311         // the upstream collector, queue a request for updates.
01312         if (s)
01313         {
01314           lock();
01315           Peer *p = createPeer(s);
01316           ap->peer = p;
01317           ap->warned = false;
01318           unlock();
01319 
01320           InetAddress peeraddr = ((InetSocket *) s)->peername();
01321           InetAddress myaddr = ((InetSocket *) s)->sockname();
01322           p->peeraddr = StringFormat("%1:%2")
01323                         .arg(peeraddr.hostname())
01324                         .arg(peeraddr.port());
01325           p->mask = IORead|IOWrite|IOUrgent;
01326           p->update = ap->update;
01327           p->automatic = ap;
01328           p->socket = s;
01329           sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
01330           if (ap == &upstream_)
01331           {
01332             uint32_t words[5] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
01333                                   3*sizeof(uint32_t), DQM_MSG_UPDATE_ME,
01334                                   requestFullUpdates_ };
01335             p->sendq = new Bucket;
01336             p->sendq->next = 0;
01337             copydata(p->sendq, words, sizeof(words));
01338           }
01339 
01340           // Report the new connection.
01341           if (debug_)
01342             logme()
01343               << "INFO: now connected to " << p->peeraddr << " from "
01344               << myaddr.hostname() << ":" << myaddr.port() << std::endl;
01345         }
01346       }
01347     }
01348 
01349     // Pump events for a while.
01350     sel_.dispatch(delay_);
01351     now = Time::current();
01352 
01353     // Check if flush is required.  Flush only if one is needed.
01354     // Always sends the full object list, but only rarely.
01355     // Compact objects no longer in active use before sending
01356     // out the update.
01357     if (flush_ && now > nextFlush)
01358     {
01359       flush_ = false;
01360       nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
01361 
01362       lock();
01363       purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0),
01364                        now - TimeSpan(0, 0, 20 /* minutes */, 0, 0));
01365       sendObjectListToPeers(true);
01366       unlock();
01367     }
01368 
01369     // Update the data server and peer selection masks.  If we
01370     // have no more data to send and listening for writes, remove
01371     // the write mask.  If we have something to write and aren't
01372     // listening for writes, start listening so we can send off
01373     // the data.
01374     updatePeerMasks();
01375 
01376     // Release peers that have been waiting for data for too long.
01377     lock();
01378     Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0);
01379     for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
01380     {
01381       // If the peer has waited for too long, send something.
01382       if (i->time < waitold)
01383         releaseFromWait(i++, findObject(0, i->name));
01384 
01385       // Keep it for now.
01386       else
01387         ++i;
01388     }
01389     unlock();
01390   }
01391 }

void DQMNet::sendLocalChanges ( void   ) 

Definition at line 1415 of file DQMNet.cc.

References lat::Pipe::sink(), wakeup_, and lat::IOChannel::write().

Referenced by DQMService::flush(), and DQMImplNet< DQMNet::Object >::removePeer().

01416 {
01417   char byte = 0;
01418   wakeup_.sink()->write(&byte, 1);
01419 }

virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  data,
bool  all,
bool  clear 
) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by onMessage().

virtual void DQMNet::sendObjectListToPeers ( bool  all  )  [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data,
bool  text 
) [protected]

Definition at line 432 of file DQMNet.cc.

References copydata(), DQMNet::Bucket::data, DQM_FLAG_SCALAR, DQM_FLAG_TEXT, DQM_FLAG_ZOMBIE, DQM_REPLY_OBJECT, extractScalarData(), DQMNet::CoreObject::flags, flags, DQMNet::CoreObject::name, DQMNet::Object::rawdata, DQMNet::CoreObject::tags, and DQMNet::CoreObject::version.

Referenced by onMessage(), releaseFromWait(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

00433 {
00434   uint32_t flags = o.flags & ~DQM_FLAG_ZOMBIE;
00435   DataBlob objdata;
00436 
00437   if (text && extractScalarData(objdata, o))
00438     flags |= DQM_FLAG_TEXT;
00439   else if (data || (flags & DQM_FLAG_SCALAR))
00440     objdata.insert(objdata.end(),
00441                    &o.rawdata[0],
00442                    &o.rawdata[0] + o.rawdata.size());
00443 
00444   uint32_t words [8];
00445   uint32_t namelen = o.name.size();
00446   uint32_t taglen  = o.tags.size() * sizeof(uint32_t);
00447   uint32_t datalen = objdata.size();
00448 
00449   words[0] = 8*sizeof(uint32_t) + namelen + taglen + datalen;
00450   words[1] = DQM_REPLY_OBJECT;
00451   words[2] = flags;
00452   words[3] = (o.version >> 0 ) & 0xffffffff;
00453   words[4] = (o.version >> 32) & 0xffffffff;
00454   words[5] = namelen;
00455   words[6] = taglen / sizeof(uint32_t);
00456   words[7] = datalen;
00457 
00458   msg->data.reserve(msg->data.size() + words[0]);
00459   copydata(msg, &words[0], 8*sizeof(uint32_t));
00460   if (namelen)
00461     copydata(msg, &o.name[0], namelen);
00462   if (taglen)
00463     copydata(msg, &o.tags[0], taglen);
00464   if (datalen)
00465     copydata(msg, &objdata[0], datalen);
00466 }

void DQMNet::sendScalarAsText ( bool  doit  ) 

Enable or disable sending scalar monitoring values as text, rather than their ROOT object values.

Must be called before run() or start().

Definition at line 1105 of file DQMNet.cc.

References sendScalarAsText_.

01106 {
01107   sendScalarAsText_ = doit;
01108 }

bool DQMNet::shouldStop ( void   )  [protected, virtual]

Definition at line 374 of file DQMNet.cc.

References shutdown_.

Referenced by run().

00375 {
00376   return shutdown_;
00377 }

void DQMNet::shutdown ( void   ) 

Stop the network layer and wait it to finish.

Definition at line 1198 of file DQMNet.cc.

References communicate_, and shutdown_.

Referenced by DQMService::shutdown().

01199 {
01200   shutdown_ = 1;
01201   if (communicate_ != (pthread_t) -1)
01202     pthread_join(communicate_, 0);
01203 }

void DQMNet::start ( void   ) 

Start running the network layer in a new thread.

This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1239 of file DQMNet.cc.

References communicate(), communicate_, lock_, logme(), and pthread_create.

Referenced by DQMService::DQMService().

01240 {
01241   if (communicate_ != (pthread_t) -1)
01242   {
01243     logme()
01244       << "ERROR: DQM networking thread has already been started\n";
01245     return;
01246   }
01247 
01248   pthread_mutex_init(&lock_, 0);
01249   pthread_create (&communicate_, 0, &communicate, this);
01250 }

void DQMNet::startLocalServer ( int  port  ) 

Start a server socket for accessing this DQM node remotely.

Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1125 of file DQMNet.cc.

References lat::IOSelector::attach(), lat::CreateHook(), e, lat::endl(), Exception, lat::Error::explain(), IOAccept, logme(), onPeerConnect(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, sel_, server_, lat::IOChannel::setBlocking(), lat::Socket::setopt(), and SOCKET_BUF_SIZE.

01126 {
01127   if (server_)
01128   {
01129     logme() << "ERROR: DQM server was already started.\n";
01130     return;
01131   }
01132 
01133   try
01134   {
01135     server_ = new InetServerSocket(InetAddress (port), 10);
01136     server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE);
01137     server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE);
01138     server_->setBlocking(false);
01139     sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
01140   }
01141   catch (Error &e)
01142   {
01143     // FIXME: Do we need to do this when we throw an exception anyway?
01144     // FIXME: Abort instead?
01145     logme()
01146       << "ERROR: Failed to start server at port " << port << ": "
01147       << e.explain() << std::endl;
01148 
01149     // FIXME: Throw something simpler that removes the dependency?
01150     throw cms::Exception("DQMNet::startLocalServer")
01151       << "Failed to start server at port " << port << ": "
01152       << e.explain();
01153   }
01154   
01155   logme() << "INFO: DQM server started at port " << port << std::endl;
01156 }

void DQMNet::unlock ( void   ) 

Release the lock on the DQM net layer.

Definition at line 1229 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flush(), onMessage(), DQMBasicNet::receive(), and run().

01230 {
01231   if (communicate_ != (pthread_t) -1)
01232     pthread_mutex_unlock(&lock_);
01233 }

void DQMNet::updateLocalObject ( Object o  )  [virtual]

Reimplemented in DQMBasicNet.

Definition at line 1401 of file DQMNet.cc.

References logme().

Referenced by DQMService::flush().

01402 {
01403   logme() << "ERROR: updateLocalObject() method is not supported.\n";
01404 }

void DQMNet::updateMask ( Peer p  )  [protected]

Update the selector mask for a peer based on data queues.

Close the connection if there is no reason to maintain it open.

Definition at line 1023 of file DQMNet.cc.

References debug_, lat::endl(), IOUrgent, IOWrite, logme(), losePeer(), DQMNet::Peer::mask, DQMNet::Peer::peeraddr, sel_, DQMNet::Peer::sendq, lat::IOSelector::setMask(), DQMNet::Peer::socket, and DQMNet::Peer::waiting.

Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().

01024 {
01025   if (! p->socket)
01026     return;
01027 
01028   // Listen to writes iff we have data to send.
01029   unsigned oldmask = p->mask;
01030   if (! p->sendq && (p->mask & IOWrite))
01031     sel_.setMask(p->socket, p->mask &= ~IOWrite);
01032 
01033   if (p->sendq && ! (p->mask & IOWrite))
01034     sel_.setMask(p->socket, p->mask |= IOWrite);
01035 
01036   if (debug_ && oldmask != p->mask)
01037     logme()
01038       << "DEBUG: updating mask for " << p->peeraddr << " to "
01039       << p->mask << " from " << oldmask << std::endl;
01040 
01041   // If we have nothing more to send and are no longer listening
01042   // for reads, close up the shop for this peer.
01043   if (p->mask == IOUrgent && ! p->waiting)
01044   {
01045     assert (! p->sendq);
01046     if (debug_)
01047       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
01048     losePeer(0, p, 0);
01049   }
01050 }

virtual void DQMNet::updatePeerMasks ( void   )  [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes.

Must be called before calling run() or start().

Definition at line 1162 of file DQMNet.cc.

References downstream_, lat::endl(), DQMNet::AutoPeer::host, logme(), DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

Referenced by DQMService::DQMService().

01163 {
01164   if (! downstream_.host.empty())
01165   {
01166     logme()
01167       << "ERROR: Already updating another collector at "
01168       << downstream_.host << ":" << downstream_.port << std::endl;
01169     return;
01170   }
01171 
01172   downstream_.update = true;
01173   downstream_.host = host;
01174   downstream_.port = port;
01175 }

void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
) [protected]

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 147 of file DQMNet.cc.

References requestObject(), DQMNet::Peer::waiting, and waiting_.

Referenced by onMessage().

00148 {
00149   // FIXME: Should we automatically record which exact peer the waiter
00150   // is expecting to deliver data so we know to release the waiter if
00151   // the other peer vanishes?  The current implementation stands a
00152   // chance for the waiter to wait indefinitely -- although we do
00153   // force terminate the wait after a while.
00154   requestObject(owner, name.size() ? &name[0] : 0, name.size());
00155   WaitObject wo = { Time::current(), name, info, p };
00156   waiting_.push_back(wo);
00157   p->waiting++;
00158 }


Member Data Documentation

std::string DQMNet::appname_ [private]

Definition at line 193 of file DQMNet.h.

Referenced by logme().

pthread_t DQMNet::communicate_ [private]

Definition at line 206 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool DQMNet::debug_ [protected]

Definition at line 175 of file DQMNet.h.

Referenced by debug(), onMessage(), onPeerConnect(), onPeerData(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), and updateMask().

int DQMNet::delay_ [private]

Definition at line 209 of file DQMNet.h.

Referenced by delay(), and run().

AutoPeer DQMNet::downstream_ [private]

Definition at line 202 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

const uint32_t DQMNet::DQM_FLAG_DEAD = 0x80000000 [static]

Definition at line 42 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::markObjectsDead(), onMessage(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), and DQMBasicNet::receive().

const uint32_t DQMNet::DQM_FLAG_NEW = 0x40000000 [static]

Definition at line 41 of file DQMNet.h.

Referenced by MonitorElement::MonitorElement(), onMessage(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), MonitorElement::update(), and MonitorElement::wasUpdated().

const uint32_t DQMNet::DQM_FLAG_RECEIVED = 0x20000000 [static]

Definition at line 40 of file DQMNet.h.

Referenced by onMessage(), and DQMBasicNet::receive().

const uint32_t DQMNet::DQM_FLAG_REPORT_ERROR = 0x1 [static]

Definition at line 34 of file DQMNet.h.

Referenced by MonitorElement::updateQReportStats().

const uint32_t DQMNet::DQM_FLAG_REPORT_OTHER = 0x4 [static]

Definition at line 36 of file DQMNet.h.

Referenced by MonitorElement::updateQReportStats().

const uint32_t DQMNet::DQM_FLAG_REPORT_WARNING = 0x2 [static]

Definition at line 35 of file DQMNet.h.

Referenced by MonitorElement::updateQReportStats().

const uint32_t DQMNet::DQM_FLAG_SCALAR = 0x8 [static]

Definition at line 37 of file DQMNet.h.

Referenced by extractScalarData(), MonitorElement::initialise(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), and sendObjectToPeer().

const uint32_t DQMNet::DQM_FLAG_TEXT = 0x10000000 [static]

Definition at line 39 of file DQMNet.h.

Referenced by sendObjectToPeer().

const uint32_t DQMNet::DQM_FLAG_ZOMBIE = 0x08000000 [static]

Definition at line 38 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::markObjectsDead(), DQMImplNet< DQMNet::Object >::markObjectsZombies(), and sendObjectToPeer().

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3 [static]

Definition at line 27 of file DQMNet.h.

Referenced by onMessage(), and requestObject().

const uint32_t DQMNet::DQM_MSG_HELLO = 0 [static]

Definition at line 24 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2 [static]

Definition at line 26 of file DQMNet.h.

Referenced by onMessage(), and run().

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1 [static]

Definition at line 25 of file DQMNet.h.

Referenced by onMessage(), DQMBasicNet::requestFullUpdatesFromPeers(), and run().

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101 [static]

Definition at line 29 of file DQMNet.h.

Referenced by onMessage(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102 [static]

Definition at line 30 of file DQMNet.h.

Referenced by onMessage(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_NONE = 103 [static]

Definition at line 31 of file DQMNet.h.

Referenced by onMessage(), and releaseFromWait().

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104 [static]

Definition at line 32 of file DQMNet.h.

Referenced by onMessage(), and sendObjectToPeer().

bool DQMNet::flush_ [private]

Definition at line 210 of file DQMNet.h.

Referenced by onLocalNotify(), onMessage(), and run().

pthread_mutex_t DQMNet::lock_ [private]

Definition at line 205 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128 [static]

Definition at line 44 of file DQMNet.h.

Referenced by onPeerData().

int DQMNet::pid_ [private]

Definition at line 194 of file DQMNet.h.

Referenced by logme().

bool DQMNet::requestFullUpdates_ [protected]

Definition at line 177 of file DQMNet.h.

Referenced by onMessage(), requestFullUpdates(), and run().

lat::IOSelector DQMNet::sel_ [private]

Definition at line 196 of file DQMNet.h.

Referenced by DQMNet(), losePeer(), onPeerConnect(), onPeerData(), run(), startLocalServer(), and updateMask().

bool DQMNet::sendScalarAsText_ [protected]

Definition at line 176 of file DQMNet.h.

Referenced by onMessage(), releaseFromWait(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), and sendScalarAsText().

lat::InetServerSocket* DQMNet::server_ [private]

Definition at line 197 of file DQMNet.h.

Referenced by onPeerConnect(), and startLocalServer().

sig_atomic_t DQMNet::shutdown_ [private]

Definition at line 207 of file DQMNet.h.

Referenced by shouldStop(), and shutdown().

AutoPeer DQMNet::upstream_ [private]

Definition at line 201 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

lat::Time DQMNet::version_ [private]

Definition at line 199 of file DQMNet.h.

WaitList DQMNet::waiting_ [private]

Definition at line 203 of file DQMNet.h.

Referenced by losePeer(), releaseFromWait(), releaseWaiters(), run(), and waitForData().

lat::Pipe DQMNet::wakeup_ [private]

Definition at line 198 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:18:35 2009 for CMSSW by  doxygen 1.5.4