CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes
DQMNet Class Referenceabstract

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

Classes

struct  AutoPeer
 
struct  Bucket
 
struct  CoreObject
 
struct  HashEqual
 
struct  HashOp
 
struct  Object
 
struct  Peer
 
struct  QValue
 
struct  WaitObject
 

Public Types

typedef std::vector< unsigned
char > 
DataBlob
 
typedef std::vector< QValueQReports
 
typedef std::vector< uint32_t > TagList
 
typedef std::list< WaitObjectWaitList
 

Public Member Functions

void debug (bool doit)
 
void delay (int delay)
 
 DQMNet (const std::string &appname="")
 
void listenToCollector (const std::string &host, int port)
 
void lock (void)
 Acquire a lock on the DQM net layer. More...
 
void run (void)
 
void sendLocalChanges (void)
 
void shutdown (void)
 Stop the network layer and wait it to finish. More...
 
void staleObjectWaitLimit (lat::TimeSpan time)
 
void start (void)
 
void startLocalServer (int port)
 
void startLocalServer (const char *path)
 
void unlock (void)
 Release the lock on the DQM net layer. More...
 
void updateToCollector (const std::string &host, int port)
 
virtual ~DQMNet (void)
 

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
 
static void packQualityData (std::string &into, const QReports &qr)
 
static bool setOrder (const CoreObject &a, const CoreObject &b)
 
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)
 

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
 
static const uint32_t DQM_MSG_HELLO = 0
 
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
 
static const uint32_t DQM_MSG_UPDATE_ME = 1
 
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
 
static const uint32_t DQM_PROP_DEAD = 0x00080000
 
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
 
static const uint32_t DQM_PROP_LUMI = 0x00040000
 
static const uint32_t DQM_PROP_NEW = 0x00010000
 
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
 
static const uint32_t DQM_PROP_REPORT_ALARM
 
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
 
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
 
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
 
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
 
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
 
static const uint32_t DQM_PROP_RESET = 0x00008000
 
static const uint32_t DQM_PROP_STALE = 0x00100000
 
static const uint32_t DQM_PROP_TAGGED = 0x00002000
 
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
 
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
 
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
 
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
 
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
 
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
 
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
 
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
 
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
 
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
 
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
 
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
 
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
 
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
 
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
 
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
 
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
 
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
 
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
 
static const uint32_t DQM_REPLY_LIST_END = 102
 
static const uint32_t DQM_REPLY_NONE = 103
 
static const uint32_t DQM_REPLY_OBJECT = 104
 
static const uint32_t MAX_PEER_WAITREQS = 128
 

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
 
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=0)=0
 
virtual PeergetPeer (lat::Socket *s)=0
 
std::ostream & logme (void)
 
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
 
virtual void markObjectsDead (Peer *p)=0
 
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
 
virtual void purgeDeadObjects (Peer *p)=0
 
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
 
virtual void removePeer (Peer *p, lat::Socket *s)=0
 
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
 
virtual void sendObjectListToPeers (bool all)=0
 
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
 
virtual bool shouldStop (void)
 
void updateMask (Peer *p)
 
virtual void updatePeerMasks (void)=0
 
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
 
static void discard (Bucket *&b)
 

Protected Attributes

bool debug_
 
pthread_mutex_t lock_
 

Private Member Functions

 DQMNet (const DQMNet &)
 
void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
 
bool onLocalNotify (lat::IOSelectEvent *ev)
 
bool onPeerConnect (lat::IOSelectEvent *ev)
 
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client. More...
 
DQMNetoperator= (const DQMNet &)
 
void releaseFromWait (WaitList::iterator i, Object *o)
 
void releaseWaiters (const std::string &name, Object *o)
 
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server. More...
 

Private Attributes

std::string appname_
 
pthread_t communicate_
 
int delay_
 
AutoPeer downstream_
 
bool flush_
 
int pid_
 
lat::IOSelector sel_
 
lat::Socket * server_
 
sig_atomic_t shutdown_
 
AutoPeer upstream_
 
lat::Time version_
 
WaitList waiting_
 
lat::TimeSpan waitMax_
 
lat::TimeSpan waitStale_
 
lat::Pipe wakeup_
 

Detailed Description

Definition at line 22 of file DQMNet.h.

Member Typedef Documentation

typedef std::vector<unsigned char> DQMNet::DataBlob

Definition at line 78 of file DQMNet.h.

typedef std::vector<QValue> DQMNet::QReports

Definition at line 81 of file DQMNet.h.

typedef std::vector<uint32_t> DQMNet::TagList

Definition at line 82 of file DQMNet.h.

Definition at line 83 of file DQMNet.h.

Constructor & Destructor Documentation

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 1059 of file DQMNet.cc.

References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

1060  : debug_ (false),
1061  appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
1062  pid_ (getpid()),
1063  server_ (0),
1064  version_ (Time::current()),
1065  communicate_ ((pthread_t) -1),
1066  shutdown_ (0),
1067  delay_ (1000),
1068  waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
1069  waitMax_ (0, 0, 0, 5 /* seconds */, 0),
1070  flush_ (false)
1071 {
1072  // Create a pipe for the local DQM to tell the communicator
1073  // thread that local DQM data has changed and that the peers
1074  // should be notified.
1075  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
1076  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
1077 
1078  // Initialise the upstream and downstream to empty.
1082  upstream_.update = downstream_.update = false;
1083 }
AutoPeer downstream_
Definition: DQMNet.h:324
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:998
pthread_t communicate_
Definition: DQMNet.h:327
lat::Time version_
Definition: DQMNet.h:321
int delay_
Definition: DQMNet.h:330
AutoPeer upstream_
Definition: DQMNet.h:323
int pid_
Definition: DQMNet.h:316
std::string appname_
Definition: DQMNet.h:315
Peer * peer
Definition: DQMNet.h:147
sig_atomic_t shutdown_
Definition: DQMNet.h:328
lat::Time next
Definition: DQMNet.h:148
Definition: IOTypes.h:26
lat::Pipe wakeup_
Definition: DQMNet.h:320
bool debug_
Definition: DQMNet.h:299
bool flush_
Definition: DQMNet.h:333
lat::Socket * server_
Definition: DQMNet.h:319
#define O_NONBLOCK
Definition: SysFile.h:21
lat::TimeSpan waitMax_
Definition: DQMNet.h:332
lat::TimeSpan waitStale_
Definition: DQMNet.h:331
lat::IOSelector sel_
Definition: DQMNet.h:318
DQMNet::~DQMNet ( void  )
virtual

Definition at line 1085 of file DQMNet.cc.

1086 {
1087  // FIXME
1088 }
DQMNet::DQMNet ( const DQMNet )
private

Member Function Documentation

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
)
staticprotected

Definition at line 48 of file DQMNet.cc.

References DQMNet::Bucket::data.

Referenced by run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

49 {
50  b->data.insert(b->data.end(),
51  (const unsigned char *)data,
52  (const unsigned char *)data + len);
53 }
double b
Definition: hdecay.h:120
virtual Peer* DQMNet::createPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 1093 of file DQMNet.cc.

References debug_.

1094 {
1095  debug_ = doit;
1096 }
bool debug_
Definition: DQMNet.h:299
void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 1101 of file DQMNet.cc.

References delay_.

1102 {
1103  delay_ = delay;
1104 }
int delay_
Definition: DQMNet.h:330
void delay(int delay)
Definition: DQMNet.cc:1101
void DQMNet::discard ( Bucket *&  b)
staticprotected

Definition at line 57 of file DQMNet.cc.

References b, and DQMNet::Bucket::next.

58 {
59  while (b)
60  {
61  Bucket *next = b->next;
62  delete b;
63  b = next;
64  }
65 }
double b
Definition: hdecay.h:120
static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
)
inlinestatic

Definition at line 198 of file DQMNet.h.

References a, b, trackerHits::c, dqmhashfinal, dqmhashmix, and gen::k.

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flush(), and DQMImplNet< DQMNet::Object >::makeObject().

199  {
200  // Reduced version of Bob Jenkins' hash function at:
201  // http://www.burtleburtle.net/bob/c/lookup3.c
202 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
203 # define dqmhashmix(a,b,c) { \
204  a -= c; a ^= dqmhashrot(c, 4); c += b; \
205  b -= a; b ^= dqmhashrot(a, 6); a += c; \
206  c -= b; c ^= dqmhashrot(b, 8); b += a; \
207  a -= c; a ^= dqmhashrot(c,16); c += b; \
208  b -= a; b ^= dqmhashrot(a,19); a += c; \
209  c -= b; c ^= dqmhashrot(b, 4); b += a; }
210 # define dqmhashfinal(a,b,c) { \
211  c ^= b; c -= dqmhashrot(b,14); \
212  a ^= c; a -= dqmhashrot(c,11); \
213  b ^= a; b -= dqmhashrot(a,25); \
214  c ^= b; c -= dqmhashrot(b,16); \
215  a ^= c; a -= dqmhashrot(c,4); \
216  b ^= a; b -= dqmhashrot(a,14); \
217  c ^= b; c -= dqmhashrot(b,24); }
218 
219  uint32_t a, b, c;
220  a = b = c = 0xdeadbeef + (uint32_t) keylen;
221  const unsigned char *k = (const unsigned char *) key;
222 
223  // all but the last block: affect some bits of (a, b, c)
224  while (keylen > 12)
225  {
226  a += k[0];
227  a += ((uint32_t)k[1]) << 8;
228  a += ((uint32_t)k[2]) << 16;
229  a += ((uint32_t)k[3]) << 24;
230  b += k[4];
231  b += ((uint32_t)k[5]) << 8;
232  b += ((uint32_t)k[6]) << 16;
233  b += ((uint32_t)k[7]) << 24;
234  c += k[8];
235  c += ((uint32_t)k[9]) << 8;
236  c += ((uint32_t)k[10]) << 16;
237  c += ((uint32_t)k[11]) << 24;
238  dqmhashmix(a,b,c);
239  keylen -= 12;
240  k += 12;
241  }
242 
243  // last block: affect all 32 bits of (c); all case statements fall through
244  switch (keylen)
245  {
246  case 12: c += ((uint32_t)k[11]) << 24;
247  case 11: c += ((uint32_t)k[10]) << 16;
248  case 10: c += ((uint32_t)k[9]) << 8;
249  case 9 : c += k[8];
250  case 8 : b += ((uint32_t)k[7]) << 24;
251  case 7 : b += ((uint32_t)k[6]) << 16;
252  case 6 : b += ((uint32_t)k[5]) << 8;
253  case 5 : b += k[4];
254  case 4 : a += ((uint32_t)k[3]) << 24;
255  case 3 : a += ((uint32_t)k[2]) << 16;
256  case 2 : a += ((uint32_t)k[1]) << 8;
257  case 1 : a += k[0];
258  break;
259  case 0 : return c;
260  }
261 
262  dqmhashfinal(a, b, c);
263  return c;
264 # undef dqmhashrot
265 # undef dqmhashmix
266 # undef dqmhashfinal
267  }
#define dqmhashmix(a, b, c)
int k[5][pyjets_maxn]
double b
Definition: hdecay.h:120
#define dqmhashfinal(a, b, c)
list key
Definition: combine.py:13
double a
Definition: hdecay.h:121
virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = 0 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

virtual Peer* DQMNet::getPeer ( lat::Socket *  s)
protectedpure virtual
void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1212 of file DQMNet.cc.

References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

1213 {
1214  if (! upstream_.host.empty())
1215  {
1216  logme()
1217  << "ERROR: Already receiving data from another collector at "
1218  << upstream_.host << ":" << upstream_.port << std::endl;
1219  return;
1220  }
1221 
1222  upstream_.update = false;
1223  upstream_.host = host;
1224  upstream_.port = port;
1225 }
AutoPeer upstream_
Definition: DQMNet.h:323
int port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:37
string host
Definition: query.py:114
std::string host
Definition: DQMNet.h:149
void DQMNet::lock ( void  )

Acquire a lock on the DQM net layer.

Definition at line 1252 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

1253 {
1254  if (communicate_ != (pthread_t) -1)
1255  pthread_mutex_lock(&lock_);
1256 }
pthread_t communicate_
Definition: DQMNet.h:327
pthread_mutex_t lock_
Definition: DQMNet.h:300
std::ostream & DQMNet::logme ( void  )
protected

Definition at line 37 of file DQMNet.cc.

References gather_cfg::cout, cond::rpcobimon::current, and cmsPerfSuiteHarvest::now.

Referenced by listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

38 {
39  Time now = Time::current();
40  return std::cout
41  << now.format(true, "%Y-%m-%d %H:%M:%S.")
42  << now.nanoformat(3, 3)
43  << " " << appname_ << "[" << pid_ << "]: ";
44 }
int pid_
Definition: DQMNet.h:316
std::string appname_
Definition: DQMNet.h:315
tuple cout
Definition: gather_cfg.py:41
void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = 0 
)
private

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 72 of file DQMNet.cc.

References DQMNet::Peer::automatic, i, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, asciidump::s, DQMNet::Peer::sendq, and DQMNet::Peer::socket.

76 {
77  if (reason)
78  logme ()
79  << reason << peer->peeraddr
80  << (err ? "; error was: " + err->explain() : std::string(""))
81  << std::endl;
82 
83  Socket *s = peer->socket;
84 
85  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
86  if (i->peer == peer)
87  waiting_.erase(i++);
88  else
89  ++i;
90 
91  if (ev)
92  ev->source = 0;
93 
94  discard(peer->sendq);
95  if (peer->automatic)
96  peer->automatic->peer = 0;
97 
98  sel_.detach (s);
99  s->close();
100  removePeer(peer, s);
101  delete s;
102 }
int i
Definition: DBlmapReader.cc:9
static void discard(Bucket *&b)
Definition: DQMNet.cc:57
std::ostream & logme(void)
Definition: DQMNet.cc:37
virtual void removePeer(Peer *p, lat::Socket *s)=0
string s
Definition: asciidump.py:422
WaitList waiting_
Definition: DQMNet.h:325
lat::IOSelector sel_
Definition: DQMNet.h:318
virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
)
protectedpure virtual
virtual void DQMNet::markObjectsDead ( Peer p)
protectedpure virtual
bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev)
private

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 998 of file DQMNet.cc.

Referenced by DQMNet().

999 {
1000  // Discard the data in the pipe, we care only about the wakeup.
1001  try
1002  {
1003  IOSize sz;
1004  unsigned char buf [1024];
1005  while ((sz = ev->source->read(buf, sizeof(buf))))
1006  ;
1007  }
1008  catch (Error &e)
1009  {
1010  SystemError *next = dynamic_cast<SystemError *>(e.next());
1011  if (next && next->portable() == SysErr::ErrTryAgain)
1012  ; // Ignore it
1013  else
1014  logme()
1015  << "WARNING: error reading from notification pipe: "
1016  << e.explain() << std::endl;
1017  }
1018 
1019  // Tell the main event pump to send an update in a little while.
1020  flush_ = true;
1021 
1022  // We are never done, always keep going.
1023  return false;
1024 }
std::ostream & logme(void)
Definition: DQMNet.cc:37
bool flush_
Definition: DQMNet.h:333
size_t IOSize
Definition: IOTypes.h:14
bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
)
protectedvirtual

Definition at line 458 of file DQMNet.cc.

References cond::rpcobimon::current, DQMNet::Bucket::data, DQMNet::CoreObject::flags, flags, if(), DQMNet::Object::lastreq, mergeVDriftHistosByStation::name, DQMNet::Peer::peeraddr, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::Peer::source, DQMNet::CoreObject::tag, DQMNet::Peer::update, DQMNet::Peer::updates, and DQMNet::CoreObject::version.

459 {
460  // Decode and process this message.
461  uint32_t type;
462  memcpy (&type, data + sizeof(uint32_t), sizeof (type));
463  switch (type)
464  {
465  case DQM_MSG_UPDATE_ME:
466  {
467  if (len != 2*sizeof(uint32_t))
468  {
469  logme()
470  << "ERROR: corrupt 'UPDATE_ME' message of length " << len
471  << " from peer " << p->peeraddr << std::endl;
472  return false;
473  }
474 
475  if (debug_)
476  logme()
477  << "DEBUG: received message 'UPDATE ME' from peer "
478  << p->peeraddr << ", size " << len << std::endl;
479 
480  p->update = true;
481  }
482  return true;
483 
485  {
486  if (debug_)
487  logme()
488  << "DEBUG: received message 'LIST OBJECTS' from peer "
489  << p->peeraddr << ", size " << len << std::endl;
490 
491  // Send over current status: list of known objects.
492  sendObjectListToPeer(msg, true, false);
493  }
494  return true;
495 
496  case DQM_MSG_GET_OBJECT:
497  {
498  if (debug_)
499  logme()
500  << "DEBUG: received message 'GET OBJECT' from peer "
501  << p->peeraddr << ", size " << len << std::endl;
502 
503  if (len < 3*sizeof(uint32_t))
504  {
505  logme()
506  << "ERROR: corrupt 'GET IMAGE' message of length " << len
507  << " from peer " << p->peeraddr << std::endl;
508  return false;
509  }
510 
511  uint32_t namelen;
512  memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
513  if (len != 3*sizeof(uint32_t) + namelen)
514  {
515  logme()
516  << "ERROR: corrupt 'GET OBJECT' message of length " << len
517  << " from peer " << p->peeraddr
518  << ", expected length " << (3*sizeof(uint32_t))
519  << " + " << namelen << std::endl;
520  return false;
521  }
522 
523  std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
524  Peer *owner = 0;
525  Object *o = findObject(0, name, &owner);
526  if (o)
527  {
528  o->lastreq = Time::current().ns();
529  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
530  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
531  waitForData(p, name, "", owner);
532  else
533  sendObjectToPeer(msg, *o, true);
534  }
535  else
536  {
537  uint32_t words [3];
538  words[0] = sizeof(words) + name.size();
539  words[1] = DQM_REPLY_NONE;
540  words[2] = name.size();
541 
542  msg->data.reserve(msg->data.size() + words[0]);
543  copydata(msg, &words[0], sizeof(words));
544  copydata(msg, &name[0], name.size());
545  }
546  }
547  return true;
548 
550  {
551  if (len != 4*sizeof(uint32_t))
552  {
553  logme()
554  << "ERROR: corrupt 'LIST BEGIN' message of length " << len
555  << " from peer " << p->peeraddr << std::endl;
556  return false;
557  }
558 
559  // Get the update status: whether this is a full update.
560  uint32_t flags;
561  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
562 
563  if (debug_)
564  logme()
565  << "DEBUG: received message 'LIST BEGIN "
566  << (flags ? "FULL" : "INCREMENTAL")
567  << "' from " << p->peeraddr
568  << ", size " << len << std::endl;
569 
570  // If we are about to receive a full list of objects, flag all
571  // objects as possibly dead. Subsequent object notifications
572  // will undo this for the live objects. We cannot delete
573  // objects quite yet, as we may get inquiry from another client
574  // while we are processing the incoming list, so we keep the
575  // objects tentatively alive as long as we've not seen the end.
576  if (flags)
578  }
579  return true;
580 
581  case DQM_REPLY_LIST_END:
582  {
583  if (len != 4*sizeof(uint32_t))
584  {
585  logme()
586  << "ERROR: corrupt 'LIST END' message of length " << len
587  << " from peer " << p->peeraddr << std::endl;
588  return false;
589  }
590 
591  // Get the update status: whether this is a full update.
592  uint32_t flags;
593  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
594 
595  // If we received a full list of objects, now purge all dead
596  // objects. We need to do this in two stages in case we receive
597  // updates in many parts, and end up sending updates to others in
598  // between; this avoids us lying live objects are dead.
599  if (flags)
601 
602  if (debug_)
603  logme()
604  << "DEBUG: received message 'LIST END "
605  << (flags ? "FULL" : "INCREMENTAL")
606  << "' from " << p->peeraddr
607  << ", size " << len << std::endl;
608 
609  // Indicate we have received another update from this peer.
610  // Also indicate we should flush to our clients.
611  flush_ = true;
612  p->updates++;
613  }
614  return true;
615 
616  case DQM_REPLY_OBJECT:
617  {
618  uint32_t words[9];
619  if (len < sizeof(words))
620  {
621  logme()
622  << "ERROR: corrupt 'OBJECT' message of length " << len
623  << " from peer " << p->peeraddr << std::endl;
624  return false;
625  }
626 
627  memcpy (&words[0], data, sizeof(words));
628  uint32_t &namelen = words[6];
629  uint32_t &datalen = words[7];
630  uint32_t &qlen = words[8];
631 
632  if (len != sizeof(words) + namelen + datalen + qlen)
633  {
634  logme()
635  << "ERROR: corrupt 'OBJECT' message of length " << len
636  << " from peer " << p->peeraddr
637  << ", expected length " << sizeof(words)
638  << " + " << namelen
639  << " + " << datalen
640  << " + " << qlen
641  << std::endl;
642  return false;
643  }
644 
645  unsigned char *namedata = data + sizeof(words);
646  unsigned char *objdata = namedata + namelen;
647  unsigned char *qdata = objdata + datalen;
648  unsigned char *enddata = qdata + qlen;
649  std::string name ((char *) namedata, namelen);
650  assert (enddata == data + len);
651 
652  if (debug_)
653  logme()
654  << "DEBUG: received message 'OBJECT " << name
655  << "' from " << p->peeraddr
656  << ", size " << len << std::endl;
657 
658  // Mark the peer as a known object source.
659  p->source = true;
660 
661  // Initialise or update an object entry.
662  Object *o = findObject(p, name);
663  if (! o)
664  o = makeObject(p, name);
665 
666  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
667  o->tag = words[5];
668  o->version = ((uint64_t) words[4] << 32 | words[3]);
669  o->scalar.clear();
670  o->qdata.clear();
671  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
672  {
673  o->rawdata.clear();
674  o->scalar.insert(o->scalar.end(), objdata, qdata);
675  }
676  else if (datalen)
677  {
678  o->rawdata.clear();
679  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
680  }
681  else if (! o->rawdata.empty())
682  o->flags |= DQM_PROP_STALE;
683  o->qdata.insert(o->qdata.end(), qdata, enddata);
684 
685  // If we had an object for this one already and this is a list
686  // update without data, issue an immediate data get request.
687  if (o->lastreq
688  && ! datalen
689  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
690  requestObjectData(p, (namelen ? &name[0] : 0), namelen);
691 
692  // If we have the object data, release from wait.
693  if (datalen)
694  releaseWaiters(name, o);
695  }
696  return true;
697 
698  case DQM_REPLY_NONE:
699  {
700  uint32_t words[3];
701  if (len < sizeof(words))
702  {
703  logme()
704  << "ERROR: corrupt 'NONE' message of length " << len
705  << " from peer " << p->peeraddr << std::endl;
706  return false;
707  }
708 
709  memcpy (&words[0], data, sizeof(words));
710  uint32_t &namelen = words[2];
711 
712  if (len != sizeof(words) + namelen)
713  {
714  logme()
715  << "ERROR: corrupt 'NONE' message of length " << len
716  << " from peer " << p->peeraddr
717  << ", expected length " << sizeof(words)
718  << " + " << namelen << std::endl;
719  return false;
720  }
721 
722  unsigned char *namedata = data + sizeof(words);
723  std::string name((char *) namedata, namelen);
724 
725  if (debug_)
726  logme()
727  << "DEBUG: received message 'NONE " << name
728  << "' from " << p->peeraddr
729  << ", size " << len << std::endl;
730 
731  // Mark the peer as a known object source.
732  p->source = true;
733 
734  // If this was a known object, kill it.
735  if (Object *o = findObject(p, name))
736  {
737  o->flags |= DQM_PROP_DEAD;
739  }
740 
741  // If someone was waiting for this, let them go.
742  releaseWaiters(name, 0);
743  }
744  return true;
745 
746  default:
747  logme()
748  << "ERROR: unrecognised message of length " << len
749  << " and type " << type << " from peer " << p->peeraddr
750  << std::endl;
751  return false;
752  }
753 }
type
Definition: HCALResponse.h:22
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:65
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:70
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:159
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:126
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:106
std::ostream & logme(void)
Definition: DQMNet.cc:37
virtual void markObjectsDead(Peer *p)=0
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:72
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:408
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:67
unsigned long long uint64_t
Definition: Time.h:15
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:69
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:299
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:71
perl if(1 lt scalar(@::datatypes))
Definition: edlooper.cc:31
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:66
bool flush_
Definition: DQMNet.h:333
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:58
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:59
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev)
private

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 941 of file DQMNet.cc.

References IORead, IOUrgent, CommonMethods::lock(), DQMNet::Peer::mask, onPeerData(), L1TEmulatorMonitor_cff::p, DQMNet::Peer::peeraddr, asciidump::s, and DQMNet::Peer::socket.

Referenced by startLocalServer().

942 {
943  // Recover the server socket.
944  assert (ev->source == server_);
945 
946  // Accept the connection.
947  Socket *s = server_->accept();
948  assert (s);
949  assert (! s->isBlocking());
950 
951  // Record it to our list of peers.
952  lock();
953  Peer *p = createPeer(s);
954  std::string localaddr;
955  if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
956  {
957  InetAddress peeraddr = inet->peername();
958  InetAddress myaddr = inet->sockname();
959  p->peeraddr = StringFormat("%1:%2")
960  .arg(peeraddr.hostname())
961  .arg(peeraddr.port());
962  localaddr = StringFormat("%1:%2")
963  .arg(myaddr.hostname())
964  .arg(myaddr.port());
965  }
966  else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
967  {
968  p->peeraddr = local->peername().path();
969  localaddr = local->sockname().path();
970  }
971  else
972  assert(false);
973 
974  p->mask = IORead|IOUrgent;
975  p->socket = s;
976 
977  // Report the new connection.
978  if (debug_)
979  logme()
980  << "INFO: new peer " << p->peeraddr << " is now connected to "
981  << localaddr << std::endl;
982 
983  // Attach it to the listener.
984  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
985  unlock();
986 
987  // We are never done.
988  return false;
989 }
virtual Peer * createPeer(lat::Socket *s)=0
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1252
A arg
Definition: Factorize.h:36
std::ostream & logme(void)
Definition: DQMNet.cc:37
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:758
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1260
Definition: IOTypes.h:26
bool debug_
Definition: DQMNet.h:299
lat::Socket * server_
Definition: DQMNet.h:319
string s
Definition: asciidump.py:422
lat::IOSelector sel_
Definition: DQMNet.h:318
bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
)
private

Handle communication to a particular client.

Definition at line 758 of file DQMNet.cc.

References DQMNet::Peer::automatic, b, DQMNet::Bucket::data, runTheMatrix::data, generateEDF::done, DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, CommonMethods::lock(), DQMNet::Peer::mask, MESSAGE_SIZE_LIMIT, runTheMatrix::msg, DQMNet::Bucket::next, DQMNet::Peer::peeraddr, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, TrackValidation_HighPurity_cff::valid, and DQMNet::Peer::waiting.

Referenced by onPeerConnect(), and run().

759 {
760  lock();
761  assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
762 
763  // If there is a problem with the peer socket, discard the peer
764  // and tell the selector to stop prcessing events for it. If
765  // this is a server connection, we will eventually recreate
766  // everything if/when the data server comes back.
767  if (ev->events & IOUrgent)
768  {
769  if (p->automatic)
770  {
771  logme()
772  << "WARNING: connection to the DQM server at " << p->peeraddr
773  << " lost (will attempt to reconnect in 15 seconds)\n";
774  losePeer(0, p, ev);
775  }
776  else
777  losePeer("WARNING: lost peer connection ", p, ev);
778 
779  unlock();
780  return true;
781  }
782 
783  // If we can write to the peer socket, pump whatever we can into it.
784  if (ev->events & IOWrite)
785  {
786  while (Bucket *b = p->sendq)
787  {
788  IOSize len = b->data.size() - p->sendpos;
789  const void *data = (len ? (const void *)&b->data[p->sendpos]
790  : (const void *)&data);
791  IOSize done;
792 
793  try
794  {
795  done = (len ? ev->source->write (data, len) : 0);
796  if (debug_ && len)
797  logme()
798  << "DEBUG: sent " << done << " bytes to peer "
799  << p->peeraddr << std::endl;
800  }
801  catch (Error &e)
802  {
803  losePeer("WARNING: unable to write to peer ", p, ev, &e);
804  unlock();
805  return true;
806  }
807 
808  p->sendpos += done;
809  if (p->sendpos == b->data.size())
810  {
811  Bucket *old = p->sendq;
812  p->sendq = old->next;
813  p->sendpos = 0;
814  old->next = 0;
815  discard(old);
816  }
817 
818  if (! done && len)
819  // Cannot write any more.
820  break;
821  }
822  }
823 
824  // If there is data to be read from the peer, first receive what we
825  // can get out the socket, the process all complete requests.
826  if (ev->events & IORead)
827  {
828  // First build up the incoming buffer of data in the socket.
829  // Remember the last size returned by the socket; we need
830  // it to determine if the remote end closed the connection.
831  IOSize sz;
832  try
833  {
834  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
835  do
836  if ((sz = ev->source->read(&buf[0], buf.size())))
837  {
838  if (debug_)
839  logme()
840  << "DEBUG: received " << sz << " bytes from peer "
841  << p->peeraddr << std::endl;
842  DataBlob &data = p->incoming;
843  if (data.capacity () < data.size () + sz)
844  data.reserve (data.size() + SOCKET_READ_GROWTH);
845  data.insert (data.end(), &buf[0], &buf[0] + sz);
846  }
847  while (sz == sizeof (buf));
848  }
849  catch (Error &e)
850  {
851  SystemError *next = dynamic_cast<SystemError *>(e.next());
852  if (next && next->portable() == SysErr::ErrTryAgain)
853  sz = 1; // Ignore it, and fake no end of data.
854  else
855  {
856  // Houston we have a problem.
857  losePeer("WARNING: failed to read from peer ", p, ev, &e);
858  unlock();
859  return true;
860  }
861  }
862 
863  // Process fully received messages as long as we can.
864  size_t consumed = 0;
865  DataBlob &data = p->incoming;
866  while (data.size()-consumed >= sizeof(uint32_t)
867  && p->waiting < MAX_PEER_WAITREQS)
868  {
869  uint32_t msglen;
870  memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
871 
872  if (msglen >= MESSAGE_SIZE_LIMIT)
873  {
874  losePeer("WARNING: excessively large message from ", p, ev);
875  unlock();
876  return true;
877  }
878 
879  if (data.size()-consumed >= msglen)
880  {
881  bool valid = true;
882  if (msglen < 2*sizeof(uint32_t))
883  {
884  logme()
885  << "ERROR: corrupt peer message of length " << msglen
886  << " from peer " << p->peeraddr << std::endl;
887  valid = false;
888  }
889  else
890  {
891  // Decode and process this message.
892  Bucket msg;
893  msg.next = 0;
894  valid = onMessage(&msg, p, &data[0]+consumed, msglen);
895 
896  // If we created a response, chain it to the write queue.
897  if (! msg.data.empty())
898  {
899  Bucket **prev = &p->sendq;
900  while (*prev)
901  prev = &(*prev)->next;
902 
903  *prev = new Bucket;
904  (*prev)->next = 0;
905  (*prev)->data.swap(msg.data);
906  }
907  }
908 
909  if (! valid)
910  {
911  losePeer("WARNING: data stream error with ", p, ev);
912  unlock();
913  return true;
914  }
915 
916  consumed += msglen;
917  }
918  else
919  break;
920  }
921 
922  data.erase(data.begin(), data.begin()+consumed);
923 
924  // If the client has closed the connection, shut down our end. If
925  // we have something to send back still, leave the write direction
926  // open. Otherwise close the shop for this client.
927  if (sz == 0)
928  sel_.setMask(p->socket, p->mask &= ~IORead);
929  }
930 
931  // Yes, please keep processing events for this socket.
932  unlock();
933  return false;
934 }
virtual Peer * getPeer(lat::Socket *s)=0
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:25
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:27
static void discard(Bucket *&b)
Definition: DQMNet.cc:57
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1252
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:458
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:78
std::ostream & logme(void)
Definition: DQMNet.cc:37
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:28
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:74
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:72
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1260
Definition: IOTypes.h:26
double b
Definition: hdecay.h:120
bool debug_
Definition: DQMNet.h:299
size_t IOSize
Definition: IOTypes.h:14
lat::IOSelector sel_
Definition: DQMNet.h:318
DQMNet& DQMNet::operator= ( const DQMNet )
private
void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
)
static

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 172 of file DQMNet.cc.

References pos.

Referenced by DQMService::flush(), and MonitorElement::packQualityData().

173 {
174  char buf[64];
175  std::ostringstream qrs;
176  QReports::const_iterator qi, qe;
177  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
178  {
179  int pos = 0;
180  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
181  qrs << buf << '\0'
182  << buf+pos << '\0'
183  << qi->qtname << '\0'
184  << qi->algorithm << '\0'
185  << qi->message << '\0'
186  << '\0';
187  }
188  into = qrs.str();
189 }
virtual void DQMNet::purgeDeadObjects ( Peer p)
protectedpure virtual
void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
)
protectedvirtual

Definition at line 387 of file DQMNet.cc.

References DQMNet::Bucket::data, and DQMNet::WaitObject::name.

Referenced by run().

388 {
389  if (o)
390  sendObjectToPeer(msg, *o, true);
391  else
392  {
393  uint32_t words [3];
394  words[0] = sizeof(words) + w.name.size();
395  words[1] = DQM_REPLY_NONE;
396  words[2] = w.name.size();
397 
398  msg->data.reserve(msg->data.size() + words[0]);
399  copydata(msg, &words[0], sizeof(words));
400  copydata(msg, &w.name[0], w.name.size());
401  }
402 }
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:408
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:71
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
)
private

Definition at line 142 of file DQMNet.cc.

References runTheMatrix::msg, and DQMNet::Bucket::next.

143 {
144  Bucket **msg = &i->peer->sendq;
145  while (*msg)
146  msg = &(*msg)->next;
147  *msg = new Bucket;
148  (*msg)->next = 0;
149 
150  releaseFromWait(*msg, *i, o);
151 
152  assert(i->peer->waiting > 0);
153  i->peer->waiting--;
154  waiting_.erase(i);
155 }
int i
Definition: DBlmapReader.cc:9
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:387
WaitList waiting_
Definition: DQMNet.h:325
void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
)
private

Definition at line 159 of file DQMNet.cc.

References i.

160 {
161  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
162  if (i->name == name)
163  releaseFromWait(i++, o);
164  else
165  ++i;
166 }
int i
Definition: DBlmapReader.cc:9
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:387
WaitList waiting_
Definition: DQMNet.h:325
virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
)
protectedpure virtual
void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
)
private

Queue an object request to the data server.

Definition at line 106 of file DQMNet.cc.

References runTheMatrix::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.

107 {
108  // Issue request to peer.
109  Bucket **msg = &p->sendq;
110  while (*msg)
111  msg = &(*msg)->next;
112  *msg = new Bucket;
113  (*msg)->next = 0;
114 
115  uint32_t words[3];
116  words[0] = sizeof(words) + len;
117  words[1] = DQM_MSG_GET_OBJECT;
118  words[2] = len;
119  copydata(*msg, words, sizeof(words));
120  copydata(*msg, name, len);
121 }
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:67
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
void DQMNet::run ( void  )

Run the actual I/O processing loop.

Definition at line 1285 of file DQMNet.cc.

References DQMNet::Peer::automatic, copydata(), createPeer(), cond::rpcobimon::current, debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, findObject(), DQMNet::CoreObject::flags, flush_, DQMNet::AutoPeer::host, i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, cmsPerfSuiteHarvest::now, onPeerData(), L1TEmulatorMonitor_cff::p, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, DQMNet::AutoPeer::port, DQMNet::Object::rawdata, releaseFromWait(), asciidump::s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, unlock(), DQMNet::Peer::update, DQMNet::AutoPeer::update, updatePeerMasks(), upstream_, waiting_, waitMax_, and waitStale_.

Referenced by Types.LuminosityBlockID::cppID().

1286 {
1287  Time now;
1288  Time nextFlush = 0;
1289  AutoPeer *automatic[2] = { &upstream_, &downstream_ };
1290 
1291  // Perform I/O. Every once in a while flush updates to peers.
1292  while (! shouldStop())
1293  {
1294  for (int i = 0; i < 2; ++i)
1295  {
1296  AutoPeer *ap = automatic[i];
1297 
1298  // If we need a server connection and don't have one yet,
1299  // initiate asynchronous connection creation. Swallow errors
1300  // in case the server won't talk to us.
1301  if (! ap->host.empty()
1302  && ! ap->peer
1303  && (now = Time::current()) > ap->next)
1304  {
1305  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1306  InetSocket *s = 0;
1307  try
1308  {
1309  InetAddress addr(ap->host.c_str(), ap->port);
1310  s = new InetSocket (SOCK_STREAM, 0, addr.family());
1311  s->setBlocking(false);
1312  s->connect(addr);
1313  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1314  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1315  }
1316  catch (Error &e)
1317  {
1318  SystemError *sys = dynamic_cast<SystemError *>(e.next());
1319  if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
1320  {
1321  // "In progress" just means the connection is in progress.
1322  // The connection is ready when the socket is writeable.
1323  // Anything else is a real problem.
1324  if (s)
1325  s->abort();
1326  delete s;
1327  s = 0;
1328  }
1329  }
1330 
1331  // Set up with the selector if we were successful. If this is
1332  // the upstream collector, queue a request for updates.
1333  if (s)
1334  {
1335  Peer *p = createPeer(s);
1336  ap->peer = p;
1337 
1338  InetAddress peeraddr = ((InetSocket *) s)->peername();
1339  InetAddress myaddr = ((InetSocket *) s)->sockname();
1340  p->peeraddr = StringFormat("%1:%2")
1341  .arg(peeraddr.hostname())
1342  .arg(peeraddr.port());
1343  p->mask = IORead|IOWrite|IOUrgent;
1344  p->update = ap->update;
1345  p->automatic = ap;
1346  p->socket = s;
1347  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1348  if (ap == &upstream_)
1349  {
1350  uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
1351  2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
1352  p->sendq = new Bucket;
1353  p->sendq->next = 0;
1354  copydata(p->sendq, words, sizeof(words));
1355  }
1356 
1357  // Report the new connection.
1358  if (debug_)
1359  logme()
1360  << "INFO: now connected to " << p->peeraddr << " from "
1361  << myaddr.hostname() << ":" << myaddr.port() << std::endl;
1362  }
1363  }
1364  }
1365 
1366  // Pump events for a while.
1367  sel_.dispatch(delay_);
1368  now = Time::current();
1369  lock();
1370 
1371  // Check if flush is required. Flush only if one is needed.
1372  // Always sends the full object list, but only rarely.
1373  if (flush_ && now > nextFlush)
1374  {
1375  flush_ = false;
1376  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1377  sendObjectListToPeers(true);
1378  }
1379 
1380  // Update the data server and peer selection masks. If we
1381  // have no more data to send and listening for writes, remove
1382  // the write mask. If we have something to write and aren't
1383  // listening for writes, start listening so we can send off
1384  // the data.
1385  updatePeerMasks();
1386 
1387  // Release peers that have been waiting for data for too long.
1388  Time waitold = now - waitMax_;
1389  Time waitstale = now - waitStale_;
1390  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
1391  {
1392  Object *o = findObject(0, i->name);
1393 
1394  // If we have (stale) object data, wait only up to stale limit.
1395  // Otherwise if we have no data at all, wait up to the max limit.
1396  if (i->time < waitold)
1397  {
1398  logme ()
1399  << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
1400  << "s to retrieval, releasing '" << i->name << "' from wait, have "
1401  << (o ? o->rawdata.size() : 0) << " data available\n";
1402  releaseFromWait(i++, o);
1403  }
1404  else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
1405  {
1406  logme ()
1407  << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
1408  << "s to update, releasing '" << i->name << "' from wait, have "
1409  << o->rawdata.size() << " data available\n";
1410  releaseFromWait(i++, o);
1411  }
1412 
1413  // Keep it for now.
1414  else
1415  ++i;
1416  }
1417 
1418  unlock();
1419  }
1420 }
AutoPeer downstream_
Definition: DQMNet.h:324
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:65
int i
Definition: DBlmapReader.cc:9
virtual bool shouldStop(void)
Definition: DQMNet.cc:379
virtual void sendObjectListToPeers(bool all)=0
int delay_
Definition: DQMNet.h:330
virtual Peer * createPeer(lat::Socket *s)=0
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1252
AutoPeer upstream_
Definition: DQMNet.h:323
A arg
Definition: Factorize.h:36
std::ostream & logme(void)
Definition: DQMNet.cc:37
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:758
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:387
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1260
lat::Time next
Definition: DQMNet.h:148
Definition: IOTypes.h:26
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:62
virtual void updatePeerMasks(void)=0
bool debug_
Definition: DQMNet.h:299
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:26
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:66
bool flush_
Definition: DQMNet.h:333
string s
Definition: asciidump.py:422
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
lat::TimeSpan waitMax_
Definition: DQMNet.h:332
lat::TimeSpan waitStale_
Definition: DQMNet.h:331
WaitList waiting_
Definition: DQMNet.h:325
lat::IOSelector sel_
Definition: DQMNet.h:318
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
void DQMNet::sendLocalChanges ( void  )

Definition at line 1425 of file DQMNet.cc.

References wakeup_.

Referenced by DQMImplNet< DQMNet::Object >::removePeer().

1426 {
1427  char byte = 0;
1428  wakeup_.sink()->write(&byte, 1);
1429 }
unsigned char byte
lat::Pipe wakeup_
Definition: DQMNet.h:320
virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
)
protectedpure virtual
virtual void DQMNet::sendObjectListToPeers ( bool  all)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
)
protectedvirtual

Definition at line 408 of file DQMNet.cc.

References DQMNet::Bucket::data, DQMNet::CoreObject::dirname, DQMNet::CoreObject::flags, flags, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::CoreObject::tag, and DQMNet::CoreObject::version.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

409 {
410  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
411  DataBlob objdata;
412 
413  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
414  objdata.insert(objdata.end(),
415  &o.scalar[0],
416  &o.scalar[0] + o.scalar.size());
417  else if (data)
418  objdata.insert(objdata.end(),
419  &o.rawdata[0],
420  &o.rawdata[0] + o.rawdata.size());
421 
422  uint32_t words [9];
423  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
424  uint32_t datalen = objdata.size();
425  uint32_t qlen = o.qdata.size();
426 
427  if (o.dirname->empty())
428  --namelen;
429 
430  words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
431  words[1] = DQM_REPLY_OBJECT;
432  words[2] = flags;
433  words[3] = (o.version >> 0 ) & 0xffffffff;
434  words[4] = (o.version >> 32) & 0xffffffff;
435  words[5] = o.tag;
436  words[6] = namelen;
437  words[7] = datalen;
438  words[8] = qlen;
439 
440  msg->data.reserve(msg->data.size() + words[0]);
441  copydata(msg, &words[0], 9*sizeof(uint32_t));
442  if (namelen)
443  {
444  copydata(msg, &(*o.dirname)[0], o.dirname->size());
445  if (! o.dirname->empty())
446  copydata(msg, "/", 1);
447  copydata(msg, &o.objname[0], o.objname.size());
448  }
449  if (datalen)
450  copydata(msg, &objdata[0], datalen);
451  if (qlen)
452  copydata(msg, &o.qdata[0], qlen);
453 }
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:61
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:78
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:72
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
)
inlinestatic

Definition at line 173 of file DQMNet.h.

References diffTreeTool::diff, DQMNet::CoreObject::dirname, and DQMNet::CoreObject::objname.

Referenced by MonitorElement::operator<().

174  {
175  int diff = a.dirname->compare(*b.dirname);
176  return (diff < 0 ? true
177  : diff == 0 ? a.objname < b.objname
178  : false);
179  }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
bool DQMNet::shouldStop ( void  )
protectedvirtual

Definition at line 379 of file DQMNet.cc.

Referenced by run().

380 {
381  return shutdown_;
382 }
sig_atomic_t shutdown_
Definition: DQMNet.h:328
void DQMNet::shutdown ( void  )

Stop the network layer and wait it to finish.

Definition at line 1229 of file DQMNet.cc.

References communicate_, and shutdown_.

1230 {
1231  shutdown_ = 1;
1232  if (communicate_ != (pthread_t) -1)
1233  pthread_join(communicate_, 0);
1234 }
pthread_t communicate_
Definition: DQMNet.h:327
sig_atomic_t shutdown_
Definition: DQMNet.h:328
void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 1111 of file DQMNet.cc.

References cond::rpcobgas::time, and waitStale_.

1112 {
1113  waitStale_ = time;
1114 }
lat::TimeSpan waitStale_
Definition: DQMNet.h:331
void DQMNet::start ( void  )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1270 of file DQMNet.cc.

References communicate(), communicate_, lock_, and logme().

Referenced by Types.LuminosityBlockRange::cppID(), and Types.EventRange::cppID().

1271 {
1272  if (communicate_ != (pthread_t) -1)
1273  {
1274  logme()
1275  << "ERROR: DQM networking thread has already been started\n";
1276  return;
1277  }
1278 
1279  pthread_mutex_init(&lock_, 0);
1280  pthread_create (&communicate_, 0, &communicate, this);
1281 }
pthread_t communicate_
Definition: DQMNet.h:327
pthread_mutex_t lock_
Definition: DQMNet.h:300
static void * communicate(void *obj)
Definition: DQMNet.cc:1241
std::ostream & logme(void)
Definition: DQMNet.cc:37
void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1120 of file DQMNet.cc.

References IOAccept, logme(), onPeerConnect(), raiseDQMError(), asciidump::s, sel_, server_, and SOCKET_BUF_SIZE.

1121 {
1122  if (server_)
1123  {
1124  logme() << "ERROR: DQM server was already started.\n";
1125  return;
1126  }
1127 
1128  try
1129  {
1130  InetAddress addr("0.0.0.0", port);
1131  InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
1132  s->bind(addr);
1133  s->listen(10);
1134  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1135  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1136  s->setBlocking(false);
1137  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1138  }
1139  catch (Error &e)
1140  {
1141  // FIXME: Do we need to do this when we throw an exception anyway?
1142  // FIXME: Abort instead?
1143  logme()
1144  << "ERROR: Failed to start server at port " << port << ": "
1145  << e.explain() << std::endl;
1146 
1147  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
1148  " %d: %s", port, e.explain().c_str());
1149  }
1150 
1151  logme() << "INFO: DQM server started at port " << port << std::endl;
1152 }
int port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:37
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:941
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:26
lat::Socket * server_
Definition: DQMNet.h:319
string s
Definition: asciidump.py:422
lat::IOSelector sel_
Definition: DQMNet.h:318
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11
void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1158 of file DQMNet.cc.

References IOAccept, logme(), onPeerConnect(), raiseDQMError(), sel_, server_, and SOCKET_BUF_SIZE.

1159 {
1160  if (server_)
1161  {
1162  logme() << "ERROR: DQM server was already started.\n";
1163  return;
1164  }
1165 
1166  try
1167  {
1168  server_ = new LocalServerSocket(path, 10);
1169  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1170  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1171  server_->setBlocking(false);
1172  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1173  }
1174  catch (Error &e)
1175  {
1176  // FIXME: Do we need to do this when we throw an exception anyway?
1177  // FIXME: Abort instead?
1178  logme()
1179  << "ERROR: Failed to start server at path " << path << ": "
1180  << e.explain() << std::endl;
1181 
1182  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
1183  " %s: %s", path, e.explain().c_str());
1184  }
1185 
1186  logme() << "INFO: DQM server started at path " << path << std::endl;
1187 }
int path() const
Definition: HLTadd.h:3
std::ostream & logme(void)
Definition: DQMNet.cc:37
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:941
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:26
lat::Socket * server_
Definition: DQMNet.h:319
lat::IOSelector sel_
Definition: DQMNet.h:318
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11
void DQMNet::unlock ( void  )

Release the lock on the DQM net layer.

Definition at line 1260 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

1261 {
1262  if (communicate_ != (pthread_t) -1)
1263  pthread_mutex_unlock(&lock_);
1264 }
pthread_t communicate_
Definition: DQMNet.h:327
pthread_mutex_t lock_
Definition: DQMNet.h:300
void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
)
static

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 194 of file DQMNet.cc.

References DQMNet::QValue::algorithm, DQMNet::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, Capri::details::from(), DQMNet::QValue::message, DQMNet::QValue::qtname, DQMNet::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.

195 {
196  const char *qdata = from;
197 
198  // Count how many qresults there are.
199  size_t nqv = 0;
200  while (*qdata)
201  {
202  ++nqv;
203  while (*qdata) ++qdata; ++qdata;
204  while (*qdata) ++qdata; ++qdata;
205  while (*qdata) ++qdata; ++qdata;
206  while (*qdata) ++qdata; ++qdata;
207  while (*qdata) ++qdata; ++qdata;
208  }
209 
210  // Now extract the qreports.
211  qdata = from;
212  qr.reserve(nqv);
213  while (*qdata)
214  {
215  qr.push_back(DQMNet::QValue());
216  DQMNet::QValue &qv = qr.back();
217 
218  qv.code = atoi(qdata);
219  while (*qdata) ++qdata;
220  switch (qv.code)
221  {
223  break;
226  break;
227  case dqm::qstatus::ERROR:
229  break;
230  default:
232  break;
233  }
234 
235  qv.qtresult = atof(++qdata);
236  while (*qdata) ++qdata;
237 
238  qv.qtname = ++qdata;
239  while (*qdata) ++qdata;
240 
241  qv.algorithm = ++qdata;
242  while (*qdata) ++qdata;
243 
244  qv.message = ++qdata;
245  while (*qdata) ++qdata;
246  ++qdata;
247  }
248 }
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
std::string algorithm
Definition: DQMNet.h:91
static const int WARNING
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
std::string qtname
Definition: DQMNet.h:90
static std::string from(" from ")
std::string message
Definition: DQMNet.h:89
static const int STATUS_OK
float qtresult
Definition: DQMNet.h:88
static const int ERROR
void DQMNet::updateMask ( Peer p)
protected

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 1029 of file DQMNet.cc.

References IOUrgent, IOWrite, DQMNet::Peer::mask, DQMNet::Peer::peeraddr, DQMNet::Peer::sendq, DQMNet::Peer::socket, and DQMNet::Peer::waiting.

Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().

1030 {
1031  if (! p->socket)
1032  return;
1033 
1034  // Listen to writes iff we have data to send.
1035  unsigned oldmask = p->mask;
1036  if (! p->sendq && (p->mask & IOWrite))
1037  sel_.setMask(p->socket, p->mask &= ~IOWrite);
1038 
1039  if (p->sendq && ! (p->mask & IOWrite))
1040  sel_.setMask(p->socket, p->mask |= IOWrite);
1041 
1042  if (debug_ && oldmask != p->mask)
1043  logme()
1044  << "DEBUG: updating mask for " << p->peeraddr << " to "
1045  << p->mask << " from " << oldmask << std::endl;
1046 
1047  // If we have nothing more to send and are no longer listening
1048  // for reads, close up the shop for this peer.
1049  if (p->mask == IOUrgent && ! p->waiting)
1050  {
1051  assert (! p->sendq);
1052  if (debug_)
1053  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
1054  losePeer(0, p, 0);
1055  }
1056 }
std::ostream & logme(void)
Definition: DQMNet.cc:37
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
Definition: DQMNet.cc:72
bool debug_
Definition: DQMNet.h:299
lat::IOSelector sel_
Definition: DQMNet.h:318
virtual void DQMNet::updatePeerMasks ( void  )
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1193 of file DQMNet.cc.

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

1194 {
1195  if (! downstream_.host.empty())
1196  {
1197  logme()
1198  << "ERROR: Already updating another collector at "
1199  << downstream_.host << ":" << downstream_.port << std::endl;
1200  return;
1201  }
1202 
1203  downstream_.update = true;
1204  downstream_.host = host;
1205  downstream_.port = port;
1206 }
AutoPeer downstream_
Definition: DQMNet.h:324
int port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:37
string host
Definition: query.py:114
std::string host
Definition: DQMNet.h:149
void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
)
protected

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 126 of file DQMNet.cc.

References cond::rpcobimon::current, info, mergeVDriftHistosByStation::name, and DQMNet::Peer::waiting.

127 {
128  // FIXME: Should we automatically record which exact peer the waiter
129  // is expecting to deliver data so we know to release the waiter if
130  // the other peer vanishes? The current implementation stands a
131  // chance for the waiter to wait indefinitely -- although we do
132  // force terminate the wait after a while.
133  requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
134  WaitObject wo = { Time::current(), name, info, p };
135  waiting_.push_back(wo);
136  p->waiting++;
137 }
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:106
WaitList waiting_
Definition: DQMNet.h:325

Member Data Documentation

std::string DQMNet::appname_
private

Definition at line 315 of file DQMNet.h.

pthread_t DQMNet::communicate_
private

Definition at line 327 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool DQMNet::debug_
protected

Definition at line 299 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

int DQMNet::delay_
private

Definition at line 330 of file DQMNet.h.

Referenced by delay(), and run().

AutoPeer DQMNet::downstream_
private

Definition at line 324 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3
static

Definition at line 67 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_HELLO = 0
static

Definition at line 64 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2
static

Definition at line 66 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1
static

Definition at line 65 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000
static

Definition at line 55 of file DQMNet.h.

Referenced by MonitorElement::isAccumulateEnabled(), and MonitorElement::setAccumulate().

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000
static
const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000
static

Definition at line 53 of file DQMNet.h.

Referenced by DQMStore::book(), DQMStore::extract(), and MonitorElement::initialise().

const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000
static

Definition at line 60 of file DQMNet.h.

Referenced by MonitorElement::getLumiFlag(), and MonitorElement::setLumiFlag().

const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000
static
const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000
static

Definition at line 59 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM
static
Initial value:

Definition at line 49 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000
static

Definition at line 45 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100
static
const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00
static

Definition at line 44 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400
static
const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200
static
const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000
static

Definition at line 56 of file DQMNet.h.

Referenced by MonitorElement::resetMe(), and MonitorElement::setResetMe().

const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000
static

Definition at line 62 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000
static
const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050
static

Definition at line 42 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001
static

Definition at line 28 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000
static

Definition at line 27 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff
static

Definition at line 25 of file DQMNet.h.

Referenced by MonitorElement::kind().

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002
static

Definition at line 29 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f
static

Definition at line 26 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003
static

Definition at line 30 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012
static

Definition at line 33 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010
static

Definition at line 31 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011
static

Definition at line 32 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022
static

Definition at line 36 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020
static

Definition at line 34 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021
static

Definition at line 35 of file DQMNet.h.

Referenced by HLTMonSimpleBTag::doEffCalc().

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032
static

Definition at line 39 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030
static

Definition at line 37 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031
static

Definition at line 38 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040
static

Definition at line 40 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041
static

Definition at line 41 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101
static

Definition at line 69 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102
static

Definition at line 70 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_NONE = 103
static

Definition at line 71 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104
static

Definition at line 72 of file DQMNet.h.

bool DQMNet::flush_
private

Definition at line 333 of file DQMNet.h.

Referenced by run().

pthread_mutex_t DQMNet::lock_
protected

Definition at line 300 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128
static

Definition at line 74 of file DQMNet.h.

int DQMNet::pid_
private

Definition at line 316 of file DQMNet.h.

lat::IOSelector DQMNet::sel_
private

Definition at line 318 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

lat::Socket* DQMNet::server_
private

Definition at line 319 of file DQMNet.h.

Referenced by startLocalServer().

sig_atomic_t DQMNet::shutdown_
private

Definition at line 328 of file DQMNet.h.

Referenced by shutdown().

AutoPeer DQMNet::upstream_
private

Definition at line 323 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

lat::Time DQMNet::version_
private

Definition at line 321 of file DQMNet.h.

WaitList DQMNet::waiting_
private

Definition at line 325 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitMax_
private

Definition at line 332 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitStale_
private

Definition at line 331 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

lat::Pipe DQMNet::wakeup_
private

Definition at line 320 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().