CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes
DQMNet Class Referenceabstract

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

Classes

struct  AutoPeer
 
struct  Bucket
 
struct  CoreObject
 
struct  HashEqual
 
struct  HashOp
 
struct  Object
 
struct  Peer
 
struct  QValue
 
struct  WaitObject
 

Public Types

typedef std::vector< unsigned
char > 
DataBlob
 
typedef std::vector< QValueQReports
 
typedef std::vector< uint32_t > TagList
 
typedef std::list< WaitObjectWaitList
 

Public Member Functions

void debug (bool doit)
 
void delay (int delay)
 
 DQMNet (const std::string &appname="")
 
void listenToCollector (const std::string &host, int port)
 
void lock (void)
 Acquire a lock on the DQM net layer. More...
 
void run (void)
 
void sendLocalChanges (void)
 
void shutdown (void)
 Stop the network layer and wait it to finish. More...
 
void staleObjectWaitLimit (lat::TimeSpan time)
 
void start (void)
 
void startLocalServer (int port)
 
void startLocalServer (const char *path)
 
void unlock (void)
 Release the lock on the DQM net layer. More...
 
void updateToCollector (const std::string &host, int port)
 
virtual ~DQMNet (void)
 

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
 
static void packQualityData (std::string &into, const QReports &qr)
 
static bool setOrder (const CoreObject &a, const CoreObject &b)
 
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)
 

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
 
static const uint32_t DQM_MSG_HELLO = 0
 
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
 
static const uint32_t DQM_MSG_UPDATE_ME = 1
 
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
 
static const uint32_t DQM_PROP_DEAD = 0x00080000
 
static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000
 
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
 
static const uint32_t DQM_PROP_LUMI = 0x00040000
 
static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000
 
static const uint32_t DQM_PROP_NEW = 0x00010000
 
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
 
static const uint32_t DQM_PROP_REPORT_ALARM
 
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
 
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
 
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
 
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
 
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
 
static const uint32_t DQM_PROP_RESET = 0x00008000
 
static const uint32_t DQM_PROP_STALE = 0x00100000
 
static const uint32_t DQM_PROP_TAGGED = 0x00002000
 
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
 
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
 
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
 
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
 
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
 
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
 
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
 
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
 
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
 
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
 
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
 
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
 
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
 
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
 
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
 
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
 
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
 
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
 
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
 
static const uint32_t DQM_REPLY_LIST_END = 102
 
static const uint32_t DQM_REPLY_NONE = 103
 
static const uint32_t DQM_REPLY_OBJECT = 104
 
static const uint32_t MAX_PEER_WAITREQS = 128
 

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
 
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=0)=0
 
virtual PeergetPeer (lat::Socket *s)=0
 
std::ostream & logme (void)
 
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
 
virtual void markObjectsDead (Peer *p)=0
 
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
 
virtual void purgeDeadObjects (Peer *p)=0
 
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
 
virtual void removePeer (Peer *p, lat::Socket *s)=0
 
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
 
virtual void sendObjectListToPeers (bool all)=0
 
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
 
virtual bool shouldStop (void)
 
void updateMask (Peer *p)
 
virtual void updatePeerMasks (void)=0
 
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
 
static void discard (Bucket *&b)
 

Protected Attributes

bool debug_
 
pthread_mutex_t lock_
 

Private Member Functions

 DQMNet (const DQMNet &)
 
void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
 
bool onLocalNotify (lat::IOSelectEvent *ev)
 
bool onPeerConnect (lat::IOSelectEvent *ev)
 
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client. More...
 
DQMNetoperator= (const DQMNet &)
 
void releaseFromWait (WaitList::iterator i, Object *o)
 
void releaseWaiters (const std::string &name, Object *o)
 
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server. More...
 

Private Attributes

std::string appname_
 
pthread_t communicate_
 
int delay_
 
AutoPeer downstream_
 
bool flush_
 
int pid_
 
lat::IOSelector sel_
 
lat::Socket * server_
 
sig_atomic_t shutdown_
 
AutoPeer upstream_
 
lat::Time version_
 
WaitList waiting_
 
lat::TimeSpan waitMax_
 
lat::TimeSpan waitStale_
 
lat::Pipe wakeup_
 

Detailed Description

Definition at line 22 of file DQMNet.h.

Member Typedef Documentation

typedef std::vector<unsigned char> DQMNet::DataBlob

Definition at line 80 of file DQMNet.h.

typedef std::vector<QValue> DQMNet::QReports

Definition at line 83 of file DQMNet.h.

typedef std::vector<uint32_t> DQMNet::TagList

Definition at line 84 of file DQMNet.h.

Definition at line 85 of file DQMNet.h.

Constructor & Destructor Documentation

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 1064 of file DQMNet.cc.

References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

1065  : debug_ (false),
1066  appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
1067  pid_ (getpid()),
1068  server_ (0),
1069  version_ (Time::current()),
1070  communicate_ ((pthread_t) -1),
1071  shutdown_ (0),
1072  delay_ (1000),
1073  waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
1074  waitMax_ (0, 0, 0, 5 /* seconds */, 0),
1075  flush_ (false)
1076 {
1077  // Create a pipe for the local DQM to tell the communicator
1078  // thread that local DQM data has changed and that the peers
1079  // should be notified.
1080  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
1081  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
1082 
1083  // Initialise the upstream and downstream to empty.
1087  upstream_.update = downstream_.update = false;
1088 }
AutoPeer downstream_
Definition: DQMNet.h:342
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1003
pthread_t communicate_
Definition: DQMNet.h:345
lat::Time version_
Definition: DQMNet.h:339
int delay_
Definition: DQMNet.h:348
AutoPeer upstream_
Definition: DQMNet.h:341
int pid_
Definition: DQMNet.h:334
std::string appname_
Definition: DQMNet.h:333
Peer * peer
Definition: DQMNet.h:153
sig_atomic_t shutdown_
Definition: DQMNet.h:346
lat::Time next
Definition: DQMNet.h:154
Definition: IOTypes.h:26
lat::Pipe wakeup_
Definition: DQMNet.h:338
bool debug_
Definition: DQMNet.h:317
bool flush_
Definition: DQMNet.h:351
lat::Socket * server_
Definition: DQMNet.h:337
#define O_NONBLOCK
Definition: SysFile.h:21
lat::TimeSpan waitMax_
Definition: DQMNet.h:350
lat::TimeSpan waitStale_
Definition: DQMNet.h:349
lat::IOSelector sel_
Definition: DQMNet.h:336
DQMNet::~DQMNet ( void  )
virtual

Definition at line 1090 of file DQMNet.cc.

1091 {
1092  // FIXME
1093 }
DQMNet::DQMNet ( const DQMNet )
private

Member Function Documentation

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
)
staticprotected

Definition at line 53 of file DQMNet.cc.

References DQMNet::Bucket::data.

Referenced by run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

54 {
55  b->data.insert(b->data.end(),
56  (const unsigned char *)data,
57  (const unsigned char *)data + len);
58 }
double b
Definition: hdecay.h:120
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
virtual Peer* DQMNet::createPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 1098 of file DQMNet.cc.

References debug_.

1099 {
1100  debug_ = doit;
1101 }
bool debug_
Definition: DQMNet.h:317
void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 1106 of file DQMNet.cc.

References delay_.

1107 {
1108  delay_ = delay;
1109 }
int delay_
Definition: DQMNet.h:348
void delay(int delay)
Definition: DQMNet.cc:1106
void DQMNet::discard ( Bucket *&  b)
staticprotected

Definition at line 62 of file DQMNet.cc.

References b, GetRecoTauVFromDQM_MC_cff::next, and DQMNet::Bucket::next.

63 {
64  while (b)
65  {
66  Bucket *next = b->next;
67  delete b;
68  b = next;
69  }
70 }
double b
Definition: hdecay.h:120
static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
)
inlinestatic

Definition at line 216 of file DQMNet.h.

References a, b, EnergyCorrector::c, dqmhashfinal, dqmhashmix, and relval_steps::k.

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().

217  {
218  // Reduced version of Bob Jenkins' hash function at:
219  // http://www.burtleburtle.net/bob/c/lookup3.c
220 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
221 # define dqmhashmix(a,b,c) { \
222  a -= c; a ^= dqmhashrot(c, 4); c += b; \
223  b -= a; b ^= dqmhashrot(a, 6); a += c; \
224  c -= b; c ^= dqmhashrot(b, 8); b += a; \
225  a -= c; a ^= dqmhashrot(c,16); c += b; \
226  b -= a; b ^= dqmhashrot(a,19); a += c; \
227  c -= b; c ^= dqmhashrot(b, 4); b += a; }
228 # define dqmhashfinal(a,b,c) { \
229  c ^= b; c -= dqmhashrot(b,14); \
230  a ^= c; a -= dqmhashrot(c,11); \
231  b ^= a; b -= dqmhashrot(a,25); \
232  c ^= b; c -= dqmhashrot(b,16); \
233  a ^= c; a -= dqmhashrot(c,4); \
234  b ^= a; b -= dqmhashrot(a,14); \
235  c ^= b; c -= dqmhashrot(b,24); }
236 
237  uint32_t a, b, c;
238  a = b = c = 0xdeadbeef + (uint32_t) keylen;
239  const unsigned char *k = (const unsigned char *) key;
240 
241  // all but the last block: affect some bits of (a, b, c)
242  while (keylen > 12)
243  {
244  a += k[0];
245  a += ((uint32_t)k[1]) << 8;
246  a += ((uint32_t)k[2]) << 16;
247  a += ((uint32_t)k[3]) << 24;
248  b += k[4];
249  b += ((uint32_t)k[5]) << 8;
250  b += ((uint32_t)k[6]) << 16;
251  b += ((uint32_t)k[7]) << 24;
252  c += k[8];
253  c += ((uint32_t)k[9]) << 8;
254  c += ((uint32_t)k[10]) << 16;
255  c += ((uint32_t)k[11]) << 24;
256  dqmhashmix(a,b,c);
257  keylen -= 12;
258  k += 12;
259  }
260 
261  // last block: affect all 32 bits of (c); all case statements fall through
262  switch (keylen)
263  {
264  case 12: c += ((uint32_t)k[11]) << 24;
265  case 11: c += ((uint32_t)k[10]) << 16;
266  case 10: c += ((uint32_t)k[9]) << 8;
267  case 9 : c += k[8];
268  case 8 : b += ((uint32_t)k[7]) << 24;
269  case 7 : b += ((uint32_t)k[6]) << 16;
270  case 6 : b += ((uint32_t)k[5]) << 8;
271  case 5 : b += k[4];
272  case 4 : a += ((uint32_t)k[3]) << 24;
273  case 3 : a += ((uint32_t)k[2]) << 16;
274  case 2 : a += ((uint32_t)k[1]) << 8;
275  case 1 : a += k[0];
276  break;
277  case 0 : return c;
278  }
279 
280  dqmhashfinal(a, b, c);
281  return c;
282 # undef dqmhashrot
283 # undef dqmhashmix
284 # undef dqmhashfinal
285  }
#define dqmhashmix(a, b, c)
string key
FastSim: produces sample of signal events, overlayed with premixed minbias events.
double b
Definition: hdecay.h:120
#define dqmhashfinal(a, b, c)
double a
Definition: hdecay.h:121
virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = 0 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

virtual Peer* DQMNet::getPeer ( lat::Socket *  s)
protectedpure virtual
void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1217 of file DQMNet.cc.

References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

1218 {
1219  if (! upstream_.host.empty())
1220  {
1221  logme()
1222  << "ERROR: Already receiving data from another collector at "
1223  << upstream_.host << ":" << upstream_.port << std::endl;
1224  return;
1225  }
1226 
1227  upstream_.update = false;
1228  upstream_.host = host;
1229  upstream_.port = port;
1230 }
AutoPeer upstream_
Definition: DQMNet.h:341
int port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:42
string host
Definition: query.py:114
std::string host
Definition: DQMNet.h:155
void DQMNet::lock ( void  )

Acquire a lock on the DQM net layer.

Definition at line 1257 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

1258 {
1259  if (communicate_ != (pthread_t) -1)
1260  pthread_mutex_lock(&lock_);
1261 }
pthread_t communicate_
Definition: DQMNet.h:345
pthread_mutex_t lock_
Definition: DQMNet.h:318
std::ostream & DQMNet::logme ( void  )
protected

Definition at line 42 of file DQMNet.cc.

References gather_cfg::cout, cond::rpcobimon::current, and fileCollector::now.

Referenced by listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

43 {
44  Time now = Time::current();
45  return std::cout
46  << now.format(true, "%Y-%m-%d %H:%M:%S.")
47  << now.nanoformat(3, 3)
48  << " " << appname_ << "[" << pid_ << "]: ";
49 }
int pid_
Definition: DQMNet.h:334
std::string appname_
Definition: DQMNet.h:333
tuple cout
Definition: gather_cfg.py:121
void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = 0 
)
private

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 77 of file DQMNet.cc.

References DQMNet::Peer::automatic, alignCSCRings::e, i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.

81 {
82  if (reason)
83  logme ()
84  << reason << peer->peeraddr
85  << (err ? "; error was: " + err->explain() : std::string(""))
86  << std::endl;
87 
88  Socket *s = peer->socket;
89 
90  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
91  if (i->peer == peer)
92  waiting_.erase(i++);
93  else
94  ++i;
95 
96  if (ev)
97  ev->source = 0;
98 
99  discard(peer->sendq);
100  if (peer->automatic)
101  peer->automatic->peer = 0;
102 
103  sel_.detach (s);
104  s->close();
105  removePeer(peer, s);
106  delete s;
107 }
int i
Definition: DBlmapReader.cc:9
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
bool ev
std::ostream & logme(void)
Definition: DQMNet.cc:42
virtual void removePeer(Peer *p, lat::Socket *s)=0
WaitList waiting_
Definition: DQMNet.h:343
lat::IOSelector sel_
Definition: DQMNet.h:336
virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
)
protectedpure virtual
virtual void DQMNet::markObjectsDead ( Peer p)
protectedpure virtual
bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev)
private

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 1003 of file DQMNet.cc.

References alignCSCRings::e, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.

Referenced by DQMNet().

1004 {
1005  // Discard the data in the pipe, we care only about the wakeup.
1006  try
1007  {
1008  IOSize sz;
1009  unsigned char buf [1024];
1010  while ((sz = ev->source->read(buf, sizeof(buf))))
1011  ;
1012  }
1013  catch (Error &e)
1014  {
1015  SystemError *next = dynamic_cast<SystemError *>(e.next());
1016  if (next && next->portable() == SysErr::ErrTryAgain)
1017  ; // Ignore it
1018  else
1019  logme()
1020  << "WARNING: error reading from notification pipe: "
1021  << e.explain() << std::endl;
1022  }
1023 
1024  // Tell the main event pump to send an update in a little while.
1025  flush_ = true;
1026 
1027  // We are never done, always keep going.
1028  return false;
1029 }
bool ev
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool flush_
Definition: DQMNet.h:351
size_t IOSize
Definition: IOTypes.h:14
bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
)
protectedvirtual

Definition at line 463 of file DQMNet.cc.

References assert(), cond::rpcobimon::current, DQMNet::Bucket::data, DQMNet::CoreObject::flags, flags, citk::if(), DQMNet::Object::lastreq, fileCollector::logme(), mergeVDriftHistosByStation::name, DQMNet::Peer::peeraddr, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::Peer::source, AlCaHLTBitMon_QueryRunRegistry::string, DQMNet::CoreObject::tag, DQMNet::Peer::update, DQMNet::Peer::updates, and DQMNet::CoreObject::version.

464 {
465  // Decode and process this message.
466  uint32_t type;
467  memcpy (&type, data + sizeof(uint32_t), sizeof (type));
468  switch (type)
469  {
470  case DQM_MSG_UPDATE_ME:
471  {
472  if (len != 2*sizeof(uint32_t))
473  {
474  logme()
475  << "ERROR: corrupt 'UPDATE_ME' message of length " << len
476  << " from peer " << p->peeraddr << std::endl;
477  return false;
478  }
479 
480  if (debug_)
481  logme()
482  << "DEBUG: received message 'UPDATE ME' from peer "
483  << p->peeraddr << ", size " << len << std::endl;
484 
485  p->update = true;
486  }
487  return true;
488 
490  {
491  if (debug_)
492  logme()
493  << "DEBUG: received message 'LIST OBJECTS' from peer "
494  << p->peeraddr << ", size " << len << std::endl;
495 
496  // Send over current status: list of known objects.
497  sendObjectListToPeer(msg, true, false);
498  }
499  return true;
500 
501  case DQM_MSG_GET_OBJECT:
502  {
503  if (debug_)
504  logme()
505  << "DEBUG: received message 'GET OBJECT' from peer "
506  << p->peeraddr << ", size " << len << std::endl;
507 
508  if (len < 3*sizeof(uint32_t))
509  {
510  logme()
511  << "ERROR: corrupt 'GET IMAGE' message of length " << len
512  << " from peer " << p->peeraddr << std::endl;
513  return false;
514  }
515 
516  uint32_t namelen;
517  memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
518  if (len != 3*sizeof(uint32_t) + namelen)
519  {
520  logme()
521  << "ERROR: corrupt 'GET OBJECT' message of length " << len
522  << " from peer " << p->peeraddr
523  << ", expected length " << (3*sizeof(uint32_t))
524  << " + " << namelen << std::endl;
525  return false;
526  }
527 
528  std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
529  Peer *owner = 0;
530  Object *o = findObject(0, name, &owner);
531  if (o)
532  {
533  o->lastreq = Time::current().ns();
534  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
535  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
536  waitForData(p, name, "", owner);
537  else
538  sendObjectToPeer(msg, *o, true);
539  }
540  else
541  {
542  uint32_t words [3];
543  words[0] = sizeof(words) + name.size();
544  words[1] = DQM_REPLY_NONE;
545  words[2] = name.size();
546 
547  msg->data.reserve(msg->data.size() + words[0]);
548  copydata(msg, &words[0], sizeof(words));
549  copydata(msg, &name[0], name.size());
550  }
551  }
552  return true;
553 
555  {
556  if (len != 4*sizeof(uint32_t))
557  {
558  logme()
559  << "ERROR: corrupt 'LIST BEGIN' message of length " << len
560  << " from peer " << p->peeraddr << std::endl;
561  return false;
562  }
563 
564  // Get the update status: whether this is a full update.
565  uint32_t flags;
566  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
567 
568  if (debug_)
569  logme()
570  << "DEBUG: received message 'LIST BEGIN "
571  << (flags ? "FULL" : "INCREMENTAL")
572  << "' from " << p->peeraddr
573  << ", size " << len << std::endl;
574 
575  // If we are about to receive a full list of objects, flag all
576  // objects as possibly dead. Subsequent object notifications
577  // will undo this for the live objects. We cannot delete
578  // objects quite yet, as we may get inquiry from another client
579  // while we are processing the incoming list, so we keep the
580  // objects tentatively alive as long as we've not seen the end.
581  if (flags)
583  }
584  return true;
585 
586  case DQM_REPLY_LIST_END:
587  {
588  if (len != 4*sizeof(uint32_t))
589  {
590  logme()
591  << "ERROR: corrupt 'LIST END' message of length " << len
592  << " from peer " << p->peeraddr << std::endl;
593  return false;
594  }
595 
596  // Get the update status: whether this is a full update.
597  uint32_t flags;
598  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
599 
600  // If we received a full list of objects, now purge all dead
601  // objects. We need to do this in two stages in case we receive
602  // updates in many parts, and end up sending updates to others in
603  // between; this avoids us lying live objects are dead.
604  if (flags)
606 
607  if (debug_)
608  logme()
609  << "DEBUG: received message 'LIST END "
610  << (flags ? "FULL" : "INCREMENTAL")
611  << "' from " << p->peeraddr
612  << ", size " << len << std::endl;
613 
614  // Indicate we have received another update from this peer.
615  // Also indicate we should flush to our clients.
616  flush_ = true;
617  p->updates++;
618  }
619  return true;
620 
621  case DQM_REPLY_OBJECT:
622  {
623  uint32_t words[9];
624  if (len < sizeof(words))
625  {
626  logme()
627  << "ERROR: corrupt 'OBJECT' message of length " << len
628  << " from peer " << p->peeraddr << std::endl;
629  return false;
630  }
631 
632  memcpy (&words[0], data, sizeof(words));
633  uint32_t &namelen = words[6];
634  uint32_t &datalen = words[7];
635  uint32_t &qlen = words[8];
636 
637  if (len != sizeof(words) + namelen + datalen + qlen)
638  {
639  logme()
640  << "ERROR: corrupt 'OBJECT' message of length " << len
641  << " from peer " << p->peeraddr
642  << ", expected length " << sizeof(words)
643  << " + " << namelen
644  << " + " << datalen
645  << " + " << qlen
646  << std::endl;
647  return false;
648  }
649 
650  unsigned char *namedata = data + sizeof(words);
651  unsigned char *objdata = namedata + namelen;
652  unsigned char *qdata = objdata + datalen;
653  unsigned char *enddata = qdata + qlen;
654  std::string name ((char *) namedata, namelen);
655  assert (enddata == data + len);
656 
657  if (debug_)
658  logme()
659  << "DEBUG: received message 'OBJECT " << name
660  << "' from " << p->peeraddr
661  << ", size " << len << std::endl;
662 
663  // Mark the peer as a known object source.
664  p->source = true;
665 
666  // Initialise or update an object entry.
667  Object *o = findObject(p, name);
668  if (! o)
669  o = makeObject(p, name);
670 
671  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
672  o->tag = words[5];
673  o->version = ((uint64_t) words[4] << 32 | words[3]);
674  o->scalar.clear();
675  o->qdata.clear();
676  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
677  {
678  o->rawdata.clear();
679  o->scalar.insert(o->scalar.end(), objdata, qdata);
680  }
681  else if (datalen)
682  {
683  o->rawdata.clear();
684  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
685  }
686  else if (! o->rawdata.empty())
687  o->flags |= DQM_PROP_STALE;
688  o->qdata.insert(o->qdata.end(), qdata, enddata);
689 
690  // If we had an object for this one already and this is a list
691  // update without data, issue an immediate data get request.
692  if (o->lastreq
693  && ! datalen
694  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
695  requestObjectData(p, (namelen ? &name[0] : 0), namelen);
696 
697  // If we have the object data, release from wait.
698  if (datalen)
699  releaseWaiters(name, o);
700  }
701  return true;
702 
703  case DQM_REPLY_NONE:
704  {
705  uint32_t words[3];
706  if (len < sizeof(words))
707  {
708  logme()
709  << "ERROR: corrupt 'NONE' message of length " << len
710  << " from peer " << p->peeraddr << std::endl;
711  return false;
712  }
713 
714  memcpy (&words[0], data, sizeof(words));
715  uint32_t &namelen = words[2];
716 
717  if (len != sizeof(words) + namelen)
718  {
719  logme()
720  << "ERROR: corrupt 'NONE' message of length " << len
721  << " from peer " << p->peeraddr
722  << ", expected length " << sizeof(words)
723  << " + " << namelen << std::endl;
724  return false;
725  }
726 
727  unsigned char *namedata = data + sizeof(words);
728  std::string name((char *) namedata, namelen);
729 
730  if (debug_)
731  logme()
732  << "DEBUG: received message 'NONE " << name
733  << "' from " << p->peeraddr
734  << ", size " << len << std::endl;
735 
736  // Mark the peer as a known object source.
737  p->source = true;
738 
739  // If this was a known object, kill it.
740  if (Object *o = findObject(p, name))
741  {
742  o->flags |= DQM_PROP_DEAD;
744  }
745 
746  // If someone was waiting for this, let them go.
747  releaseWaiters(name, 0);
748  }
749  return true;
750 
751  default:
752  logme()
753  << "ERROR: unrecognised message of length " << len
754  << " and type " << type << " from peer " << p->peeraddr
755  << std::endl;
756  return false;
757  }
758 }
type
Definition: HCALResponse.h:21
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:67
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:72
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
assert(m_qm.get())
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
std::ostream & logme(void)
Definition: DQMNet.cc:42
if(c.getParameter< edm::InputTag >("puppiValueMap").label().size()!=0)
virtual void markObjectsDead(Peer *p)=0
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:74
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:413
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:69
unsigned long long uint64_t
Definition: Time.h:15
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:71
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:317
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:73
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:68
bool flush_
Definition: DQMNet.h:351
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:58
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:59
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev)
private

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 946 of file DQMNet.cc.

References assert(), IORead, IOUrgent, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by startLocalServer().

947 {
948  // Recover the server socket.
949  assert (ev->source == server_);
950 
951  // Accept the connection.
952  Socket *s = server_->accept();
953  assert (s);
954  assert (! s->isBlocking());
955 
956  // Record it to our list of peers.
957  lock();
958  Peer *p = createPeer(s);
959  std::string localaddr;
960  if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
961  {
962  InetAddress peeraddr = inet->peername();
963  InetAddress myaddr = inet->sockname();
964  p->peeraddr = StringFormat("%1:%2")
965  .arg(peeraddr.hostname())
966  .arg(peeraddr.port());
967  localaddr = StringFormat("%1:%2")
968  .arg(myaddr.hostname())
969  .arg(myaddr.port());
970  }
971  else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
972  {
973  p->peeraddr = local->peername().path();
974  localaddr = local->sockname().path();
975  }
976  else
977  assert(false);
978 
979  p->mask = IORead|IOUrgent;
980  p->socket = s;
981 
982  // Report the new connection.
983  if (debug_)
984  logme()
985  << "INFO: new peer " << p->peeraddr << " is now connected to "
986  << localaddr << std::endl;
987 
988  // Attach it to the listener.
989  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
990  unlock();
991 
992  // We are never done.
993  return false;
994 }
virtual Peer * createPeer(lat::Socket *s)=0
assert(m_qm.get())
bool ev
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1257
A arg
Definition: Factorize.h:36
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:763
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1265
Definition: IOTypes.h:26
bool debug_
Definition: DQMNet.h:317
lat::Socket * server_
Definition: DQMNet.h:337
lat::IOSelector sel_
Definition: DQMNet.h:336
bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
)
private

Handle communication to a particular client.

Definition at line 763 of file DQMNet.cc.

References assert(), DQMNet::Peer::automatic, b, data, DQMNet::Bucket::data, run_regression::done, alignCSCRings::e, DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, MESSAGE_SIZE_LIMIT, visualization-live-secondInstance_cfg::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, DQMNet::Peer::peeraddr, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, and DQMNet::Peer::waiting.

Referenced by onPeerConnect(), and run().

764 {
765  lock();
766  assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
767 
768  // If there is a problem with the peer socket, discard the peer
769  // and tell the selector to stop prcessing events for it. If
770  // this is a server connection, we will eventually recreate
771  // everything if/when the data server comes back.
772  if (ev->events & IOUrgent)
773  {
774  if (p->automatic)
775  {
776  logme()
777  << "WARNING: connection to the DQM server at " << p->peeraddr
778  << " lost (will attempt to reconnect in 15 seconds)\n";
779  losePeer(0, p, ev);
780  }
781  else
782  losePeer("WARNING: lost peer connection ", p, ev);
783 
784  unlock();
785  return true;
786  }
787 
788  // If we can write to the peer socket, pump whatever we can into it.
789  if (ev->events & IOWrite)
790  {
791  while (Bucket *b = p->sendq)
792  {
793  IOSize len = b->data.size() - p->sendpos;
794  const void *data = (len ? (const void *)&b->data[p->sendpos]
795  : (const void *)&data);
796  IOSize done;
797 
798  try
799  {
800  done = (len ? ev->source->write (data, len) : 0);
801  if (debug_ && len)
802  logme()
803  << "DEBUG: sent " << done << " bytes to peer "
804  << p->peeraddr << std::endl;
805  }
806  catch (Error &e)
807  {
808  losePeer("WARNING: unable to write to peer ", p, ev, &e);
809  unlock();
810  return true;
811  }
812 
813  p->sendpos += done;
814  if (p->sendpos == b->data.size())
815  {
816  Bucket *old = p->sendq;
817  p->sendq = old->next;
818  p->sendpos = 0;
819  old->next = 0;
820  discard(old);
821  }
822 
823  if (! done && len)
824  // Cannot write any more.
825  break;
826  }
827  }
828 
829  // If there is data to be read from the peer, first receive what we
830  // can get out the socket, the process all complete requests.
831  if (ev->events & IORead)
832  {
833  // First build up the incoming buffer of data in the socket.
834  // Remember the last size returned by the socket; we need
835  // it to determine if the remote end closed the connection.
836  IOSize sz;
837  try
838  {
839  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
840  do
841  if ((sz = ev->source->read(&buf[0], buf.size())))
842  {
843  if (debug_)
844  logme()
845  << "DEBUG: received " << sz << " bytes from peer "
846  << p->peeraddr << std::endl;
847  DataBlob &data = p->incoming;
848  if (data.capacity () < data.size () + sz)
849  data.reserve (data.size() + SOCKET_READ_GROWTH);
850  data.insert (data.end(), &buf[0], &buf[0] + sz);
851  }
852  while (sz == sizeof (buf));
853  }
854  catch (Error &e)
855  {
856  SystemError *next = dynamic_cast<SystemError *>(e.next());
857  if (next && next->portable() == SysErr::ErrTryAgain)
858  sz = 1; // Ignore it, and fake no end of data.
859  else
860  {
861  // Houston we have a problem.
862  losePeer("WARNING: failed to read from peer ", p, ev, &e);
863  unlock();
864  return true;
865  }
866  }
867 
868  // Process fully received messages as long as we can.
869  size_t consumed = 0;
870  DataBlob &data = p->incoming;
871  while (data.size()-consumed >= sizeof(uint32_t)
872  && p->waiting < MAX_PEER_WAITREQS)
873  {
874  uint32_t msglen;
875  memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
876 
877  if (msglen >= MESSAGE_SIZE_LIMIT)
878  {
879  losePeer("WARNING: excessively large message from ", p, ev);
880  unlock();
881  return true;
882  }
883 
884  if (data.size()-consumed >= msglen)
885  {
886  bool valid = true;
887  if (msglen < 2*sizeof(uint32_t))
888  {
889  logme()
890  << "ERROR: corrupt peer message of length " << msglen
891  << " from peer " << p->peeraddr << std::endl;
892  valid = false;
893  }
894  else
895  {
896  // Decode and process this message.
897  Bucket msg;
898  msg.next = 0;
899  valid = onMessage(&msg, p, &data[0]+consumed, msglen);
900 
901  // If we created a response, chain it to the write queue.
902  if (! msg.data.empty())
903  {
904  Bucket **prev = &p->sendq;
905  while (*prev)
906  prev = &(*prev)->next;
907 
908  *prev = new Bucket;
909  (*prev)->next = 0;
910  (*prev)->data.swap(msg.data);
911  }
912  }
913 
914  if (! valid)
915  {
916  losePeer("WARNING: data stream error with ", p, ev);
917  unlock();
918  return true;
919  }
920 
921  consumed += msglen;
922  }
923  else
924  break;
925  }
926 
927  data.erase(data.begin(), data.begin()+consumed);
928 
929  // If the client has closed the connection, shut down our end. If
930  // we have something to send back still, leave the write direction
931  // open. Otherwise close the shop for this client.
932  if (sz == 0)
933  sel_.setMask(p->socket, p->mask &= ~IORead);
934  }
935 
936  // Yes, please keep processing events for this socket.
937  unlock();
938  return false;
939 }
virtual Peer * getPeer(lat::Socket *s)=0
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
assert(m_qm.get())
bool ev
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1257
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:463
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
std::ostream & logme(void)
Definition: DQMNet.cc:42
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:76
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1265
Definition: IOTypes.h:26
double b
Definition: hdecay.h:120
bool debug_
Definition: DQMNet.h:317
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
lat::IOSelector sel_
Definition: DQMNet.h:336
DQMNet& DQMNet::operator= ( const DQMNet )
private
void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
)
static

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 177 of file DQMNet.cc.

Referenced by DQMService::flushStandalone(), and MonitorElement::packQualityData().

178 {
179  char buf[64];
180  std::ostringstream qrs;
181  QReports::const_iterator qi, qe;
182  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
183  {
184  int pos = 0;
185  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
186  qrs << buf << '\0'
187  << buf+pos << '\0'
188  << qi->qtname << '\0'
189  << qi->algorithm << '\0'
190  << qi->message << '\0'
191  << '\0';
192  }
193  into = qrs.str();
194 }
virtual void DQMNet::purgeDeadObjects ( Peer p)
protectedpure virtual
void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
)
protectedvirtual

Definition at line 392 of file DQMNet.cc.

References DQMNet::Bucket::data, and DQMNet::WaitObject::name.

Referenced by run().

393 {
394  if (o)
395  sendObjectToPeer(msg, *o, true);
396  else
397  {
398  uint32_t words [3];
399  words[0] = sizeof(words) + w.name.size();
400  words[1] = DQM_REPLY_NONE;
401  words[2] = w.name.size();
402 
403  msg->data.reserve(msg->data.size() + words[0]);
404  copydata(msg, &words[0], sizeof(words));
405  copydata(msg, &w.name[0], w.name.size());
406  }
407 }
const double w
Definition: UKUtility.cc:23
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:413
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:73
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
)
private

Definition at line 147 of file DQMNet.cc.

References assert(), visualization-live-secondInstance_cfg::msg, and DQMNet::Bucket::next.

148 {
149  Bucket **msg = &i->peer->sendq;
150  while (*msg)
151  msg = &(*msg)->next;
152  *msg = new Bucket;
153  (*msg)->next = 0;
154 
155  releaseFromWait(*msg, *i, o);
156 
157  assert(i->peer->waiting > 0);
158  i->peer->waiting--;
159  waiting_.erase(i);
160 }
int i
Definition: DBlmapReader.cc:9
assert(m_qm.get())
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:392
WaitList waiting_
Definition: DQMNet.h:343
void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
)
private

Definition at line 164 of file DQMNet.cc.

References alignCSCRings::e, and i.

165 {
166  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
167  if (i->name == name)
168  releaseFromWait(i++, o);
169  else
170  ++i;
171 }
int i
Definition: DBlmapReader.cc:9
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:392
WaitList waiting_
Definition: DQMNet.h:343
virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
)
protectedpure virtual
void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
)
private

Queue an object request to the data server.

Definition at line 111 of file DQMNet.cc.

References visualization-live-secondInstance_cfg::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.

112 {
113  // Issue request to peer.
114  Bucket **msg = &p->sendq;
115  while (*msg)
116  msg = &(*msg)->next;
117  *msg = new Bucket;
118  (*msg)->next = 0;
119 
120  uint32_t words[3];
121  words[0] = sizeof(words) + len;
122  words[1] = DQM_MSG_GET_OBJECT;
123  words[2] = len;
124  copydata(*msg, words, sizeof(words));
125  copydata(*msg, name, len);
126 }
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:69
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void DQMNet::run ( void  )

Run the actual I/O processing loop.

Definition at line 1290 of file DQMNet.cc.

References DQMNet::Peer::automatic, copydata(), createPeer(), cond::rpcobimon::current, debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, alignCSCRings::e, findObject(), DQMNet::CoreObject::flags, flush_, DQMNet::AutoPeer::host, i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, fileCollector::now, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, DQMNet::AutoPeer::port, DQMNet::Object::rawdata, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, unlock(), DQMNet::Peer::update, DQMNet::AutoPeer::update, updatePeerMasks(), upstream_, waiting_, waitMax_, and waitStale_.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

1291 {
1292  Time now;
1293  Time nextFlush = 0;
1294  AutoPeer *automatic[2] = { &upstream_, &downstream_ };
1295 
1296  // Perform I/O. Every once in a while flush updates to peers.
1297  while (! shouldStop())
1298  {
1299  for (int i = 0; i < 2; ++i)
1300  {
1301  AutoPeer *ap = automatic[i];
1302 
1303  // If we need a server connection and don't have one yet,
1304  // initiate asynchronous connection creation. Swallow errors
1305  // in case the server won't talk to us.
1306  if (! ap->host.empty()
1307  && ! ap->peer
1308  && (now = Time::current()) > ap->next)
1309  {
1310  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1311  InetSocket *s = 0;
1312  try
1313  {
1314  InetAddress addr(ap->host.c_str(), ap->port);
1315  s = new InetSocket (SOCK_STREAM, 0, addr.family());
1316  s->setBlocking(false);
1317  s->connect(addr);
1318  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1319  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1320  }
1321  catch (Error &e)
1322  {
1323  SystemError *sys = dynamic_cast<SystemError *>(e.next());
1324  if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
1325  {
1326  // "In progress" just means the connection is in progress.
1327  // The connection is ready when the socket is writeable.
1328  // Anything else is a real problem.
1329  if (s)
1330  s->abort();
1331  delete s;
1332  s = 0;
1333  }
1334  }
1335 
1336  // Set up with the selector if we were successful. If this is
1337  // the upstream collector, queue a request for updates.
1338  if (s)
1339  {
1340  Peer *p = createPeer(s);
1341  ap->peer = p;
1342 
1343  InetAddress peeraddr = ((InetSocket *) s)->peername();
1344  InetAddress myaddr = ((InetSocket *) s)->sockname();
1345  p->peeraddr = StringFormat("%1:%2")
1346  .arg(peeraddr.hostname())
1347  .arg(peeraddr.port());
1348  p->mask = IORead|IOWrite|IOUrgent;
1349  p->update = ap->update;
1350  p->automatic = ap;
1351  p->socket = s;
1352  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1353  if (ap == &upstream_)
1354  {
1355  uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
1356  2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
1357  p->sendq = new Bucket;
1358  p->sendq->next = 0;
1359  copydata(p->sendq, words, sizeof(words));
1360  }
1361 
1362  // Report the new connection.
1363  if (debug_)
1364  logme()
1365  << "INFO: now connected to " << p->peeraddr << " from "
1366  << myaddr.hostname() << ":" << myaddr.port() << std::endl;
1367  }
1368  }
1369  }
1370 
1371  // Pump events for a while.
1372  sel_.dispatch(delay_);
1373  now = Time::current();
1374  lock();
1375 
1376  // Check if flush is required. Flush only if one is needed.
1377  // Always sends the full object list, but only rarely.
1378  if (flush_ && now > nextFlush)
1379  {
1380  flush_ = false;
1381  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1382  sendObjectListToPeers(true);
1383  }
1384 
1385  // Update the data server and peer selection masks. If we
1386  // have no more data to send and listening for writes, remove
1387  // the write mask. If we have something to write and aren't
1388  // listening for writes, start listening so we can send off
1389  // the data.
1390  updatePeerMasks();
1391 
1392  // Release peers that have been waiting for data for too long.
1393  Time waitold = now - waitMax_;
1394  Time waitstale = now - waitStale_;
1395  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
1396  {
1397  Object *o = findObject(0, i->name);
1398 
1399  // If we have (stale) object data, wait only up to stale limit.
1400  // Otherwise if we have no data at all, wait up to the max limit.
1401  if (i->time < waitold)
1402  {
1403  logme ()
1404  << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
1405  << "s to retrieval, releasing '" << i->name << "' from wait, have "
1406  << (o ? o->rawdata.size() : 0) << " data available\n";
1407  releaseFromWait(i++, o);
1408  }
1409  else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
1410  {
1411  logme ()
1412  << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
1413  << "s to update, releasing '" << i->name << "' from wait, have "
1414  << o->rawdata.size() << " data available\n";
1415  releaseFromWait(i++, o);
1416  }
1417 
1418  // Keep it for now.
1419  else
1420  ++i;
1421  }
1422 
1423  unlock();
1424  }
1425 }
AutoPeer downstream_
Definition: DQMNet.h:342
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:67
int i
Definition: DBlmapReader.cc:9
virtual bool shouldStop(void)
Definition: DQMNet.cc:384
virtual void sendObjectListToPeers(bool all)=0
int delay_
Definition: DQMNet.h:348
virtual Peer * createPeer(lat::Socket *s)=0
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1257
AutoPeer upstream_
Definition: DQMNet.h:341
A arg
Definition: Factorize.h:36
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:763
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:392
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1265
lat::Time next
Definition: DQMNet.h:154
Definition: IOTypes.h:26
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
bool debug_
Definition: DQMNet.h:317
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:68
bool flush_
Definition: DQMNet.h:351
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
lat::TimeSpan waitMax_
Definition: DQMNet.h:350
lat::TimeSpan waitStale_
Definition: DQMNet.h:349
WaitList waiting_
Definition: DQMNet.h:343
lat::IOSelector sel_
Definition: DQMNet.h:336
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
void DQMNet::sendLocalChanges ( void  )

Definition at line 1430 of file DQMNet.cc.

References wakeup_.

Referenced by DQMImplNet< DQMNet::Object >::removePeer().

1431 {
1432  char byte = 0;
1433  wakeup_.sink()->write(&byte, 1);
1434 }
lat::Pipe wakeup_
Definition: DQMNet.h:338
virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
)
protectedpure virtual
virtual void DQMNet::sendObjectListToPeers ( bool  all)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
)
protectedvirtual

Definition at line 413 of file DQMNet.cc.

References DQMNet::Bucket::data, DQMNet::CoreObject::dirname, DQMNet::CoreObject::flags, flags, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::CoreObject::tag, and DQMNet::CoreObject::version.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

414 {
415  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
416  DataBlob objdata;
417 
418  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
419  objdata.insert(objdata.end(),
420  &o.scalar[0],
421  &o.scalar[0] + o.scalar.size());
422  else if (data)
423  objdata.insert(objdata.end(),
424  &o.rawdata[0],
425  &o.rawdata[0] + o.rawdata.size());
426 
427  uint32_t words [9];
428  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
429  uint32_t datalen = objdata.size();
430  uint32_t qlen = o.qdata.size();
431 
432  if (o.dirname->empty())
433  --namelen;
434 
435  words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
436  words[1] = DQM_REPLY_OBJECT;
437  words[2] = flags;
438  words[3] = (o.version >> 0 ) & 0xffffffff;
439  words[4] = (o.version >> 32) & 0xffffffff;
440  words[5] = o.tag;
441  words[6] = namelen;
442  words[7] = datalen;
443  words[8] = qlen;
444 
445  msg->data.reserve(msg->data.size() + words[0]);
446  copydata(msg, &words[0], 9*sizeof(uint32_t));
447  if (namelen)
448  {
449  copydata(msg, &(*o.dirname)[0], o.dirname->size());
450  if (! o.dirname->empty())
451  copydata(msg, "/", 1);
452  copydata(msg, &o.objname[0], o.objname.size());
453  }
454  if (datalen)
455  copydata(msg, &objdata[0], datalen);
456  if (qlen)
457  copydata(msg, &o.qdata[0], qlen);
458 }
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:74
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
)
inlinestatic

Definition at line 179 of file DQMNet.h.

References DQMNet::CoreObject::dirname, DQMNet::CoreObject::lumi, DQMNet::CoreObject::moduleId, DQMNet::CoreObject::objname, DQMNet::CoreObject::run, and DQMNet::CoreObject::streamId.

Referenced by MonitorElement::operator<().

180  {
181  if (a.run == b.run) {
182  if (a.lumi == b.lumi) {
183  if (a.streamId == b.streamId) {
184  if (a.moduleId == b.moduleId) {
185  if (*a.dirname == *b.dirname) {
186  return a.objname < b.objname;
187  }
188  return *a.dirname < *b.dirname;
189  }
190  return a.moduleId < b.moduleId;
191  }
192  return a.streamId < b.streamId;
193  }
194  return a.lumi < b.lumi;
195  }
196  return a.run < b.run;
197  }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
bool DQMNet::shouldStop ( void  )
protectedvirtual

Definition at line 384 of file DQMNet.cc.

Referenced by run().

385 {
386  return shutdown_;
387 }
sig_atomic_t shutdown_
Definition: DQMNet.h:346
void DQMNet::shutdown ( void  )

Stop the network layer and wait it to finish.

Definition at line 1234 of file DQMNet.cc.

References communicate_, and shutdown_.

1235 {
1236  shutdown_ = 1;
1237  if (communicate_ != (pthread_t) -1)
1238  pthread_join(communicate_, 0);
1239 }
pthread_t communicate_
Definition: DQMNet.h:345
sig_atomic_t shutdown_
Definition: DQMNet.h:346
void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 1116 of file DQMNet.cc.

References cond::rpcobgas::time, and waitStale_.

1117 {
1118  waitStale_ = time;
1119 }
lat::TimeSpan waitStale_
Definition: DQMNet.h:349
void DQMNet::start ( void  )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1275 of file DQMNet.cc.

References communicate(), communicate_, lock_, and logme().

Referenced by progressbar.ProgressBar::__next__(), Types.LuminosityBlockRange::cppID(), and Types.EventRange::cppID().

1276 {
1277  if (communicate_ != (pthread_t) -1)
1278  {
1279  logme()
1280  << "ERROR: DQM networking thread has already been started\n";
1281  return;
1282  }
1283 
1284  pthread_mutex_init(&lock_, 0);
1285  pthread_create (&communicate_, 0, &communicate, this);
1286 }
pthread_t communicate_
Definition: DQMNet.h:345
pthread_mutex_t lock_
Definition: DQMNet.h:318
static void * communicate(void *obj)
Definition: DQMNet.cc:1246
std::ostream & logme(void)
Definition: DQMNet.cc:42
void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1125 of file DQMNet.cc.

References alignCSCRings::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.

1126 {
1127  if (server_)
1128  {
1129  logme() << "ERROR: DQM server was already started.\n";
1130  return;
1131  }
1132 
1133  try
1134  {
1135  InetAddress addr("0.0.0.0", port);
1136  InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
1137  s->bind(addr);
1138  s->listen(10);
1139  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1140  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1141  s->setBlocking(false);
1142  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1143  }
1144  catch (Error &e)
1145  {
1146  // FIXME: Do we need to do this when we throw an exception anyway?
1147  // FIXME: Abort instead?
1148  logme()
1149  << "ERROR: Failed to start server at port " << port << ": "
1150  << e.explain() << std::endl;
1151 
1152  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
1153  " %d: %s", port, e.explain().c_str());
1154  }
1155 
1156  logme() << "INFO: DQM server started at port " << port << std::endl;
1157 }
int port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:946
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:337
lat::IOSelector sel_
Definition: DQMNet.h:336
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11
void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1163 of file DQMNet.cc.

References alignCSCRings::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), sel_, server_, and SOCKET_BUF_SIZE.

1164 {
1165  if (server_)
1166  {
1167  logme() << "ERROR: DQM server was already started.\n";
1168  return;
1169  }
1170 
1171  try
1172  {
1173  server_ = new LocalServerSocket(path, 10);
1174  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1175  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1176  server_->setBlocking(false);
1177  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1178  }
1179  catch (Error &e)
1180  {
1181  // FIXME: Do we need to do this when we throw an exception anyway?
1182  // FIXME: Abort instead?
1183  logme()
1184  << "ERROR: Failed to start server at path " << path << ": "
1185  << e.explain() << std::endl;
1186 
1187  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
1188  " %s: %s", path, e.explain().c_str());
1189  }
1190 
1191  logme() << "INFO: DQM server started at path " << path << std::endl;
1192 }
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:946
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:337
lat::IOSelector sel_
Definition: DQMNet.h:336
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11
void DQMNet::unlock ( void  )

Release the lock on the DQM net layer.

Definition at line 1265 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

1266 {
1267  if (communicate_ != (pthread_t) -1)
1268  pthread_mutex_unlock(&lock_);
1269 }
pthread_t communicate_
Definition: DQMNet.h:345
pthread_mutex_t lock_
Definition: DQMNet.h:318
void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
)
static

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 199 of file DQMNet.cc.

References DQMNet::QValue::algorithm, DQMNet::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, DQMNet::QValue::message, DQMNet::QValue::qtname, DQMNet::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.

200 {
201  const char *qdata = from;
202 
203  // Count how many qresults there are.
204  size_t nqv = 0;
205  while (*qdata)
206  {
207  ++nqv;
208  while (*qdata) ++qdata; ++qdata;
209  while (*qdata) ++qdata; ++qdata;
210  while (*qdata) ++qdata; ++qdata;
211  while (*qdata) ++qdata; ++qdata;
212  while (*qdata) ++qdata; ++qdata;
213  }
214 
215  // Now extract the qreports.
216  qdata = from;
217  qr.reserve(nqv);
218  while (*qdata)
219  {
220  qr.push_back(DQMNet::QValue());
221  DQMNet::QValue &qv = qr.back();
222 
223  qv.code = atoi(qdata);
224  while (*qdata) ++qdata;
225  switch (qv.code)
226  {
228  break;
231  break;
232  case dqm::qstatus::ERROR:
234  break;
235  default:
237  break;
238  }
239 
240  qv.qtresult = atof(++qdata);
241  while (*qdata) ++qdata;
242 
243  qv.qtname = ++qdata;
244  while (*qdata) ++qdata;
245 
246  qv.algorithm = ++qdata;
247  while (*qdata) ++qdata;
248 
249  qv.message = ++qdata;
250  while (*qdata) ++qdata;
251  ++qdata;
252  }
253 }
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
std::string algorithm
Definition: DQMNet.h:93
static const int WARNING
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
std::string qtname
Definition: DQMNet.h:92
std::string message
Definition: DQMNet.h:91
static const int STATUS_OK
float qtresult
Definition: DQMNet.h:90
static const int ERROR
void DQMNet::updateMask ( Peer p)
protected

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 1034 of file DQMNet.cc.

References assert(), IOUrgent, IOWrite, fileCollector::logme(), DQMNet::Peer::mask, DQMNet::Peer::peeraddr, DQMNet::Peer::sendq, DQMNet::Peer::socket, and DQMNet::Peer::waiting.

Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().

1035 {
1036  if (! p->socket)
1037  return;
1038 
1039  // Listen to writes iff we have data to send.
1040  unsigned oldmask = p->mask;
1041  if (! p->sendq && (p->mask & IOWrite))
1042  sel_.setMask(p->socket, p->mask &= ~IOWrite);
1043 
1044  if (p->sendq && ! (p->mask & IOWrite))
1045  sel_.setMask(p->socket, p->mask |= IOWrite);
1046 
1047  if (debug_ && oldmask != p->mask)
1048  logme()
1049  << "DEBUG: updating mask for " << p->peeraddr << " to "
1050  << p->mask << " from " << oldmask << std::endl;
1051 
1052  // If we have nothing more to send and are no longer listening
1053  // for reads, close up the shop for this peer.
1054  if (p->mask == IOUrgent && ! p->waiting)
1055  {
1056  assert (! p->sendq);
1057  if (debug_)
1058  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
1059  losePeer(0, p, 0);
1060  }
1061 }
assert(m_qm.get())
std::ostream & logme(void)
Definition: DQMNet.cc:42
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:77
bool debug_
Definition: DQMNet.h:317
lat::IOSelector sel_
Definition: DQMNet.h:336
virtual void DQMNet::updatePeerMasks ( void  )
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1198 of file DQMNet.cc.

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

1199 {
1200  if (! downstream_.host.empty())
1201  {
1202  logme()
1203  << "ERROR: Already updating another collector at "
1204  << downstream_.host << ":" << downstream_.port << std::endl;
1205  return;
1206  }
1207 
1208  downstream_.update = true;
1209  downstream_.host = host;
1210  downstream_.port = port;
1211 }
AutoPeer downstream_
Definition: DQMNet.h:342
int port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:42
string host
Definition: query.py:114
std::string host
Definition: DQMNet.h:155
void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
)
protected

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 131 of file DQMNet.cc.

References cond::rpcobimon::current, info(), mergeVDriftHistosByStation::name, and DQMNet::Peer::waiting.

132 {
133  // FIXME: Should we automatically record which exact peer the waiter
134  // is expecting to deliver data so we know to release the waiter if
135  // the other peer vanishes? The current implementation stands a
136  // chance for the waiter to wait indefinitely -- although we do
137  // force terminate the wait after a while.
138  requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
139  WaitObject wo = { Time::current(), name, info, p };
140  waiting_.push_back(wo);
141  p->waiting++;
142 }
static const TGPicture * info(bool iBackgroundIsBlack)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
WaitList waiting_
Definition: DQMNet.h:343

Member Data Documentation

std::string DQMNet::appname_
private

Definition at line 333 of file DQMNet.h.

pthread_t DQMNet::communicate_
private

Definition at line 345 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool DQMNet::debug_
protected

Definition at line 317 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

int DQMNet::delay_
private

Definition at line 348 of file DQMNet.h.

Referenced by delay(), and run().

AutoPeer DQMNet::downstream_
private

Definition at line 342 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3
static

Definition at line 69 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_HELLO = 0
static

Definition at line 66 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2
static

Definition at line 68 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1
static

Definition at line 67 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000
static

Definition at line 55 of file DQMNet.h.

Referenced by MonitorElement::isAccumulateEnabled(), and MonitorElement::setAccumulate().

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000
static
const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000
static
const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000
static

Definition at line 53 of file DQMNet.h.

Referenced by DQMStore::book(), DQMStore::extract(), and MonitorElement::initialise().

const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000
static
const uint32_t DQMNet::DQM_PROP_MARKTODELETE = 0x01000000
static

Definition at line 64 of file DQMNet.h.

Referenced by MonitorElement::markedToDelete(), and MonitorElement::markToDelete().

const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000
static
const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000
static

Definition at line 59 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM
static
Initial value:

Definition at line 49 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000
static

Definition at line 45 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100
static
const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00
static

Definition at line 44 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400
static
const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200
static
const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000
static

Definition at line 56 of file DQMNet.h.

Referenced by MonitorElement::resetMe(), and MonitorElement::setResetMe().

const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000
static

Definition at line 62 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000
static
const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050
static

Definition at line 42 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001
static

Definition at line 28 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000
static

Definition at line 27 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff
static

Definition at line 25 of file DQMNet.h.

Referenced by MonitorElement::kind().

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002
static

Definition at line 29 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f
static

Definition at line 26 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003
static

Definition at line 30 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012
static

Definition at line 33 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010
static

Definition at line 31 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011
static

Definition at line 32 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022
static

Definition at line 36 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020
static

Definition at line 34 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021
static

Definition at line 35 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032
static

Definition at line 39 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030
static

Definition at line 37 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031
static

Definition at line 38 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040
static

Definition at line 40 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041
static

Definition at line 41 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101
static

Definition at line 71 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102
static

Definition at line 72 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_NONE = 103
static

Definition at line 73 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104
static

Definition at line 74 of file DQMNet.h.

bool DQMNet::flush_
private

Definition at line 351 of file DQMNet.h.

Referenced by run().

pthread_mutex_t DQMNet::lock_
protected

Definition at line 318 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128
static

Definition at line 76 of file DQMNet.h.

int DQMNet::pid_
private

Definition at line 334 of file DQMNet.h.

lat::IOSelector DQMNet::sel_
private

Definition at line 336 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

lat::Socket* DQMNet::server_
private

Definition at line 337 of file DQMNet.h.

Referenced by startLocalServer().

sig_atomic_t DQMNet::shutdown_
private

Definition at line 346 of file DQMNet.h.

Referenced by shutdown().

AutoPeer DQMNet::upstream_
private

Definition at line 341 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

lat::Time DQMNet::version_
private

Definition at line 339 of file DQMNet.h.

WaitList DQMNet::waiting_
private

Definition at line 343 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitMax_
private

Definition at line 350 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitStale_
private

Definition at line 349 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

lat::Pipe DQMNet::wakeup_
private

Definition at line 338 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().