CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes
DQMNet Class Referenceabstract

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

Classes

struct  AutoPeer
 
struct  Bucket
 
struct  CoreObject
 
struct  HashEqual
 
struct  HashOp
 
struct  Object
 
struct  Peer
 
struct  QValue
 
struct  WaitObject
 

Public Types

typedef std::vector< unsigned char > DataBlob
 
typedef std::vector< QValueQReports
 
typedef std::vector< uint32_t > TagList
 
typedef std::list< WaitObjectWaitList
 

Public Member Functions

void debug (bool doit)
 
void delay (int delay)
 
 DQMNet (const std::string &appname="")
 
void listenToCollector (const std::string &host, int port)
 
void lock (void)
 Acquire a lock on the DQM net layer. More...
 
void run (void)
 
void sendLocalChanges (void)
 
void shutdown (void)
 Stop the network layer and wait it to finish. More...
 
void staleObjectWaitLimit (lat::TimeSpan time)
 
void start (void)
 
void startLocalServer (int port)
 
void startLocalServer (const char *path)
 
void unlock (void)
 Release the lock on the DQM net layer. More...
 
void updateToCollector (const std::string &host, int port)
 
virtual ~DQMNet (void)
 

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
 
static void packQualityData (std::string &into, const QReports &qr)
 
static bool setOrder (const CoreObject &a, const CoreObject &b)
 
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)
 

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
 
static const uint32_t DQM_MSG_HELLO = 0
 
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
 
static const uint32_t DQM_MSG_UPDATE_ME = 1
 
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
 
static const uint32_t DQM_PROP_DEAD = 0x00080000
 
static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000
 
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
 
static const uint32_t DQM_PROP_LUMI = 0x00040000
 
static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000
 
static const uint32_t DQM_PROP_NEW = 0x00010000
 
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
 
static const uint32_t DQM_PROP_REPORT_ALARM
 
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
 
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
 
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
 
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
 
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
 
static const uint32_t DQM_PROP_RESET = 0x00008000
 
static const uint32_t DQM_PROP_STALE = 0x00100000
 
static const uint32_t DQM_PROP_TAGGED = 0x00002000
 
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
 
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
 
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
 
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
 
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
 
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
 
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
 
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
 
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
 
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
 
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
 
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
 
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
 
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
 
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
 
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
 
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
 
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
 
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
 
static const uint32_t DQM_REPLY_LIST_END = 102
 
static const uint32_t DQM_REPLY_NONE = 103
 
static const uint32_t DQM_REPLY_OBJECT = 104
 
static const uint32_t MAX_PEER_WAITREQS = 128
 

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
 
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=nullptr)=0
 
virtual PeergetPeer (lat::Socket *s)=0
 
std::ostream & logme (void)
 
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
 
virtual void markObjectsDead (Peer *p)=0
 
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
 
virtual void purgeDeadObjects (Peer *p)=0
 
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
 
virtual void removePeer (Peer *p, lat::Socket *s)=0
 
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
 
virtual void sendObjectListToPeers (bool all)=0
 
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
 
virtual bool shouldStop (void)
 
void updateMask (Peer *p)
 
virtual void updatePeerMasks (void)=0
 
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
 
static void discard (Bucket *&b)
 

Protected Attributes

bool debug_
 
pthread_mutex_t lock_
 

Private Member Functions

 DQMNet (const DQMNet &)=delete
 
void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
 
bool onLocalNotify (lat::IOSelectEvent *ev)
 
bool onPeerConnect (lat::IOSelectEvent *ev)
 
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client. More...
 
DQMNetoperator= (const DQMNet &)=delete
 
void releaseFromWait (WaitList::iterator i, Object *o)
 
void releaseWaiters (const std::string &name, Object *o)
 
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server. More...
 

Private Attributes

std::string appname_
 
pthread_t communicate_
 
int delay_
 
AutoPeer downstream_
 
bool flush_
 
int pid_
 
lat::IOSelector sel_
 
lat::Socket * server_
 
sig_atomic_t shutdown_
 
AutoPeer upstream_
 
lat::Time version_
 
WaitList waiting_
 
lat::TimeSpan waitMax_
 
lat::TimeSpan waitStale_
 
lat::Pipe wakeup_
 

Detailed Description

Definition at line 23 of file DQMNet.h.

Member Typedef Documentation

typedef std::vector<unsigned char> DQMNet::DataBlob

Definition at line 81 of file DQMNet.h.

typedef std::vector<QValue> DQMNet::QReports

Definition at line 84 of file DQMNet.h.

typedef std::vector<uint32_t> DQMNet::TagList

Definition at line 85 of file DQMNet.h.

Definition at line 86 of file DQMNet.h.

Constructor & Destructor Documentation

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 1069 of file DQMNet.cc.

References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

1070  : debug_ (false),
1071  appname_ (appname.empty() ? "DQMNet" : appname.c_str()),
1072  pid_ (getpid()),
1073  server_ (nullptr),
1074  version_ (Time::current()),
1075  communicate_ ((pthread_t) -1),
1076  shutdown_ (0),
1077  delay_ (1000),
1078  waitStale_ (0, 0, 0, 0, 500000000 /* 500 ms */),
1079  waitMax_ (0, 0, 0, 5 /* seconds */, 0),
1080  flush_ (false)
1081 {
1082  // Create a pipe for the local DQM to tell the communicator
1083  // thread that local DQM data has changed and that the peers
1084  // should be notified.
1085  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
1086  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
1087 
1088  // Initialise the upstream and downstream to empty.
1089  upstream_.peer = downstream_.peer = nullptr;
1092  upstream_.update = downstream_.update = false;
1093 }
AutoPeer downstream_
Definition: DQMNet.h:343
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:1008
pthread_t communicate_
Definition: DQMNet.h:346
lat::Time version_
Definition: DQMNet.h:340
int delay_
Definition: DQMNet.h:349
AutoPeer upstream_
Definition: DQMNet.h:342
int pid_
Definition: DQMNet.h:335
std::string appname_
Definition: DQMNet.h:334
Peer * peer
Definition: DQMNet.h:154
sig_atomic_t shutdown_
Definition: DQMNet.h:347
lat::Time next
Definition: DQMNet.h:155
Definition: IOTypes.h:26
lat::Pipe wakeup_
Definition: DQMNet.h:339
bool debug_
Definition: DQMNet.h:318
bool flush_
Definition: DQMNet.h:352
lat::Socket * server_
Definition: DQMNet.h:338
#define O_NONBLOCK
Definition: SysFile.h:21
lat::TimeSpan waitMax_
Definition: DQMNet.h:351
lat::TimeSpan waitStale_
Definition: DQMNet.h:350
lat::IOSelector sel_
Definition: DQMNet.h:337
DQMNet::~DQMNet ( void  )
virtual

Definition at line 1095 of file DQMNet.cc.

1096 {
1097  // FIXME
1098 }
DQMNet::DQMNet ( const DQMNet )
privatedelete

Member Function Documentation

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
)
staticprotected

Definition at line 53 of file DQMNet.cc.

References DQMNet::Bucket::data.

Referenced by dqmhash(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

54 {
55  b->data.insert(b->data.end(),
56  (const unsigned char *)data,
57  (const unsigned char *)data + len);
58 }
double b
Definition: hdecay.h:120
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
virtual Peer* DQMNet::createPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 1103 of file DQMNet.cc.

References debug_.

1104 {
1105  debug_ = doit;
1106 }
bool debug_
Definition: DQMNet.h:318
void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 1111 of file DQMNet.cc.

References delay_.

1112 {
1113  delay_ = delay;
1114 }
int delay_
Definition: DQMNet.h:349
void delay(int delay)
Definition: DQMNet.cc:1111
void DQMNet::discard ( Bucket *&  b)
staticprotected

Definition at line 62 of file DQMNet.cc.

References b, GetRecoTauVFromDQM_MC_cff::next, and DQMNet::Bucket::next.

Referenced by dqmhash(), and OrderedSet.OrderedSet::pop().

63 {
64  while (b)
65  {
66  Bucket *next = b->next;
67  delete b;
68  b = next;
69  }
70 }
double b
Definition: hdecay.h:120
static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
)
inlinestatic

Definition at line 217 of file DQMNet.h.

References a, Vispa.Plugins.EdmBrowser.EdmDataAccessor::all(), b, EnergyCorrector::c, hitfit::clear(), copydata(), createPeer(), data, discard(), dqmhashfinal, dqmhashmix, findObject(), flags, getPeer(), info(), gen::k, logme(), makeObject(), markObjectsDead(), mps_alisetup::msg, dataset::name, connectstrParser::o, onMessage(), AlCaHLTBitMon_ParallelJobs::p, packQualityData(), purgeDeadObjects(), releaseFromWait(), removePeer(), alignCSCRings::s, sendObjectListToPeer(), sendObjectListToPeers(), sendObjectToPeer(), shouldStop(), AlCaHLTBitMon_QueryRunRegistry::string, unpackQualityData(), updateMask(), updatePeerMasks(), w, and waitForData().

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().

218  {
219  // Reduced version of Bob Jenkins' hash function at:
220  // http://www.burtleburtle.net/bob/c/lookup3.c
221 # define dqmhashrot(x,k) (((x)<<(k)) | ((x)>>(32-(k))))
222 # define dqmhashmix(a,b,c) { \
223  a -= c; a ^= dqmhashrot(c, 4); c += b; \
224  b -= a; b ^= dqmhashrot(a, 6); a += c; \
225  c -= b; c ^= dqmhashrot(b, 8); b += a; \
226  a -= c; a ^= dqmhashrot(c,16); c += b; \
227  b -= a; b ^= dqmhashrot(a,19); a += c; \
228  c -= b; c ^= dqmhashrot(b, 4); b += a; }
229 # define dqmhashfinal(a,b,c) { \
230  c ^= b; c -= dqmhashrot(b,14); \
231  a ^= c; a -= dqmhashrot(c,11); \
232  b ^= a; b -= dqmhashrot(a,25); \
233  c ^= b; c -= dqmhashrot(b,16); \
234  a ^= c; a -= dqmhashrot(c,4); \
235  b ^= a; b -= dqmhashrot(a,14); \
236  c ^= b; c -= dqmhashrot(b,24); }
237 
238  uint32_t a, b, c;
239  a = b = c = 0xdeadbeef + (uint32_t) keylen;
240  const unsigned char *k = (const unsigned char *) key;
241 
242  // all but the last block: affect some bits of (a, b, c)
243  while (keylen > 12)
244  {
245  a += k[0];
246  a += ((uint32_t)k[1]) << 8;
247  a += ((uint32_t)k[2]) << 16;
248  a += ((uint32_t)k[3]) << 24;
249  b += k[4];
250  b += ((uint32_t)k[5]) << 8;
251  b += ((uint32_t)k[6]) << 16;
252  b += ((uint32_t)k[7]) << 24;
253  c += k[8];
254  c += ((uint32_t)k[9]) << 8;
255  c += ((uint32_t)k[10]) << 16;
256  c += ((uint32_t)k[11]) << 24;
257  dqmhashmix(a,b,c);
258  keylen -= 12;
259  k += 12;
260  }
261 
262  // last block: affect all 32 bits of (c); all case statements fall through
263  switch (keylen)
264  {
265  case 12: c += ((uint32_t)k[11]) << 24;
266  case 11: c += ((uint32_t)k[10]) << 16;
267  case 10: c += ((uint32_t)k[9]) << 8;
268  case 9 : c += k[8];
269  case 8 : b += ((uint32_t)k[7]) << 24;
270  case 7 : b += ((uint32_t)k[6]) << 16;
271  case 6 : b += ((uint32_t)k[5]) << 8;
272  case 5 : b += k[4];
273  case 4 : a += ((uint32_t)k[3]) << 24;
274  case 3 : a += ((uint32_t)k[2]) << 16;
275  case 2 : a += ((uint32_t)k[1]) << 8;
276  case 1 : a += k[0];
277  break;
278  case 0 : return c;
279  }
280 
281  dqmhashfinal(a, b, c);
282  return c;
283 # undef dqmhashrot
284 # undef dqmhashmix
285 # undef dqmhashfinal
286  }
#define dqmhashmix(a, b, c)
int k[5][pyjets_maxn]
double b
Definition: hdecay.h:120
#define dqmhashfinal(a, b, c)
double a
Definition: hdecay.h:121
virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = nullptr 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

virtual Peer* DQMNet::getPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1222 of file DQMNet.cc.

References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

1223 {
1224  if (! upstream_.host.empty())
1225  {
1226  logme()
1227  << "ERROR: Already receiving data from another collector at "
1228  << upstream_.host << ":" << upstream_.port << std::endl;
1229  return;
1230  }
1231 
1232  upstream_.update = false;
1233  upstream_.host = host;
1234  upstream_.port = port;
1235 }
host
Definition: query.py:114
AutoPeer upstream_
Definition: DQMNet.h:342
port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:42
std::string host
Definition: DQMNet.h:156
void DQMNet::lock ( void  )

Acquire a lock on the DQM net layer.

Definition at line 1262 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

1263 {
1264  if (communicate_ != (pthread_t) -1)
1265  pthread_mutex_lock(&lock_);
1266 }
pthread_t communicate_
Definition: DQMNet.h:346
pthread_mutex_t lock_
Definition: DQMNet.h:319
std::ostream & DQMNet::logme ( void  )
protected

Definition at line 42 of file DQMNet.cc.

References gather_cfg::cout, and cmsPerfSuiteHarvest::now.

Referenced by dqmhash(), listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

43 {
44  Time now = Time::current();
45  return std::cout
46  << now.format(true, "%Y-%m-%d %H:%M:%S.")
47  << now.nanoformat(3, 3)
48  << " " << appname_ << "[" << pid_ << "]: ";
49 }
int pid_
Definition: DQMNet.h:335
std::string appname_
Definition: DQMNet.h:334
void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = nullptr 
)
private

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 77 of file DQMNet.cc.

References DQMNet::Peer::automatic, MillePedeFileConverter_cfg::e, mps_fire::i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.

81 {
82  if (reason)
83  logme ()
84  << reason << peer->peeraddr
85  << (err ? "; error was: " + err->explain() : std::string(""))
86  << std::endl;
87 
88  Socket *s = peer->socket;
89 
90  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
91  if (i->peer == peer)
92  waiting_.erase(i++);
93  else
94  ++i;
95 
96  if (ev)
97  ev->source = nullptr;
98 
99  discard(peer->sendq);
100  if (peer->automatic)
101  peer->automatic->peer = nullptr;
102 
103  sel_.detach (s);
104  s->close();
105  removePeer(peer, s);
106  delete s;
107 }
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
bool ev
std::ostream & logme(void)
Definition: DQMNet.cc:42
virtual void removePeer(Peer *p, lat::Socket *s)=0
WaitList waiting_
Definition: DQMNet.h:344
lat::IOSelector sel_
Definition: DQMNet.h:337
virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

virtual void DQMNet::markObjectsDead ( Peer p)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev)
private

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 1008 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.

Referenced by DQMNet().

1009 {
1010  // Discard the data in the pipe, we care only about the wakeup.
1011  try
1012  {
1013  IOSize sz;
1014  unsigned char buf [1024];
1015  while ((sz = ev->source->read(buf, sizeof(buf))))
1016  ;
1017  }
1018  catch (Error &e)
1019  {
1020  SystemError *next = dynamic_cast<SystemError *>(e.next());
1021  if (next && next->portable() == SysErr::ErrTryAgain)
1022  ; // Ignore it
1023  else
1024  logme()
1025  << "WARNING: error reading from notification pipe: "
1026  << e.explain() << std::endl;
1027  }
1028 
1029  // Tell the main event pump to send an update in a little while.
1030  flush_ = true;
1031 
1032  // We are never done, always keep going.
1033  return false;
1034 }
bool ev
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool flush_
Definition: DQMNet.h:352
size_t IOSize
Definition: IOTypes.h:14
bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
)
protectedvirtual

Definition at line 468 of file DQMNet.cc.

References DQMNet::Bucket::data, DQMNet::CoreObject::flags, flags, reco::if(), DQMNet::Object::lastreq, fileCollector::logme(), dataset::name, DQMNet::Peer::peeraddr, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::Peer::source, AlCaHLTBitMon_QueryRunRegistry::string, DQMNet::CoreObject::tag, DQMNet::Peer::update, DQMNet::Peer::updates, and DQMNet::CoreObject::version.

Referenced by dqmhash().

469 {
470  // Decode and process this message.
471  uint32_t type;
472  memcpy (&type, data + sizeof(uint32_t), sizeof (type));
473  switch (type)
474  {
475  case DQM_MSG_UPDATE_ME:
476  {
477  if (len != 2*sizeof(uint32_t))
478  {
479  logme()
480  << "ERROR: corrupt 'UPDATE_ME' message of length " << len
481  << " from peer " << p->peeraddr << std::endl;
482  return false;
483  }
484 
485  if (debug_)
486  logme()
487  << "DEBUG: received message 'UPDATE ME' from peer "
488  << p->peeraddr << ", size " << len << std::endl;
489 
490  p->update = true;
491  }
492  return true;
493 
495  {
496  if (debug_)
497  logme()
498  << "DEBUG: received message 'LIST OBJECTS' from peer "
499  << p->peeraddr << ", size " << len << std::endl;
500 
501  // Send over current status: list of known objects.
502  sendObjectListToPeer(msg, true, false);
503  }
504  return true;
505 
506  case DQM_MSG_GET_OBJECT:
507  {
508  if (debug_)
509  logme()
510  << "DEBUG: received message 'GET OBJECT' from peer "
511  << p->peeraddr << ", size " << len << std::endl;
512 
513  if (len < 3*sizeof(uint32_t))
514  {
515  logme()
516  << "ERROR: corrupt 'GET IMAGE' message of length " << len
517  << " from peer " << p->peeraddr << std::endl;
518  return false;
519  }
520 
521  uint32_t namelen;
522  memcpy (&namelen, data + 2*sizeof(uint32_t), sizeof(namelen));
523  if (len != 3*sizeof(uint32_t) + namelen)
524  {
525  logme()
526  << "ERROR: corrupt 'GET OBJECT' message of length " << len
527  << " from peer " << p->peeraddr
528  << ", expected length " << (3*sizeof(uint32_t))
529  << " + " << namelen << std::endl;
530  return false;
531  }
532 
533  std::string name ((char *) data + 3*sizeof(uint32_t), namelen);
534  Peer *owner = nullptr;
535  Object *o = findObject(nullptr, name, &owner);
536  if (o)
537  {
538  o->lastreq = Time::current().ns();
539  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE))
540  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
541  waitForData(p, name, "", owner);
542  else
543  sendObjectToPeer(msg, *o, true);
544  }
545  else
546  {
547  uint32_t words [3];
548  words[0] = sizeof(words) + name.size();
549  words[1] = DQM_REPLY_NONE;
550  words[2] = name.size();
551 
552  msg->data.reserve(msg->data.size() + words[0]);
553  copydata(msg, &words[0], sizeof(words));
554  copydata(msg, &name[0], name.size());
555  }
556  }
557  return true;
558 
560  {
561  if (len != 4*sizeof(uint32_t))
562  {
563  logme()
564  << "ERROR: corrupt 'LIST BEGIN' message of length " << len
565  << " from peer " << p->peeraddr << std::endl;
566  return false;
567  }
568 
569  // Get the update status: whether this is a full update.
570  uint32_t flags;
571  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
572 
573  if (debug_)
574  logme()
575  << "DEBUG: received message 'LIST BEGIN "
576  << (flags ? "FULL" : "INCREMENTAL")
577  << "' from " << p->peeraddr
578  << ", size " << len << std::endl;
579 
580  // If we are about to receive a full list of objects, flag all
581  // objects as possibly dead. Subsequent object notifications
582  // will undo this for the live objects. We cannot delete
583  // objects quite yet, as we may get inquiry from another client
584  // while we are processing the incoming list, so we keep the
585  // objects tentatively alive as long as we've not seen the end.
586  if (flags)
588  }
589  return true;
590 
591  case DQM_REPLY_LIST_END:
592  {
593  if (len != 4*sizeof(uint32_t))
594  {
595  logme()
596  << "ERROR: corrupt 'LIST END' message of length " << len
597  << " from peer " << p->peeraddr << std::endl;
598  return false;
599  }
600 
601  // Get the update status: whether this is a full update.
602  uint32_t flags;
603  memcpy(&flags, data + 3*sizeof(uint32_t), sizeof(uint32_t));
604 
605  // If we received a full list of objects, now purge all dead
606  // objects. We need to do this in two stages in case we receive
607  // updates in many parts, and end up sending updates to others in
608  // between; this avoids us lying live objects are dead.
609  if (flags)
611 
612  if (debug_)
613  logme()
614  << "DEBUG: received message 'LIST END "
615  << (flags ? "FULL" : "INCREMENTAL")
616  << "' from " << p->peeraddr
617  << ", size " << len << std::endl;
618 
619  // Indicate we have received another update from this peer.
620  // Also indicate we should flush to our clients.
621  flush_ = true;
622  p->updates++;
623  }
624  return true;
625 
626  case DQM_REPLY_OBJECT:
627  {
628  uint32_t words[9];
629  if (len < sizeof(words))
630  {
631  logme()
632  << "ERROR: corrupt 'OBJECT' message of length " << len
633  << " from peer " << p->peeraddr << std::endl;
634  return false;
635  }
636 
637  memcpy (&words[0], data, sizeof(words));
638  uint32_t &namelen = words[6];
639  uint32_t &datalen = words[7];
640  uint32_t &qlen = words[8];
641 
642  if (len != sizeof(words) + namelen + datalen + qlen)
643  {
644  logme()
645  << "ERROR: corrupt 'OBJECT' message of length " << len
646  << " from peer " << p->peeraddr
647  << ", expected length " << sizeof(words)
648  << " + " << namelen
649  << " + " << datalen
650  << " + " << qlen
651  << std::endl;
652  return false;
653  }
654 
655  unsigned char *namedata = data + sizeof(words);
656  unsigned char *objdata = namedata + namelen;
657  unsigned char *qdata = objdata + datalen;
658  unsigned char *enddata = qdata + qlen;
659  std::string name ((char *) namedata, namelen);
660  assert (enddata == data + len);
661 
662  if (debug_)
663  logme()
664  << "DEBUG: received message 'OBJECT " << name
665  << "' from " << p->peeraddr
666  << ", size " << len << std::endl;
667 
668  // Mark the peer as a known object source.
669  p->source = true;
670 
671  // Initialise or update an object entry.
672  Object *o = findObject(p, name);
673  if (! o)
674  o = makeObject(p, name);
675 
676  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
677  o->tag = words[5];
678  o->version = ((uint64_t) words[4] << 32 | words[3]);
679  o->scalar.clear();
680  o->qdata.clear();
681  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
682  {
683  o->rawdata.clear();
684  o->scalar.insert(o->scalar.end(), objdata, qdata);
685  }
686  else if (datalen)
687  {
688  o->rawdata.clear();
689  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
690  }
691  else if (! o->rawdata.empty())
692  o->flags |= DQM_PROP_STALE;
693  o->qdata.insert(o->qdata.end(), qdata, enddata);
694 
695  // If we had an object for this one already and this is a list
696  // update without data, issue an immediate data get request.
697  if (o->lastreq
698  && ! datalen
699  && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
700  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
701 
702  // If we have the object data, release from wait.
703  if (datalen)
704  releaseWaiters(name, o);
705  }
706  return true;
707 
708  case DQM_REPLY_NONE:
709  {
710  uint32_t words[3];
711  if (len < sizeof(words))
712  {
713  logme()
714  << "ERROR: corrupt 'NONE' message of length " << len
715  << " from peer " << p->peeraddr << std::endl;
716  return false;
717  }
718 
719  memcpy (&words[0], data, sizeof(words));
720  uint32_t &namelen = words[2];
721 
722  if (len != sizeof(words) + namelen)
723  {
724  logme()
725  << "ERROR: corrupt 'NONE' message of length " << len
726  << " from peer " << p->peeraddr
727  << ", expected length " << sizeof(words)
728  << " + " << namelen << std::endl;
729  return false;
730  }
731 
732  unsigned char *namedata = data + sizeof(words);
733  std::string name((char *) namedata, namelen);
734 
735  if (debug_)
736  logme()
737  << "DEBUG: received message 'NONE " << name
738  << "' from " << p->peeraddr
739  << ", size " << len << std::endl;
740 
741  // Mark the peer as a known object source.
742  p->source = true;
743 
744  // If this was a known object, kill it.
745  if (Object *o = findObject(p, name))
746  {
747  o->flags |= DQM_PROP_DEAD;
749  }
750 
751  // If someone was waiting for this, let them go.
752  releaseWaiters(name, nullptr);
753  }
754  return true;
755 
756  default:
757  logme()
758  << "ERROR: unrecognised message of length " << len
759  << " and type " << type << " from peer " << p->peeraddr
760  << std::endl;
761  return false;
762  }
763 }
type
Definition: HCALResponse.h:21
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:73
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:27
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:164
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:62
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:131
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
std::ostream & logme(void)
Definition: DQMNet.cc:42
virtual void markObjectsDead(Peer *p)=0
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:75
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:418
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:70
unsigned long long uint64_t
Definition: Time.h:15
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:72
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:318
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
if(dp >Float(M_PI)) dp-
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:74
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
bool flush_
Definition: DQMNet.h:352
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:59
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:26
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:60
bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev)
private

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 951 of file DQMNet.cc.

References IORead, IOUrgent, DTRecHitQuality_cfi::local, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::socket, AlCaHLTBitMon_QueryRunRegistry::string, and relativeConstraints::value.

Referenced by startLocalServer().

952 {
953  // Recover the server socket.
954  assert (ev->source == server_);
955 
956  // Accept the connection.
957  Socket *s = server_->accept();
958  assert (s);
959  assert (! s->isBlocking());
960 
961  // Record it to our list of peers.
962  lock();
963  Peer *p = createPeer(s);
964  std::string localaddr;
965  if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
966  {
967  InetAddress peeraddr = inet->peername();
968  InetAddress myaddr = inet->sockname();
969  p->peeraddr = StringFormat("%1:%2")
970  .arg(peeraddr.hostname())
971  .arg(peeraddr.port()).value();
972  localaddr = StringFormat("%1:%2")
973  .arg(myaddr.hostname())
974  .arg(myaddr.port()).value();
975  }
976  else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
977  {
978  p->peeraddr = local->peername().path();
979  localaddr = local->sockname().path();
980  }
981  else
982  assert(false);
983 
984  p->mask = IORead|IOUrgent;
985  p->socket = s;
986 
987  // Report the new connection.
988  if (debug_)
989  logme()
990  << "INFO: new peer " << p->peeraddr << " is now connected to "
991  << localaddr << std::endl;
992 
993  // Attach it to the listener.
994  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
995  unlock();
996 
997  // We are never done.
998  return false;
999 }
virtual Peer * createPeer(lat::Socket *s)=0
bool ev
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1262
A arg
Definition: Factorize.h:37
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:768
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1270
Definition: IOTypes.h:26
bool debug_
Definition: DQMNet.h:318
lat::Socket * server_
Definition: DQMNet.h:338
lat::IOSelector sel_
Definition: DQMNet.h:337
bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
)
private

Handle communication to a particular client.

Definition at line 768 of file DQMNet.cc.

References DQMNet::Peer::automatic, b, data, DQMNet::Bucket::data, MillePedeFileConverter_cfg::e, DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, MESSAGE_SIZE_LIMIT, mps_alisetup::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, DQMNet::Peer::peeraddr, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, and DQMNet::Peer::waiting.

Referenced by onPeerConnect(), and run().

769 {
770  lock();
771  assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
772 
773  // If there is a problem with the peer socket, discard the peer
774  // and tell the selector to stop prcessing events for it. If
775  // this is a server connection, we will eventually recreate
776  // everything if/when the data server comes back.
777  if (ev->events & IOUrgent)
778  {
779  if (p->automatic)
780  {
781  logme()
782  << "WARNING: connection to the DQM server at " << p->peeraddr
783  << " lost (will attempt to reconnect in 15 seconds)\n";
784  losePeer(nullptr, p, ev);
785  }
786  else
787  losePeer("WARNING: lost peer connection ", p, ev);
788 
789  unlock();
790  return true;
791  }
792 
793  // If we can write to the peer socket, pump whatever we can into it.
794  if (ev->events & IOWrite)
795  {
796  while (Bucket *b = p->sendq)
797  {
798  IOSize len = b->data.size() - p->sendpos;
799  const void *data = (len ? (const void *)&b->data[p->sendpos]
800  : (const void *)&data);
801  IOSize done;
802 
803  try
804  {
805  done = (len ? ev->source->write (data, len) : 0);
806  if (debug_ && len)
807  logme()
808  << "DEBUG: sent " << done << " bytes to peer "
809  << p->peeraddr << std::endl;
810  }
811  catch (Error &e)
812  {
813  losePeer("WARNING: unable to write to peer ", p, ev, &e);
814  unlock();
815  return true;
816  }
817 
818  p->sendpos += done;
819  if (p->sendpos == b->data.size())
820  {
821  Bucket *old = p->sendq;
822  p->sendq = old->next;
823  p->sendpos = 0;
824  old->next = nullptr;
825  discard(old);
826  }
827 
828  if (! done && len)
829  // Cannot write any more.
830  break;
831  }
832  }
833 
834  // If there is data to be read from the peer, first receive what we
835  // can get out the socket, the process all complete requests.
836  if (ev->events & IORead)
837  {
838  // First build up the incoming buffer of data in the socket.
839  // Remember the last size returned by the socket; we need
840  // it to determine if the remote end closed the connection.
841  IOSize sz;
842  try
843  {
844  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
845  do
846  if ((sz = ev->source->read(&buf[0], buf.size())))
847  {
848  if (debug_)
849  logme()
850  << "DEBUG: received " << sz << " bytes from peer "
851  << p->peeraddr << std::endl;
852  DataBlob &data = p->incoming;
853  if (data.capacity () < data.size () + sz)
854  data.reserve (data.size() + SOCKET_READ_GROWTH);
855  data.insert (data.end(), &buf[0], &buf[0] + sz);
856  }
857  while (sz == sizeof (buf));
858  }
859  catch (Error &e)
860  {
861  SystemError *next = dynamic_cast<SystemError *>(e.next());
862  if (next && next->portable() == SysErr::ErrTryAgain)
863  sz = 1; // Ignore it, and fake no end of data.
864  else
865  {
866  // Houston we have a problem.
867  losePeer("WARNING: failed to read from peer ", p, ev, &e);
868  unlock();
869  return true;
870  }
871  }
872 
873  // Process fully received messages as long as we can.
874  size_t consumed = 0;
875  DataBlob &data = p->incoming;
876  while (data.size()-consumed >= sizeof(uint32_t)
877  && p->waiting < MAX_PEER_WAITREQS)
878  {
879  uint32_t msglen;
880  memcpy (&msglen, &data[0]+consumed, sizeof(msglen));
881 
882  if (msglen >= MESSAGE_SIZE_LIMIT)
883  {
884  losePeer("WARNING: excessively large message from ", p, ev);
885  unlock();
886  return true;
887  }
888 
889  if (data.size()-consumed >= msglen)
890  {
891  bool valid = true;
892  if (msglen < 2*sizeof(uint32_t))
893  {
894  logme()
895  << "ERROR: corrupt peer message of length " << msglen
896  << " from peer " << p->peeraddr << std::endl;
897  valid = false;
898  }
899  else
900  {
901  // Decode and process this message.
902  Bucket msg;
903  msg.next = nullptr;
904  valid = onMessage(&msg, p, &data[0]+consumed, msglen);
905 
906  // If we created a response, chain it to the write queue.
907  if (! msg.data.empty())
908  {
909  Bucket **prev = &p->sendq;
910  while (*prev)
911  prev = &(*prev)->next;
912 
913  *prev = new Bucket;
914  (*prev)->next = nullptr;
915  (*prev)->data.swap(msg.data);
916  }
917  }
918 
919  if (! valid)
920  {
921  losePeer("WARNING: data stream error with ", p, ev);
922  unlock();
923  return true;
924  }
925 
926  consumed += msglen;
927  }
928  else
929  break;
930  }
931 
932  data.erase(data.begin(), data.begin()+consumed);
933 
934  // If the client has closed the connection, shut down our end. If
935  // we have something to send back still, leave the write direction
936  // open. Otherwise close the shop for this client.
937  if (sz == 0)
938  sel_.setMask(p->socket, p->mask &= ~IORead);
939  }
940 
941  // Yes, please keep processing events for this socket.
942  unlock();
943  return false;
944 }
virtual Peer * getPeer(lat::Socket *s)=0
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
bool ev
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1262
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:468
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:81
std::ostream & logme(void)
Definition: DQMNet.cc:42
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:77
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1270
Definition: IOTypes.h:26
double b
Definition: hdecay.h:120
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:77
bool debug_
Definition: DQMNet.h:318
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
lat::IOSelector sel_
Definition: DQMNet.h:337
DQMNet& DQMNet::operator= ( const DQMNet )
privatedelete
void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
)
static

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 177 of file DQMNet.cc.

Referenced by dqmhash(), DQMService::flushStandalone(), and MonitorElement::packQualityData().

178 {
179  char buf[64];
180  std::ostringstream qrs;
181  QReports::const_iterator qi, qe;
182  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
183  {
184  int pos = 0;
185  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
186  qrs << buf << '\0'
187  << buf+pos << '\0'
188  << qi->qtname << '\0'
189  << qi->algorithm << '\0'
190  << qi->message << '\0'
191  << '\0';
192  }
193  into = qrs.str();
194 }
virtual void DQMNet::purgeDeadObjects ( Peer p)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
)
protectedvirtual

Definition at line 397 of file DQMNet.cc.

References DQMNet::Bucket::data, and DQMNet::WaitObject::name.

Referenced by dqmhash(), and run().

398 {
399  if (o)
400  sendObjectToPeer(msg, *o, true);
401  else
402  {
403  uint32_t words [3];
404  words[0] = sizeof(words) + w.name.size();
405  words[1] = DQM_REPLY_NONE;
406  words[2] = w.name.size();
407 
408  msg->data.reserve(msg->data.size() + words[0]);
409  copydata(msg, &words[0], sizeof(words));
410  copydata(msg, &w.name[0], w.name.size());
411  }
412 }
const double w
Definition: UKUtility.cc:23
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:418
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:74
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
)
private

Definition at line 147 of file DQMNet.cc.

References mps_alisetup::msg, and DQMNet::Bucket::next.

148 {
149  Bucket **msg = &i->peer->sendq;
150  while (*msg)
151  msg = &(*msg)->next;
152  *msg = new Bucket;
153  (*msg)->next = nullptr;
154 
155  releaseFromWait(*msg, *i, o);
156 
157  assert(i->peer->waiting > 0);
158  i->peer->waiting--;
159  waiting_.erase(i);
160 }
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:397
WaitList waiting_
Definition: DQMNet.h:344
void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
)
private

Definition at line 164 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, and mps_fire::i.

165 {
166  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
167  if (i->name == name)
168  releaseFromWait(i++, o);
169  else
170  ++i;
171 }
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:397
WaitList waiting_
Definition: DQMNet.h:344
virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
)
private

Queue an object request to the data server.

Definition at line 111 of file DQMNet.cc.

References mps_alisetup::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.

112 {
113  // Issue request to peer.
114  Bucket **msg = &p->sendq;
115  while (*msg)
116  msg = &(*msg)->next;
117  *msg = new Bucket;
118  (*msg)->next = nullptr;
119 
120  uint32_t words[3];
121  words[0] = sizeof(words) + len;
122  words[1] = DQM_MSG_GET_OBJECT;
123  words[2] = len;
124  copydata(*msg, words, sizeof(words));
125  copydata(*msg, name, len);
126 }
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:70
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
void DQMNet::run ( void  )

Run the actual I/O processing loop.

Definition at line 1295 of file DQMNet.cc.

References DQMNet::Peer::automatic, copydata(), createPeer(), debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, MillePedeFileConverter_cfg::e, findObject(), DQMNet::CoreObject::flags, flush_, DQMNet::AutoPeer::host, mps_fire::i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, cmsPerfSuiteHarvest::now, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, DQMNet::AutoPeer::port, DQMNet::Object::rawdata, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, unlock(), DQMNet::Peer::update, DQMNet::AutoPeer::update, updatePeerMasks(), upstream_, relativeConstraints::value, waiting_, waitMax_, and waitStale_.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

1296 {
1297  Time now;
1298  Time nextFlush = 0;
1299  AutoPeer *automatic[2] = { &upstream_, &downstream_ };
1300 
1301  // Perform I/O. Every once in a while flush updates to peers.
1302  while (! shouldStop())
1303  {
1304  for (int i = 0; i < 2; ++i)
1305  {
1306  AutoPeer *ap = automatic[i];
1307 
1308  // If we need a server connection and don't have one yet,
1309  // initiate asynchronous connection creation. Swallow errors
1310  // in case the server won't talk to us.
1311  if (! ap->host.empty()
1312  && ! ap->peer
1313  && (now = Time::current()) > ap->next)
1314  {
1315  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1316  InetSocket *s = nullptr;
1317  try
1318  {
1319  InetAddress addr(ap->host.c_str(), ap->port);
1320  s = new InetSocket (SOCK_STREAM, 0, addr.family());
1321  s->setBlocking(false);
1322  s->connect(addr);
1323  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1324  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1325  }
1326  catch (Error &e)
1327  {
1328  SystemError *sys = dynamic_cast<SystemError *>(e.next());
1329  if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
1330  {
1331  // "In progress" just means the connection is in progress.
1332  // The connection is ready when the socket is writeable.
1333  // Anything else is a real problem.
1334  if (s)
1335  s->abort();
1336  delete s;
1337  s = nullptr;
1338  }
1339  }
1340 
1341  // Set up with the selector if we were successful. If this is
1342  // the upstream collector, queue a request for updates.
1343  if (s)
1344  {
1345  Peer *p = createPeer(s);
1346  ap->peer = p;
1347 
1348  InetAddress peeraddr = ((InetSocket *) s)->peername();
1349  InetAddress myaddr = ((InetSocket *) s)->sockname();
1350  p->peeraddr = StringFormat("%1:%2")
1351  .arg(peeraddr.hostname())
1352  .arg(peeraddr.port()).value();
1353  p->mask = IORead|IOWrite|IOUrgent;
1354  p->update = ap->update;
1355  p->automatic = ap;
1356  p->socket = s;
1357  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1358  if (ap == &upstream_)
1359  {
1360  uint32_t words[4] = { 2*sizeof(uint32_t), DQM_MSG_LIST_OBJECTS,
1361  2*sizeof(uint32_t), DQM_MSG_UPDATE_ME };
1362  p->sendq = new Bucket;
1363  p->sendq->next = nullptr;
1364  copydata(p->sendq, words, sizeof(words));
1365  }
1366 
1367  // Report the new connection.
1368  if (debug_)
1369  logme()
1370  << "INFO: now connected to " << p->peeraddr << " from "
1371  << myaddr.hostname() << ":" << myaddr.port() << std::endl;
1372  }
1373  }
1374  }
1375 
1376  // Pump events for a while.
1377  sel_.dispatch(delay_);
1378  now = Time::current();
1379  lock();
1380 
1381  // Check if flush is required. Flush only if one is needed.
1382  // Always sends the full object list, but only rarely.
1383  if (flush_ && now > nextFlush)
1384  {
1385  flush_ = false;
1386  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1387  sendObjectListToPeers(true);
1388  }
1389 
1390  // Update the data server and peer selection masks. If we
1391  // have no more data to send and listening for writes, remove
1392  // the write mask. If we have something to write and aren't
1393  // listening for writes, start listening so we can send off
1394  // the data.
1395  updatePeerMasks();
1396 
1397  // Release peers that have been waiting for data for too long.
1398  Time waitold = now - waitMax_;
1399  Time waitstale = now - waitStale_;
1400  for (WaitList::iterator i = waiting_.begin(), e = waiting_.end(); i != e; )
1401  {
1402  Object *o = findObject(nullptr, i->name);
1403 
1404  // If we have (stale) object data, wait only up to stale limit.
1405  // Otherwise if we have no data at all, wait up to the max limit.
1406  if (i->time < waitold)
1407  {
1408  logme ()
1409  << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9)
1410  << "s to retrieval, releasing '" << i->name << "' from wait, have "
1411  << (o ? o->rawdata.size() : 0) << " data available\n";
1412  releaseFromWait(i++, o);
1413  }
1414  else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE))
1415  {
1416  logme ()
1417  << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9)
1418  << "s to update, releasing '" << i->name << "' from wait, have "
1419  << o->rawdata.size() << " data available\n";
1420  releaseFromWait(i++, o);
1421  }
1422 
1423  // Keep it for now.
1424  else
1425  ++i;
1426  }
1427 
1428  unlock();
1429  }
1430 }
AutoPeer downstream_
Definition: DQMNet.h:343
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:68
virtual bool shouldStop(void)
Definition: DQMNet.cc:389
virtual void sendObjectListToPeers(bool all)=0
int delay_
Definition: DQMNet.h:349
virtual Peer * createPeer(lat::Socket *s)=0
void lock(void)
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1262
AutoPeer upstream_
Definition: DQMNet.h:342
A arg
Definition: Factorize.h:37
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:768
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:397
void unlock(void)
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1270
lat::Time next
Definition: DQMNet.h:155
Definition: IOTypes.h:26
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:63
virtual void updatePeerMasks(void)=0
bool debug_
Definition: DQMNet.h:318
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:69
bool flush_
Definition: DQMNet.h:352
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
lat::TimeSpan waitMax_
Definition: DQMNet.h:351
lat::TimeSpan waitStale_
Definition: DQMNet.h:350
WaitList waiting_
Definition: DQMNet.h:344
lat::IOSelector sel_
Definition: DQMNet.h:337
void DQMNet::sendLocalChanges ( void  )

Definition at line 1435 of file DQMNet.cc.

References wakeup_.

Referenced by DQMImplNet< DQMNet::Object >::removePeer().

1436 {
1437  char byte = 0;
1438  wakeup_.sink()->write(&byte, 1);
1439 }
lat::Pipe wakeup_
Definition: DQMNet.h:339
virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
)
protectedpure virtual
virtual void DQMNet::sendObjectListToPeers ( bool  all)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
)
protectedvirtual

Definition at line 418 of file DQMNet.cc.

References DQMNet::Bucket::data, DQMNet::CoreObject::dirname, DQMNet::CoreObject::flags, flags, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::CoreObject::tag, and DQMNet::CoreObject::version.

Referenced by dqmhash(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

419 {
420  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
421  DataBlob objdata;
422 
423  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
424  objdata.insert(objdata.end(),
425  &o.scalar[0],
426  &o.scalar[0] + o.scalar.size());
427  else if (data)
428  objdata.insert(objdata.end(),
429  &o.rawdata[0],
430  &o.rawdata[0] + o.rawdata.size());
431 
432  uint32_t words [9];
433  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
434  uint32_t datalen = objdata.size();
435  uint32_t qlen = o.qdata.size();
436 
437  if (o.dirname->empty())
438  --namelen;
439 
440  words[0] = 9*sizeof(uint32_t) + namelen + datalen + qlen;
441  words[1] = DQM_REPLY_OBJECT;
442  words[2] = flags;
443  words[3] = (o.version >> 0 ) & 0xffffffff;
444  words[4] = (o.version >> 32) & 0xffffffff;
445  words[5] = o.tag;
446  words[6] = namelen;
447  words[7] = datalen;
448  words[8] = qlen;
449 
450  msg->data.reserve(msg->data.size() + words[0]);
451  copydata(msg, &words[0], 9*sizeof(uint32_t));
452  if (namelen)
453  {
454  copydata(msg, &(*o.dirname)[0], o.dirname->size());
455  if (! o.dirname->empty())
456  copydata(msg, "/", 1);
457  copydata(msg, &o.objname[0], o.objname.size());
458  }
459  if (datalen)
460  copydata(msg, &objdata[0], datalen);
461  if (qlen)
462  copydata(msg, &o.qdata[0], qlen);
463 }
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:27
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:62
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:81
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:75
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:26
static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
)
inlinestatic

Definition at line 180 of file DQMNet.h.

References DQMNet::CoreObject::dirname, DQMNet::CoreObject::lumi, DQMNet::CoreObject::moduleId, DQMNet::CoreObject::objname, DQMNet::CoreObject::run, and DQMNet::CoreObject::streamId.

Referenced by MonitorElement::operator<().

181  {
182  if (a.run == b.run) {
183  if (a.lumi == b.lumi) {
184  if (a.streamId == b.streamId) {
185  if (a.moduleId == b.moduleId) {
186  if (*a.dirname == *b.dirname) {
187  return a.objname < b.objname;
188  }
189  return *a.dirname < *b.dirname;
190  }
191  return a.moduleId < b.moduleId;
192  }
193  return a.streamId < b.streamId;
194  }
195  return a.lumi < b.lumi;
196  }
197  return a.run < b.run;
198  }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
bool DQMNet::shouldStop ( void  )
protectedvirtual

Definition at line 389 of file DQMNet.cc.

Referenced by dqmhash(), and run().

390 {
391  return shutdown_;
392 }
sig_atomic_t shutdown_
Definition: DQMNet.h:347
void DQMNet::shutdown ( void  )

Stop the network layer and wait it to finish.

Definition at line 1239 of file DQMNet.cc.

References communicate_, and shutdown_.

1240 {
1241  shutdown_ = 1;
1242  if (communicate_ != (pthread_t) -1)
1243  pthread_join(communicate_, nullptr);
1244 }
pthread_t communicate_
Definition: DQMNet.h:346
sig_atomic_t shutdown_
Definition: DQMNet.h:347
void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 1121 of file DQMNet.cc.

References ntuplemaker::time, and waitStale_.

1122 {
1123  waitStale_ = time;
1124 }
lat::TimeSpan waitStale_
Definition: DQMNet.h:350
void DQMNet::start ( void  )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1280 of file DQMNet.cc.

References communicate(), communicate_, lock_, and logme().

Referenced by progressbar.ProgressBar::__next__(), Types.LuminosityBlockRange::cppID(), and Types.EventRange::cppID().

1281 {
1282  if (communicate_ != (pthread_t) -1)
1283  {
1284  logme()
1285  << "ERROR: DQM networking thread has already been started\n";
1286  return;
1287  }
1288 
1289  pthread_mutex_init(&lock_, nullptr);
1290  pthread_create (&communicate_, nullptr, &communicate, this);
1291 }
pthread_t communicate_
Definition: DQMNet.h:346
pthread_mutex_t lock_
Definition: DQMNet.h:319
static void * communicate(void *obj)
Definition: DQMNet.cc:1251
std::ostream & logme(void)
Definition: DQMNet.cc:42
void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1130 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.

1131 {
1132  if (server_)
1133  {
1134  logme() << "ERROR: DQM server was already started.\n";
1135  return;
1136  }
1137 
1138  try
1139  {
1140  InetAddress addr("0.0.0.0", port);
1141  InetSocket *s = new InetSocket(SOCK_STREAM, 0, addr.family());
1142  s->bind(addr);
1143  s->listen(10);
1144  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1145  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1146  s->setBlocking(false);
1147  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1148  }
1149  catch (Error &e)
1150  {
1151  // FIXME: Do we need to do this when we throw an exception anyway?
1152  // FIXME: Abort instead?
1153  logme()
1154  << "ERROR: Failed to start server at port " << port << ": "
1155  << e.explain() << std::endl;
1156 
1157  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at port"
1158  " %d: %s", port, e.explain().c_str());
1159  }
1160 
1161  logme() << "INFO: DQM server started at port " << port << std::endl;
1162 }
port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:951
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:338
lat::IOSelector sel_
Definition: DQMNet.h:337
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11
void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 1168 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), sel_, server_, and SOCKET_BUF_SIZE.

1169 {
1170  if (server_)
1171  {
1172  logme() << "ERROR: DQM server was already started.\n";
1173  return;
1174  }
1175 
1176  try
1177  {
1178  server_ = new LocalServerSocket(path, 10);
1179  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1180  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1181  server_->setBlocking(false);
1182  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1183  }
1184  catch (Error &e)
1185  {
1186  // FIXME: Do we need to do this when we throw an exception anyway?
1187  // FIXME: Abort instead?
1188  logme()
1189  << "ERROR: Failed to start server at path " << path << ": "
1190  << e.explain() << std::endl;
1191 
1192  raiseDQMError("DQMNet::startLocalServer", "Failed to start server at path"
1193  " %s: %s", path, e.explain().c_str());
1194  }
1195 
1196  logme() << "INFO: DQM server started at path " << path << std::endl;
1197 }
std::ostream & logme(void)
Definition: DQMNet.cc:42
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:951
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:338
lat::IOSelector sel_
Definition: DQMNet.h:337
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:11
void DQMNet::unlock ( void  )

Release the lock on the DQM net layer.

Definition at line 1270 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by run().

1271 {
1272  if (communicate_ != (pthread_t) -1)
1273  pthread_mutex_unlock(&lock_);
1274 }
pthread_t communicate_
Definition: DQMNet.h:346
pthread_mutex_t lock_
Definition: DQMNet.h:319
void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
)
static

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 199 of file DQMNet.cc.

References DQMNet::QValue::algorithm, EnergyCorrector::c, DQMNet::QValue::code, MonitorElement::data_, DQMNet::CoreObject::dirname, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, fastjetJetProducer_validation_cfg::DQMStore, dqm::qstatus::ERROR, extractNextObject(), MonitorElement::Fill(), DQMNet::CoreObject::flags, fileCollector::logme(), DQMNet::QValue::message, dataset::name, connectstrParser::o, MuonAssociatorByHits_cfi::obj, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::CoreObject::qreports, DQMNet::QValue::qtname, DQMNet::QValue::qtresult, DQMNet::Object::rawdata, DQMNet::Object::scalar, dqm::qstatus::STATUS_OK, DQMNet::CoreObject::tag, and dqm::qstatus::WARNING.

Referenced by dqmhash().

200 {
201  const char *qdata = from;
202 
203  // Count how many qresults there are.
204  size_t nqv = 0;
205  while (*qdata)
206  {
207  ++nqv;
208  while (*qdata) ++qdata;
209  ++qdata;
210  while (*qdata) ++qdata;
211  ++qdata;
212  while (*qdata) ++qdata;
213  ++qdata;
214  while (*qdata) ++qdata;
215  ++qdata;
216  while (*qdata) ++qdata;
217  ++qdata;
218  }
219 
220  // Now extract the qreports.
221  qdata = from;
222  qr.reserve(nqv);
223  while (*qdata)
224  {
225  qr.push_back(DQMNet::QValue());
226  DQMNet::QValue &qv = qr.back();
227 
228  qv.code = atoi(qdata);
229  while (*qdata) ++qdata;
230  switch (qv.code)
231  {
233  break;
236  break;
237  case dqm::qstatus::ERROR:
239  break;
240  default:
242  break;
243  }
244 
245  qv.qtresult = atof(++qdata);
246  while (*qdata) ++qdata;
247 
248  qv.qtname = ++qdata;
249  while (*qdata) ++qdata;
250 
251  qv.algorithm = ++qdata;
252  while (*qdata) ++qdata;
253 
254  qv.message = ++qdata;
255  while (*qdata) ++qdata;
256  ++qdata;
257  }
258 }
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:48
std::string algorithm
Definition: DQMNet.h:94
static const int WARNING
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:47
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:49
std::string qtname
Definition: DQMNet.h:93
std::string message
Definition: DQMNet.h:92
static const int STATUS_OK
float qtresult
Definition: DQMNet.h:91
static const int ERROR
void DQMNet::updateMask ( Peer p)
protected

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 1039 of file DQMNet.cc.

References IOUrgent, IOWrite, fileCollector::logme(), DQMNet::Peer::mask, DQMNet::Peer::peeraddr, DQMNet::Peer::sendq, DQMNet::Peer::socket, and DQMNet::Peer::waiting.

Referenced by dqmhash(), and DQMImplNet< DQMNet::Object >::updatePeerMasks().

1040 {
1041  if (! p->socket)
1042  return;
1043 
1044  // Listen to writes iff we have data to send.
1045  unsigned oldmask = p->mask;
1046  if (! p->sendq && (p->mask & IOWrite))
1047  sel_.setMask(p->socket, p->mask &= ~IOWrite);
1048 
1049  if (p->sendq && ! (p->mask & IOWrite))
1050  sel_.setMask(p->socket, p->mask |= IOWrite);
1051 
1052  if (debug_ && oldmask != p->mask)
1053  logme()
1054  << "DEBUG: updating mask for " << p->peeraddr << " to "
1055  << p->mask << " from " << oldmask << std::endl;
1056 
1057  // If we have nothing more to send and are no longer listening
1058  // for reads, close up the shop for this peer.
1059  if (p->mask == IOUrgent && ! p->waiting)
1060  {
1061  assert (! p->sendq);
1062  if (debug_)
1063  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
1064  losePeer(nullptr, p, nullptr);
1065  }
1066 }
std::ostream & logme(void)
Definition: DQMNet.cc:42
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:77
bool debug_
Definition: DQMNet.h:318
lat::IOSelector sel_
Definition: DQMNet.h:337
virtual void DQMNet::updatePeerMasks ( void  )
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1203 of file DQMNet.cc.

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

1204 {
1205  if (! downstream_.host.empty())
1206  {
1207  logme()
1208  << "ERROR: Already updating another collector at "
1209  << downstream_.host << ":" << downstream_.port << std::endl;
1210  return;
1211  }
1212 
1213  downstream_.update = true;
1214  downstream_.host = host;
1215  downstream_.port = port;
1216 }
AutoPeer downstream_
Definition: DQMNet.h:343
host
Definition: query.py:114
port
Definition: query.py:115
std::ostream & logme(void)
Definition: DQMNet.cc:42
std::string host
Definition: DQMNet.h:156
void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
)
protected

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 131 of file DQMNet.cc.

References info(), dataset::name, and DQMNet::Peer::waiting.

Referenced by dqmhash().

132 {
133  // FIXME: Should we automatically record which exact peer the waiter
134  // is expecting to deliver data so we know to release the waiter if
135  // the other peer vanishes? The current implementation stands a
136  // chance for the waiter to wait indefinitely -- although we do
137  // force terminate the wait after a while.
138  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
139  WaitObject wo = { Time::current(), name, info, p };
140  waiting_.push_back(wo);
141  p->waiting++;
142 }
static const TGPicture * info(bool iBackgroundIsBlack)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:111
WaitList waiting_
Definition: DQMNet.h:344

Member Data Documentation

std::string DQMNet::appname_
private

Definition at line 334 of file DQMNet.h.

pthread_t DQMNet::communicate_
private

Definition at line 346 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool DQMNet::debug_
protected

Definition at line 318 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

int DQMNet::delay_
private

Definition at line 349 of file DQMNet.h.

Referenced by delay(), and run().

AutoPeer DQMNet::downstream_
private

Definition at line 343 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3
static

Definition at line 70 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_HELLO = 0
static

Definition at line 67 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2
static

Definition at line 69 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1
static

Definition at line 68 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000
static

Definition at line 56 of file DQMNet.h.

Referenced by MonitorElement::isAccumulateEnabled(), and MonitorElement::setAccumulate().

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000
static
const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000
static
const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000
static

Definition at line 54 of file DQMNet.h.

Referenced by DQMStore::book(), DQMStore::extract(), and MonitorElement::initialise().

const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000
static
const uint32_t DQMNet::DQM_PROP_MARKTODELETE = 0x01000000
static

Definition at line 65 of file DQMNet.h.

Referenced by MonitorElement::markedToDelete(), and MonitorElement::markToDelete().

const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000
static
const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000
static

Definition at line 60 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM
static
Initial value:

Definition at line 50 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000
static

Definition at line 46 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100
static
const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00
static

Definition at line 45 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400
static
const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200
static
const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000
static

Definition at line 57 of file DQMNet.h.

Referenced by MonitorElement::resetMe(), and MonitorElement::setResetMe().

const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000
static

Definition at line 63 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000
static
const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050
static

Definition at line 43 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001
static

Definition at line 29 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000
static

Definition at line 28 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff
static

Definition at line 26 of file DQMNet.h.

Referenced by MonitorElement::kind().

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002
static

Definition at line 30 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f
static

Definition at line 27 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003
static

Definition at line 31 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012
static

Definition at line 34 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010
static

Definition at line 32 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011
static

Definition at line 33 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022
static

Definition at line 37 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020
static

Definition at line 35 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021
static

Definition at line 36 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032
static

Definition at line 40 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030
static

Definition at line 38 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031
static

Definition at line 39 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040
static

Definition at line 41 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041
static

Definition at line 42 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101
static

Definition at line 72 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102
static

Definition at line 73 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_NONE = 103
static

Definition at line 74 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104
static

Definition at line 75 of file DQMNet.h.

bool DQMNet::flush_
private

Definition at line 352 of file DQMNet.h.

Referenced by run().

pthread_mutex_t DQMNet::lock_
protected

Definition at line 319 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128
static

Definition at line 77 of file DQMNet.h.

int DQMNet::pid_
private

Definition at line 335 of file DQMNet.h.

lat::IOSelector DQMNet::sel_
private

Definition at line 337 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

lat::Socket* DQMNet::server_
private

Definition at line 338 of file DQMNet.h.

Referenced by startLocalServer().

sig_atomic_t DQMNet::shutdown_
private

Definition at line 347 of file DQMNet.h.

Referenced by shutdown().

AutoPeer DQMNet::upstream_
private

Definition at line 342 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

lat::Time DQMNet::version_
private

Definition at line 340 of file DQMNet.h.

WaitList DQMNet::waiting_
private

Definition at line 344 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitMax_
private

Definition at line 351 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitStale_
private

Definition at line 350 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

lat::Pipe DQMNet::wakeup_
private

Definition at line 339 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().