CMS 3D CMS Logo

Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes

DQMNet Class Reference

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

List of all members.

Classes

struct  AutoPeer
struct  Bucket
struct  CoreObject
struct  HashEqual
struct  HashOp
struct  Object
struct  Peer
struct  QValue
struct  WaitObject

Public Types

typedef std::vector< unsigned
char > 
DataBlob
typedef std::vector< QValueQReports
typedef std::vector< uint32_t > TagList
typedef std::list< WaitObjectWaitList

Public Member Functions

void debug (bool doit)
void delay (int delay)
 DQMNet (const std::string &appname="")
void listenToCollector (const std::string &host, int port)
void lock (void)
 Acquire a lock on the DQM net layer.
void run (void)
void sendLocalChanges (void)
void shutdown (void)
 Stop the network layer and wait it to finish.
void staleObjectWaitLimit (lat::TimeSpan time)
void start (void)
void startLocalServer (int port)
void startLocalServer (const char *path)
void unlock (void)
 Release the lock on the DQM net layer.
void updateToCollector (const std::string &host, int port)
virtual ~DQMNet (void)

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
static void packQualityData (std::string &into, const QReports &qr)
static bool setOrder (const CoreObject &a, const CoreObject &b)
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
static const uint32_t DQM_MSG_HELLO = 0
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
static const uint32_t DQM_MSG_UPDATE_ME = 1
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
static const uint32_t DQM_PROP_DEAD = 0x00080000
static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
static const uint32_t DQM_PROP_LUMI = 0x00040000
static const uint32_t DQM_PROP_NEW = 0x00010000
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
static const uint32_t DQM_PROP_REPORT_ALARM
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
static const uint32_t DQM_PROP_RESET = 0x00008000
static const uint32_t DQM_PROP_STALE = 0x00100000
static const uint32_t DQM_PROP_TAGGED = 0x00002000
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
static const uint32_t DQM_REPLY_LIST_END = 102
static const uint32_t DQM_REPLY_NONE = 103
static const uint32_t DQM_REPLY_OBJECT = 104
static const uint32_t MAX_PEER_WAITREQS = 128

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=0)=0
virtual PeergetPeer (lat::Socket *s)=0
std::ostream & logme (void)
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
virtual void markObjectsDead (Peer *p)=0
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
virtual void purgeDeadObjects (Peer *p)=0
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
virtual void removePeer (Peer *p, lat::Socket *s)=0
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
virtual void sendObjectListToPeers (bool all)=0
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
virtual bool shouldStop (void)
void updateMask (Peer *p)
virtual void updatePeerMasks (void)=0
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
static void discard (Bucket *&b)

Protected Attributes

bool debug_
pthread_mutex_t lock_

Private Member Functions

 DQMNet (const DQMNet &)
void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
bool onLocalNotify (lat::IOSelectEvent *ev)
bool onPeerConnect (lat::IOSelectEvent *ev)
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
DQMNetoperator= (const DQMNet &)
void releaseFromWait (WaitList::iterator i, Object *o)
void releaseWaiters (const std::string &name, Object *o)
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server.

Private Attributes

std::string appname_
pthread_t communicate_
int delay_
AutoPeer downstream_
bool flush_
int pid_
lat::IOSelector sel_
lat::Socket * server_
sig_atomic_t shutdown_
AutoPeer upstream_
lat::Time version_
WaitList waiting_
lat::TimeSpan waitMax_
lat::TimeSpan waitStale_
lat::Pipe wakeup_

Detailed Description

Definition at line 22 of file DQMNet.h.


Member Typedef Documentation

typedef std::vector<unsigned char> DQMNet::DataBlob

Definition at line 79 of file DQMNet.h.

typedef std::vector<QValue> DQMNet::QReports

Definition at line 82 of file DQMNet.h.

typedef std::vector<uint32_t> DQMNet::TagList

Definition at line 83 of file DQMNet.h.

typedef std::list<WaitObject> DQMNet::WaitList

Definition at line 84 of file DQMNet.h.


Constructor & Destructor Documentation

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 1064 of file DQMNet.cc.

References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

  : debug_ (false),
    appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
    pid_ (getpid()),
    server_ (0),
    version_ (Time::current()),
    communicate_ ((pthread_t) -1),
    shutdown_ (0),
    delay_ (1000),
    waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
    waitMax_ (0, 0, 0, 5 /* seconds */, 0),
    flush_ (false)
{
  // Create a pipe for the local DQM to tell the communicator
  // thread that local DQM data has changed and that the peers
  // should be notified.
  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));

  // Initialise the upstream and downstream to empty.
  upstream_.peer   = downstream_.peer   = 0;
  upstream_.next   = downstream_.next   = 0;
  upstream_.port   = downstream_.port   = 0;
  upstream_.update = downstream_.update = false;
}
DQMNet::~DQMNet ( void  ) [virtual]

Definition at line 1090 of file DQMNet.cc.

{
  // FIXME
}
DQMNet::DQMNet ( const DQMNet ) [private]

Member Function Documentation

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
) [static, protected]

Definition at line 53 of file DQMNet.cc.

References DQMNet::Bucket::data.

Referenced by run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

{
  b->data.insert(b->data.end(),
                 (const unsigned char *)data,
                 (const unsigned char *)data + len);
}
virtual Peer* DQMNet::createPeer ( lat::Socket *  s) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 1098 of file DQMNet.cc.

References debug_.

{
  debug_ = doit;
}
void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 1106 of file DQMNet.cc.

References delay_.

{
  delay_ = delay;
}
void DQMNet::discard ( Bucket *&  b) [static, protected]

Definition at line 62 of file DQMNet.cc.

References b, and DQMNet::Bucket::next.

{
  while (b)
  {
    Bucket *next = b->next;
    delete b;
    b = next;
  }
}
static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
) [inline, static]

Definition at line 199 of file DQMNet.h.

References a, b, trackerHits::c, dqmhashfinal, dqmhashmix, and gen::k.

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flush(), and DQMImplNet< DQMNet::Object >::makeObject().

    {
      // Reduced version of Bob Jenkins' hash function at:
      //   https://www.burtleburtle.net/bob/c/lookup3.c
#     define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
#     define dqmhashmix(a,b,c) { \
        a -= c; a ^= dqmhashrot(c, 4); c += b; \
        b -= a; b ^= dqmhashrot(a, 6); a += c; \
        c -= b; c ^= dqmhashrot(b, 8); b += a; \
        a -= c; a ^= dqmhashrot(c,16); c += b; \
        b -= a; b ^= dqmhashrot(a,19); a += c; \
        c -= b; c ^= dqmhashrot(b, 4); b += a; }
#     define dqmhashfinal(a,b,c) { \
        c ^= b; c -= dqmhashrot(b,14); \
        a ^= c; a -= dqmhashrot(c,11); \
        b ^= a; b -= dqmhashrot(a,25); \
        c ^= b; c -= dqmhashrot(b,16); \
        a ^= c; a -= dqmhashrot(c,4);  \
        b ^= a; b -= dqmhashrot(a,14); \
        c ^= b; c -= dqmhashrot(b,24); }
 
      uint32_t a, b, c;
      a = b = c = 0xdeadbeef + (uint32_t) keylen;
      const unsigned char *k = (const unsigned char *) key;

      // all but the last block: affect some bits of (a, b, c)
      while (keylen > 12)
      {
        a += k[0];
        a += ((uint32_t)k[1]) << 8;
        a += ((uint32_t)k[2]) << 16;
        a += ((uint32_t)k[3]) << 24;
        b += k[4];
        b += ((uint32_t)k[5]) << 8;
        b += ((uint32_t)k[6]) << 16;
        b += ((uint32_t)k[7]) << 24;
        c += k[8];
        c += ((uint32_t)k[9]) << 8;
        c += ((uint32_t)k[10]) << 16;
        c += ((uint32_t)k[11]) << 24;
        dqmhashmix(a,b,c);
        keylen -= 12;
        k += 12;
      }

      // last block: affect all 32 bits of (c); all case statements fall through
      switch (keylen)
      {
      case 12: c += ((uint32_t)k[11]) << 24;
      case 11: c += ((uint32_t)k[10]) << 16;
      case 10: c += ((uint32_t)k[9]) << 8;
      case 9 : c += k[8];
      case 8 : b += ((uint32_t)k[7]) << 24;
      case 7 : b += ((uint32_t)k[6]) << 16;
      case 6 : b += ((uint32_t)k[5]) << 8;
      case 5 : b += k[4];
      case 4 : a += ((uint32_t)k[3]) << 24;
      case 3 : a += ((uint32_t)k[2]) << 16;
      case 2 : a += ((uint32_t)k[1]) << 8;
      case 1 : a += k[0];
               break;
      case 0 : return c;
      }

      dqmhashfinal(a, b, c);
      return c;
#     undef dqmhashrot
#     undef dqmhashmix
#     undef dqmhashfinal
    }
virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = 0 
) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

virtual Peer* DQMNet::getPeer ( lat::Socket *  s) [protected, pure virtual]
void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1217 of file DQMNet.cc.

References query::host, DQMNet::AutoPeer::host, logme(), DQMNet::AutoPeer::port, query::port, DQMNet::AutoPeer::update, and upstream_.

{
  if (! upstream_.host.empty())
  {
    logme()
      << "ERROR: Already receiving data from another collector at "
      << upstream_.host << ":" << upstream_.port << std::endl;
    return;
  }

  upstream_.update = false;
  upstream_.host = host;
  upstream_.port = port;
}
void DQMNet::lock ( void  )

Acquire a lock on the DQM net layer.

Definition at line 1257 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

{
  if (communicate_ != (pthread_t) -1)
    pthread_mutex_lock(&lock_);
}
std::ostream & DQMNet::logme ( void  ) [protected]

Definition at line 42 of file DQMNet.cc.

References gather_cfg::cout, cond::rpcobimon::current, and cmsPerfSuiteHarvest::now.

Referenced by listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

{
  Time now = Time::current();
  return std::cout
    << now.format(true, "%Y-%m-%d %H:%M:%S.")
    << now.nanoformat(3, 3)
    << " " << appname_ << "[" << pid_ << "]: ";
}
void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = 0 
) [private]

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 77 of file DQMNet.cc.

References DQMNet::Peer::automatic, alignCSCRings::e, i, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::sendq, and DQMNet::Peer::socket.

{
  if (reason)
    logme ()
      << reason << peer->peeraddr
      << (err ? "; error was: " + err->explain() : std::string(""))
      << std::endl;

  Socket *s = peer->socket;

  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
    if (i->peer == peer)
      waiting_.erase(i++);
    else
      ++i;

  if (ev)
    ev->source = 0;

  discard(peer->sendq);
  if (peer->automatic)
    peer->automatic->peer = 0;

  sel_.detach (s);
  s->close();
  removePeer(peer, s);
  delete s;
}
virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
) [protected, pure virtual]
virtual void DQMNet::markObjectsDead ( Peer p) [protected, pure virtual]
bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev) [private]

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 1003 of file DQMNet.cc.

References alignCSCRings::e.

Referenced by DQMNet().

{
  // Discard the data in the pipe, we care only about the wakeup.
  try
  {
    IOSize sz;
    unsigned char buf [1024];
    while ((sz = ev->source->read(buf, sizeof(buf))))
      ;
  }
  catch (Error &e)
  {
    SystemError *next = dynamic_cast<SystemError *>(e.next());
    if (next && next->portable() == SysErr::ErrTryAgain)
      ; // Ignore it
    else
      logme()
        << "WARNING: error reading from notification pipe: "
        << e.explain() << std::endl;
  }

  // Tell the main event pump to send an update in a little while.
  flush_ = true;

  // We are never done, always keep going.
  return false;
}
bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
) [protected, virtual]

Definition at line 463 of file DQMNet.cc.

References cond::rpcobimon::current, DQMNet::Bucket::data, DQMNet::CoreObject::flags, flags, if(), DQMNet::Object::lastreq, mergeVDriftHistosByStation::name, DQMNet::Peer::peeraddr, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::Peer::source, DQMNet::CoreObject::tag, DQMNet::Peer::update, DQMNet::Peer::updates, and DQMNet::CoreObject::version.

{
  // Decode and process this message.
  uint32_t type;
  memcpy (&type, data + sizeof(uint32_t), sizeof (type));
  switch (type)
  {
  case DQM_MSG_UPDATE_ME:
    {
      if (len != 2*sizeof(uint32_t))
      {
        logme()
          << "ERROR: corrupt 'UPDATE_ME' message of length " << len
          << " from peer " << p->peeraddr << std::endl;
        return false;
      }

      if (debug_)
        logme()
          << "DEBUG: received message 'UPDATE ME' from peer "
          << p->peeraddr << ", size " << len << std::endl;

      p->update = true;
    }
    return true;

  case DQM_MSG_LIST_OBJECTS:
    {
      if (debug_)
        logme()
          << "DEBUG: received message 'LIST OBJECTS' from peer "
          << p->peeraddr << ", size " << len << std::endl;

      // Send over current status: list of known objects.
      sendObjectListToPeer(msg, true, false);
    }
    return true;

  case DQM_MSG_GET_OBJECT:
    {
      if (debug_)
        logme()
          << "DEBUG: received message 'GET OBJECT' from peer "
          << p->peeraddr << ", size " << len << std::endl;

      if (len < 3*sizeof(uint32_t))
      {
        logme()
          << "ERROR: corrupt 'GET IMAGE' message of length " << len
          << " from peer " << p->peeraddr << std::endl;
        return false;
      }

      uint32_t namelen;
      memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
      if (len != 3*sizeof(uint32_t) + namelen)
      {
        logme()
          << "ERROR: corrupt 'GET OBJECT' message of length " << len
          << " from peer " << p->peeraddr
          << ", expected length " << (3*sizeof(uint32_t))
          << " + " << namelen << std::endl;
        return false;
      }

      std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
      Peer *owner = 0;
      Object *o = findObject(0, name, &owner);
      if (o)
      {
        o->lastreq = Time::current().ns();
        if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
            && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
          waitForData(p, name, "", owner);
        else
          sendObjectToPeer(msg, *o, true);
      }
      else
      {
        uint32_t words [3];
        words[0] = sizeof(words) + name.size();
        words[1] = DQM_REPLY_NONE;
        words[2] = name.size();

        msg->data.reserve(msg->data.size() + words[0]);
        copydata(msg, &words[0], sizeof(words));
        copydata(msg, &name[0], name.size());
      }
    }
    return true;

  case DQM_REPLY_LIST_BEGIN:
    {
      if (len != 4*sizeof(uint32_t))
      {
        logme()
          << "ERROR: corrupt 'LIST BEGIN' message of length " << len
          << " from peer " << p->peeraddr << std::endl;
        return false;
      }

      // Get the update status: whether this is a full update.
      uint32_t flags;
      memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));

      if (debug_)
        logme()
          << "DEBUG: received message 'LIST BEGIN "
          << (flags ? "FULL" : "INCREMENTAL")
          << "' from " << p->peeraddr
          << ", size " << len << std::endl;

      // If we are about to receive a full list of objects, flag all
      // objects as possibly dead.  Subsequent object notifications
      // will undo this for the live objects.  We cannot delete
      // objects quite yet, as we may get inquiry from another client
      // while we are processing the incoming list, so we keep the
      // objects tentatively alive as long as we've not seen the end.
      if (flags)
        markObjectsDead(p);
    }
    return true;

  case DQM_REPLY_LIST_END:
    {
      if (len != 4*sizeof(uint32_t))
      {
        logme()
          << "ERROR: corrupt 'LIST END' message of length " << len
          << " from peer " << p->peeraddr << std::endl;
        return false;
      }

      // Get the update status: whether this is a full update.
      uint32_t flags;
      memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));

      // If we received a full list of objects, now purge all dead
      // objects. We need to do this in two stages in case we receive
      // updates in many parts, and end up sending updates to others in
      // between; this avoids us lying live objects are dead.
      if (flags)
        purgeDeadObjects(p);

      if (debug_)
        logme()
          << "DEBUG: received message 'LIST END "
          << (flags ? "FULL" : "INCREMENTAL")
          << "' from " << p->peeraddr
          << ", size " << len << std::endl;

      // Indicate we have received another update from this peer.
      // Also indicate we should flush to our clients.
      flush_ = true;
      p->updates++;
    }
    return true;

  case DQM_REPLY_OBJECT:
    {
      uint32_t words[9];
      if (len < sizeof(words))
      {
        logme()
          << "ERROR: corrupt 'OBJECT' message of length " << len
          << " from peer " << p->peeraddr << std::endl;
        return false;
      }

      memcpy (&words[0], data, sizeof(words));
      uint32_t &namelen = words[6];
      uint32_t &datalen = words[7];
      uint32_t &qlen = words[8];

      if (len != sizeof(words) + namelen + datalen + qlen)
      {
        logme()
          << "ERROR: corrupt 'OBJECT' message of length " << len
          << " from peer " << p->peeraddr
          << ", expected length " << sizeof(words)
          << " + " << namelen
          << " + " << datalen
          << " + " << qlen
          << std::endl;
        return false;
      }

      unsigned char *namedata = data + sizeof(words);
      unsigned char *objdata = namedata + namelen;
      unsigned char *qdata = objdata + datalen;
      unsigned char *enddata = qdata + qlen;
      std::string name ((char *) namedata, namelen);
      assert (enddata == data + len);

      if (debug_)
        logme()
          << "DEBUG: received message 'OBJECT " << name
          << "' from " << p->peeraddr
          << ", size " << len << std::endl;

      // Mark the peer as a known object source.
      p->source = true;

      // Initialise or update an object entry.
      Object *o = findObject(p, name);
      if (! o)
        o = makeObject(p, name);

      o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
      o->tag = words[5];
      o->version = ((uint64_t) words[4] << 32 | words[3]);
      o->scalar.clear();
      o->qdata.clear();
      if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
      {
        o->rawdata.clear();
        o->scalar.insert(o->scalar.end(), objdata, qdata);
      }
      else if (datalen)
      {
        o->rawdata.clear();
        o->rawdata.insert(o->rawdata.end(), objdata, qdata);
      }
      else if (! o->rawdata.empty())
        o->flags |= DQM_PROP_STALE;
      o->qdata.insert(o->qdata.end(), qdata, enddata);

      // If we had an object for this one already and this is a list
      // update without data, issue an immediate data get request.
      if (o->lastreq
          && ! datalen
          && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
        requestObjectData(p, (namelen ? &name[0] : 0), namelen);

      // If we have the object data, release from wait.
      if (datalen)
        releaseWaiters(name, o);
    }
    return true;

  case DQM_REPLY_NONE:
    {
      uint32_t words[3];
      if (len < sizeof(words))
      {
        logme()
          << "ERROR: corrupt 'NONE' message of length " << len
          << " from peer " << p->peeraddr << std::endl;
        return false;
      }

      memcpy (&words[0], data, sizeof(words));
      uint32_t &namelen = words[2];

      if (len != sizeof(words) + namelen)
      {
        logme()
          << "ERROR: corrupt 'NONE' message of length " << len
          << " from peer " << p->peeraddr
          << ", expected length " << sizeof(words)
          << " + " << namelen << std::endl;
        return false;
      }

      unsigned char *namedata = data + sizeof(words);
      std::string name((char *) namedata, namelen);

      if (debug_)
        logme()
          << "DEBUG: received message 'NONE " << name
          << "' from " << p->peeraddr
          << ", size " << len << std::endl;

      // Mark the peer as a known object source.
      p->source = true;

      // If this was a known object, kill it.
      if (Object *o = findObject(p, name))
      {
        o->flags |= DQM_PROP_DEAD;
        purgeDeadObjects(p);
      }

      // If someone was waiting for this, let them go.
      releaseWaiters(name, 0);
    }
    return true;

  default:
    logme()
      << "ERROR: unrecognised message of length " << len
      << " and type " << type << " from peer " << p->peeraddr
      << std::endl;
    return false;
  }
}
bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev) [private]

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 946 of file DQMNet.cc.

References cmsCodeRulesChecker::arg, IORead, IOUrgent, CommonMethods::lock(), DQMNet::Peer::mask, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::Peer::peeraddr, alignCSCRings::s, and DQMNet::Peer::socket.

Referenced by startLocalServer().

{
  // Recover the server socket.
  assert (ev->source == server_);

  // Accept the connection.
  Socket *s = server_->accept();
  assert (s);
  assert (! s->isBlocking());

  // Record it to our list of peers.
  lock();
  Peer *p = createPeer(s);
  std::string localaddr;
  if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
  {
    InetAddress peeraddr = inet->peername();
    InetAddress myaddr = inet->sockname();
    p->peeraddr = StringFormat("%1:%2")
                  .arg(peeraddr.hostname())
                  .arg(peeraddr.port());
    localaddr = StringFormat("%1:%2")
                .arg(myaddr.hostname())
                .arg(myaddr.port());
  }
  else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
  {
    p->peeraddr = local->peername().path();
    localaddr = local->sockname().path();
  }
  else
    assert(false);

  p->mask = IORead|IOUrgent;
  p->socket = s;

  // Report the new connection.
  if (debug_)
    logme()
      << "INFO: new peer " << p->peeraddr << " is now connected to "
      << localaddr << std::endl;

  // Attach it to the listener.
  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
  unlock();

  // We are never done.
  return false;
}
bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
) [private]

Handle communication to a particular client.

Definition at line 763 of file DQMNet.cc.

References DQMNet::Peer::automatic, b, DQMNet::Bucket::data, AlCaHLTBitMon_QueryRunRegistry::data, generateEDF::done, alignCSCRings::e, DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, CommonMethods::lock(), DQMNet::Peer::mask, MESSAGE_SIZE_LIMIT, lumiQueryAPI::msg, DQMNet::Bucket::next, DQMNet::Peer::peeraddr, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, TrackValidation_HighPurity_cff::valid, and DQMNet::Peer::waiting.

Referenced by onPeerConnect(), and run().

{
  lock();
  assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);

  // If there is a problem with the peer socket, discard the peer
  // and tell the selector to stop prcessing events for it.  If
  // this is a server connection, we will eventually recreate
  // everything if/when the data server comes back.
  if (ev->events & IOUrgent)
  {
    if (p->automatic)
    {
      logme()
        << "WARNING: connection to the DQM server at " << p->peeraddr
        << " lost (will attempt to reconnect in 15 seconds)\n";
      losePeer(0, p, ev);
    }
    else
      losePeer("WARNING: lost peer connection ", p, ev);

    unlock();
    return true;
  }

  // If we can write to the peer socket, pump whatever we can into it.
  if (ev->events & IOWrite)
  {
    while (Bucket *b = p->sendq)
    {
      IOSize len = b->data.size() - p->sendpos;
      const void *data = (len ? (const void *)&b->data[p->sendpos]
                          : (const void *)&data);
      IOSize done;

      try
      {
        done = (len ? ev->source->write (data, len) : 0);
        if (debug_ && len)
          logme()
            << "DEBUG: sent " << done << " bytes to peer "
            << p->peeraddr << std::endl;
      }
      catch (Error &e)
      {
        losePeer("WARNING: unable to write to peer ", p, ev, &e);
        unlock();
        return true;
      }

      p->sendpos += done;
      if (p->sendpos == b->data.size())
      {
        Bucket *old = p->sendq;
        p->sendq = old->next;
        p->sendpos = 0;
        old->next = 0;
        discard(old);
      }

      if (! done && len)
        // Cannot write any more.
        break;
    }
  }

  // If there is data to be read from the peer, first receive what we
  // can get out the socket, the process all complete requests.
  if (ev->events & IORead)
  {
    // First build up the incoming buffer of data in the socket.
    // Remember the last size returned by the socket; we need
    // it to determine if the remote end closed the connection.
    IOSize sz;
    try
    {
      std::vector<unsigned char> buf(SOCKET_READ_SIZE);
      do
        if ((sz = ev->source->read(&buf[0], buf.size())))
        {
          if (debug_)
            logme()
              << "DEBUG: received " << sz << " bytes from peer "
              << p->peeraddr << std::endl;
          DataBlob &data = p->incoming;
          if (data.capacity () < data.size () + sz)
            data.reserve (data.size() + SOCKET_READ_GROWTH);
          data.insert (data.end(), &buf[0], &buf[0] + sz);
        }
      while (sz == sizeof (buf));
    }
    catch (Error &e)
    {
      SystemError *next = dynamic_cast<SystemError *>(e.next());
      if (next && next->portable() == SysErr::ErrTryAgain)
        sz = 1; // Ignore it, and fake no end of data.
      else
      {
        // Houston we have a problem.
        losePeer("WARNING: failed to read from peer ", p, ev, &e);
        unlock();
        return true;
      }
    }

    // Process fully received messages as long as we can.
    size_t consumed = 0;
    DataBlob &data = p->incoming;
    while (data.size()-consumed >= sizeof(uint32_t)
           && p->waiting < MAX_PEER_WAITREQS)
    {
      uint32_t msglen;
      memcpy (&msglen, &data[0]+consumed, sizeof(msglen));

      if (msglen >= MESSAGE_SIZE_LIMIT)
      {
        losePeer("WARNING: excessively large message from ", p, ev);
        unlock();
        return true;
      }

      if (data.size()-consumed >= msglen)
      {
        bool valid = true;
        if (msglen < 2*sizeof(uint32_t))
        {
          logme()
            << "ERROR: corrupt peer message of length " << msglen
            << " from peer " << p->peeraddr << std::endl;
          valid = false;
        }
        else
        {
          // Decode and process this message.
          Bucket msg;
          msg.next = 0;
          valid = onMessage(&msg, p, &data[0]+consumed, msglen);

          // If we created a response, chain it to the write queue.
          if (! msg.data.empty())
          {
            Bucket **prev = &p->sendq;
            while (*prev)
              prev = &(*prev)->next;

            *prev = new Bucket;
            (*prev)->next = 0;
            (*prev)->data.swap(msg.data);
          }
        }

        if (! valid)
        {
          losePeer("WARNING: data stream error with ", p, ev);
          unlock();
          return true;
        }

        consumed += msglen;
      }
      else
        break;
    }

    data.erase(data.begin(), data.begin()+consumed);

    // If the client has closed the connection, shut down our end.  If
    // we have something to send back still, leave the write direction
    // open.  Otherwise close the shop for this client.
    if (sz == 0)
      sel_.setMask(p->socket, p->mask &= ~IORead);
  }

  // Yes, please keep processing events for this socket.
  unlock();
  return false;
}
DQMNet& DQMNet::operator= ( const DQMNet ) [private]
void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
) [static]

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 177 of file DQMNet.cc.

References pos.

Referenced by DQMService::flush().

{
  char buf[64];
  std::ostringstream qrs;
  QReports::const_iterator qi, qe;
  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
  {
    int pos = 0;
    sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
    qrs << buf << '\0'
        << buf+pos << '\0'
        << qi->qtname << '\0'
        << qi->algorithm << '\0'
        << qi->message << '\0'
        << '\0';
  }
  into = qrs.str();
}
virtual void DQMNet::purgeDeadObjects ( Peer p) [protected, pure virtual]
void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
) [protected, virtual]

Definition at line 392 of file DQMNet.cc.

References DQMNet::Bucket::data, and DQMNet::WaitObject::name.

Referenced by run().

{
  if (o)
    sendObjectToPeer(msg, *o, true);
  else
  {
    uint32_t words [3];
    words[0] = sizeof(words) + w.name.size();
    words[1] = DQM_REPLY_NONE;
    words[2] = w.name.size();

    msg->data.reserve(msg->data.size() + words[0]);
    copydata(msg, &words[0], sizeof(words));
    copydata(msg, &w.name[0], w.name.size());
  }
}
void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
) [private]

Definition at line 147 of file DQMNet.cc.

References lumiQueryAPI::msg, and DQMNet::Bucket::next.

{
  Bucket **msg = &i->peer->sendq;
  while (*msg)
    msg = &(*msg)->next;
  *msg = new Bucket;
  (*msg)->next = 0;

  releaseFromWait(*msg, *i, o);

  assert(i->peer->waiting > 0);
  i->peer->waiting--;
  waiting_.erase(i);
}
void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
) [private]

Definition at line 164 of file DQMNet.cc.

References alignCSCRings::e, and i.

{
  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
    if (i->name == name)
      releaseFromWait(i++, o);
    else
      ++i;
}
virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
) [protected, pure virtual]
void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
) [private]

Queue an object request to the data server.

Definition at line 111 of file DQMNet.cc.

References lumiQueryAPI::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.

{
  // Issue request to peer.
  Bucket **msg = &p->sendq;
  while (*msg)
    msg = &(*msg)->next;
  *msg = new Bucket;
  (*msg)->next = 0;

  uint32_t words[3];
  words[0] = sizeof(words) + len;
  words[1] = DQM_MSG_GET_OBJECT;
  words[2] = len;
  copydata(*msg, words, sizeof(words));
  copydata(*msg, name, len);
}
void DQMNet::run ( void  )

Run the actual I/O processing loop.

Definition at line 1290 of file DQMNet.cc.

References cmsCodeRulesChecker::arg, DQMNet::Peer::automatic, copydata(), createPeer(), cond::rpcobimon::current, debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, alignCSCRings::e, findObject(), DQMNet::CoreObject::flags, flush_, DQMNet::AutoPeer::host, i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, cmsPerfSuiteHarvest::now, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, DQMNet::AutoPeer::port, DQMNet::Object::rawdata, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, unlock(), DQMNet::AutoPeer::update, DQMNet::Peer::update, updatePeerMasks(), upstream_, waiting_, waitMax_, and waitStale_.

{
  Time now;
  Time nextFlush = 0;
  AutoPeer *automatic[2] = { &upstream_, &downstream_ };

  // Perform I/O.  Every once in a while flush updates to peers.
  while (! shouldStop())
  {
    for (int i = 0; i < 2; ++i)
    {
      AutoPeer *ap = automatic[i];

      // If we need a server connection and don't have one yet,
      // initiate asynchronous connection creation.  Swallow errors
      // in case the server won't talk to us.
      if (! ap->host.empty()
          && ! ap->peer
          && (now = Time::current()) > ap->next)
      {
        ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
        InetSocket *s = 0;
        try
        {
          InetAddress addr(ap->host.c_str(), ap->port);
          s = new InetSocket (SOCK_STREAM, 0, addr.family());
          s->setBlocking(false);
          s->connect(addr);
          s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
          s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
        }
        catch (Error &e)
        {
          SystemError *sys = dynamic_cast<SystemError *>(e.next());
          if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
          {
            // "In progress" just means the connection is in progress.
            // The connection is ready when the socket is writeable.
            // Anything else is a real problem.
            if (s)
              s->abort();
            delete s;
            s = 0;
          }
        }

        // Set up with the selector if we were successful.  If this is
        // the upstream collector, queue a request for updates.
        if (s)
        {
          Peer *p = createPeer(s);
          ap->peer = p;

          InetAddress peeraddr = ((InetSocket *) s)->peername();
          InetAddress myaddr = ((InetSocket *) s)->sockname();
          p->peeraddr = StringFormat("%1:%2")
                        .arg(peeraddr.hostname())
                        .arg(peeraddr.port());
          p->mask = IORead|IOWrite|IOUrgent;
          p->update = ap->update;
          p->automatic = ap;
          p->socket = s;
          sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
          if (ap == &upstream_)
          {
            uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
                                  2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
            p->sendq = new Bucket;
            p->sendq->next = 0;
            copydata(p->sendq, words, sizeof(words));
          }

          // Report the new connection.
          if (debug_)
            logme()
              << "INFO: now connected to " << p->peeraddr << " from "
              << myaddr.hostname() << ":" << myaddr.port() << std::endl;
        }
      }
    }

    // Pump events for a while.
    sel_.dispatch(delay_);
    now = Time::current();
    lock();

    // Check if flush is required.  Flush only if one is needed.
    // Always sends the full object list, but only rarely.
    if (flush_ && now > nextFlush)
    {
      flush_ = false;
      nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
      sendObjectListToPeers(true);
    }

    // Update the data server and peer selection masks.  If we
    // have no more data to send and listening for writes, remove
    // the write mask.  If we have something to write and aren't
    // listening for writes, start listening so we can send off
    // the data.
    updatePeerMasks();

    // Release peers that have been waiting for data for too long.
    Time waitold = now - waitMax_;
    Time waitstale = now - waitStale_;
    for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
    {
      Object *o = findObject(0, i->name);

      // If we have (stale) object data, wait only up to stale limit.
      // Otherwise if we have no data at all, wait up to the max limit.
      if (i->time < waitold)
      {
        logme ()
          << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
          << "s to retrieval, releasing '" << i->name << "' from wait, have "
          << (o ? o->rawdata.size() : 0) << " data available\n";
        releaseFromWait(i++, o);
      }
      else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
      {
        logme ()
          << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
          << "s to update, releasing '" << i->name << "' from wait, have "
          << o->rawdata.size() << " data available\n";
        releaseFromWait(i++, o);
      }

      // Keep it for now.
      else
        ++i;
    }

    unlock();
  }
}
void DQMNet::sendLocalChanges ( void  )

Definition at line 1430 of file DQMNet.cc.

References INSTR::byte, and wakeup_.

Referenced by DQMImplNet< DQMNet::Object >::removePeer().

{
  char byte = 0;
  wakeup_.sink()->write(&byte, 1);
}
virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
) [protected, pure virtual]
virtual void DQMNet::sendObjectListToPeers ( bool  all) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
) [protected, virtual]

Definition at line 413 of file DQMNet.cc.

References DQMNet::Bucket::data, DQMNet::CoreObject::dirname, DQMNet::CoreObject::flags, flags, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::CoreObject::tag, and DQMNet::CoreObject::version.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

{
  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
  DataBlob objdata;

  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
    objdata.insert(objdata.end(),
                   &o.scalar[0],
                   &o.scalar[0] + o.scalar.size());
  else if (data)
    objdata.insert(objdata.end(),
                   &o.rawdata[0],
                   &o.rawdata[0] + o.rawdata.size());

  uint32_t words [9];
  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
  uint32_t datalen = objdata.size();
  uint32_t qlen = o.qdata.size();

  if (o.dirname->empty())
    --namelen;

  words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
  words[1] = DQM_REPLY_OBJECT;
  words[2] = flags;
  words[3] = (o.version >> 0 ) & 0xffffffff;
  words[4] = (o.version >> 32) & 0xffffffff;
  words[5] = o.tag;
  words[6] = namelen;
  words[7] = datalen;
  words[8] = qlen;

  msg->data.reserve(msg->data.size() + words[0]);
  copydata(msg, &words[0], 9*sizeof(uint32_t));
  if (namelen)
  {
    copydata(msg, &(*o.dirname)[0], o.dirname->size());
    if (! o.dirname->empty())
      copydata(msg, "/", 1);
    copydata(msg, &o.objname[0], o.objname.size());
  }
  if (datalen)
    copydata(msg, &objdata[0], datalen);
  if (qlen)
    copydata(msg, &o.qdata[0], qlen);
}
static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
) [inline, static]

Definition at line 174 of file DQMNet.h.

References diffTreeTool::diff, DQMNet::CoreObject::dirname, and DQMNet::CoreObject::objname.

Referenced by MonitorElement::operator<().

    {
      int diff = a.dirname->compare(*b.dirname);
      return (diff < 0 ? true
              : diff == 0 ? a.objname < b.objname
              : false);
    }
bool DQMNet::shouldStop ( void  ) [protected, virtual]

Definition at line 384 of file DQMNet.cc.

Referenced by run().

{
  return shutdown_;
}
void DQMNet::shutdown ( void  )

Stop the network layer and wait it to finish.

Definition at line 1234 of file DQMNet.cc.

References communicate_, and shutdown_.

{
  shutdown_ = 1;
  if (communicate_ != (pthread_t) -1)
    pthread_join(communicate_, 0);
}
void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 1116 of file DQMNet.cc.

References cond::rpcobgas::time, and waitStale_.

void DQMNet::start ( void  )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1275 of file DQMNet.cc.

References communicate(), communicate_, lock_, and logme().

{
  if (communicate_ != (pthread_t) -1)
  {
    logme()
      << "ERROR: DQM networking thread has already been started\n";
    return;
  }

  pthread_mutex_init(&lock_, 0);
  pthread_create (&communicate_, 0, &communicate, this);
}
void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1125 of file DQMNet.cc.

References alignCSCRings::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.

{
  if (server_)
  {
    logme() << "ERROR: DQM server was already started.\n";
    return;
  }

  try
  {
    InetAddress addr("0.0.0.0", port);
    InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
    s->bind(addr);
    s->listen(10);
    s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
    s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
    s->setBlocking(false);
    sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
  }
  catch (Error &e)
  {
    // FIXME: Do we need to do this when we throw an exception anyway?
    // FIXME: Abort instead?
    logme()
      << "ERROR: Failed to start server at port " << port << ": "
      << e.explain() << std::endl;

    raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
                  " %d: %s", port, e.explain().c_str());
  }
  
  logme() << "INFO: DQM server started at port " << port << std::endl;
}
void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1163 of file DQMNet.cc.

References alignCSCRings::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), sel_, server_, and SOCKET_BUF_SIZE.

{
  if (server_)
  {
    logme() << "ERROR: DQM server was already started.\n";
    return;
  }

  try
  {
    server_ = new LocalServerSocket(path, 10);
    server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
    server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
    server_->setBlocking(false);
    sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
  }
  catch (Error &e)
  {
    // FIXME: Do we need to do this when we throw an exception anyway?
    // FIXME: Abort instead?
    logme()
      << "ERROR: Failed to start server at path " << path << ": "
      << e.explain() << std::endl;

    raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
                  " %s: %s", path, e.explain().c_str());
  }
  
  logme() << "INFO: DQM server started at path " << path << std::endl;
}
void DQMNet::unlock ( void  )

Release the lock on the DQM net layer.

Definition at line 1265 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

{
  if (communicate_ != (pthread_t) -1)
    pthread_mutex_unlock(&lock_);
}
void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
) [static]

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 199 of file DQMNet.cc.

References DQMNet::QValue::algorithm, DQMNet::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, Capri::details::from(), DQMNet::QValue::message, DQMNet::QValue::qtname, DQMNet::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.

{
  const char *qdata = from;

  // Count how many qresults there are.
  size_t nqv = 0;
  while (*qdata)
  {
    ++nqv;
    while (*qdata) ++qdata; ++qdata;
    while (*qdata) ++qdata; ++qdata;
    while (*qdata) ++qdata; ++qdata;
    while (*qdata) ++qdata; ++qdata;
    while (*qdata) ++qdata; ++qdata;
  }

  // Now extract the qreports.
  qdata = from;
  qr.reserve(nqv);
  while (*qdata)
  {
    qr.push_back(DQMNet::QValue());
    DQMNet::QValue &qv = qr.back();

    qv.code = atoi(qdata);
    while (*qdata) ++qdata;
    switch (qv.code)
    {
    case dqm::qstatus::STATUS_OK:
      break;
    case dqm::qstatus::WARNING:
      flags |= DQMNet::DQM_PROP_REPORT_WARN;
      break;
    case dqm::qstatus::ERROR:
      flags |= DQMNet::DQM_PROP_REPORT_ERROR;
      break;
    default:
      flags |= DQMNet::DQM_PROP_REPORT_OTHER;
      break;
    }

    qv.qtresult = atof(++qdata);
    while (*qdata) ++qdata;

    qv.qtname = ++qdata;
    while (*qdata) ++qdata;

    qv.algorithm = ++qdata;
    while (*qdata) ++qdata;

    qv.message = ++qdata;
    while (*qdata) ++qdata;
    ++qdata;
  }
}
void DQMNet::updateMask ( Peer p) [protected]

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 1034 of file DQMNet.cc.

References IOUrgent, IOWrite, DQMNet::Peer::mask, DQMNet::Peer::peeraddr, DQMNet::Peer::sendq, DQMNet::Peer::socket, and DQMNet::Peer::waiting.

Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().

{
  if (! p->socket)
    return;

  // Listen to writes iff we have data to send.
  unsigned oldmask = p->mask;
  if (! p->sendq && (p->mask & IOWrite))
    sel_.setMask(p->socket, p->mask &= ~IOWrite);

  if (p->sendq && ! (p->mask & IOWrite))
    sel_.setMask(p->socket, p->mask |= IOWrite);

  if (debug_ && oldmask != p->mask)
    logme()
      << "DEBUG: updating mask for " << p->peeraddr << " to "
      << p->mask << " from " << oldmask << std::endl;

  // If we have nothing more to send and are no longer listening
  // for reads, close up the shop for this peer.
  if (p->mask == IOUrgent && ! p->waiting)
  {
    assert (! p->sendq);
    if (debug_)
      logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
    losePeer(0, p, 0);
  }
}
virtual void DQMNet::updatePeerMasks ( void  ) [protected, pure virtual]

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1198 of file DQMNet.cc.

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), DQMNet::AutoPeer::port, query::port, and DQMNet::AutoPeer::update.

{
  if (! downstream_.host.empty())
  {
    logme()
      << "ERROR: Already updating another collector at "
      << downstream_.host << ":" << downstream_.port << std::endl;
    return;
  }

  downstream_.update = true;
  downstream_.host = host;
  downstream_.port = port;
}
void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
) [protected]

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 131 of file DQMNet.cc.

References cond::rpcobimon::current, info, mergeVDriftHistosByStation::name, and DQMNet::Peer::waiting.

{
  // FIXME: Should we automatically record which exact peer the waiter
  // is expecting to deliver data so we know to release the waiter if
  // the other peer vanishes?  The current implementation stands a
  // chance for the waiter to wait indefinitely -- although we do
  // force terminate the wait after a while.
  requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
  WaitObject wo = { Time::current(), name, info, p };
  waiting_.push_back(wo);
  p->waiting++;
}

Member Data Documentation

std::string DQMNet::appname_ [private]

Definition at line 316 of file DQMNet.h.

pthread_t DQMNet::communicate_ [private]

Definition at line 328 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool DQMNet::debug_ [protected]

Definition at line 300 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

int DQMNet::delay_ [private]

Definition at line 331 of file DQMNet.h.

Referenced by delay(), and run().

Definition at line 325 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3 [static]

Definition at line 68 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_HELLO = 0 [static]

Definition at line 65 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2 [static]

Definition at line 67 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1 [static]

Definition at line 66 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000 [static]

Definition at line 55 of file DQMNet.h.

Referenced by MonitorElement::isAccumulateEnabled(), and MonitorElement::setAccumulate().

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000 [static]
const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000 [static]
const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000 [static]

Definition at line 53 of file DQMNet.h.

Referenced by DQMStore::book(), DQMStore::extract(), and MonitorElement::initialise().

const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000 [static]

Definition at line 60 of file DQMNet.h.

Referenced by MonitorElement::getLumiFlag(), and MonitorElement::setLumiFlag().

const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000 [static]
const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000 [static]

Definition at line 59 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM [static]
Initial value:

Definition at line 49 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000 [static]

Definition at line 45 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100 [static]
const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00 [static]

Definition at line 44 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400 [static]
const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200 [static]
const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000 [static]

Definition at line 56 of file DQMNet.h.

Referenced by MonitorElement::resetMe(), and MonitorElement::setResetMe().

const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000 [static]

Definition at line 62 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000 [static]
const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050 [static]

Definition at line 42 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001 [static]

Definition at line 28 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000 [static]

Definition at line 27 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff [static]

Definition at line 25 of file DQMNet.h.

Referenced by MonitorElement::kind().

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002 [static]

Definition at line 29 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f [static]

Definition at line 26 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003 [static]

Definition at line 30 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012 [static]

Definition at line 33 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010 [static]

Definition at line 31 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011 [static]

Definition at line 32 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022 [static]

Definition at line 36 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020 [static]

Definition at line 34 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021 [static]

Definition at line 35 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032 [static]

Definition at line 39 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030 [static]

Definition at line 37 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031 [static]

Definition at line 38 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040 [static]

Definition at line 40 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041 [static]

Definition at line 41 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101 [static]

Definition at line 70 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102 [static]

Definition at line 71 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_NONE = 103 [static]

Definition at line 72 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104 [static]

Definition at line 73 of file DQMNet.h.

bool DQMNet::flush_ [private]

Definition at line 334 of file DQMNet.h.

Referenced by run().

pthread_mutex_t DQMNet::lock_ [protected]

Definition at line 301 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128 [static]

Definition at line 75 of file DQMNet.h.

int DQMNet::pid_ [private]

Definition at line 317 of file DQMNet.h.

lat::IOSelector DQMNet::sel_ [private]

Definition at line 319 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

lat::Socket* DQMNet::server_ [private]

Definition at line 320 of file DQMNet.h.

Referenced by startLocalServer().

sig_atomic_t DQMNet::shutdown_ [private]

Definition at line 329 of file DQMNet.h.

Referenced by shutdown().

Definition at line 324 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

lat::Time DQMNet::version_ [private]

Definition at line 322 of file DQMNet.h.

Definition at line 326 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitMax_ [private]

Definition at line 333 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitStale_ [private]

Definition at line 332 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

lat::Pipe DQMNet::wakeup_ [private]

Definition at line 321 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().