#include <DQMServices/Core/interface/DQMNet.h>
Public Types | |
typedef std::vector< unsigned char > | DataBlob |
typedef std::vector< QValue > | QReports |
typedef std::vector< uint32_t > | TagList |
typedef std::list< WaitObject > | WaitList |
Public Member Functions | |
void | debug (bool doit) |
Enable or disable verbose debugging. | |
void | delay (int delay) |
Set the I/O dispatching delay. | |
DQMNet (const std::string &appname="") | |
void | listenToCollector (const std::string &host, int port) |
Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. | |
void | lock (void) |
Acquire a lock on the DQM net layer. | |
virtual int | receive (DQMStore *store) |
virtual void | removeLocalObject (const std::string &name) |
void | requestFullUpdates (bool doit) |
Enable or disable requests for full updates. | |
void | run (void) |
Run the actual I/O processing loop. | |
void | sendLocalChanges (void) |
void | sendScalarAsText (bool doit) |
Enable or disable sending scalar monitoring values as text, rather than their ROOT object values. | |
void | shutdown (void) |
Stop the network layer and wait it to finish. | |
void | start (void) |
Start running the network layer in a new thread. | |
void | startLocalServer (int port) |
Start a server socket for accessing this DQM node remotely. | |
void | unlock (void) |
Release the lock on the DQM net layer. | |
virtual void | updateLocalObject (Object &o) |
void | updateToCollector (const std::string &host, int port) |
Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. | |
virtual | ~DQMNet (void) |
Static Public Attributes | |
static const uint32_t | DQM_FLAG_DEAD = 0x80000000 |
static const uint32_t | DQM_FLAG_NEW = 0x40000000 |
static const uint32_t | DQM_FLAG_RECEIVED = 0x20000000 |
static const uint32_t | DQM_FLAG_REPORT_ERROR = 0x1 |
static const uint32_t | DQM_FLAG_REPORT_OTHER = 0x4 |
static const uint32_t | DQM_FLAG_REPORT_WARNING = 0x2 |
static const uint32_t | DQM_FLAG_SCALAR = 0x8 |
static const uint32_t | DQM_FLAG_TEXT = 0x10000000 |
static const uint32_t | DQM_FLAG_ZOMBIE = 0x08000000 |
static const uint32_t | DQM_MSG_GET_OBJECT = 3 |
static const uint32_t | DQM_MSG_HELLO = 0 |
static const uint32_t | DQM_MSG_LIST_OBJECTS = 2 |
static const uint32_t | DQM_MSG_UPDATE_ME = 1 |
static const uint32_t | DQM_REPLY_LIST_BEGIN = 101 |
static const uint32_t | DQM_REPLY_LIST_END = 102 |
static const uint32_t | DQM_REPLY_NONE = 103 |
static const uint32_t | DQM_REPLY_OBJECT = 104 |
static const uint32_t | MAX_PEER_WAITREQS = 128 |
Protected Member Functions | |
virtual Peer * | createPeer (lat::Socket *s)=0 |
bool | extractScalarData (DataBlob &objdata, Object &o) |
virtual Object * | findObject (Peer *p, const std::string &name, Peer **owner=0)=0 |
virtual Peer * | getPeer (lat::Socket *s)=0 |
std::ostream & | logme (void) |
virtual Object * | makeObject (Peer *p, const std::string &name)=0 |
virtual void | markObjectsDead (Peer *p)=0 |
virtual void | markObjectsZombies (Peer *p)=0 |
virtual bool | onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len) |
virtual void | purgeDeadObjects (lat::Time oldobj, lat::Time deadobj)=0 |
bool | reconstructObject (Object &o) |
bool | reinstateObject (DQMStore *store, Object &o) |
virtual void | releaseFromWait (Bucket *msg, WaitObject &w, Object *o) |
virtual void | removePeer (Peer *p, lat::Socket *s)=0 |
virtual void | requestFullUpdatesFromPeers (void)=0 |
virtual void | sendObjectListToPeer (Bucket *msg, bool data, bool all, bool clear)=0 |
virtual void | sendObjectListToPeers (bool all)=0 |
void | sendObjectToPeer (Bucket *msg, Object &o, bool data, bool text) |
virtual bool | shouldStop (void) |
void | updateMask (Peer *p) |
Update the selector mask for a peer based on data queues. | |
virtual void | updatePeerMasks (void)=0 |
void | waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner) |
Queue a request for an object and put a peer into the mode of waiting for object data to appear. | |
Static Protected Member Functions | |
static void | copydata (Bucket *b, const void *data, size_t len) |
Protected Attributes | |
bool | debug_ |
bool | requestFullUpdates_ |
bool | sendScalarAsText_ |
Private Member Functions | |
DQMNet (const DQMNet &) | |
bool | losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0) |
Handle errors with a peer socket. | |
bool | onLocalNotify (lat::IOSelectEvent *ev) |
React to notifications from the DQM thread. | |
bool | onPeerConnect (lat::IOSelectEvent *ev) |
Respond to new connections on the server socket. | |
bool | onPeerData (lat::IOSelectEvent *ev, Peer *p) |
Handle communication to a particular client. | |
DQMNet & | operator= (const DQMNet &) |
void | releaseFromWait (WaitList::iterator i, Object *o) |
void | releaseWaiters (Object *o) |
void | requestObject (Peer *p, const char *name, size_t len) |
Queue an object request to the data server. | |
Static Private Member Functions | |
static void | discard (Bucket *&b) |
Private Attributes | |
std::string | appname_ |
pthread_t | communicate_ |
int | delay_ |
AutoPeer | downstream_ |
bool | flush_ |
pthread_mutex_t | lock_ |
int | pid_ |
lat::IOSelector | sel_ |
lat::InetServerSocket * | server_ |
sig_atomic_t | shutdown_ |
AutoPeer | upstream_ |
lat::Time | version_ |
WaitList | waiting_ |
lat::Pipe | wakeup_ |
Classes | |
struct | AutoPeer |
struct | Bucket |
struct | CoreObject |
struct | Object |
struct | Peer |
struct | QValue |
struct | WaitObject |
Definition at line 21 of file DQMNet.h.
typedef std::vector<unsigned char> DQMNet::DataBlob |
typedef std::vector<QValue> DQMNet::QReports |
typedef std::vector<uint32_t> DQMNet::TagList |
typedef std::list<WaitObject> DQMNet::WaitList |
DQMNet::DQMNet | ( | const std::string & | appname = "" |
) |
Definition at line 1053 of file DQMNet.cc.
References lat::IOSelector::attach(), lat::CreateHook(), downstream_, lat::IOChannel::fd(), IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, lat::Pipe::source(), DQMNet::AutoPeer::update, upstream_, wakeup_, and DQMNet::AutoPeer::warned.
01054 : debug_ (false), 01055 sendScalarAsText_ (false), 01056 requestFullUpdates_ (false), 01057 appname_ (appname.empty() ? "DQMNet" : appname.c_str()), 01058 pid_ (getpid()), 01059 server_ (0), 01060 version_ (Time::current()), 01061 communicate_ ((pthread_t) -1), 01062 shutdown_ (0), 01063 delay_ (1000), 01064 flush_ (false) 01065 { 01066 // Create a pipe for the local DQM to tell the communicator 01067 // thread that local DQM data has changed and that the peers 01068 // should be notified. 01069 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK); 01070 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify)); 01071 01072 // Initialise the upstream and downstream to empty. 01073 upstream_.peer = downstream_.peer = 0; 01074 upstream_.next = downstream_.next = 0; 01075 upstream_.port = downstream_.port = 0; 01076 upstream_.update = downstream_.update = false; 01077 upstream_.warned = downstream_.warned = false; 01078 }
DQMNet::~DQMNet | ( | void | ) | [virtual] |
DQMNet::DQMNet | ( | const DQMNet & | ) | [private] |
Definition at line 69 of file DQMNet.cc.
References DQMNet::Bucket::data.
Referenced by onMessage(), releaseFromWait(), DQMBasicNet::requestFullUpdatesFromPeers(), requestObject(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), and sendObjectToPeer().
00070 { 00071 b->data.insert(b->data.end(), 00072 (const unsigned char *)data, 00073 (const unsigned char *)data + len); 00074 }
virtual Peer* DQMNet::createPeer | ( | lat::Socket * | s | ) | [protected, pure virtual] |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
Referenced by onPeerConnect(), and run().
Definition at line 78 of file DQMNet.cc.
References DQMNet::Bucket::next.
Referenced by losePeer(), and onPeerData().
00079 { 00080 while (b) 00081 { 00082 Bucket *next = b->next; 00083 delete b; 00084 b = next; 00085 } 00086 }
Definition at line 401 of file DQMNet.cc.
References DQM_FLAG_SCALAR, extractNextObject(), DQMNet::CoreObject::flags, VarParsing::obj, DQMNet::CoreObject::object, DQMNet::Object::rawdata, and s.
Referenced by sendObjectToPeer().
00402 { 00403 if (! o.flags & DQM_FLAG_SCALAR) 00404 return false; 00405 00406 TObject *obj = o.object; 00407 if (! obj && o.rawdata.size()) 00408 { 00409 DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE); 00410 buf.InitMap(); 00411 buf.Reset(); 00412 obj = extractNextObject(buf); 00413 } 00414 00415 if (TObjString *ostr = dynamic_cast<TObjString *>(obj)) 00416 { 00417 const TString &s = ostr->String(); 00418 objdata.insert(objdata.end(), 00419 (unsigned char *) s.Data(), 00420 (unsigned char *) s.Data() + s.Length()); 00421 return true; 00422 } 00423 00424 return false; 00425 }
virtual Object* DQMNet::findObject | ( | Peer * | p, | |
const std::string & | name, | |||
Peer ** | owner = 0 | |||
) | [protected, pure virtual] |
Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.
Referenced by onMessage(), and run().
virtual Peer* DQMNet::getPeer | ( | lat::Socket * | s | ) | [protected, pure virtual] |
Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources.
Must be called before calling run() or start().
Definition at line 1181 of file DQMNet.cc.
References lat::endl(), DQMNet::AutoPeer::host, logme(), DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.
01182 { 01183 if (! upstream_.host.empty()) 01184 { 01185 logme() 01186 << "ERROR: Already receiving data from another collector at " 01187 << upstream_.host << ":" << upstream_.port << std::endl; 01188 return; 01189 } 01190 01191 upstream_.update = false; 01192 upstream_.host = host; 01193 upstream_.port = port; 01194 }
Acquire a lock on the DQM net layer.
Definition at line 1221 of file DQMNet.cc.
References communicate_, and lock_.
Referenced by DQMService::flush(), onMessage(), DQMBasicNet::receive(), and run().
01222 { 01223 if (communicate_ != (pthread_t) -1) 01224 pthread_mutex_lock(&lock_); 01225 }
std::ostream & DQMNet::logme | ( | void | ) | [protected] |
Definition at line 60 of file DQMNet.cc.
References appname_, TestMuL1L2Filter_cff::cerr, and pid_.
Referenced by listenToCollector(), losePeer(), onLocalNotify(), onMessage(), onPeerConnect(), onPeerData(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), receive(), reconstructObject(), reinstateObject(), removeLocalObject(), DQMImplNet< DQMNet::Object >::requestFullUpdatesFromPeers(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), updateLocalObject(), updateMask(), and updateToCollector().
00061 { 00062 return std::cerr 00063 << Time::current().format(true, "%Y-%m-%d %H:%M:%S") 00064 << " " << appname_ << "[" << pid_ << "]: "; 00065 }
bool DQMNet::losePeer | ( | const char * | reason, | |
Peer * | peer, | |||
lat::IOSelectEvent * | event, | |||
lat::Error * | err = 0 | |||
) | [private] |
Handle errors with a peer socket.
Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.
Definition at line 93 of file DQMNet.cc.
References DQMNet::Peer::automatic, lat::Socket::close(), lat::IOSelector::detach(), discard(), e, lat::endl(), lat::Error::explain(), i, logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, removePeer(), s, sel_, DQMNet::Peer::sendq, DQMNet::Peer::socket, lat::IOSelectEvent::source, and waiting_.
Referenced by onPeerData(), and updateMask().
00097 { 00098 if (reason) 00099 logme () 00100 << reason << peer->peeraddr 00101 << (err ? "; error was: " + err->explain() : std::string("")) 00102 << std::endl; 00103 00104 Socket *s = peer->socket; 00105 00106 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 00107 if (i->peer == peer) 00108 waiting_.erase(i++); 00109 else 00110 ++i; 00111 00112 if (ev) 00113 ev->source = 0; 00114 00115 discard(peer->sendq); 00116 if (peer->automatic) 00117 peer->automatic->peer = 0; 00118 00119 sel_.detach (s); 00120 s->close(); 00121 removePeer (peer, s); 00122 delete s; 00123 return true; 00124 }
bool DQMNet::onLocalNotify | ( | lat::IOSelectEvent * | ev | ) | [private] |
React to notifications from the DQM thread.
This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.
Definition at line 992 of file DQMNet.cc.
References e, lat::endl(), lat::SysErr::ErrTryAgain, lat::Error::explain(), flush_, logme(), lat::Error::next(), lat::SystemError::portable(), lat::IOChannel::read(), and lat::IOSelectEvent::source.
Referenced by DQMNet().
00993 { 00994 // Discard the data in the pipe, we care only about the wakeup. 00995 try 00996 { 00997 IOSize sz; 00998 unsigned char buf [1024]; 00999 while ((sz = ev->source->read(buf, sizeof(buf)))) 01000 ; 01001 } 01002 catch (Error &e) 01003 { 01004 SystemError *next = dynamic_cast<SystemError *>(e.next()); 01005 if (next && next->portable() == SysErr::ErrTryAgain) 01006 ; // Ignore it 01007 else 01008 logme() 01009 << "WARNING: error reading from notification pipe: " 01010 << e.explain() << std::endl; 01011 } 01012 01013 // Tell the main event pump to send an update in a little while. 01014 flush_ = true; 01015 01016 // We are never done, always keep going. 01017 return false; 01018 }
bool DQMNet::onMessage | ( | Bucket * | msg, | |
Peer * | p, | |||
unsigned char * | data, | |||
size_t | len | |||
) | [protected, virtual] |
Definition at line 471 of file DQMNet.cc.
References copydata(), DQMNet::Bucket::data, debug_, DQM_FLAG_DEAD, DQM_FLAG_NEW, DQM_FLAG_RECEIVED, DQM_MSG_GET_OBJECT, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_REPLY_LIST_BEGIN, DQM_REPLY_LIST_END, DQM_REPLY_NONE, DQM_REPLY_OBJECT, lat::endl(), findObject(), DQMNet::CoreObject::flags, flags, flush_, full, DQMNet::Object::lastreq, lock(), logme(), makeObject(), markObjectsDead(), markObjectsZombies(), name, DQMNet::CoreObject::object, DQMNet::Peer::peeraddr, DQMNet::Object::rawdata, DQMNet::CoreObject::reference, releaseWaiters(), requestFullUpdates_, requestFullUpdatesFromPeers(), requestObject(), sendObjectListToPeer(), sendObjectToPeer(), sendScalarAsText_, DQMNet::Peer::source, DQMNet::CoreObject::tags, unlock(), DQMNet::Peer::update, DQMNet::Peer::updatefull, DQMNet::Peer::updates, DQMNet::CoreObject::version, and waitForData().
Referenced by onPeerData().
00472 { 00473 // Decode and process this message. 00474 uint32_t type; 00475 memcpy (&type, data + sizeof(uint32_t), sizeof (type)); 00476 switch (type) 00477 { 00478 case DQM_MSG_UPDATE_ME: 00479 { 00480 if (len != 3*sizeof(uint32_t)) 00481 { 00482 logme() 00483 << "ERROR: corrupt 'UPDATE_ME' message of length " << len 00484 << " from peer " << p->peeraddr << std::endl; 00485 return false; 00486 } 00487 00488 // Get the update status: whether this is a full update. 00489 uint32_t full; 00490 memcpy(&full, data + 2*sizeof(uint32_t), sizeof(uint32_t)); 00491 00492 if (debug_) 00493 logme() 00494 << "DEBUG: received message 'UPDATE ME' from peer " 00495 << p->peeraddr << ", full = " << full << std::endl; 00496 00497 p->update = true; 00498 p->updatefull = full; 00499 00500 if (full && ! requestFullUpdates_) 00501 { 00502 if (debug_) 00503 logme() 00504 << "WARNING: forcing full update request mode on due to " 00505 << "request from " << p->peeraddr << std::endl; 00506 requestFullUpdates_ = true; 00507 requestFullUpdatesFromPeers(); 00508 } 00509 } 00510 return true; 00511 00512 case DQM_MSG_LIST_OBJECTS: 00513 { 00514 if (debug_) 00515 logme() 00516 << "DEBUG: received message 'LIST OBJECTS' from peer " 00517 << p->peeraddr << std::endl; 00518 00519 // Send over current status: list of known objects. 00520 lock(); 00521 sendObjectListToPeer(msg, p->updatefull, true, false); 00522 unlock(); 00523 } 00524 return true; 00525 00526 case DQM_MSG_GET_OBJECT: 00527 { 00528 if (debug_) 00529 logme() 00530 << "DEBUG: received message 'GET OBJECT' from peer " 00531 << p->peeraddr << std::endl; 00532 00533 if (len < 3*sizeof(uint32_t)) 00534 { 00535 logme() 00536 << "ERROR: corrupt 'GET IMAGE' message of length " << len 00537 << " from peer " << p->peeraddr << std::endl; 00538 return false; 00539 } 00540 00541 uint32_t namelen; 00542 memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen)); 00543 if (len != 3*sizeof(uint32_t) + namelen) 00544 { 00545 logme() 00546 << "ERROR: corrupt 'GET OBJECT' message of length " << len 00547 << " from peer " << p->peeraddr 00548 << ", expected length " << (3*sizeof(uint32_t)) 00549 << " + " << namelen << std::endl; 00550 return false; 00551 } 00552 00553 lock(); 00554 std::string name ((char *) data + 3*sizeof(uint32_t), namelen); 00555 Peer *owner = 0; 00556 Object *o = findObject(0, name, &owner); 00557 if (o) 00558 { 00559 o->lastreq = Time::current(); 00560 if (o->rawdata.empty()) 00561 waitForData(p, name, "", owner); 00562 else 00563 sendObjectToPeer(msg, *o, true, sendScalarAsText_); 00564 } 00565 else 00566 { 00567 uint32_t words [3]; 00568 words[0] = sizeof(words) + name.size(); 00569 words[1] = DQM_REPLY_NONE; 00570 words[2] = name.size(); 00571 00572 msg->data.reserve(msg->data.size() + words[0]); 00573 copydata(msg, &words[0], sizeof(words)); 00574 copydata(msg, &name[0], name.size()); 00575 } 00576 unlock(); 00577 } 00578 return true; 00579 00580 case DQM_REPLY_LIST_BEGIN: 00581 { 00582 if (len != 4*sizeof(uint32_t)) 00583 { 00584 logme() 00585 << "ERROR: corrupt 'LIST BEGIN' message of length " << len 00586 << " from peer " << p->peeraddr << std::endl; 00587 return false; 00588 } 00589 00590 if (debug_) 00591 logme() 00592 << "DEBUG: received message 'LIST BEGIN' from " 00593 << p->peeraddr << std::endl; 00594 00595 // Get the update status: whether this is a full update. 00596 uint32_t flags; 00597 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t)); 00598 00599 // If we are about to receive a full list of objects, flag all 00600 // objects dead. Subsequent object notifications will undo this 00601 // for the live objects. This tells us the object has been 00602 // removed, but we can keep making it available for a while if 00603 // there continues to be interest in it. 00604 if (flags) 00605 { 00606 lock(); 00607 markObjectsZombies(p); 00608 unlock(); 00609 } 00610 } 00611 return true; 00612 00613 case DQM_REPLY_LIST_END: 00614 { 00615 if (len != 4*sizeof(uint32_t)) 00616 { 00617 logme() 00618 << "ERROR: corrupt 'LIST END' message of length " << len 00619 << " from peer " << p->peeraddr << std::endl; 00620 return false; 00621 } 00622 00623 // Get the update status: whether this is a full update. 00624 uint32_t flags; 00625 memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t)); 00626 00627 // If we received a full list of objects, flag all zombie objects 00628 // now dead. We need to do this in two stages in case we receive 00629 // updates in many parts, and end up sending updates to others in 00630 // between; this avoids us lying live objects are dead. 00631 if (flags) 00632 { 00633 lock(); 00634 markObjectsDead(p); 00635 unlock(); 00636 } 00637 00638 if (debug_) 00639 logme() 00640 << "DEBUG: received message 'LIST END' from " 00641 << p->peeraddr << std::endl; 00642 00643 // Indicate we have received another update from this peer. 00644 // Also indicate we should flush to our clients. 00645 flush_ = true; 00646 p->updates++; 00647 } 00648 return true; 00649 00650 case DQM_REPLY_OBJECT: 00651 { 00652 uint32_t words[8]; 00653 if (len < sizeof(words)) 00654 { 00655 logme() 00656 << "ERROR: corrupt 'OBJECT' message of length " << len 00657 << " from peer " << p->peeraddr << std::endl; 00658 return false; 00659 } 00660 00661 memcpy (&words[0], data, sizeof(words)); 00662 uint32_t &namelen = words[5]; 00663 uint32_t &taglen = words[6]; 00664 uint32_t &datalen = words[7]; 00665 00666 if (len != sizeof(words) + namelen + taglen*sizeof(uint32_t) + datalen) 00667 { 00668 logme() 00669 << "ERROR: corrupt 'OBJECT' message of length " << len 00670 << " from peer " << p->peeraddr 00671 << ", expected length " << sizeof(words) 00672 << " + " << namelen 00673 << " + " << (taglen*sizeof(uint32_t)) 00674 << " + " << datalen 00675 << std::endl; 00676 return false; 00677 } 00678 00679 unsigned char *namedata = data + sizeof(words); 00680 unsigned char *tagdata = namedata + namelen; 00681 unsigned char *objdata = tagdata + taglen*sizeof(uint32_t); 00682 unsigned char *enddata = objdata + datalen; 00683 std::string name ((char *) namedata, namelen); 00684 assert (enddata == data + len); 00685 00686 if (debug_) 00687 logme() 00688 << "DEBUG: received message 'OBJECT " << name 00689 << "' from " << p->peeraddr << std::endl; 00690 00691 // Mark the peer as a known object source. 00692 p->source = true; 00693 00694 // Initialise or update an object entry. 00695 lock(); 00696 Object *o = findObject(p, name); 00697 if (! o) 00698 o = makeObject(p, name); 00699 00700 o->flags = words[2] | DQM_FLAG_NEW | DQM_FLAG_RECEIVED; 00701 o->version = ((uint64_t) words[4] << 32 | words[3]); 00702 o->tags.clear(); 00703 o->tags.insert(o->tags.end(), (uint32_t *) tagdata, (uint32_t *) objdata); 00704 o->rawdata.clear(); 00705 o->rawdata.insert (o->rawdata.end(), objdata, enddata); 00706 00707 bool hadobject = (o->object != 0); 00708 delete o->object; 00709 o->object = 0; 00710 delete o->reference; 00711 o->reference = 0; 00712 00713 // If we had an object for this one already and this is a list 00714 // update without data, issue an immediate data get request. 00715 if (hadobject && ! datalen) 00716 requestObject(p, (namelen ? &name[0] : 0), namelen); 00717 00718 // If we have the object data, release from wait. 00719 if (datalen) 00720 releaseWaiters(o); 00721 unlock(); 00722 } 00723 return true; 00724 00725 case DQM_REPLY_NONE: 00726 { 00727 uint32_t words[3]; 00728 if (len < sizeof(words)) 00729 { 00730 logme() 00731 << "ERROR: corrupt 'NONE' message of length " << len 00732 << " from peer " << p->peeraddr << std::endl; 00733 return false; 00734 } 00735 00736 memcpy (&words[0], data, sizeof(words)); 00737 uint32_t &namelen = words[2]; 00738 00739 if (len != sizeof(words) + namelen) 00740 { 00741 logme() 00742 << "ERROR: corrupt 'NONE' message of length " << len 00743 << " from peer " << p->peeraddr 00744 << ", expected length " << sizeof(words) 00745 << " + " << namelen << std::endl; 00746 return false; 00747 } 00748 00749 unsigned char *namedata = data + sizeof(words); 00750 unsigned char *enddata = namedata + namelen; 00751 std::string name ((char *) namedata, namelen); 00752 assert (enddata == data + len); 00753 00754 if (debug_) 00755 logme() 00756 << "DEBUG: received message 'NONE " << name 00757 << "' from " << p->peeraddr << std::endl; 00758 00759 // Mark the peer as a known object source. 00760 p->source = true; 00761 00762 // If this was a known object, update its entry. 00763 lock(); 00764 Object *o = findObject(p, name); 00765 if (o) 00766 o->flags |= DQM_FLAG_DEAD; 00767 00768 // If someone was waiting for this, let them go. 00769 releaseWaiters(o); 00770 unlock(); 00771 } 00772 return true; 00773 00774 default: 00775 logme() 00776 << "ERROR: unrecognised message of length " << len 00777 << " and type " << type << " from peer " << p->peeraddr 00778 << std::endl; 00779 return false; 00780 } 00781 }
bool DQMNet::onPeerConnect | ( | lat::IOSelectEvent * | ev | ) | [private] |
Respond to new connections on the server socket.
Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false
always to tell the IOSelector to keep processing events for the server socket.
Definition at line 952 of file DQMNet.cc.
References lat::Socket::accept(), arg, lat::IOSelector::attach(), lat::CreateHook(), createPeer(), debug_, lat::endl(), lat::InetAddress::hostname(), IORead, IOUrgent, lat::IOChannel::isBlocking(), logme(), onPeerData(), p, lat::InetAddress::port(), s, sel_, server_, and lat::IOSelectEvent::source.
Referenced by startLocalServer().
00953 { 00954 // Recover the server socket. 00955 assert (ev->source == server_); 00956 00957 // Accept the connection. 00958 Socket *s = server_->accept(); 00959 assert (s); 00960 assert (! s->isBlocking()); 00961 00962 // Record it to our list of peers. 00963 Peer *p = createPeer(s); 00964 InetAddress peeraddr = ((InetSocket *) s)->peername(); 00965 InetAddress myaddr = ((InetSocket *) s)->sockname(); 00966 p->peeraddr = StringFormat("%1:%2") 00967 .arg(peeraddr.hostname()) 00968 .arg(peeraddr.port()); 00969 p->mask = IORead|IOUrgent; 00970 p->socket = s; 00971 00972 // Report the new connection. 00973 if (debug_) 00974 logme() 00975 << "INFO: new peer " << p->peeraddr << " is now connected to " 00976 << myaddr.hostname() << ":" << myaddr.port() << std::endl; 00977 00978 // Attach it to the listener. 00979 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p)); 00980 00981 // We are never done. 00982 return false; 00983 }
bool DQMNet::onPeerData | ( | lat::IOSelectEvent * | ev, | |
Peer * | p | |||
) | [private] |
Handle communication to a particular client.
Definition at line 786 of file DQMNet.cc.
References DQMNet::Peer::automatic, b, data, DQMNet::Bucket::data, debug_, discard(), e, lat::endl(), lat::SysErr::ErrTryAgain, lat::IOSelectEvent::events, getPeer(), DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, len, logme(), losePeer(), DQMNet::Peer::mask, MAX_PEER_WAITREQS, MESSAGE_SIZE_LIMIT, alivecheck_mergeAndRegister::msg, DQMNet::Bucket::next, lat::Error::next(), old, onMessage(), DQMNet::Peer::peeraddr, lat::SystemError::portable(), lat::IOChannel::read(), sel_, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, lat::IOSelector::setMask(), DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, lat::IOSelectEvent::source, TrackValidation_HighPurity_cff::valid, DQMNet::Peer::waiting, and lat::IOChannel::write().
Referenced by onPeerConnect(), and run().
00787 { 00788 assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p); 00789 00790 // If there is a problem with the peer socket, discard the peer 00791 // and tell the selector to stop prcessing events for it. If 00792 // this is a server connection, we will eventually recreate 00793 // everything if/when the data server comes back. 00794 if (ev->events & IOUrgent) 00795 { 00796 if (p->automatic) 00797 { 00798 logme() 00799 << "WARNING: connection to the DQM server at " << p->peeraddr 00800 << " lost (will attempt to reconnect in 15 seconds)\n"; 00801 return losePeer(0, p, ev); 00802 } 00803 else 00804 return losePeer("WARNING: lost peer connection ", p, ev); 00805 } 00806 00807 // If we can write to the peer socket, pump whatever we can into it. 00808 if (ev->events & IOWrite) 00809 { 00810 while (Bucket *b = p->sendq) 00811 { 00812 IOSize len = b->data.size() - p->sendpos; 00813 const void *data = (len ? (const void *)&b->data[p->sendpos] 00814 : (const void *)&data); 00815 IOSize done; 00816 00817 try 00818 { 00819 done = (len ? ev->source->write (data, len) : 0); 00820 if (debug_ && len) 00821 logme() 00822 << "DEBUG: sent " << done << " bytes to peer " 00823 << p->peeraddr << std::endl; 00824 } 00825 catch (Error &e) 00826 { 00827 return losePeer("WARNING: unable to write to peer ", 00828 p, ev, &e); 00829 } 00830 00831 p->sendpos += done; 00832 if (p->sendpos == b->data.size()) 00833 { 00834 Bucket *old = p->sendq; 00835 p->sendq = old->next; 00836 p->sendpos = 0; 00837 old->next = 0; 00838 discard(old); 00839 } 00840 00841 if (! done && len) 00842 // Cannot write any more. 00843 break; 00844 } 00845 } 00846 00847 // If there is data to be read from the peer, first receive what we 00848 // can get out the socket, the process all complete requests. 00849 if (ev->events & IORead) 00850 { 00851 // First build up the incoming buffer of data in the socket. 00852 // Remember the last size returned by the socket; we need 00853 // it to determine if the remote end closed the connection. 00854 IOSize sz; 00855 try 00856 { 00857 std::vector<unsigned char> buf(SOCKET_READ_SIZE); 00858 do 00859 if ((sz = ev->source->read(&buf[0], buf.size()))) 00860 { 00861 if (debug_) 00862 logme() 00863 << "DEBUG: received " << sz << " bytes from peer " 00864 << p->peeraddr << std::endl; 00865 DataBlob &data = p->incoming; 00866 if (data.capacity () < data.size () + sz) 00867 data.reserve (data.size() + SOCKET_READ_GROWTH); 00868 data.insert (data.end(), &buf[0], &buf[0] + sz); 00869 } 00870 while (sz == sizeof (buf)); 00871 } 00872 catch (Error &e) 00873 { 00874 SystemError *next = dynamic_cast<SystemError *>(e.next()); 00875 if (next && next->portable() == SysErr::ErrTryAgain) 00876 sz = 1; // Ignore it, and fake no end of data. 00877 else 00878 // Houston we have a problem. 00879 return losePeer("WARNING: failed to read from peer ", 00880 p, ev, &e); 00881 } 00882 00883 // Process fully received messages as long as we can. 00884 size_t consumed = 0; 00885 DataBlob &data = p->incoming; 00886 while (data.size()-consumed >= sizeof(uint32_t) 00887 && p->waiting < MAX_PEER_WAITREQS) 00888 { 00889 uint32_t msglen; 00890 memcpy (&msglen, &data[0]+consumed, sizeof(msglen)); 00891 00892 if (msglen >= MESSAGE_SIZE_LIMIT) 00893 return losePeer("WARNING: excessively large message from ", p, ev); 00894 00895 if (data.size()-consumed >= msglen) 00896 { 00897 bool valid = true; 00898 if (msglen < 2*sizeof(uint32_t)) 00899 { 00900 logme() 00901 << "ERROR: corrupt peer message of length " << msglen 00902 << " from peer " << p->peeraddr << std::endl; 00903 valid = false; 00904 } 00905 else 00906 { 00907 // Decode and process this message. 00908 Bucket msg; 00909 msg.next = 0; 00910 valid = onMessage(&msg, p, &data[0]+consumed, msglen); 00911 00912 // If we created a response, chain it to the write queue. 00913 if (! msg.data.empty()) 00914 { 00915 Bucket **prev = &p->sendq; 00916 while (*prev) 00917 prev = &(*prev)->next; 00918 00919 *prev = new Bucket; 00920 (*prev)->next = 0; 00921 (*prev)->data.swap(msg.data); 00922 } 00923 } 00924 00925 if (! valid) 00926 return losePeer("WARNING: data stream error with ", p, ev); 00927 00928 consumed += msglen; 00929 } 00930 else 00931 break; 00932 } 00933 00934 data.erase(data.begin(), data.begin()+consumed); 00935 00936 // If the client has closed the connection, shut down our end. If 00937 // we have something to send back still, leave the write direction 00938 // open. Otherwise close the shop for this client. 00939 if (sz == 0) 00940 sel_.setMask(p->socket, p->mask &= ~IORead); 00941 } 00942 00943 // Yes, please keep processing events for this socket. 00944 return false; 00945 }
Reimplemented in DQMBasicNet.
Definition at line 1394 of file DQMNet.cc.
References logme().
Referenced by DQMOldReceiver::doMonitoring().
01395 { 01396 logme() << "ERROR: receive() method is not supported.\n"; 01397 return 0; 01398 }
Definition at line 217 of file DQMNet.cc.
References abortReconstructObject(), DQMNet::QValue::code, extractNextObject(), label, logme(), m, lat::Regexp::match(), lat::RegexpMatch::matchString(), DQMNet::QValue::message, DQMNet::CoreObject::name, DQMNet::CoreObject::object, p, parseInt(), DQMNet::CoreObject::qreports, DQMNet::QValue::qtname, DQMNet::Object::rawdata, DQMNet::CoreObject::reference, lat::RegexpMatch::reset(), s_rxmeqr, s_rxmeval(), and value.
Referenced by reinstateObject().
00218 { 00219 DQMRootBuffer buf(DQMRootBuffer::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE); 00220 buf.Reset(); 00221 00222 // Extract the main object. 00223 if (! (o.object = extractNextObject(buf))) 00224 return false; 00225 00226 // Extract the reference object. 00227 o.reference = extractNextObject(buf); 00228 00229 // Calculate quality report base name. 00230 int slash = StringOps::rfind(o.name, '/'); 00231 std::string qrbase; 00232 qrbase.reserve(o.name.size()+2); 00233 qrbase = (slash == -1 ? o.name : o.name.substr(slash+1, std::string::npos)); 00234 qrbase += "."; 00235 00236 // Extract quality reports. 00237 while (TObjString *qrstr = dynamic_cast<TObjString *>(extractNextObject(buf))) 00238 { 00239 RegexpMatch m; 00240 if (! s_rxmeval.match(qrstr->GetName(), 0, 0, &m)) 00241 { 00242 logme() 00243 << "ERROR: unexpected quality report string '" 00244 << qrstr->GetName() << "' for object '" 00245 << o.name << "'\n"; 00246 return abortReconstructObject(o); 00247 } 00248 00249 std::string label = m.matchString(qrstr->GetName(), 1); 00250 std::string type = m.matchString(qrstr->GetName(), 2); 00251 std::string value = m.matchString(qrstr->GetName(), 3); 00252 00253 if (type != "qr") 00254 { 00255 logme() 00256 << "ERROR: expected a 'qr' for a quality report '" 00257 << qrstr->GetName() << "' but found '" << type 00258 << "' instead\n"; 00259 return abortReconstructObject(o); 00260 } 00261 00262 std::string qrname = label; 00263 qrname.replace(0, qrbase.size(), ""); 00264 if (qrname == label) 00265 { 00266 logme() 00267 << "ERROR: quality report label in '" 00268 << qrstr->GetName() 00269 << "' does not match object name '" 00270 << o.name << "'\n"; 00271 return abortReconstructObject(o); 00272 } 00273 00274 m.reset(); 00275 if (! s_rxmeqr.match(value, 0, 0, &m)) 00276 { 00277 logme() 00278 << "ERROR: quality test value '" 00279 << value << "' is incorrectly formatted\n"; 00280 return abortReconstructObject(o); 00281 } 00282 00283 QValue qval; 00284 qval.code = 0; 00285 qval.qtname = qrname; 00286 qval.message = m.matchString(value, 2); 00287 std::string strcode = m.matchString(value, 1); 00288 const char *p = strcode.c_str(); 00289 if (! parseInt(p, "", 0, qval.code) || *p) 00290 { 00291 logme() 00292 << "ERROR: failed to determine quality test code from '" 00293 << value << "'\n"; 00294 return abortReconstructObject(o); 00295 } 00296 00297 o.qreports.push_back(qval); 00298 } 00299 00300 return true; 00301 }
Definition at line 304 of file DQMNet.cc.
References DQMStore::book1D(), DQMStore::book1S(), DQMStore::book2D(), DQMStore::book2S(), DQMStore::book3D(), DQMStore::bookFloat(), DQMStore::bookInt(), DQMStore::bookProfile(), DQMStore::bookProfile2D(), DQMStore::bookString(), e, MonitorElement::Fill(), i, logme(), m, lat::Regexp::match(), lat::RegexpMatch::matchString(), name, DQMNet::CoreObject::name, DQMNet::CoreObject::object, reconstructObject(), s_rxmeval(), DQMStore::setCurrentFolder(), t, DQMStore::tag(), DQMNet::CoreObject::tags, and value.
Referenced by DQMBasicNet::receive().
00305 { 00306 if (! reconstructObject (o)) 00307 return false; 00308 00309 // Reconstruct the main object 00310 std::string folder = o.name; 00311 std::string name = o.name; 00312 folder.erase(folder.rfind('/'), std::string::npos); 00313 name.erase(0, name.rfind('/')+1); 00314 store->setCurrentFolder(folder); 00315 if (TProfile2D *t = dynamic_cast<TProfile2D *>(o.object)) 00316 store->bookProfile2D(name, t); 00317 else if (TProfile *t = dynamic_cast<TProfile *>(o.object)) 00318 store->bookProfile(name, t); 00319 else if (TH3F *t = dynamic_cast<TH3F *>(o.object)) 00320 store->book3D(name, t); 00321 else if (TH2F *t = dynamic_cast<TH2F *>(o.object)) 00322 store->book2D(name, t); 00323 else if (TH2S *t = dynamic_cast<TH2S *>(o.object)) 00324 store->book2S(name, t); 00325 else if (TH1F *t = dynamic_cast<TH1F *>(o.object)) 00326 store->book1D(name, t); 00327 else if (TH1S *t = dynamic_cast<TH1S *>(o.object)) 00328 store->book1S(name, t); 00329 else if (TObjString *t = dynamic_cast<TObjString *>(o.object)) 00330 { 00331 RegexpMatch m; 00332 if (! s_rxmeval.match(t->GetName(), 0, 0, &m)) 00333 { 00334 logme() 00335 << "ERROR: unexpected monitor element string '" 00336 << t->GetName() << "' for object '" 00337 << o.name << "'\n"; 00338 return false; 00339 } 00340 00341 // std::string label = m.matchString(t->GetName(), 1); 00342 std::string type = m.matchString(t->GetName(), 2); 00343 std::string value = m.matchString(t->GetName(), 3); 00344 00345 if (type == "i") 00346 store->bookInt(name)->Fill(atoi(value.c_str())); 00347 else if (type == "f") 00348 store->bookFloat(name)->Fill(atof(value.c_str())); 00349 else if (type == "s") 00350 store->bookString(name, value); 00351 else 00352 { 00353 logme() 00354 << "ERROR: unexpected string monitor element of type '" 00355 << type << "' (from '" << t->GetName() << "') for object '" 00356 << o.name << "'\n"; 00357 return false; 00358 } 00359 } 00360 00361 // Reconstruct tags. (FIXME: untag old tags first?) 00362 for (size_t i = 0, e = o.tags.size(); i < e; ++i) 00363 store->tag(o.name, o.tags[i]); 00364 00365 // FIXME: Reference and quality reports? 00366 00367 // Inidicate success. 00368 return true; 00369 }
Definition at line 163 of file DQMNet.cc.
References alivecheck_mergeAndRegister::msg, DQMNet::Bucket::next, releaseFromWait(), and waiting_.
00164 { 00165 Bucket **msg = &i->peer->sendq; 00166 while (*msg) 00167 msg = &(*msg)->next; 00168 *msg = new Bucket; 00169 (*msg)->next = 0; 00170 00171 releaseFromWait(*msg, *i, o); 00172 00173 assert(i->peer->waiting > 0); 00174 i->peer->waiting--; 00175 waiting_.erase(i); 00176 }
void DQMNet::releaseFromWait | ( | Bucket * | msg, | |
WaitObject & | w, | |||
Object * | o | |||
) | [protected, virtual] |
Definition at line 382 of file DQMNet.cc.
References copydata(), DQMNet::Bucket::data, DQM_REPLY_NONE, DQMNet::WaitObject::name, sendObjectToPeer(), and sendScalarAsText_.
Referenced by releaseFromWait(), releaseWaiters(), and run().
00383 { 00384 if (o) 00385 sendObjectToPeer (msg, *o, true, sendScalarAsText_); 00386 else 00387 { 00388 uint32_t words [3]; 00389 words[0] = sizeof(words) + w.name.size(); 00390 words[1] = DQM_REPLY_NONE; 00391 words[2] = w.name.size(); 00392 00393 msg->data.reserve(msg->data.size() + words[0]); 00394 copydata(msg, &words[0], sizeof(words)); 00395 copydata(msg, &w.name[0], w.name.size()); 00396 } 00397 }
Definition at line 180 of file DQMNet.cc.
References e, i, DQMNet::CoreObject::name, releaseFromWait(), and waiting_.
Referenced by onMessage().
00181 { 00182 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 00183 if (i->name == o->name) 00184 releaseFromWait(i++, o); 00185 else 00186 ++i; 00187 }
void DQMNet::removeLocalObject | ( | const std::string & | name | ) | [virtual] |
Reimplemented in DQMBasicNet.
Definition at line 1407 of file DQMNet.cc.
References logme().
Referenced by DQMService::flush().
01408 { 01409 logme() << "ERROR: removeLocalObject() method is not supported.\n"; 01410 }
virtual void DQMNet::removePeer | ( | Peer * | p, | |
lat::Socket * | s | |||
) | [protected, pure virtual] |
Enable or disable requests for full updates.
Set this to get the "old" DQM networking behaviour to automatically fetch all upstream content when it changes, rather than fetching it lazily as needed. You must call this method if you use receive(); any other use is strongly discouraged. Must be called before run() or start().
Definition at line 1116 of file DQMNet.cc.
References requestFullUpdates_.
Referenced by DQMOldReceiver::DQMOldReceiver().
01117 { 01118 requestFullUpdates_ = doit; 01119 }
Implemented in DQMImplNet< ObjType >, DQMBasicNet, and DQMImplNet< DQMNet::Object >.
Referenced by onMessage().
Queue an object request to the data server.
Definition at line 128 of file DQMNet.cc.
References copydata(), DQM_MSG_GET_OBJECT, alivecheck_mergeAndRegister::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.
Referenced by onMessage(), and waitForData().
00129 { 00130 Bucket **msg = &p->sendq; 00131 while (*msg) 00132 msg = &(*msg)->next; 00133 *msg = new Bucket; 00134 (*msg)->next = 0; 00135 00136 uint32_t words[3]; 00137 words[0] = sizeof(words) + len; 00138 words[1] = DQM_MSG_GET_OBJECT; 00139 words[2] = len; 00140 copydata(*msg, words, sizeof(words)); 00141 copydata(*msg, name, len); 00142 }
Run the actual I/O processing loop.
Definition at line 1254 of file DQMNet.cc.
References lat::Socket::abort(), arg, lat::IOSelector::attach(), DQMNet::Peer::automatic, lat::InetSocket::connect(), copydata(), lat::CreateHook(), createPeer(), debug_, delay_, lat::IOSelector::dispatch(), downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, e, lat::endl(), lat::SysErr::ErrOperationInProgress, lat::Error::explain(), findObject(), flush_, DQMNet::AutoPeer::host, lat::InetAddress::hostname(), i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, lat::Error::next(), onPeerData(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, p, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, DQMNet::AutoPeer::port, lat::InetAddress::port(), lat::SystemError::portable(), purgeDeadObjects(), releaseFromWait(), requestFullUpdates_, s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, lat::IOChannel::setBlocking(), lat::Socket::setopt(), shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, lat::SocketConst::TypeStream, unlock(), DQMNet::AutoPeer::update, DQMNet::Peer::update, updatePeerMasks(), upstream_, waiting_, and DQMNet::AutoPeer::warned.
01255 { 01256 Time now; 01257 Time nextFlush = 0; 01258 AutoPeer *automatic[2] = { &upstream_, &downstream_ }; 01259 01260 // Perform I/O. Every once in a while flush updates to peers. 01261 while (! shouldStop()) 01262 { 01263 for (int i = 0; i < 2; ++i) 01264 { 01265 AutoPeer *ap = automatic[i]; 01266 01267 // If we need a server connection and don't have one yet, 01268 // initiate asynchronous connection creation. Swallow errors 01269 // in case the server won't talk to us. 01270 if (! ap->host.empty() 01271 && ! ap->peer 01272 && (now = Time::current()) > ap->next) 01273 { 01274 ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0); 01275 InetSocket *s = 0; 01276 try 01277 { 01278 s = new InetSocket (SocketConst::TypeStream); 01279 s->setBlocking (false); 01280 s->connect(InetAddress (ap->host.c_str(), ap->port)); 01281 s->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE); 01282 s->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE); 01283 } 01284 catch (Error &e) 01285 { 01286 SystemError *sys = dynamic_cast<SystemError *>(e.next()); 01287 if (! sys || sys->portable() != SysErr::ErrOperationInProgress) 01288 { 01289 // "In progress" just means the connection is in progress. 01290 // The connection is ready when the socket is writeable. 01291 // Anything else is a real problem. 01292 if (! ap->warned) 01293 { 01294 logme() 01295 << "NOTE: DQM server at " << ap->host << ":" << ap->port 01296 << " is unavailable. Connection will be established" 01297 << " automatically on the background once the server" 01298 << " becomes available. Error from the attempt was: " 01299 << e.explain() << '\n'; 01300 ap->warned = true; 01301 } 01302 01303 if (s) 01304 s->abort(); 01305 delete s; 01306 s = 0; 01307 } 01308 } 01309 01310 // Set up with the selector if we were successful. If this is 01311 // the upstream collector, queue a request for updates. 01312 if (s) 01313 { 01314 lock(); 01315 Peer *p = createPeer(s); 01316 ap->peer = p; 01317 ap->warned = false; 01318 unlock(); 01319 01320 InetAddress peeraddr = ((InetSocket *) s)->peername(); 01321 InetAddress myaddr = ((InetSocket *) s)->sockname(); 01322 p->peeraddr = StringFormat("%1:%2") 01323 .arg(peeraddr.hostname()) 01324 .arg(peeraddr.port()); 01325 p->mask = IORead|IOWrite|IOUrgent; 01326 p->update = ap->update; 01327 p->automatic = ap; 01328 p->socket = s; 01329 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p)); 01330 if (ap == &upstream_) 01331 { 01332 uint32_t words[5] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 01333 3*sizeof(uint32_t), DQM_MSG_UPDATE_ME, 01334 requestFullUpdates_ }; 01335 p->sendq = new Bucket; 01336 p->sendq->next = 0; 01337 copydata(p->sendq, words, sizeof(words)); 01338 } 01339 01340 // Report the new connection. 01341 if (debug_) 01342 logme() 01343 << "INFO: now connected to " << p->peeraddr << " from " 01344 << myaddr.hostname() << ":" << myaddr.port() << std::endl; 01345 } 01346 } 01347 } 01348 01349 // Pump events for a while. 01350 sel_.dispatch(delay_); 01351 now = Time::current(); 01352 01353 // Check if flush is required. Flush only if one is needed. 01354 // Always sends the full object list, but only rarely. 01355 // Compact objects no longer in active use before sending 01356 // out the update. 01357 if (flush_ && now > nextFlush) 01358 { 01359 flush_ = false; 01360 nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0); 01361 01362 lock(); 01363 purgeDeadObjects(now - TimeSpan(0, 0, 2 /* minutes */, 0, 0), 01364 now - TimeSpan(0, 0, 20 /* minutes */, 0, 0)); 01365 sendObjectListToPeers(true); 01366 unlock(); 01367 } 01368 01369 // Update the data server and peer selection masks. If we 01370 // have no more data to send and listening for writes, remove 01371 // the write mask. If we have something to write and aren't 01372 // listening for writes, start listening so we can send off 01373 // the data. 01374 updatePeerMasks(); 01375 01376 // Release peers that have been waiting for data for too long. 01377 lock(); 01378 Time waitold = now - TimeSpan(0, 0, 2 /* minutes */, 0, 0); 01379 for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; ) 01380 { 01381 // If the peer has waited for too long, send something. 01382 if (i->time < waitold) 01383 releaseFromWait(i++, findObject(0, i->name)); 01384 01385 // Keep it for now. 01386 else 01387 ++i; 01388 } 01389 unlock(); 01390 } 01391 }
Definition at line 1415 of file DQMNet.cc.
References lat::Pipe::sink(), wakeup_, and lat::IOChannel::write().
Referenced by DQMService::flush(), and DQMImplNet< DQMNet::Object >::removePeer().
Definition at line 432 of file DQMNet.cc.
References copydata(), DQMNet::Bucket::data, DQM_FLAG_SCALAR, DQM_FLAG_TEXT, DQM_FLAG_ZOMBIE, DQM_REPLY_OBJECT, extractScalarData(), DQMNet::CoreObject::flags, flags, DQMNet::CoreObject::name, DQMNet::Object::rawdata, DQMNet::CoreObject::tags, and DQMNet::CoreObject::version.
Referenced by onMessage(), releaseFromWait(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
00433 { 00434 uint32_t flags = o.flags & ~DQM_FLAG_ZOMBIE; 00435 DataBlob objdata; 00436 00437 if (text && extractScalarData(objdata, o)) 00438 flags |= DQM_FLAG_TEXT; 00439 else if (data || (flags & DQM_FLAG_SCALAR)) 00440 objdata.insert(objdata.end(), 00441 &o.rawdata[0], 00442 &o.rawdata[0] + o.rawdata.size()); 00443 00444 uint32_t words [8]; 00445 uint32_t namelen = o.name.size(); 00446 uint32_t taglen = o.tags.size() * sizeof(uint32_t); 00447 uint32_t datalen = objdata.size(); 00448 00449 words[0] = 8*sizeof(uint32_t) + namelen + taglen + datalen; 00450 words[1] = DQM_REPLY_OBJECT; 00451 words[2] = flags; 00452 words[3] = (o.version >> 0 ) & 0xffffffff; 00453 words[4] = (o.version >> 32) & 0xffffffff; 00454 words[5] = namelen; 00455 words[6] = taglen / sizeof(uint32_t); 00456 words[7] = datalen; 00457 00458 msg->data.reserve(msg->data.size() + words[0]); 00459 copydata(msg, &words[0], 8*sizeof(uint32_t)); 00460 if (namelen) 00461 copydata(msg, &o.name[0], namelen); 00462 if (taglen) 00463 copydata(msg, &o.tags[0], taglen); 00464 if (datalen) 00465 copydata(msg, &objdata[0], datalen); 00466 }
Enable or disable sending scalar monitoring values as text, rather than their ROOT object values.
Must be called before run() or start().
Definition at line 1105 of file DQMNet.cc.
References sendScalarAsText_.
01106 { 01107 sendScalarAsText_ = doit; 01108 }
Stop the network layer and wait it to finish.
Definition at line 1198 of file DQMNet.cc.
References communicate_, and shutdown_.
Referenced by DQMService::shutdown().
01199 { 01200 shutdown_ = 1; 01201 if (communicate_ != (pthread_t) -1) 01202 pthread_join(communicate_, 0); 01203 }
Start running the network layer in a new thread.
This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.
Definition at line 1239 of file DQMNet.cc.
References communicate(), communicate_, lock_, logme(), and pthread_create.
Referenced by DQMService::DQMService().
01240 { 01241 if (communicate_ != (pthread_t) -1) 01242 { 01243 logme() 01244 << "ERROR: DQM networking thread has already been started\n"; 01245 return; 01246 } 01247 01248 pthread_mutex_init(&lock_, 0); 01249 pthread_create (&communicate_, 0, &communicate, this); 01250 }
Start a server socket for accessing this DQM node remotely.
Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.
Definition at line 1125 of file DQMNet.cc.
References lat::IOSelector::attach(), lat::CreateHook(), e, lat::endl(), Exception, lat::Error::explain(), IOAccept, logme(), onPeerConnect(), lat::SocketConst::OptSockReceiveBuffer, lat::SocketConst::OptSockSendBuffer, sel_, server_, lat::IOChannel::setBlocking(), lat::Socket::setopt(), and SOCKET_BUF_SIZE.
01126 { 01127 if (server_) 01128 { 01129 logme() << "ERROR: DQM server was already started.\n"; 01130 return; 01131 } 01132 01133 try 01134 { 01135 server_ = new InetServerSocket(InetAddress (port), 10); 01136 server_->setopt(lat::SocketConst::OptSockSendBuffer, SOCKET_BUF_SIZE); 01137 server_->setopt(lat::SocketConst::OptSockReceiveBuffer, SOCKET_BUF_SIZE); 01138 server_->setBlocking(false); 01139 sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect)); 01140 } 01141 catch (Error &e) 01142 { 01143 // FIXME: Do we need to do this when we throw an exception anyway? 01144 // FIXME: Abort instead? 01145 logme() 01146 << "ERROR: Failed to start server at port " << port << ": " 01147 << e.explain() << std::endl; 01148 01149 // FIXME: Throw something simpler that removes the dependency? 01150 throw cms::Exception("DQMNet::startLocalServer") 01151 << "Failed to start server at port " << port << ": " 01152 << e.explain(); 01153 } 01154 01155 logme() << "INFO: DQM server started at port " << port << std::endl; 01156 }
Release the lock on the DQM net layer.
Definition at line 1229 of file DQMNet.cc.
References communicate_, and lock_.
Referenced by DQMService::flush(), onMessage(), DQMBasicNet::receive(), and run().
01230 { 01231 if (communicate_ != (pthread_t) -1) 01232 pthread_mutex_unlock(&lock_); 01233 }
Reimplemented in DQMBasicNet.
Definition at line 1401 of file DQMNet.cc.
References logme().
Referenced by DQMService::flush().
01402 { 01403 logme() << "ERROR: updateLocalObject() method is not supported.\n"; 01404 }
Update the selector mask for a peer based on data queues.
Close the connection if there is no reason to maintain it open.
Definition at line 1023 of file DQMNet.cc.
References debug_, lat::endl(), IOUrgent, IOWrite, logme(), losePeer(), DQMNet::Peer::mask, DQMNet::Peer::peeraddr, sel_, DQMNet::Peer::sendq, lat::IOSelector::setMask(), DQMNet::Peer::socket, and DQMNet::Peer::waiting.
Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().
01024 { 01025 if (! p->socket) 01026 return; 01027 01028 // Listen to writes iff we have data to send. 01029 unsigned oldmask = p->mask; 01030 if (! p->sendq && (p->mask & IOWrite)) 01031 sel_.setMask(p->socket, p->mask &= ~IOWrite); 01032 01033 if (p->sendq && ! (p->mask & IOWrite)) 01034 sel_.setMask(p->socket, p->mask |= IOWrite); 01035 01036 if (debug_ && oldmask != p->mask) 01037 logme() 01038 << "DEBUG: updating mask for " << p->peeraddr << " to " 01039 << p->mask << " from " << oldmask << std::endl; 01040 01041 // If we have nothing more to send and are no longer listening 01042 // for reads, close up the shop for this peer. 01043 if (p->mask == IOUrgent && ! p->waiting) 01044 { 01045 assert (! p->sendq); 01046 if (debug_) 01047 logme() << "INFO: connection closed to " << p->peeraddr << std::endl; 01048 losePeer(0, p, 0); 01049 } 01050 }
Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes.
Must be called before calling run() or start().
Definition at line 1162 of file DQMNet.cc.
References downstream_, lat::endl(), DQMNet::AutoPeer::host, logme(), DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.
Referenced by DQMService::DQMService().
01163 { 01164 if (! downstream_.host.empty()) 01165 { 01166 logme() 01167 << "ERROR: Already updating another collector at " 01168 << downstream_.host << ":" << downstream_.port << std::endl; 01169 return; 01170 } 01171 01172 downstream_.update = true; 01173 downstream_.host = host; 01174 downstream_.port = port; 01175 }
void DQMNet::waitForData | ( | Peer * | p, | |
const std::string & | name, | |||
const std::string & | info, | |||
Peer * | owner | |||
) | [protected] |
Queue a request for an object and put a peer into the mode of waiting for object data to appear.
Definition at line 147 of file DQMNet.cc.
References requestObject(), DQMNet::Peer::waiting, and waiting_.
Referenced by onMessage().
00148 { 00149 // FIXME: Should we automatically record which exact peer the waiter 00150 // is expecting to deliver data so we know to release the waiter if 00151 // the other peer vanishes? The current implementation stands a 00152 // chance for the waiter to wait indefinitely -- although we do 00153 // force terminate the wait after a while. 00154 requestObject(owner, name.size() ? &name[0] : 0, name.size()); 00155 WaitObject wo = { Time::current(), name, info, p }; 00156 waiting_.push_back(wo); 00157 p->waiting++; 00158 }
std::string DQMNet::appname_ [private] |
pthread_t DQMNet::communicate_ [private] |
bool DQMNet::debug_ [protected] |
Definition at line 175 of file DQMNet.h.
Referenced by debug(), onMessage(), onPeerConnect(), onPeerData(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), and updateMask().
int DQMNet::delay_ [private] |
AutoPeer DQMNet::downstream_ [private] |
const uint32_t DQMNet::DQM_FLAG_DEAD = 0x80000000 [static] |
Definition at line 42 of file DQMNet.h.
Referenced by DQMImplNet< DQMNet::Object >::markObjectsDead(), onMessage(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), and DQMBasicNet::receive().
const uint32_t DQMNet::DQM_FLAG_NEW = 0x40000000 [static] |
Definition at line 41 of file DQMNet.h.
Referenced by MonitorElement::MonitorElement(), onMessage(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), MonitorElement::update(), and MonitorElement::wasUpdated().
const uint32_t DQMNet::DQM_FLAG_RECEIVED = 0x20000000 [static] |
const uint32_t DQMNet::DQM_FLAG_REPORT_ERROR = 0x1 [static] |
const uint32_t DQMNet::DQM_FLAG_REPORT_OTHER = 0x4 [static] |
const uint32_t DQMNet::DQM_FLAG_REPORT_WARNING = 0x2 [static] |
const uint32_t DQMNet::DQM_FLAG_SCALAR = 0x8 [static] |
Definition at line 37 of file DQMNet.h.
Referenced by extractScalarData(), MonitorElement::initialise(), DQMImplNet< DQMNet::Object >::purgeDeadObjects(), and sendObjectToPeer().
const uint32_t DQMNet::DQM_FLAG_TEXT = 0x10000000 [static] |
const uint32_t DQMNet::DQM_FLAG_ZOMBIE = 0x08000000 [static] |
Definition at line 38 of file DQMNet.h.
Referenced by DQMImplNet< DQMNet::Object >::markObjectsDead(), DQMImplNet< DQMNet::Object >::markObjectsZombies(), and sendObjectToPeer().
const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3 [static] |
const uint32_t DQMNet::DQM_MSG_HELLO = 0 [static] |
const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2 [static] |
const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1 [static] |
Definition at line 25 of file DQMNet.h.
Referenced by onMessage(), DQMBasicNet::requestFullUpdatesFromPeers(), and run().
const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101 [static] |
Definition at line 29 of file DQMNet.h.
Referenced by onMessage(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
const uint32_t DQMNet::DQM_REPLY_LIST_END = 102 [static] |
Definition at line 30 of file DQMNet.h.
Referenced by onMessage(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().
const uint32_t DQMNet::DQM_REPLY_NONE = 103 [static] |
const uint32_t DQMNet::DQM_REPLY_OBJECT = 104 [static] |
bool DQMNet::flush_ [private] |
pthread_mutex_t DQMNet::lock_ [private] |
const uint32_t DQMNet::MAX_PEER_WAITREQS = 128 [static] |
int DQMNet::pid_ [private] |
bool DQMNet::requestFullUpdates_ [protected] |
Definition at line 177 of file DQMNet.h.
Referenced by onMessage(), requestFullUpdates(), and run().
lat::IOSelector DQMNet::sel_ [private] |
Definition at line 196 of file DQMNet.h.
Referenced by DQMNet(), losePeer(), onPeerConnect(), onPeerData(), run(), startLocalServer(), and updateMask().
bool DQMNet::sendScalarAsText_ [protected] |
Definition at line 176 of file DQMNet.h.
Referenced by onMessage(), releaseFromWait(), DQMImplNet< DQMNet::Object >::sendObjectListToPeer(), and sendScalarAsText().
lat::InetServerSocket* DQMNet::server_ [private] |
sig_atomic_t DQMNet::shutdown_ [private] |
AutoPeer DQMNet::upstream_ [private] |
lat::Time DQMNet::version_ [private] |
WaitList DQMNet::waiting_ [private] |
Definition at line 203 of file DQMNet.h.
Referenced by losePeer(), releaseFromWait(), releaseWaiters(), run(), and waitForData().
lat::Pipe DQMNet::wakeup_ [private] |