CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes
DQMNet Class Referenceabstract

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

Classes

struct  AutoPeer
 
struct  Bucket
 
struct  CoreObject
 
struct  HashEqual
 
struct  HashOp
 
struct  Object
 
struct  Peer
 
struct  QValue
 
struct  WaitObject
 

Public Types

using DataBlob = std::vector< unsigned char >
 
using QReports = std::vector< QValue >
 
using TagList = std::vector< uint32_t >
 
using WaitList = std::list< WaitObject >
 

Public Member Functions

void debug (bool doit)
 
void delay (int delay)
 
 DQMNet (const std::string &appname="")
 
 DQMNet (const DQMNet &)=delete
 
void listenToCollector (const std::string &host, int port)
 
void lock ()
 Acquire a lock on the DQM net layer. More...
 
DQMNetoperator= (const DQMNet &)=delete
 
void run ()
 
void sendLocalChanges ()
 
void shutdown ()
 Stop the network layer and wait it to finish. More...
 
void staleObjectWaitLimit (lat::TimeSpan time)
 
void start ()
 
void startLocalServer (int port)
 
void startLocalServer (const char *path)
 
void unlock ()
 Release the lock on the DQM net layer. More...
 
void updateToCollector (const std::string &host, int port)
 
virtual ~DQMNet ()
 

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
 
static void packQualityData (std::string &into, const QReports &qr)
 
static bool setOrder (const CoreObject &a, const CoreObject &b)
 
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)
 

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
 
static const uint32_t DQM_MSG_HELLO = 0
 
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
 
static const uint32_t DQM_MSG_UPDATE_ME = 1
 
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
 
static const uint32_t DQM_PROP_DEAD = 0x00080000
 
static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000
 
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
 
static const uint32_t DQM_PROP_LUMI = 0x00040000
 
static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000
 
static const uint32_t DQM_PROP_NEW = 0x00010000
 
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
 
static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
 
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
 
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
 
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
 
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
 
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
 
static const uint32_t DQM_PROP_RESET = 0x00008000
 
static const uint32_t DQM_PROP_STALE = 0x00100000
 
static const uint32_t DQM_PROP_TAGGED = 0x00002000
 
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
 
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
 
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
 
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
 
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
 
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
 
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
 
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
 
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
 
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
 
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
 
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
 
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
 
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
 
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
 
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
 
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
 
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
 
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
 
static const uint32_t DQM_REPLY_LIST_END = 102
 
static const uint32_t DQM_REPLY_NONE = 103
 
static const uint32_t DQM_REPLY_OBJECT = 104
 
static const uint32_t MAX_PEER_WAITREQS = 128
 

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
 
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=nullptr)=0
 
virtual PeergetPeer (lat::Socket *s)=0
 
std::ostream & logme ()
 
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
 
virtual void markObjectsDead (Peer *p)=0
 
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
 
virtual void purgeDeadObjects (Peer *p)=0
 
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
 
virtual void removePeer (Peer *p, lat::Socket *s)=0
 
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
 
virtual void sendObjectListToPeers (bool all)=0
 
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
 
virtual bool shouldStop ()
 
void updateMask (Peer *p)
 
virtual void updatePeerMasks ()=0
 
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
 
static void discard (Bucket *&b)
 

Protected Attributes

bool debug_
 
pthread_mutex_t lock_
 

Private Member Functions

void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
 
bool onLocalNotify (lat::IOSelectEvent *ev)
 
bool onPeerConnect (lat::IOSelectEvent *ev)
 
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client. More...
 
void releaseFromWait (WaitList::iterator i, Object *o)
 
void releaseWaiters (const std::string &name, Object *o)
 
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server. More...
 

Private Attributes

std::string appname_
 
pthread_t communicate_
 
int delay_
 
AutoPeer downstream_
 
bool flush_
 
int pid_
 
lat::IOSelector sel_
 
lat::Socket * server_
 
sig_atomic_t shutdown_
 
AutoPeer upstream_
 
lat::Time version_
 
WaitList waiting_
 
lat::TimeSpan waitMax_
 
lat::TimeSpan waitStale_
 
lat::Pipe wakeup_
 

Detailed Description

Definition at line 23 of file DQMNet.h.

Member Typedef Documentation

using DQMNet::DataBlob = std::vector<unsigned char>

Definition at line 80 of file DQMNet.h.

using DQMNet::QReports = std::vector<QValue>

Definition at line 81 of file DQMNet.h.

using DQMNet::TagList = std::vector<uint32_t>

Definition at line 82 of file DQMNet.h.

Definition at line 83 of file DQMNet.h.

Constructor & Destructor Documentation

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 897 of file DQMNet.cc.

References downstream_, IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

898  : debug_(false),
899  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
900  pid_(getpid()),
901  server_(nullptr),
902  version_(Time::current()),
903  communicate_((pthread_t)-1),
904  shutdown_(0),
905  delay_(1000),
906  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
907  waitMax_(0, 0, 0, 5 /* seconds */, 0),
908  flush_(false) {
909  // Create a pipe for the local DQM to tell the communicator
910  // thread that local DQM data has changed and that the peers
911  // should be notified.
912  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
913  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
914 
915  // Initialise the upstream and downstream to empty.
916  upstream_.peer = downstream_.peer = nullptr;
920 }
AutoPeer downstream_
Definition: DQMNet.h:366
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:847
pthread_t communicate_
Definition: DQMNet.h:369
lat::Time version_
Definition: DQMNet.h:363
int delay_
Definition: DQMNet.h:372
AutoPeer upstream_
Definition: DQMNet.h:365
int pid_
Definition: DQMNet.h:358
std::string appname_
Definition: DQMNet.h:357
Peer * peer
Definition: DQMNet.h:144
sig_atomic_t shutdown_
Definition: DQMNet.h:370
lat::Time next
Definition: DQMNet.h:145
Definition: IOTypes.h:26
lat::Pipe wakeup_
Definition: DQMNet.h:362
bool debug_
Definition: DQMNet.h:344
bool flush_
Definition: DQMNet.h:375
lat::Socket * server_
Definition: DQMNet.h:361
#define O_NONBLOCK
Definition: SysFile.h:21
lat::TimeSpan waitMax_
Definition: DQMNet.h:374
lat::TimeSpan waitStale_
Definition: DQMNet.h:373
lat::IOSelector sel_
Definition: DQMNet.h:360
DQMNet::~DQMNet ( )
virtual

Definition at line 922 of file DQMNet.cc.

922  {
923  // FIXME
924 }
DQMNet::DQMNet ( const DQMNet )
delete

Member Function Documentation

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
)
staticprotected

Definition at line 48 of file DQMNet.cc.

References DQMNet::Bucket::data.

Referenced by dqmhash(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

48  {
49  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
50 }
double b
Definition: hdecay.h:118
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
virtual Peer* DQMNet::createPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 928 of file DQMNet.cc.

References debug_.

Referenced by DQMService::DQMService().

928 { debug_ = doit; }
bool debug_
Definition: DQMNet.h:344
void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 932 of file DQMNet.cc.

References delay_.

932 { delay_ = delay; }
int delay_
Definition: DQMNet.h:372
void delay(int delay)
Definition: DQMNet.cc:932
void DQMNet::discard ( Bucket *&  b)
staticprotected

Definition at line 53 of file DQMNet.cc.

References b, GetRecoTauVFromDQM_MC_cff::next, and DQMNet::Bucket::next.

Referenced by dqmhash(), and OrderedSet.OrderedSet::pop().

53  {
54  while (b) {
55  Bucket *next = b->next;
56  delete b;
57  b = next;
58  }
59 }
double b
Definition: hdecay.h:118
static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
)
inlinestatic

Definition at line 199 of file DQMNet.h.

References a, Vispa.Plugins.EdmBrowser.EdmDataAccessor::all(), b, HltBtagPostValidation_cff::c, hitfit::clear(), copydata(), createPeer(), data, discard(), dqmhashfinal, dqmhashmix, findObject(), HLT_2018_cff::flags, getPeer(), info(), dqmdumpme::k, logme(), makeObject(), markObjectsDead(), mps_check::msg, Skims_PA_cff::name, EcalTangentSkim_cfg::o, onMessage(), AlCaHLTBitMon_ParallelJobs::p, packQualityData(), purgeDeadObjects(), releaseFromWait(), removePeer(), alignCSCRings::s, sendObjectListToPeer(), sendObjectListToPeers(), sendObjectToPeer(), shouldStop(), AlCaHLTBitMon_QueryRunRegistry::string, unpackQualityData(), updateMask(), updatePeerMasks(), w, and waitForData().

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().

199  {
200  // Reduced version of Bob Jenkins' hash function at:
201  // http://www.burtleburtle.net/bob/c/lookup3.c
202 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
203 #define dqmhashmix(a, b, c) \
204  { \
205  a -= c; \
206  a ^= dqmhashrot(c, 4); \
207  c += b; \
208  b -= a; \
209  b ^= dqmhashrot(a, 6); \
210  a += c; \
211  c -= b; \
212  c ^= dqmhashrot(b, 8); \
213  b += a; \
214  a -= c; \
215  a ^= dqmhashrot(c, 16); \
216  c += b; \
217  b -= a; \
218  b ^= dqmhashrot(a, 19); \
219  a += c; \
220  c -= b; \
221  c ^= dqmhashrot(b, 4); \
222  b += a; \
223  }
224 #define dqmhashfinal(a, b, c) \
225  { \
226  c ^= b; \
227  c -= dqmhashrot(b, 14); \
228  a ^= c; \
229  a -= dqmhashrot(c, 11); \
230  b ^= a; \
231  b -= dqmhashrot(a, 25); \
232  c ^= b; \
233  c -= dqmhashrot(b, 16); \
234  a ^= c; \
235  a -= dqmhashrot(c, 4); \
236  b ^= a; \
237  b -= dqmhashrot(a, 14); \
238  c ^= b; \
239  c -= dqmhashrot(b, 24); \
240  }
241 
242  uint32_t a, b, c;
243  a = b = c = 0xdeadbeef + (uint32_t)keylen;
244  const auto *k = (const unsigned char *)key;
245 
246  // all but the last block: affect some bits of (a, b, c)
247  while (keylen > 12) {
248  a += k[0];
249  a += ((uint32_t)k[1]) << 8;
250  a += ((uint32_t)k[2]) << 16;
251  a += ((uint32_t)k[3]) << 24;
252  b += k[4];
253  b += ((uint32_t)k[5]) << 8;
254  b += ((uint32_t)k[6]) << 16;
255  b += ((uint32_t)k[7]) << 24;
256  c += k[8];
257  c += ((uint32_t)k[9]) << 8;
258  c += ((uint32_t)k[10]) << 16;
259  c += ((uint32_t)k[11]) << 24;
260  dqmhashmix(a, b, c);
261  keylen -= 12;
262  k += 12;
263  }
264 
265  // last block: affect all 32 bits of (c); all case statements fall through
266  switch (keylen) {
267  case 12:
268  c += ((uint32_t)k[11]) << 24;
269  [[fallthrough]];
270  case 11:
271  c += ((uint32_t)k[10]) << 16;
272  [[fallthrough]];
273  case 10:
274  c += ((uint32_t)k[9]) << 8;
275  [[fallthrough]];
276  case 9:
277  c += k[8];
278  [[fallthrough]];
279  case 8:
280  b += ((uint32_t)k[7]) << 24;
281  [[fallthrough]];
282  case 7:
283  b += ((uint32_t)k[6]) << 16;
284  [[fallthrough]];
285  case 6:
286  b += ((uint32_t)k[5]) << 8;
287  [[fallthrough]];
288  case 5:
289  b += k[4];
290  [[fallthrough]];
291  case 4:
292  a += ((uint32_t)k[3]) << 24;
293  [[fallthrough]];
294  case 3:
295  a += ((uint32_t)k[2]) << 16;
296  [[fallthrough]];
297  case 2:
298  a += ((uint32_t)k[1]) << 8;
299  [[fallthrough]];
300  case 1:
301  a += k[0];
302  break;
303  case 0:
304  return c;
305  }
306 
307  dqmhashfinal(a, b, c);
308  return c;
309 #undef dqmhashrot
310 #undef dqmhashmix
311 #undef dqmhashfinal
312  }
#define dqmhashmix(a, b, c)
double b
Definition: hdecay.h:118
#define dqmhashfinal(a, b, c)
double a
Definition: hdecay.h:119
virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = nullptr 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

virtual Peer* DQMNet::getPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1021 of file DQMNet.cc.

References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

1021  {
1022  if (!upstream_.host.empty()) {
1023  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1024  << std::endl;
1025  return;
1026  }
1027 
1028  upstream_.update = false;
1029  upstream_.host = host;
1030  upstream_.port = port;
1031 }
host
Definition: query.py:115
std::ostream & logme()
Definition: DQMNet.cc:41
AutoPeer upstream_
Definition: DQMNet.h:365
port
Definition: query.py:116
std::string host
Definition: DQMNet.h:146
void DQMNet::lock ( )

Acquire a lock on the DQM net layer.

Definition at line 1054 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

1054  {
1055  if (communicate_ != (pthread_t)-1)
1056  pthread_mutex_lock(&lock_);
1057 }
pthread_t communicate_
Definition: DQMNet.h:369
pthread_mutex_t lock_
Definition: DQMNet.h:345
std::ostream & DQMNet::logme ( )
protected

Definition at line 41 of file DQMNet.cc.

References gather_cfg::cout, fileCollector::now, and RecoSummaryTask_cfi::Time.

Referenced by dqmhash(), listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

41  {
42  Time now = Time::current();
43  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
44  << "]: ";
45 }
int pid_
Definition: DQMNet.h:358
std::string appname_
Definition: DQMNet.h:357
void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = nullptr 
)
private

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 65 of file DQMNet.cc.

References DQMNet::Peer::automatic, MillePedeFileConverter_cfg::e, mps_fire::i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.

65  {
66  if (reason)
67  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
68 
69  Socket *s = peer->socket;
70 
71  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
72  if (i->peer == peer)
73  waiting_.erase(i++);
74  else
75  ++i;
76 
77  if (ev)
78  ev->source = nullptr;
79 
80  discard(peer->sendq);
81  if (peer->automatic)
82  peer->automatic->peer = nullptr;
83 
84  sel_.detach(s);
85  s->close();
86  removePeer(peer, s);
87  delete s;
88 }
std::ostream & logme()
Definition: DQMNet.cc:41
static void discard(Bucket *&b)
Definition: DQMNet.cc:53
bool ev
virtual void removePeer(Peer *p, lat::Socket *s)=0
WaitList waiting_
Definition: DQMNet.h:367
lat::IOSelector sel_
Definition: DQMNet.h:360
virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

virtual void DQMNet::markObjectsDead ( Peer p)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev)
private

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 847 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.

Referenced by DQMNet().

847  {
848  // Discard the data in the pipe, we care only about the wakeup.
849  try {
850  IOSize sz;
851  unsigned char buf[1024];
852  while ((sz = ev->source->read(buf, sizeof(buf))))
853  ;
854  } catch (Error &e) {
855  auto *next = dynamic_cast<SystemError *>(e.next());
856  if (next && next->portable() == SysErr::ErrTryAgain)
857  ; // Ignore it
858  else
859  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
860  }
861 
862  // Tell the main event pump to send an update in a little while.
863  flush_ = true;
864 
865  // We are never done, always keep going.
866  return false;
867 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:41
bool ev
bool flush_
Definition: DQMNet.h:375
size_t IOSize
Definition: IOTypes.h:14
bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
)
protectedvirtual

Definition at line 424 of file DQMNet.cc.

References DQMNet::Bucket::data, DQMNet::CoreObject::flags, HLT_2018_cff::flags, DQMNet::Object::lastreq, fileCollector::logme(), Skims_PA_cff::name, DQMNet::Peer::peeraddr, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::Peer::source, AlCaHLTBitMon_QueryRunRegistry::string, DQMNet::CoreObject::tag, DQMNet::Peer::update, DQMNet::Peer::updates, and DQMNet::CoreObject::version.

Referenced by dqmhash().

424  {
425  // Decode and process this message.
426  uint32_t type;
427  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
428  switch (type) {
429  case DQM_MSG_UPDATE_ME: {
430  if (len != 2 * sizeof(uint32_t)) {
431  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
432  return false;
433  }
434 
435  if (debug_)
436  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
437 
438  p->update = true;
439  }
440  return true;
441 
442  case DQM_MSG_LIST_OBJECTS: {
443  if (debug_)
444  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
445 
446  // Send over current status: list of known objects.
447  sendObjectListToPeer(msg, true, false);
448  }
449  return true;
450 
451  case DQM_MSG_GET_OBJECT: {
452  if (debug_)
453  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
454 
455  if (len < 3 * sizeof(uint32_t)) {
456  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
457  return false;
458  }
459 
460  uint32_t namelen;
461  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
462  if (len != 3 * sizeof(uint32_t) + namelen) {
463  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
464  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
465  return false;
466  }
467 
468  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
469  Peer *owner = nullptr;
470  Object *o = findObject(nullptr, name, &owner);
471  if (o) {
472  o->lastreq = Time::current().ns();
473  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
475  waitForData(p, name, "", owner);
476  else
477  sendObjectToPeer(msg, *o, true);
478  } else {
479  uint32_t words[3];
480  words[0] = sizeof(words) + name.size();
481  words[1] = DQM_REPLY_NONE;
482  words[2] = name.size();
483 
484  msg->data.reserve(msg->data.size() + words[0]);
485  copydata(msg, &words[0], sizeof(words));
486  copydata(msg, &name[0], name.size());
487  }
488  }
489  return true;
490 
491  case DQM_REPLY_LIST_BEGIN: {
492  if (len != 4 * sizeof(uint32_t)) {
493  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
494  return false;
495  }
496 
497  // Get the update status: whether this is a full update.
498  uint32_t flags;
499  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
500 
501  if (debug_)
502  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
503  << p->peeraddr << ", size " << len << std::endl;
504 
505  // If we are about to receive a full list of objects, flag all
506  // objects as possibly dead. Subsequent object notifications
507  // will undo this for the live objects. We cannot delete
508  // objects quite yet, as we may get inquiry from another client
509  // while we are processing the incoming list, so we keep the
510  // objects tentatively alive as long as we've not seen the end.
511  if (flags)
513  }
514  return true;
515 
516  case DQM_REPLY_LIST_END: {
517  if (len != 4 * sizeof(uint32_t)) {
518  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
519  return false;
520  }
521 
522  // Get the update status: whether this is a full update.
523  uint32_t flags;
524  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
525 
526  // If we received a full list of objects, now purge all dead
527  // objects. We need to do this in two stages in case we receive
528  // updates in many parts, and end up sending updates to others in
529  // between; this avoids us lying live objects are dead.
530  if (flags)
532 
533  if (debug_)
534  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
535  << ", size " << len << std::endl;
536 
537  // Indicate we have received another update from this peer.
538  // Also indicate we should flush to our clients.
539  flush_ = true;
540  p->updates++;
541  }
542  return true;
543 
544  case DQM_REPLY_OBJECT: {
545  uint32_t words[9];
546  if (len < sizeof(words)) {
547  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
548  return false;
549  }
550 
551  memcpy(&words[0], data, sizeof(words));
552  uint32_t &namelen = words[6];
553  uint32_t &datalen = words[7];
554  uint32_t &qlen = words[8];
555 
556  if (len != sizeof(words) + namelen + datalen + qlen) {
557  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
558  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
559  << std::endl;
560  return false;
561  }
562 
563  unsigned char *namedata = data + sizeof(words);
564  unsigned char *objdata = namedata + namelen;
565  unsigned char *qdata = objdata + datalen;
566  unsigned char *enddata = qdata + qlen;
567  std::string name((char *)namedata, namelen);
568  assert(enddata == data + len);
569 
570  if (debug_)
571  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
572  << std::endl;
573 
574  // Mark the peer as a known object source.
575  p->source = true;
576 
577  // Initialise or update an object entry.
578  Object *o = findObject(p, name);
579  if (!o)
580  o = makeObject(p, name);
581 
582  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
583  o->tag = words[5];
584  o->version = ((uint64_t)words[4] << 32 | words[3]);
585  o->scalar.clear();
586  o->qdata.clear();
587  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
588  o->rawdata.clear();
589  o->scalar.insert(o->scalar.end(), objdata, qdata);
590  } else if (datalen) {
591  o->rawdata.clear();
592  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
593  } else if (!o->rawdata.empty())
594  o->flags |= DQM_PROP_STALE;
595  o->qdata.insert(o->qdata.end(), qdata, enddata);
596 
597  // If we had an object for this one already and this is a list
598  // update without data, issue an immediate data get request.
599  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
600  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
601 
602  // If we have the object data, release from wait.
603  if (datalen)
604  releaseWaiters(name, o);
605  }
606  return true;
607 
608  case DQM_REPLY_NONE: {
609  uint32_t words[3];
610  if (len < sizeof(words)) {
611  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
612  return false;
613  }
614 
615  memcpy(&words[0], data, sizeof(words));
616  uint32_t &namelen = words[2];
617 
618  if (len != sizeof(words) + namelen) {
619  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
620  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
621  return false;
622  }
623 
624  unsigned char *namedata = data + sizeof(words);
625  std::string name((char *)namedata, namelen);
626 
627  if (debug_)
628  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
629  << std::endl;
630 
631  // Mark the peer as a known object source.
632  p->source = true;
633 
634  // If this was a known object, kill it.
635  if (Object *o = findObject(p, name)) {
636  o->flags |= DQM_PROP_DEAD;
638  }
639 
640  // If someone was waiting for this, let them go.
641  releaseWaiters(name, nullptr);
642  }
643  return true;
644 
645  default:
646  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
647  << std::endl;
648  return false;
649  }
650 }
type
Definition: HCALResponse.h:21
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:65
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:70
std::ostream & logme()
Definition: DQMNet.cc:41
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:138
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:59
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:109
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:91
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual void markObjectsDead(Peer *p)=0
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:72
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:381
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:67
unsigned long long uint64_t
Definition: Time.h:13
tuple msg
Definition: mps_check.py:285
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:60
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:69
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:344
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:71
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:66
bool flush_
Definition: DQMNet.h:375
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:56
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:57
bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev)
private

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 802 of file DQMNet.cc.

References IORead, IOUrgent, DTRecHitClients_cfi::local, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::Peer::peeraddr, alignCSCRings::s, DQMNet::Peer::socket, AlCaHLTBitMon_QueryRunRegistry::string, and relativeConstraints::value.

Referenced by startLocalServer().

802  {
803  // Recover the server socket.
804  assert(ev->source == server_);
805 
806  // Accept the connection.
807  Socket *s = server_->accept();
808  assert(s);
809  assert(!s->isBlocking());
810 
811  // Record it to our list of peers.
812  lock();
813  Peer *p = createPeer(s);
814  std::string localaddr;
815  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
816  InetAddress peeraddr = inet->peername();
817  InetAddress myaddr = inet->sockname();
818  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
819  localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
820  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
821  p->peeraddr = local->peername().path();
822  localaddr = local->sockname().path();
823  } else
824  assert(false);
825 
826  p->mask = IORead | IOUrgent;
827  p->socket = s;
828 
829  // Report the new connection.
830  if (debug_)
831  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
832 
833  // Attach it to the listener.
834  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
835  unlock();
836 
837  // We are never done.
838  return false;
839 }
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1054
std::ostream & logme()
Definition: DQMNet.cc:41
virtual Peer * createPeer(lat::Socket *s)=0
bool ev
A arg
Definition: Factorize.h:36
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:654
Definition: IOTypes.h:26
bool debug_
Definition: DQMNet.h:344
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1060
lat::Socket * server_
Definition: DQMNet.h:361
lat::IOSelector sel_
Definition: DQMNet.h:360
bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
)
private

Handle communication to a particular client.

Definition at line 654 of file DQMNet.cc.

References DQMNet::Peer::automatic, b, data, DQMNet::Bucket::data, fileCollector::done, MillePedeFileConverter_cfg::e, DQMNet::Peer::incoming, IORead, IOUrgent, IOWrite, CommonMethods::lock(), fileCollector::logme(), DQMNet::Peer::mask, MESSAGE_SIZE_LIMIT, mps_check::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, DQMNet::Peer::peeraddr, DQMNet::Peer::sendpos, DQMNet::Peer::sendq, DQMNet::Peer::socket, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, validateGeometry_cfg::valid, and DQMNet::Peer::waiting.

Referenced by onPeerConnect(), and run().

654  {
655  lock();
656  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
657 
658  // If there is a problem with the peer socket, discard the peer
659  // and tell the selector to stop prcessing events for it. If
660  // this is a server connection, we will eventually recreate
661  // everything if/when the data server comes back.
662  if (ev->events & IOUrgent) {
663  if (p->automatic) {
664  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
665  << " lost (will attempt to reconnect in 15 seconds)\n";
666  losePeer(nullptr, p, ev);
667  } else
668  losePeer("WARNING: lost peer connection ", p, ev);
669 
670  unlock();
671  return true;
672  }
673 
674  // If we can write to the peer socket, pump whatever we can into it.
675  if (ev->events & IOWrite) {
676  while (Bucket *b = p->sendq) {
677  IOSize len = b->data.size() - p->sendpos;
678  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
679  IOSize done;
680 
681  try {
682  done = (len ? ev->source->write(data, len) : 0);
683  if (debug_ && len)
684  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
685  } catch (Error &e) {
686  losePeer("WARNING: unable to write to peer ", p, ev, &e);
687  unlock();
688  return true;
689  }
690 
691  p->sendpos += done;
692  if (p->sendpos == b->data.size()) {
693  Bucket *old = p->sendq;
694  p->sendq = old->next;
695  p->sendpos = 0;
696  old->next = nullptr;
697  discard(old);
698  }
699 
700  if (!done && len)
701  // Cannot write any more.
702  break;
703  }
704  }
705 
706  // If there is data to be read from the peer, first receive what we
707  // can get out the socket, the process all complete requests.
708  if (ev->events & IORead) {
709  // First build up the incoming buffer of data in the socket.
710  // Remember the last size returned by the socket; we need
711  // it to determine if the remote end closed the connection.
712  IOSize sz;
713  try {
714  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
715  do
716  if ((sz = ev->source->read(&buf[0], buf.size()))) {
717  if (debug_)
718  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
719  DataBlob &data = p->incoming;
720  if (data.capacity() < data.size() + sz)
721  data.reserve(data.size() + SOCKET_READ_GROWTH);
722  data.insert(data.end(), &buf[0], &buf[0] + sz);
723  }
724  while (sz == sizeof(buf));
725  } catch (Error &e) {
726  auto *next = dynamic_cast<SystemError *>(e.next());
727  if (next && next->portable() == SysErr::ErrTryAgain)
728  sz = 1; // Ignore it, and fake no end of data.
729  else {
730  // Houston we have a problem.
731  losePeer("WARNING: failed to read from peer ", p, ev, &e);
732  unlock();
733  return true;
734  }
735  }
736 
737  // Process fully received messages as long as we can.
738  size_t consumed = 0;
739  DataBlob &data = p->incoming;
740  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
741  uint32_t msglen;
742  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
743 
744  if (msglen >= MESSAGE_SIZE_LIMIT) {
745  losePeer("WARNING: excessively large message from ", p, ev);
746  unlock();
747  return true;
748  }
749 
750  if (data.size() - consumed >= msglen) {
751  bool valid = true;
752  if (msglen < 2 * sizeof(uint32_t)) {
753  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
754  valid = false;
755  } else {
756  // Decode and process this message.
757  Bucket msg;
758  msg.next = nullptr;
759  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
760 
761  // If we created a response, chain it to the write queue.
762  if (!msg.data.empty()) {
763  Bucket **prev = &p->sendq;
764  while (*prev)
765  prev = &(*prev)->next;
766 
767  *prev = new Bucket;
768  (*prev)->next = nullptr;
769  (*prev)->data.swap(msg.data);
770  }
771  }
772 
773  if (!valid) {
774  losePeer("WARNING: data stream error with ", p, ev);
775  unlock();
776  return true;
777  }
778 
779  consumed += msglen;
780  } else
781  break;
782  }
783 
784  data.erase(data.begin(), data.begin() + consumed);
785 
786  // If the client has closed the connection, shut down our end. If
787  // we have something to send back still, leave the write direction
788  // open. Otherwise close the shop for this client.
789  if (sz == 0)
790  sel_.setMask(p->socket, p->mask &= ~IORead);
791  }
792 
793  // Yes, please keep processing events for this socket.
794  unlock();
795  return false;
796 }
edm::ErrorSummaryEntry Error
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1054
std::ostream & logme()
Definition: DQMNet.cc:41
virtual Peer * getPeer(lat::Socket *s)=0
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:53
bool ev
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:424
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:74
Definition: IOTypes.h:26
double b
Definition: hdecay.h:118
tuple msg
Definition: mps_check.py:285
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:65
bool debug_
Definition: DQMNet.h:344
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1060
size_t IOSize
Definition: IOTypes.h:14
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
lat::IOSelector sel_
Definition: DQMNet.h:360
DQMNet& DQMNet::operator= ( const DQMNet )
delete
void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
)
static

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 149 of file DQMNet.cc.

Referenced by dqmhash(), DQMService::flushStandalone(), and dqm::impl::MonitorElement::packQualityData().

149  {
150  char buf[64];
151  std::ostringstream qrs;
152  QReports::const_iterator qi, qe;
153  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
154  int pos = 0;
155  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
156  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
157  << '\0';
158  }
159  into = qrs.str();
160 }
virtual void DQMNet::purgeDeadObjects ( Peer p)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
)
protectedvirtual

Definition at line 363 of file DQMNet.cc.

References DQMNet::Bucket::data, and DQMNet::WaitObject::name.

Referenced by dqmhash(), and run().

363  {
364  if (o)
365  sendObjectToPeer(msg, *o, true);
366  else {
367  uint32_t words[3];
368  words[0] = sizeof(words) + w.name.size();
369  words[1] = DQM_REPLY_NONE;
370  words[2] = w.name.size();
371 
372  msg->data.reserve(msg->data.size() + words[0]);
373  copydata(msg, &words[0], sizeof(words));
374  copydata(msg, &w.name[0], w.name.size());
375  }
376 }
const double w
Definition: UKUtility.cc:23
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:381
tuple msg
Definition: mps_check.py:285
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:71
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
)
private

Definition at line 123 of file DQMNet.cc.

References mps_check::msg, and DQMNet::Bucket::next.

123  {
124  Bucket **msg = &i->peer->sendq;
125  while (*msg)
126  msg = &(*msg)->next;
127  *msg = new Bucket;
128  (*msg)->next = nullptr;
129 
130  releaseFromWait(*msg, *i, o);
131 
132  assert(i->peer->waiting > 0);
133  i->peer->waiting--;
134  waiting_.erase(i);
135 }
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:363
tuple msg
Definition: mps_check.py:285
WaitList waiting_
Definition: DQMNet.h:367
void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
)
private

Definition at line 138 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, and mps_fire::i.

138  {
139  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
140  if (i->name == name)
141  releaseFromWait(i++, o);
142  else
143  ++i;
144 }
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:363
WaitList waiting_
Definition: DQMNet.h:367
virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash().

void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
)
private

Queue an object request to the data server.

Definition at line 91 of file DQMNet.cc.

References mps_check::msg, DQMNet::Bucket::next, and DQMNet::Peer::sendq.

91  {
92  // Issue request to peer.
93  Bucket **msg = &p->sendq;
94  while (*msg)
95  msg = &(*msg)->next;
96  *msg = new Bucket;
97  (*msg)->next = nullptr;
98 
99  uint32_t words[3];
100  words[0] = sizeof(words) + len;
101  words[1] = DQM_MSG_GET_OBJECT;
102  words[2] = len;
103  copydata(*msg, words, sizeof(words));
104  copydata(*msg, name, len);
105 }
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:67
tuple msg
Definition: mps_check.py:285
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
void DQMNet::run ( )

Run the actual I/O processing loop.

Definition at line 1079 of file DQMNet.cc.

References generateTowerEtThresholdLUT::addr, DQMNet::Peer::automatic, copydata(), createPeer(), debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, MillePedeFileConverter_cfg::e, findObject(), DQMNet::CoreObject::flags, flush_, mps_fire::i, IORead, IOUrgent, IOWrite, lock(), logme(), DQMNet::Peer::mask, DQMNet::Bucket::next, DQMNet::AutoPeer::next, fileCollector::now, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, DQMNet::Peer::peeraddr, DQMNet::Object::rawdata, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), DQMNet::Peer::sendq, shouldStop(), DQMNet::Peer::socket, SOCKET_BUF_SIZE, RecoSummaryTask_cfi::Time, unlock(), DQMNet::Peer::update, updatePeerMasks(), upstream_, relativeConstraints::value, waiting_, waitMax_, and waitStale_.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

1079  {
1080  Time now;
1081  Time nextFlush = 0;
1082  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1083 
1084  // Perform I/O. Every once in a while flush updates to peers.
1085  while (!shouldStop()) {
1086  for (auto ap : automatic) {
1087  // If we need a server connection and don't have one yet,
1088  // initiate asynchronous connection creation. Swallow errors
1089  // in case the server won't talk to us.
1090  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1091  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1092  InetSocket *s = nullptr;
1093  try {
1094  InetAddress addr(ap->host.c_str(), ap->port);
1095  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1096  s->setBlocking(false);
1097  s->connect(addr);
1098  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1099  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1100  } catch (Error &e) {
1101  auto *sys = dynamic_cast<SystemError *>(e.next());
1102  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1103  // "In progress" just means the connection is in progress.
1104  // The connection is ready when the socket is writeable.
1105  // Anything else is a real problem.
1106  if (s)
1107  s->abort();
1108  delete s;
1109  s = nullptr;
1110  }
1111  }
1112 
1113  // Set up with the selector if we were successful. If this is
1114  // the upstream collector, queue a request for updates.
1115  if (s) {
1116  Peer *p = createPeer(s);
1117  ap->peer = p;
1118 
1119  InetAddress peeraddr = ((InetSocket *)s)->peername();
1120  InetAddress myaddr = ((InetSocket *)s)->sockname();
1121  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1122  p->mask = IORead | IOWrite | IOUrgent;
1123  p->update = ap->update;
1124  p->automatic = ap;
1125  p->socket = s;
1126  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1127  if (ap == &upstream_) {
1128  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1129  p->sendq = new Bucket;
1130  p->sendq->next = nullptr;
1131  copydata(p->sendq, words, sizeof(words));
1132  }
1133 
1134  // Report the new connection.
1135  if (debug_)
1136  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1137  << std::endl;
1138  }
1139  }
1140  }
1141 
1142  // Pump events for a while.
1143  sel_.dispatch(delay_);
1144  now = Time::current();
1145  lock();
1146 
1147  // Check if flush is required. Flush only if one is needed.
1148  // Always sends the full object list, but only rarely.
1149  if (flush_ && now > nextFlush) {
1150  flush_ = false;
1151  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1152  sendObjectListToPeers(true);
1153  }
1154 
1155  // Update the data server and peer selection masks. If we
1156  // have no more data to send and listening for writes, remove
1157  // the write mask. If we have something to write and aren't
1158  // listening for writes, start listening so we can send off
1159  // the data.
1160  updatePeerMasks();
1161 
1162  // Release peers that have been waiting for data for too long.
1163  Time waitold = now - waitMax_;
1164  Time waitstale = now - waitStale_;
1165  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1166  Object *o = findObject(nullptr, i->name);
1167 
1168  // If we have (stale) object data, wait only up to stale limit.
1169  // Otherwise if we have no data at all, wait up to the max limit.
1170  if (i->time < waitold) {
1171  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1172  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1173  releaseFromWait(i++, o);
1174  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1175  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1176  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1177  releaseFromWait(i++, o);
1178  }
1179 
1180  // Keep it for now.
1181  else
1182  ++i;
1183  }
1184 
1185  unlock();
1186  }
1187 }
edm::ErrorSummaryEntry Error
AutoPeer downstream_
Definition: DQMNet.h:366
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:65
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1054
std::ostream & logme()
Definition: DQMNet.cc:41
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:372
virtual Peer * createPeer(lat::Socket *s)=0
AutoPeer upstream_
Definition: DQMNet.h:365
A arg
Definition: Factorize.h:36
virtual bool shouldStop()
Definition: DQMNet.cc:359
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:654
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:363
lat::Time next
Definition: DQMNet.h:145
Definition: IOTypes.h:26
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:60
bool debug_
Definition: DQMNet.h:344
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1060
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:66
bool flush_
Definition: DQMNet.h:375
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
lat::TimeSpan waitMax_
Definition: DQMNet.h:374
lat::TimeSpan waitStale_
Definition: DQMNet.h:373
WaitList waiting_
Definition: DQMNet.h:367
lat::IOSelector sel_
Definition: DQMNet.h:360
void DQMNet::sendLocalChanges ( )

Definition at line 1191 of file DQMNet.cc.

References wakeup_.

Referenced by DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::removePeer().

1191  {
1192  char byte = 0;
1193  wakeup_.sink()->write(&byte, 1);
1194 }
lat::Pipe wakeup_
Definition: DQMNet.h:362
virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
)
protectedpure virtual
virtual void DQMNet::sendObjectListToPeers ( bool  all)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
)
protectedvirtual

Definition at line 381 of file DQMNet.cc.

References DQMNet::Bucket::data, DQMNet::CoreObject::dirname, DQMNet::CoreObject::flags, HLT_2018_cff::flags, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::Object::rawdata, DQMNet::Object::scalar, DQMNet::CoreObject::tag, and DQMNet::CoreObject::version.

Referenced by dqmhash(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

381  {
382  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
383  DataBlob objdata;
384 
385  if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
386  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
387  else if (data)
388  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
389 
390  uint32_t words[9];
391  uint32_t namelen = o.dirname->size() + o.objname.size() + 1;
392  uint32_t datalen = objdata.size();
393  uint32_t qlen = o.qdata.size();
394 
395  if (o.dirname->empty())
396  --namelen;
397 
398  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
399  words[1] = DQM_REPLY_OBJECT;
400  words[2] = flags;
401  words[3] = (o.version >> 0) & 0xffffffff;
402  words[4] = (o.version >> 32) & 0xffffffff;
403  words[5] = o.tag;
404  words[6] = namelen;
405  words[7] = datalen;
406  words[8] = qlen;
407 
408  msg->data.reserve(msg->data.size() + words[0]);
409  copydata(msg, &words[0], 9 * sizeof(uint32_t));
410  if (namelen) {
411  copydata(msg, &(*o.dirname)[0], o.dirname->size());
412  if (!o.dirname->empty())
413  copydata(msg, "/", 1);
414  copydata(msg, &o.objname[0], o.objname.size());
415  }
416  if (datalen)
417  copydata(msg, &objdata[0], datalen);
418  if (qlen)
419  copydata(msg, &o.qdata[0], qlen);
420 }
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:26
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:59
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:72
tuple msg
Definition: mps_check.py:285
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:48
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:25
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:80
static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
)
inlinestatic

Definition at line 170 of file DQMNet.h.

References DQMNet::CoreObject::dirname, DQMNet::CoreObject::lumi, DQMNet::CoreObject::moduleId, DQMNet::CoreObject::objname, DQMNet::CoreObject::run, and DQMNet::CoreObject::streamId.

Referenced by dqm::impl::MonitorElement::operator<().

170  {
171  if (a.run == b.run) {
172  if (a.lumi == b.lumi) {
173  if (a.streamId == b.streamId) {
174  if (a.moduleId == b.moduleId) {
175  if (*a.dirname == *b.dirname) {
176  return a.objname < b.objname;
177  }
178  return *a.dirname < *b.dirname;
179  }
180  return a.moduleId < b.moduleId;
181  }
182  return a.streamId < b.streamId;
183  }
184  return a.lumi < b.lumi;
185  }
186  return a.run < b.run;
187  }
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
bool DQMNet::shouldStop ( )
protectedvirtual

Definition at line 359 of file DQMNet.cc.

Referenced by dqmhash(), and run().

359 { return shutdown_; }
sig_atomic_t shutdown_
Definition: DQMNet.h:370
void DQMNet::shutdown ( )

Stop the network layer and wait it to finish.

Definition at line 1034 of file DQMNet.cc.

References communicate_, and shutdown_.

Referenced by DQMService::shutdown().

1034  {
1035  shutdown_ = 1;
1036  if (communicate_ != (pthread_t)-1)
1037  pthread_join(communicate_, nullptr);
1038 }
pthread_t communicate_
Definition: DQMNet.h:369
sig_atomic_t shutdown_
Definition: DQMNet.h:370
void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 938 of file DQMNet.cc.

References ntuplemaker::time, and waitStale_.

938 { waitStale_ = time; }
lat::TimeSpan waitStale_
Definition: DQMNet.h:373
void DQMNet::start ( )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1068 of file DQMNet.cc.

References communicate(), communicate_, lock_, and logme().

Referenced by progressbar.ProgressBar::__next__(), Types.LuminosityBlockRange::cppID(), Types.EventRange::cppID(), and DQMService::DQMService().

1068  {
1069  if (communicate_ != (pthread_t)-1) {
1070  logme() << "ERROR: DQM networking thread has already been started\n";
1071  return;
1072  }
1073 
1074  pthread_mutex_init(&lock_, nullptr);
1075  pthread_create(&communicate_, nullptr, &communicate, this);
1076 }
pthread_t communicate_
Definition: DQMNet.h:369
std::ostream & logme()
Definition: DQMNet.cc:41
pthread_mutex_t lock_
Definition: DQMNet.h:345
static void * communicate(void *obj)
Definition: DQMNet.cc:1045
void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 943 of file DQMNet.cc.

References generateTowerEtThresholdLUT::addr, MillePedeFileConverter_cfg::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.

943  {
944  if (server_) {
945  logme() << "ERROR: DQM server was already started.\n";
946  return;
947  }
948 
949  try {
950  InetAddress addr("0.0.0.0", port);
951  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
952  s->bind(addr);
953  s->listen(10);
954  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
955  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
956  s->setBlocking(false);
957  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
958  } catch (Error &e) {
959  // FIXME: Do we need to do this when we throw an exception anyway?
960  // FIXME: Abort instead?
961  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
962 
963  raiseDQMError("DQMNet::startLocalServer",
964  "Failed to start server at port"
965  " %d: %s",
966  port,
967  e.explain().c_str());
968  }
969 
970  logme() << "INFO: DQM server started at port " << port << std::endl;
971 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:41
port
Definition: query.py:116
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:802
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:361
lat::IOSelector sel_
Definition: DQMNet.h:360
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:10
void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 976 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, IOAccept, logme(), onPeerConnect(), raiseDQMError(), sel_, server_, and SOCKET_BUF_SIZE.

976  {
977  if (server_) {
978  logme() << "ERROR: DQM server was already started.\n";
979  return;
980  }
981 
982  try {
983  server_ = new LocalServerSocket(path, 10);
984  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
985  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
986  server_->setBlocking(false);
987  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
988  } catch (Error &e) {
989  // FIXME: Do we need to do this when we throw an exception anyway?
990  // FIXME: Abort instead?
991  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
992 
993  raiseDQMError("DQMNet::startLocalServer",
994  "Failed to start server at path"
995  " %s: %s",
996  path,
997  e.explain().c_str());
998  }
999 
1000  logme() << "INFO: DQM server started at path " << path << std::endl;
1001 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:41
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:802
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:361
lat::IOSelector sel_
Definition: DQMNet.h:360
void raiseDQMError(const char *context, const char *fmt,...)
Definition: DQMError.cc:10
void DQMNet::unlock ( )

Release the lock on the DQM net layer.

Definition at line 1060 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

1060  {
1061  if (communicate_ != (pthread_t)-1)
1062  pthread_mutex_unlock(&lock_);
1063 }
pthread_t communicate_
Definition: DQMNet.h:369
pthread_mutex_t lock_
Definition: DQMNet.h:345
void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
)
static

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 164 of file DQMNet.cc.

References DQMNet::QValue::algorithm, HltBtagPostValidation_cff::c, DQMNet::QValue::code, dqm::impl::MonitorElement::data_, DQMNet::CoreObject::dirname, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, extractNextObject(), dqm::impl::MonitorElement::Fill(), DQMNet::CoreObject::flags, fileCollector::logme(), DQMNet::QValue::message, Skims_PA_cff::name, EcalTangentSkim_cfg::o, getGTfromDQMFile::obj, DQMNet::CoreObject::objname, DQMNet::Object::qdata, DQMNet::CoreObject::qreports, DQMNet::QValue::qtname, DQMNet::QValue::qtresult, DQMNet::Object::rawdata, DQMNet::Object::scalar, dqm::qstatus::STATUS_OK, DQMNet::CoreObject::tag, and dqm::qstatus::WARNING.

Referenced by dqmhash().

164  {
165  const char *qdata = from;
166 
167  // Count how many qresults there are.
168  size_t nqv = 0;
169  while (*qdata) {
170  ++nqv;
171  while (*qdata)
172  ++qdata;
173  ++qdata;
174  while (*qdata)
175  ++qdata;
176  ++qdata;
177  while (*qdata)
178  ++qdata;
179  ++qdata;
180  while (*qdata)
181  ++qdata;
182  ++qdata;
183  while (*qdata)
184  ++qdata;
185  ++qdata;
186  }
187 
188  // Now extract the qreports.
189  qdata = from;
190  qr.reserve(nqv);
191  while (*qdata) {
192  qr.emplace_back();
193  DQMNet::QValue &qv = qr.back();
194 
195  qv.code = atoi(qdata);
196  while (*qdata)
197  ++qdata;
198  switch (qv.code) {
200  break;
203  break;
204  case dqm::qstatus::ERROR:
206  break;
207  default:
209  break;
210  }
211 
212  qv.qtresult = atof(++qdata);
213  while (*qdata)
214  ++qdata;
215 
216  qv.qtname = ++qdata;
217  while (*qdata)
218  ++qdata;
219 
220  qv.algorithm = ++qdata;
221  while (*qdata)
222  ++qdata;
223 
224  qv.message = ++qdata;
225  while (*qdata)
226  ++qdata;
227  ++qdata;
228  }
229 }
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:47
std::string algorithm
Definition: DQMNet.h:90
static const int WARNING
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:46
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:48
std::string qtname
Definition: DQMNet.h:89
std::string message
Definition: DQMNet.h:88
static const int STATUS_OK
float qtresult
Definition: DQMNet.h:87
static const int ERROR
void DQMNet::updateMask ( Peer p)
protected

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 871 of file DQMNet.cc.

References IOUrgent, IOWrite, fileCollector::logme(), DQMNet::Peer::mask, DQMNet::Peer::peeraddr, DQMNet::Peer::sendq, DQMNet::Peer::socket, and DQMNet::Peer::waiting.

Referenced by dqmhash(), and DQMImplNet< DQMNet::Object >::updatePeerMasks().

871  {
872  if (!p->socket)
873  return;
874 
875  // Listen to writes iff we have data to send.
876  unsigned oldmask = p->mask;
877  if (!p->sendq && (p->mask & IOWrite))
878  sel_.setMask(p->socket, p->mask &= ~IOWrite);
879 
880  if (p->sendq && !(p->mask & IOWrite))
881  sel_.setMask(p->socket, p->mask |= IOWrite);
882 
883  if (debug_ && oldmask != p->mask)
884  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
885 
886  // If we have nothing more to send and are no longer listening
887  // for reads, close up the shop for this peer.
888  if (p->mask == IOUrgent && !p->waiting) {
889  assert(!p->sendq);
890  if (debug_)
891  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
892  losePeer(nullptr, p, nullptr);
893  }
894 }
std::ostream & logme()
Definition: DQMNet.cc:41
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:65
bool debug_
Definition: DQMNet.h:344
lat::IOSelector sel_
Definition: DQMNet.h:360
virtual void DQMNet::updatePeerMasks ( )
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by dqmhash(), and run().

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1006 of file DQMNet.cc.

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

Referenced by DQMService::DQMService().

1006  {
1007  if (!downstream_.host.empty()) {
1008  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1009  << std::endl;
1010  return;
1011  }
1012 
1013  downstream_.update = true;
1014  downstream_.host = host;
1015  downstream_.port = port;
1016 }
AutoPeer downstream_
Definition: DQMNet.h:366
host
Definition: query.py:115
std::ostream & logme()
Definition: DQMNet.cc:41
port
Definition: query.py:116
std::string host
Definition: DQMNet.h:146
void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
)
protected

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 109 of file DQMNet.cc.

References info(), Skims_PA_cff::name, and DQMNet::Peer::waiting.

Referenced by dqmhash().

109  {
110  // FIXME: Should we automatically record which exact peer the waiter
111  // is expecting to deliver data so we know to release the waiter if
112  // the other peer vanishes? The current implementation stands a
113  // chance for the waiter to wait indefinitely -- although we do
114  // force terminate the wait after a while.
115  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
116  WaitObject wo = {Time::current(), name, info, p};
117  waiting_.push_back(wo);
118  p->waiting++;
119 }
static const TGPicture * info(bool iBackgroundIsBlack)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:91
WaitList waiting_
Definition: DQMNet.h:367

Member Data Documentation

std::string DQMNet::appname_
private

Definition at line 357 of file DQMNet.h.

pthread_t DQMNet::communicate_
private

Definition at line 369 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

bool DQMNet::debug_
protected

Definition at line 344 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

int DQMNet::delay_
private

Definition at line 372 of file DQMNet.h.

Referenced by delay(), and run().

AutoPeer DQMNet::downstream_
private

Definition at line 366 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3
static

Definition at line 67 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_HELLO = 0
static

Definition at line 64 of file DQMNet.h.

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2
static

Definition at line 66 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1
static

Definition at line 65 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000
static

Definition at line 53 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000
static
const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000
static
const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000
static
const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000
static
const uint32_t DQMNet::DQM_PROP_MARKTODELETE = 0x01000000
static
const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000
static
const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000
static

Definition at line 57 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
static

Definition at line 49 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000
static

Definition at line 45 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100
static
const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00
static

Definition at line 44 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400
static
const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200
static
const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000
static
const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000
static

Definition at line 60 of file DQMNet.h.

Referenced by run().

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000
static

Definition at line 52 of file DQMNet.h.

Referenced by dqm::dqmstoreimpl::DQMStore::saveMonitorElementToROOT().

const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050
static

Definition at line 42 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001
static

Definition at line 28 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000
static

Definition at line 27 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff
static

Definition at line 25 of file DQMNet.h.

Referenced by dqm::impl::MonitorElement::kind().

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002
static

Definition at line 29 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f
static

Definition at line 26 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003
static

Definition at line 30 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012
static

Definition at line 33 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010
static

Definition at line 31 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011
static

Definition at line 32 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022
static

Definition at line 36 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020
static

Definition at line 34 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021
static

Definition at line 35 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032
static

Definition at line 39 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030
static

Definition at line 37 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031
static

Definition at line 38 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040
static

Definition at line 40 of file DQMNet.h.

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041
static

Definition at line 41 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101
static

Definition at line 69 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102
static

Definition at line 70 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

const uint32_t DQMNet::DQM_REPLY_NONE = 103
static

Definition at line 71 of file DQMNet.h.

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104
static

Definition at line 72 of file DQMNet.h.

bool DQMNet::flush_
private

Definition at line 375 of file DQMNet.h.

Referenced by run().

pthread_mutex_t DQMNet::lock_
protected

Definition at line 345 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128
static

Definition at line 74 of file DQMNet.h.

int DQMNet::pid_
private

Definition at line 358 of file DQMNet.h.

lat::IOSelector DQMNet::sel_
private

Definition at line 360 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

lat::Socket* DQMNet::server_
private

Definition at line 361 of file DQMNet.h.

Referenced by startLocalServer().

sig_atomic_t DQMNet::shutdown_
private

Definition at line 370 of file DQMNet.h.

Referenced by shutdown().

AutoPeer DQMNet::upstream_
private

Definition at line 365 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

lat::Time DQMNet::version_
private

Definition at line 363 of file DQMNet.h.

WaitList DQMNet::waiting_
private

Definition at line 367 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitMax_
private

Definition at line 374 of file DQMNet.h.

Referenced by run().

lat::TimeSpan DQMNet::waitStale_
private

Definition at line 373 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

lat::Pipe DQMNet::wakeup_
private

Definition at line 362 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().