CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes
DQMNet Class Referenceabstract

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

Classes

struct  AutoPeer
 
struct  Bucket
 
struct  CoreObject
 
struct  HashEqual
 
struct  HashOp
 
struct  Object
 
struct  Peer
 
struct  WaitObject
 

Public Types

using DataBlob = std::vector< unsigned char >
 
using QReports = std::vector< QValue >
 
using QValue = MonitorElementData::QReport::QValue
 
using TagList = std::vector< uint32_t >
 
using WaitList = std::list< WaitObject >
 

Public Member Functions

void debug (bool doit)
 
void delay (int delay)
 
 DQMNet (const std::string &appname="")
 
 DQMNet (const DQMNet &)=delete
 
void listenToCollector (const std::string &host, int port)
 
void lock ()
 Acquire a lock on the DQM net layer. More...
 
DQMNetoperator= (const DQMNet &)=delete
 
void run ()
 
void sendLocalChanges ()
 
void shutdown ()
 Stop the network layer and wait it to finish. More...
 
void staleObjectWaitLimit (lat::TimeSpan time)
 
void start ()
 
void startLocalServer (int port)
 
void startLocalServer (const char *path)
 
void unlock ()
 Release the lock on the DQM net layer. More...
 
void updateToCollector (const std::string &host, int port)
 
virtual ~DQMNet ()
 

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
 
static void packQualityData (std::string &into, const QReports &qr)
 
static bool setOrder (const CoreObject &a, const CoreObject &b)
 
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)
 

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
 
static const uint32_t DQM_MSG_HELLO = 0
 
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
 
static const uint32_t DQM_MSG_UPDATE_ME = 1
 
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
 
static const uint32_t DQM_PROP_DEAD = 0x00080000
 
static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000
 
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
 
static const uint32_t DQM_PROP_LUMI = 0x00040000
 
static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000
 
static const uint32_t DQM_PROP_NEW = 0x00010000
 
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
 
static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
 
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
 
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
 
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
 
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
 
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
 
static const uint32_t DQM_PROP_RESET = 0x00008000
 
static const uint32_t DQM_PROP_STALE = 0x00100000
 
static const uint32_t DQM_PROP_TAGGED = 0x00002000
 
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
 
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
 
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
 
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
 
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
 
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
 
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
 
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
 
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
 
static const uint32_t DQM_PROP_TYPE_TH1I = 0x00000013
 
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
 
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
 
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
 
static const uint32_t DQM_PROP_TYPE_TH2I = 0x00000023
 
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
 
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
 
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
 
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
 
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
 
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
 
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
 
static const uint32_t DQM_REPLY_LIST_END = 102
 
static const uint32_t DQM_REPLY_NONE = 103
 
static const uint32_t DQM_REPLY_OBJECT = 104
 
static const uint32_t MAX_PEER_WAITREQS = 128
 

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
 
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=nullptr)=0
 
virtual PeergetPeer (lat::Socket *s)=0
 
std::ostream & logme ()
 
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
 
virtual void markObjectsDead (Peer *p)=0
 
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
 
virtual void purgeDeadObjects (Peer *p)=0
 
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
 
virtual void removePeer (Peer *p, lat::Socket *s)=0
 
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
 
virtual void sendObjectListToPeers (bool all)=0
 
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
 
virtual bool shouldStop ()
 
void updateMask (Peer *p)
 
virtual void updatePeerMasks ()=0
 
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
 
static void discard (Bucket *&b)
 

Protected Attributes

bool debug_
 
pthread_mutex_t lock_
 

Private Member Functions

void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
 
bool onLocalNotify (lat::IOSelectEvent *ev)
 
bool onPeerConnect (lat::IOSelectEvent *ev)
 
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client. More...
 
void releaseFromWait (WaitList::iterator i, Object *o)
 
void releaseWaiters (const std::string &name, Object *o)
 
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server. More...
 

Private Attributes

std::string appname_
 
pthread_t communicate_
 
int delay_
 
AutoPeer downstream_
 
bool flush_
 
int pid_
 
lat::IOSelector sel_
 
lat::Socket * server_
 
sig_atomic_t shutdown_
 
AutoPeer upstream_
 
lat::Time version_
 
WaitList waiting_
 
lat::TimeSpan waitMax_
 
lat::TimeSpan waitStale_
 
lat::Pipe wakeup_
 

Detailed Description

Definition at line 26 of file DQMNet.h.

Member Typedef Documentation

◆ DataBlob

using DQMNet::DataBlob = std::vector<unsigned char>

Definition at line 85 of file DQMNet.h.

◆ QReports

using DQMNet::QReports = std::vector<QValue>

Definition at line 86 of file DQMNet.h.

◆ QValue

Definition at line 84 of file DQMNet.h.

◆ TagList

using DQMNet::TagList = std::vector<uint32_t>

Definition at line 87 of file DQMNet.h.

◆ WaitList

using DQMNet::WaitList = std::list<WaitObject>

Definition at line 88 of file DQMNet.h.

Constructor & Destructor Documentation

◆ DQMNet() [1/2]

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 914 of file DQMNet.cc.

References downstream_, edm::storage::IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

915  : debug_(false),
916  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
917  pid_(getpid()),
918  server_(nullptr),
919  version_(Time::current()),
920  communicate_((pthread_t)-1),
921  shutdown_(0),
922  delay_(1000),
923  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
924  waitMax_(0, 0, 0, 5 /* seconds */, 0),
925  flush_(false) {
926  // Create a pipe for the local DQM to tell the communicator
927  // thread that local DQM data has changed and that the peers
928  // should be notified.
929  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
930  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
931 
932  // Initialise the upstream and downstream to empty.
933  upstream_.peer = downstream_.peer = nullptr;
937 }
AutoPeer downstream_
Definition: DQMNet.h:363
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:864
pthread_t communicate_
Definition: DQMNet.h:366
lat::Time version_
Definition: DQMNet.h:360
int delay_
Definition: DQMNet.h:369
AutoPeer upstream_
Definition: DQMNet.h:362
int pid_
Definition: DQMNet.h:355
std::string appname_
Definition: DQMNet.h:354
Peer * peer
Definition: DQMNet.h:141
sig_atomic_t shutdown_
Definition: DQMNet.h:367
lat::Time next
Definition: DQMNet.h:142
lat::Pipe wakeup_
Definition: DQMNet.h:359
bool debug_
Definition: DQMNet.h:341
bool flush_
Definition: DQMNet.h:372
lat::Socket * server_
Definition: DQMNet.h:358
#define O_NONBLOCK
Definition: SysFile.h:23
lat::TimeSpan waitMax_
Definition: DQMNet.h:371
lat::TimeSpan waitStale_
Definition: DQMNet.h:370
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ ~DQMNet()

DQMNet::~DQMNet ( )
virtual

Definition at line 939 of file DQMNet.cc.

939  {
940  // FIXME
941 }

◆ DQMNet() [2/2]

DQMNet::DQMNet ( const DQMNet )
delete

Member Function Documentation

◆ copydata()

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
)
staticprotected

Definition at line 57 of file DQMNet.cc.

References b, and data.

Referenced by run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

57  {
58  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
59 }
double b
Definition: hdecay.h:120
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80

◆ createPeer()

virtual Peer* DQMNet::createPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ debug()

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 945 of file DQMNet.cc.

References debug_.

Referenced by DQMService::DQMService().

945 { debug_ = doit; }
bool debug_
Definition: DQMNet.h:341

◆ delay()

void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 949 of file DQMNet.cc.

References delay_.

949 { delay_ = delay; }
int delay_
Definition: DQMNet.h:369
void delay(int delay)
Definition: DQMNet.cc:949

◆ discard()

void DQMNet::discard ( Bucket *&  b)
staticprotected

Definition at line 62 of file DQMNet.cc.

References b, and GetRecoTauVFromDQM_MC_cff::next.

Referenced by OrderedSet.OrderedSet::pop().

62  {
63  while (b) {
64  Bucket *next = b->next;
65  delete b;
66  b = next;
67  }
68 }
double b
Definition: hdecay.h:120

◆ dqmhash()

static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
)
inlinestatic

Definition at line 196 of file DQMNet.h.

References a, b, HltBtagPostValidation_cff::c, dqmhashfinal, dqmhashmix, dqmdumpme::k, and crabWrapper::key.

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().

196  {
197  // Reduced version of Bob Jenkins' hash function at:
198  // http://www.burtleburtle.net/bob/c/lookup3.c
199 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
200 #define dqmhashmix(a, b, c) \
201  { \
202  a -= c; \
203  a ^= dqmhashrot(c, 4); \
204  c += b; \
205  b -= a; \
206  b ^= dqmhashrot(a, 6); \
207  a += c; \
208  c -= b; \
209  c ^= dqmhashrot(b, 8); \
210  b += a; \
211  a -= c; \
212  a ^= dqmhashrot(c, 16); \
213  c += b; \
214  b -= a; \
215  b ^= dqmhashrot(a, 19); \
216  a += c; \
217  c -= b; \
218  c ^= dqmhashrot(b, 4); \
219  b += a; \
220  }
221 #define dqmhashfinal(a, b, c) \
222  { \
223  c ^= b; \
224  c -= dqmhashrot(b, 14); \
225  a ^= c; \
226  a -= dqmhashrot(c, 11); \
227  b ^= a; \
228  b -= dqmhashrot(a, 25); \
229  c ^= b; \
230  c -= dqmhashrot(b, 16); \
231  a ^= c; \
232  a -= dqmhashrot(c, 4); \
233  b ^= a; \
234  b -= dqmhashrot(a, 14); \
235  c ^= b; \
236  c -= dqmhashrot(b, 24); \
237  }
238 
239  uint32_t a, b, c;
240  a = b = c = 0xdeadbeef + (uint32_t)keylen;
241  const auto *k = (const unsigned char *)key;
242 
243  // all but the last block: affect some bits of (a, b, c)
244  while (keylen > 12) {
245  a += k[0];
246  a += ((uint32_t)k[1]) << 8;
247  a += ((uint32_t)k[2]) << 16;
248  a += ((uint32_t)k[3]) << 24;
249  b += k[4];
250  b += ((uint32_t)k[5]) << 8;
251  b += ((uint32_t)k[6]) << 16;
252  b += ((uint32_t)k[7]) << 24;
253  c += k[8];
254  c += ((uint32_t)k[9]) << 8;
255  c += ((uint32_t)k[10]) << 16;
256  c += ((uint32_t)k[11]) << 24;
257  dqmhashmix(a, b, c);
258  keylen -= 12;
259  k += 12;
260  }
261 
262  // last block: affect all 32 bits of (c); all case statements fall through
263  switch (keylen) {
264  case 12:
265  c += ((uint32_t)k[11]) << 24;
266  [[fallthrough]];
267  case 11:
268  c += ((uint32_t)k[10]) << 16;
269  [[fallthrough]];
270  case 10:
271  c += ((uint32_t)k[9]) << 8;
272  [[fallthrough]];
273  case 9:
274  c += k[8];
275  [[fallthrough]];
276  case 8:
277  b += ((uint32_t)k[7]) << 24;
278  [[fallthrough]];
279  case 7:
280  b += ((uint32_t)k[6]) << 16;
281  [[fallthrough]];
282  case 6:
283  b += ((uint32_t)k[5]) << 8;
284  [[fallthrough]];
285  case 5:
286  b += k[4];
287  [[fallthrough]];
288  case 4:
289  a += ((uint32_t)k[3]) << 24;
290  [[fallthrough]];
291  case 3:
292  a += ((uint32_t)k[2]) << 16;
293  [[fallthrough]];
294  case 2:
295  a += ((uint32_t)k[1]) << 8;
296  [[fallthrough]];
297  case 1:
298  a += k[0];
299  break;
300  case 0:
301  return c;
302  }
303 
304  dqmhashfinal(a, b, c);
305  return c;
306 #undef dqmhashrot
307 #undef dqmhashmix
308 #undef dqmhashfinal
309  }
#define dqmhashmix(a, b, c)
double b
Definition: hdecay.h:120
#define dqmhashfinal(a, b, c)
double a
Definition: hdecay.h:121

◆ findObject()

virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = nullptr 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ getPeer()

virtual Peer* DQMNet::getPeer ( lat::Socket *  s)
protectedpure virtual

◆ listenToCollector()

void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1033 of file DQMNet.cc.

References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

1033  {
1034  if (!upstream_.host.empty()) {
1035  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1036  << std::endl;
1037  return;
1038  }
1039 
1040  upstream_.update = false;
1041  upstream_.host = host;
1042  upstream_.port = port;
1043 }
string host
Definition: query.py:115
std::ostream & logme()
Definition: DQMNet.cc:50
AutoPeer upstream_
Definition: DQMNet.h:362
int port
Definition: query.py:116
std::string host
Definition: DQMNet.h:143

◆ lock()

void DQMNet::lock ( )

Acquire a lock on the DQM net layer.

Definition at line 1066 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

1066  {
1067  if (communicate_ != (pthread_t)-1)
1068  pthread_mutex_lock(&lock_);
1069 }
pthread_t communicate_
Definition: DQMNet.h:366
pthread_mutex_t lock_
Definition: DQMNet.h:342

◆ logme()

std::ostream & DQMNet::logme ( )
protected

Definition at line 50 of file DQMNet.cc.

References gather_cfg::cout, submitPVValidationJobs::now, and RecoSummaryTask_cfi::Time.

Referenced by listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

50  {
51  Time now = Time::current();
52  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
53  << "]: ";
54 }
int pid_
Definition: DQMNet.h:355
std::string appname_
Definition: DQMNet.h:354

◆ losePeer()

void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = nullptr 
)
private

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 74 of file DQMNet.cc.

References DQMNet::Peer::automatic, MillePedeFileConverter_cfg::e, submitPVResolutionJobs::err, makeMEIFBenchmarkPlots::ev, mps_fire::i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, PixelMapPlotter::reason, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.

74  {
75  if (reason)
76  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
77 
78  Socket *s = peer->socket;
79 
80  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
81  if (i->peer == peer)
82  waiting_.erase(i++);
83  else
84  ++i;
85 
86  if (ev)
87  ev->source = nullptr;
88 
89  discard(peer->sendq);
90  if (peer->automatic)
91  peer->automatic->peer = nullptr;
92 
93  sel_.detach(s);
94  s->close();
95  removePeer(peer, s);
96  delete s;
97 }
std::ostream & logme()
Definition: DQMNet.cc:50
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
virtual void removePeer(Peer *p, lat::Socket *s)=0
WaitList waiting_
Definition: DQMNet.h:364
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ makeObject()

virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
)
protectedpure virtual

◆ markObjectsDead()

virtual void DQMNet::markObjectsDead ( Peer p)
protectedpure virtual

◆ onLocalNotify()

bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev)
private

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 864 of file DQMNet.cc.

References visDQMUpload::buf, MillePedeFileConverter_cfg::e, makeMEIFBenchmarkPlots::ev, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.

Referenced by DQMNet().

864  {
865  // Discard the data in the pipe, we care only about the wakeup.
866  try {
867  IOSize sz;
868  unsigned char buf[1024];
869  while ((sz = ev->source->read(buf, sizeof(buf))))
870  ;
871  } catch (Error &e) {
872  auto *next = dynamic_cast<SystemError *>(e.next());
873  if (next && next->portable() == SysErr::ErrTryAgain)
874  ; // Ignore it
875  else
876  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
877  }
878 
879  // Tell the main event pump to send an update in a little while.
880  flush_ = true;
881 
882  // We are never done, always keep going.
883  return false;
884 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:50
size_t IOSize
Definition: IOTypes.h:15
bool flush_
Definition: DQMNet.h:372

◆ onMessage()

bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
)
protectedvirtual

Definition at line 441 of file DQMNet.cc.

References cms::cuda::assert(), data, HLT_2023v12_cff::flags, caHitNtupletGeneratorKernels::if(), fileCollector::logme(), mps_check::msg, Skims_PA_cff::name, EcalTangentSkim_cfg::o, AlCaHLTBitMon_ParallelJobs::p, and AlCaHLTBitMon_QueryRunRegistry::string.

441  {
442  // Decode and process this message.
443  uint32_t type;
444  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
445  switch (type) {
446  case DQM_MSG_UPDATE_ME: {
447  if (len != 2 * sizeof(uint32_t)) {
448  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
449  return false;
450  }
451 
452  if (debug_)
453  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
454 
455  p->update = true;
456  }
457  return true;
458 
459  case DQM_MSG_LIST_OBJECTS: {
460  if (debug_)
461  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
462 
463  // Send over current status: list of known objects.
464  sendObjectListToPeer(msg, true, false);
465  }
466  return true;
467 
468  case DQM_MSG_GET_OBJECT: {
469  if (debug_)
470  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
471 
472  if (len < 3 * sizeof(uint32_t)) {
473  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
474  return false;
475  }
476 
477  uint32_t namelen;
478  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
479  if (len != 3 * sizeof(uint32_t) + namelen) {
480  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
481  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
482  return false;
483  }
484 
485  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
486  Peer *owner = nullptr;
487  Object *o = findObject(nullptr, name, &owner);
488  if (o) {
489  o->lastreq = Time::current().ns();
490  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
492  waitForData(p, name, "", owner);
493  else
494  sendObjectToPeer(msg, *o, true);
495  } else {
496  uint32_t words[3];
497  words[0] = sizeof(words) + name.size();
498  words[1] = DQM_REPLY_NONE;
499  words[2] = name.size();
500 
501  msg->data.reserve(msg->data.size() + words[0]);
502  copydata(msg, &words[0], sizeof(words));
503  copydata(msg, &name[0], name.size());
504  }
505  }
506  return true;
507 
508  case DQM_REPLY_LIST_BEGIN: {
509  if (len != 4 * sizeof(uint32_t)) {
510  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
511  return false;
512  }
513 
514  // Get the update status: whether this is a full update.
515  uint32_t flags;
516  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
517 
518  if (debug_)
519  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
520  << p->peeraddr << ", size " << len << std::endl;
521 
522  // If we are about to receive a full list of objects, flag all
523  // objects as possibly dead. Subsequent object notifications
524  // will undo this for the live objects. We cannot delete
525  // objects quite yet, as we may get inquiry from another client
526  // while we are processing the incoming list, so we keep the
527  // objects tentatively alive as long as we've not seen the end.
528  if (flags)
530  }
531  return true;
532 
533  case DQM_REPLY_LIST_END: {
534  if (len != 4 * sizeof(uint32_t)) {
535  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
536  return false;
537  }
538 
539  // Get the update status: whether this is a full update.
540  uint32_t flags;
541  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
542 
543  // If we received a full list of objects, now purge all dead
544  // objects. We need to do this in two stages in case we receive
545  // updates in many parts, and end up sending updates to others in
546  // between; this avoids us lying live objects are dead.
547  if (flags)
549 
550  if (debug_)
551  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
552  << ", size " << len << std::endl;
553 
554  // Indicate we have received another update from this peer.
555  // Also indicate we should flush to our clients.
556  flush_ = true;
557  p->updates++;
558  }
559  return true;
560 
561  case DQM_REPLY_OBJECT: {
562  uint32_t words[9];
563  if (len < sizeof(words)) {
564  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
565  return false;
566  }
567 
568  memcpy(&words[0], data, sizeof(words));
569  uint32_t &namelen = words[6];
570  uint32_t &datalen = words[7];
571  uint32_t &qlen = words[8];
572 
573  if (len != sizeof(words) + namelen + datalen + qlen) {
574  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
575  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
576  << std::endl;
577  return false;
578  }
579 
580  unsigned char *namedata = data + sizeof(words);
581  unsigned char *objdata = namedata + namelen;
582  unsigned char *qdata = objdata + datalen;
583  unsigned char *enddata = qdata + qlen;
584  std::string name((char *)namedata, namelen);
585  assert(enddata == data + len);
586 
587  if (debug_)
588  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
589  << std::endl;
590 
591  // Mark the peer as a known object source.
592  p->source = true;
593 
594  // Initialise or update an object entry.
595  Object *o = findObject(p, name);
596  if (!o)
597  o = makeObject(p, name);
598 
599  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
600  o->tag = words[5];
601  o->version = ((uint64_t)words[4] << 32 | words[3]);
602  o->scalar.clear();
603  o->qdata.clear();
604  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
605  o->rawdata.clear();
606  o->scalar.insert(o->scalar.end(), objdata, qdata);
607  } else if (datalen) {
608  o->rawdata.clear();
609  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
610  } else if (!o->rawdata.empty())
611  o->flags |= DQM_PROP_STALE;
612  o->qdata.insert(o->qdata.end(), qdata, enddata);
613 
614  // If we had an object for this one already and this is a list
615  // update without data, issue an immediate data get request.
616  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
617  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
618 
619  // If we have the object data, release from wait.
620  if (datalen)
622  }
623  return true;
624 
625  case DQM_REPLY_NONE: {
626  uint32_t words[3];
627  if (len < sizeof(words)) {
628  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
629  return false;
630  }
631 
632  memcpy(&words[0], data, sizeof(words));
633  uint32_t &namelen = words[2];
634 
635  if (len != sizeof(words) + namelen) {
636  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
637  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
638  return false;
639  }
640 
641  unsigned char *namedata = data + sizeof(words);
642  std::string name((char *)namedata, namelen);
643 
644  if (debug_)
645  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
646  << std::endl;
647 
648  // Mark the peer as a known object source.
649  p->source = true;
650 
651  // If this was a known object, kill it.
652  if (Object *o = findObject(p, name)) {
653  o->flags |= DQM_PROP_DEAD;
655  }
656 
657  // If someone was waiting for this, let them go.
658  releaseWaiters(name, nullptr);
659  }
660  return true;
661 
662  default:
663  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
664  << std::endl;
665  return false;
666  }
667 }
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:70
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:75
std::ostream & logme()
Definition: DQMNet.cc:50
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:29
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:147
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:64
assert(be >=bs)
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:118
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:100
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual void markObjectsDead(Peer *p)=0
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:77
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:398
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:72
unsigned long long uint64_t
Definition: Time.h:13
tuple msg
Definition: mps_check.py:286
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:65
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:74
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:341
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:76
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:71
bool flush_
Definition: DQMNet.h:372
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:61
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:28
virtual void purgeDeadObjects(Peer *p)=0
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:62

◆ onPeerConnect()

bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev)
private

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 819 of file DQMNet.cc.

References cms::cuda::assert(), makeMEIFBenchmarkPlots::ev, edm::storage::IORead, edm::storage::IOUrgent, DTRecHitClients_cfi::local, CommonMethods::lock(), fileCollector::logme(), onPeerData(), AlCaHLTBitMon_ParallelJobs::p, alignCSCRings::s, AlCaHLTBitMon_QueryRunRegistry::string, and relativeConstraints::value.

Referenced by startLocalServer().

819  {
820  // Recover the server socket.
821  assert(ev->source == server_);
822 
823  // Accept the connection.
824  Socket *s = server_->accept();
825  assert(s);
826  assert(!s->isBlocking());
827 
828  // Record it to our list of peers.
829  lock();
830  Peer *p = createPeer(s);
831  std::string localaddr;
832  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
833  InetAddress peeraddr = inet->peername();
834  InetAddress myaddr = inet->sockname();
835  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
836  localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
837  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
838  p->peeraddr = local->peername().path();
839  localaddr = local->sockname().path();
840  } else
841  assert(false);
842 
843  p->mask = IORead | IOUrgent;
844  p->socket = s;
845 
846  // Report the new connection.
847  if (debug_)
848  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
849 
850  // Attach it to the listener.
851  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
852  unlock();
853 
854  // We are never done.
855  return false;
856 }
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1066
std::ostream & logme()
Definition: DQMNet.cc:50
virtual Peer * createPeer(lat::Socket *s)=0
assert(be >=bs)
A arg
Definition: Factorize.h:31
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:671
bool debug_
Definition: DQMNet.h:341
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1072
lat::Socket * server_
Definition: DQMNet.h:358
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ onPeerData()

bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
)
private

Handle communication to a particular client.

Definition at line 671 of file DQMNet.cc.

References cms::cuda::assert(), b, visDQMUpload::buf, data, DQMNet::Bucket::data, fileCollector::done, MillePedeFileConverter_cfg::e, makeMEIFBenchmarkPlots::ev, edm::storage::IORead, edm::storage::IOUrgent, edm::storage::IOWrite, CommonMethods::lock(), fileCollector::logme(), MESSAGE_SIZE_LIMIT, mps_check::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, AlCaHLTBitMon_ParallelJobs::p, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, and validateGeometry_cfg::valid.

Referenced by onPeerConnect(), and run().

671  {
672  lock();
673  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
674 
675  // If there is a problem with the peer socket, discard the peer
676  // and tell the selector to stop prcessing events for it. If
677  // this is a server connection, we will eventually recreate
678  // everything if/when the data server comes back.
679  if (ev->events & IOUrgent) {
680  if (p->automatic) {
681  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
682  << " lost (will attempt to reconnect in 15 seconds)\n";
683  losePeer(nullptr, p, ev);
684  } else
685  losePeer("WARNING: lost peer connection ", p, ev);
686 
687  unlock();
688  return true;
689  }
690 
691  // If we can write to the peer socket, pump whatever we can into it.
692  if (ev->events & IOWrite) {
693  while (Bucket *b = p->sendq) {
694  IOSize len = b->data.size() - p->sendpos;
695  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
696  IOSize done;
697 
698  try {
699  done = (len ? ev->source->write(data, len) : 0);
700  if (debug_ && len)
701  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
702  } catch (Error &e) {
703  losePeer("WARNING: unable to write to peer ", p, ev, &e);
704  unlock();
705  return true;
706  }
707 
708  p->sendpos += done;
709  if (p->sendpos == b->data.size()) {
710  Bucket *old = p->sendq;
711  p->sendq = old->next;
712  p->sendpos = 0;
713  old->next = nullptr;
714  discard(old);
715  }
716 
717  if (!done && len)
718  // Cannot write any more.
719  break;
720  }
721  }
722 
723  // If there is data to be read from the peer, first receive what we
724  // can get out the socket, the process all complete requests.
725  if (ev->events & IORead) {
726  // First build up the incoming buffer of data in the socket.
727  // Remember the last size returned by the socket; we need
728  // it to determine if the remote end closed the connection.
729  IOSize sz;
730  try {
731  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
732  do
733  if ((sz = ev->source->read(&buf[0], buf.size()))) {
734  if (debug_)
735  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
736  DataBlob &data = p->incoming;
737  if (data.capacity() < data.size() + sz)
738  data.reserve(data.size() + SOCKET_READ_GROWTH);
739  data.insert(data.end(), &buf[0], &buf[0] + sz);
740  }
741  while (sz == sizeof(buf));
742  } catch (Error &e) {
743  auto *next = dynamic_cast<SystemError *>(e.next());
744  if (next && next->portable() == SysErr::ErrTryAgain)
745  sz = 1; // Ignore it, and fake no end of data.
746  else {
747  // Houston we have a problem.
748  losePeer("WARNING: failed to read from peer ", p, ev, &e);
749  unlock();
750  return true;
751  }
752  }
753 
754  // Process fully received messages as long as we can.
755  size_t consumed = 0;
756  DataBlob &data = p->incoming;
757  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
758  uint32_t msglen;
759  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
760 
761  if (msglen >= MESSAGE_SIZE_LIMIT) {
762  losePeer("WARNING: excessively large message from ", p, ev);
763  unlock();
764  return true;
765  }
766 
767  if (data.size() - consumed >= msglen) {
768  bool valid = true;
769  if (msglen < 2 * sizeof(uint32_t)) {
770  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
771  valid = false;
772  } else {
773  // Decode and process this message.
774  Bucket msg;
775  msg.next = nullptr;
776  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
777 
778  // If we created a response, chain it to the write queue.
779  if (!msg.data.empty()) {
780  Bucket **prev = &p->sendq;
781  while (*prev)
782  prev = &(*prev)->next;
783 
784  *prev = new Bucket;
785  (*prev)->next = nullptr;
786  (*prev)->data.swap(msg.data);
787  }
788  }
789 
790  if (!valid) {
791  losePeer("WARNING: data stream error with ", p, ev);
792  unlock();
793  return true;
794  }
795 
796  consumed += msglen;
797  } else
798  break;
799  }
800 
801  data.erase(data.begin(), data.begin() + consumed);
802 
803  // If the client has closed the connection, shut down our end. If
804  // we have something to send back still, leave the write direction
805  // open. Otherwise close the shop for this client.
806  if (sz == 0)
807  sel_.setMask(p->socket, p->mask &= ~IORead);
808  }
809 
810  // Yes, please keep processing events for this socket.
811  unlock();
812  return false;
813 }
edm::ErrorSummaryEntry Error
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1066
std::ostream & logme()
Definition: DQMNet.cc:50
virtual Peer * getPeer(lat::Socket *s)=0
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:29
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:32
static void discard(Bucket *&b)
Definition: DQMNet.cc:62
assert(be >=bs)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:441
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:33
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:79
size_t IOSize
Definition: IOTypes.h:15
double b
Definition: hdecay.h:120
tuple msg
Definition: mps_check.py:286
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:74
bool debug_
Definition: DQMNet.h:341
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1072
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:85
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ operator=()

DQMNet& DQMNet::operator= ( const DQMNet )
delete

◆ packQualityData()

void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
)
static

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 158 of file DQMNet.cc.

References visDQMUpload::buf.

Referenced by DQMService::flushStandalone(), and dqm::impl::MonitorElement::packQualityData().

158  {
159  char buf[64];
160  std::ostringstream qrs;
161  QReports::const_iterator qi, qe;
162  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
163  int pos = 0;
164  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
165  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
166  << '\0';
167  }
168  into = qrs.str();
169 }

◆ purgeDeadObjects()

virtual void DQMNet::purgeDeadObjects ( Peer p)
protectedpure virtual

◆ releaseFromWait() [1/2]

void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
)
protectedvirtual

Definition at line 380 of file DQMNet.cc.

References mps_check::msg, EcalTangentSkim_cfg::o, and w().

Referenced by run().

380  {
381  if (o)
382  sendObjectToPeer(msg, *o, true);
383  else {
384  uint32_t words[3];
385  words[0] = sizeof(words) + w.name.size();
386  words[1] = DQM_REPLY_NONE;
387  words[2] = w.name.size();
388 
389  msg->data.reserve(msg->data.size() + words[0]);
390  copydata(msg, &words[0], sizeof(words));
391  copydata(msg, &w.name[0], w.name.size());
392  }
393 }
T w() const
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:398
tuple msg
Definition: mps_check.py:286
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:76
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57

◆ releaseFromWait() [2/2]

void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
)
private

Definition at line 132 of file DQMNet.cc.

References cms::cuda::assert(), mps_fire::i, mps_check::msg, DQMNet::Bucket::next, and EcalTangentSkim_cfg::o.

132  {
133  Bucket **msg = &i->peer->sendq;
134  while (*msg)
135  msg = &(*msg)->next;
136  *msg = new Bucket;
137  (*msg)->next = nullptr;
138 
139  releaseFromWait(*msg, *i, o);
140 
141  assert(i->peer->waiting > 0);
142  i->peer->waiting--;
143  waiting_.erase(i);
144 }
assert(be >=bs)
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:380
tuple msg
Definition: mps_check.py:286
WaitList waiting_
Definition: DQMNet.h:364

◆ releaseWaiters()

void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
)
private

Definition at line 147 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, mps_fire::i, Skims_PA_cff::name, and EcalTangentSkim_cfg::o.

147  {
148  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
149  if (i->name == name)
150  releaseFromWait(i++, o);
151  else
152  ++i;
153 }
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:380
WaitList waiting_
Definition: DQMNet.h:364

◆ removePeer()

virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
)
protectedpure virtual

◆ requestObjectData()

void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
)
private

Queue an object request to the data server.

Definition at line 100 of file DQMNet.cc.

References mps_check::msg, Skims_PA_cff::name, DQMNet::Bucket::next, and AlCaHLTBitMon_ParallelJobs::p.

100  {
101  // Issue request to peer.
102  Bucket **msg = &p->sendq;
103  while (*msg)
104  msg = &(*msg)->next;
105  *msg = new Bucket;
106  (*msg)->next = nullptr;
107 
108  uint32_t words[3];
109  words[0] = sizeof(words) + len;
110  words[1] = DQM_MSG_GET_OBJECT;
111  words[2] = len;
112  copydata(*msg, words, sizeof(words));
113  copydata(*msg, name, len);
114 }
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:72
tuple msg
Definition: mps_check.py:286
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57

◆ run()

void DQMNet::run ( )

Run the actual I/O processing loop.

Definition at line 1091 of file DQMNet.cc.

References generateTowerEtThresholdLUT::addr, copydata(), createPeer(), debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, MillePedeFileConverter_cfg::e, findObject(), flush_, mps_fire::i, edm::storage::IORead, edm::storage::IOUrgent, edm::storage::IOWrite, lock(), logme(), DQMNet::AutoPeer::next, submitPVValidationJobs::now, EcalTangentSkim_cfg::o, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), shouldStop(), SOCKET_BUF_SIZE, RecoSummaryTask_cfi::Time, unlock(), updatePeerMasks(), upstream_, relativeConstraints::value, waiting_, waitMax_, and waitStale_.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

1091  {
1092  Time now;
1093  Time nextFlush = 0;
1094  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1095 
1096  // Perform I/O. Every once in a while flush updates to peers.
1097  while (!shouldStop()) {
1098  for (auto ap : automatic) {
1099  // If we need a server connection and don't have one yet,
1100  // initiate asynchronous connection creation. Swallow errors
1101  // in case the server won't talk to us.
1102  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1103  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1104  InetSocket *s = nullptr;
1105  try {
1106  InetAddress addr(ap->host.c_str(), ap->port);
1107  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1108  s->setBlocking(false);
1109  s->connect(addr);
1110  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1111  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1112  } catch (Error &e) {
1113  auto *sys = dynamic_cast<SystemError *>(e.next());
1114  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1115  // "In progress" just means the connection is in progress.
1116  // The connection is ready when the socket is writeable.
1117  // Anything else is a real problem.
1118  if (s)
1119  s->abort();
1120  delete s;
1121  s = nullptr;
1122  }
1123  }
1124 
1125  // Set up with the selector if we were successful. If this is
1126  // the upstream collector, queue a request for updates.
1127  if (s) {
1128  Peer *p = createPeer(s);
1129  ap->peer = p;
1130 
1131  InetAddress peeraddr = ((InetSocket *)s)->peername();
1132  InetAddress myaddr = ((InetSocket *)s)->sockname();
1133  p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1134  p->mask = IORead | IOWrite | IOUrgent;
1135  p->update = ap->update;
1136  p->automatic = ap;
1137  p->socket = s;
1138  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1139  if (ap == &upstream_) {
1140  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1141  p->sendq = new Bucket;
1142  p->sendq->next = nullptr;
1143  copydata(p->sendq, words, sizeof(words));
1144  }
1145 
1146  // Report the new connection.
1147  if (debug_)
1148  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1149  << std::endl;
1150  }
1151  }
1152  }
1153 
1154  // Pump events for a while.
1155  sel_.dispatch(delay_);
1156  now = Time::current();
1157  lock();
1158 
1159  // Check if flush is required. Flush only if one is needed.
1160  // Always sends the full object list, but only rarely.
1161  if (flush_ && now > nextFlush) {
1162  flush_ = false;
1163  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1164  sendObjectListToPeers(true);
1165  }
1166 
1167  // Update the data server and peer selection masks. If we
1168  // have no more data to send and listening for writes, remove
1169  // the write mask. If we have something to write and aren't
1170  // listening for writes, start listening so we can send off
1171  // the data.
1172  updatePeerMasks();
1173 
1174  // Release peers that have been waiting for data for too long.
1175  Time waitold = now - waitMax_;
1176  Time waitstale = now - waitStale_;
1177  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1178  Object *o = findObject(nullptr, i->name);
1179 
1180  // If we have (stale) object data, wait only up to stale limit.
1181  // Otherwise if we have no data at all, wait up to the max limit.
1182  if (i->time < waitold) {
1183  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1184  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1185  releaseFromWait(i++, o);
1186  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1187  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1188  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1189  releaseFromWait(i++, o);
1190  }
1191 
1192  // Keep it for now.
1193  else
1194  ++i;
1195  }
1196 
1197  unlock();
1198  }
1199 }
edm::ErrorSummaryEntry Error
AutoPeer downstream_
Definition: DQMNet.h:363
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:70
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1066
std::ostream & logme()
Definition: DQMNet.cc:50
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:369
virtual Peer * createPeer(lat::Socket *s)=0
AutoPeer upstream_
Definition: DQMNet.h:362
A arg
Definition: Factorize.h:31
virtual bool shouldStop()
Definition: DQMNet.cc:376
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:671
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:380
lat::Time next
Definition: DQMNet.h:142
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:65
bool debug_
Definition: DQMNet.h:341
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1072
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:71
bool flush_
Definition: DQMNet.h:372
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
lat::TimeSpan waitMax_
Definition: DQMNet.h:371
lat::TimeSpan waitStale_
Definition: DQMNet.h:370
WaitList waiting_
Definition: DQMNet.h:364
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ sendLocalChanges()

void DQMNet::sendLocalChanges ( )

Definition at line 1203 of file DQMNet.cc.

References wakeup_.

Referenced by DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::removePeer().

1203  {
1204  char byte = 0;
1205  wakeup_.sink()->write(&byte, 1);
1206 }
lat::Pipe wakeup_
Definition: DQMNet.h:359

◆ sendObjectListToPeer()

virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
)
protectedpure virtual

◆ sendObjectListToPeers()

virtual void DQMNet::sendObjectListToPeers ( bool  all)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ sendObjectToPeer()

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
)
protectedvirtual

Definition at line 398 of file DQMNet.cc.

References data, HLT_2023v12_cff::flags, mps_check::msg, and EcalTangentSkim_cfg::o.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

398  {
399  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
400  DataBlob objdata;
401 
403  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
404  else if (data)
405  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
406 
407  uint32_t words[9];
408  uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
409  uint32_t datalen = objdata.size();
410  uint32_t qlen = o.qdata.size();
411 
412  if (o.dirname.empty())
413  --namelen;
414 
415  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
416  words[1] = DQM_REPLY_OBJECT;
417  words[2] = flags;
418  words[3] = (o.version >> 0) & 0xffffffff;
419  words[4] = (o.version >> 32) & 0xffffffff;
420  words[5] = o.tag;
421  words[6] = namelen;
422  words[7] = datalen;
423  words[8] = qlen;
424 
425  msg->data.reserve(msg->data.size() + words[0]);
426  copydata(msg, &words[0], 9 * sizeof(uint32_t));
427  if (namelen) {
428  copydata(msg, &(o.dirname)[0], o.dirname.size());
429  if (!o.dirname.empty())
430  copydata(msg, "/", 1);
431  copydata(msg, &o.objname[0], o.objname.size());
432  }
433  if (datalen)
434  copydata(msg, &objdata[0], datalen);
435  if (qlen)
436  copydata(msg, &o.qdata[0], qlen);
437 }
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:29
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:64
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:77
tuple msg
Definition: mps_check.py:286
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:57
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:28
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:85

◆ setOrder()

static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
)
inlinestatic

Definition at line 167 of file DQMNet.h.

References a, and b.

Referenced by dqm::impl::MonitorElement::operator<().

167  {
168  if (a.run == b.run) {
169  if (a.lumi == b.lumi) {
170  if (a.streamId == b.streamId) {
171  if (a.moduleId == b.moduleId) {
172  if (a.dirname == b.dirname) {
173  return a.objname < b.objname;
174  }
175  return a.dirname < b.dirname;
176  }
177  return a.moduleId < b.moduleId;
178  }
179  return a.streamId < b.streamId;
180  }
181  return a.lumi < b.lumi;
182  }
183  return a.run < b.run;
184  }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121

◆ shouldStop()

bool DQMNet::shouldStop ( )
protectedvirtual

Definition at line 376 of file DQMNet.cc.

Referenced by run().

376 { return shutdown_; }
sig_atomic_t shutdown_
Definition: DQMNet.h:367

◆ shutdown()

void DQMNet::shutdown ( )

Stop the network layer and wait it to finish.

Definition at line 1046 of file DQMNet.cc.

References communicate_, and shutdown_.

Referenced by DQMService::shutdown().

1046  {
1047  shutdown_ = 1;
1048  if (communicate_ != (pthread_t)-1)
1049  pthread_join(communicate_, nullptr);
1050 }
pthread_t communicate_
Definition: DQMNet.h:366
sig_atomic_t shutdown_
Definition: DQMNet.h:367

◆ staleObjectWaitLimit()

void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 955 of file DQMNet.cc.

References hcalRecHitTable_cff::time, and waitStale_.

955 { waitStale_ = time; }
lat::TimeSpan waitStale_
Definition: DQMNet.h:370

◆ start()

void DQMNet::start ( )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1080 of file DQMNet.cc.

References communicate(), communicate_, lock_, and logme().

Referenced by progressbar.ProgressBar::__next__(), Types.LuminosityBlockRange::cppID(), Types.EventRange::cppID(), and DQMService::DQMService().

1080  {
1081  if (communicate_ != (pthread_t)-1) {
1082  logme() << "ERROR: DQM networking thread has already been started\n";
1083  return;
1084  }
1085 
1086  pthread_mutex_init(&lock_, nullptr);
1087  pthread_create(&communicate_, nullptr, &communicate, this);
1088 }
pthread_t communicate_
Definition: DQMNet.h:366
std::ostream & logme()
Definition: DQMNet.cc:50
pthread_mutex_t lock_
Definition: DQMNet.h:342
static void * communicate(void *obj)
Definition: DQMNet.cc:1057

◆ startLocalServer() [1/2]

void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 960 of file DQMNet.cc.

References generateTowerEtThresholdLUT::addr, MillePedeFileConverter_cfg::e, Exception, edm::storage::IOAccept, logme(), onPeerConnect(), query::port, alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.

960  {
961  if (server_) {
962  logme() << "ERROR: DQM server was already started.\n";
963  return;
964  }
965 
966  try {
967  InetAddress addr("0.0.0.0", port);
968  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
969  s->bind(addr);
970  s->listen(10);
971  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
972  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
973  s->setBlocking(false);
974  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
975  } catch (Error &e) {
976  // FIXME: Do we need to do this when we throw an exception anyway?
977  // FIXME: Abort instead?
978  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
979 
980  throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
981 
982  port << ": " << e.explain().c_str();
983  }
984 
985  logme() << "INFO: DQM server started at port " << port << std::endl;
986 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:50
int port
Definition: query.py:116
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:819
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:358
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ startLocalServer() [2/2]

void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 991 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, Exception, edm::storage::IOAccept, logme(), onPeerConnect(), castor_dqm_sourceclient_file_cfg::path, sel_, server_, and SOCKET_BUF_SIZE.

991  {
992  if (server_) {
993  logme() << "ERROR: DQM server was already started.\n";
994  return;
995  }
996 
997  try {
998  server_ = new LocalServerSocket(path, 10);
999  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1000  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1001  server_->setBlocking(false);
1002  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1003  } catch (Error &e) {
1004  // FIXME: Do we need to do this when we throw an exception anyway?
1005  // FIXME: Abort instead?
1006  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1007 
1008  throw cms::Exception("DQMNet::startLocalServer")
1009  << "Failed to start server at path " << path << ": " << e.explain().c_str();
1010  }
1011 
1012  logme() << "INFO: DQM server started at path " << path << std::endl;
1013 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:50
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:819
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:30
lat::Socket * server_
Definition: DQMNet.h:358
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ unlock()

void DQMNet::unlock ( )

Release the lock on the DQM net layer.

Definition at line 1072 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

1072  {
1073  if (communicate_ != (pthread_t)-1)
1074  pthread_mutex_unlock(&lock_);
1075 }
pthread_t communicate_
Definition: DQMNet.h:366
pthread_mutex_t lock_
Definition: DQMNet.h:342

◆ unpackQualityData()

void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
)
static

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 173 of file DQMNet.cc.

References MonitorElementData::QReport::QValue::algorithm, MonitorElementData::QReport::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, HLT_2023v12_cff::flags, MonitorElementData::QReport::QValue::message, MonitorElementData::QReport::QValue::qtname, MonitorElementData::QReport::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.

173  {
174  const char *qdata = from;
175 
176  // Count how many qresults there are.
177  size_t nqv = 0;
178  while (*qdata) {
179  ++nqv;
180  while (*qdata)
181  ++qdata;
182  ++qdata;
183  while (*qdata)
184  ++qdata;
185  ++qdata;
186  while (*qdata)
187  ++qdata;
188  ++qdata;
189  while (*qdata)
190  ++qdata;
191  ++qdata;
192  while (*qdata)
193  ++qdata;
194  ++qdata;
195  }
196 
197  // Now extract the qreports.
198  qdata = from;
199  qr.reserve(nqv);
200  while (*qdata) {
201  qr.emplace_back();
202  DQMNet::QValue &qv = qr.back();
203 
204  qv.code = atoi(qdata);
205  while (*qdata)
206  ++qdata;
207  switch (qv.code) {
209  break;
212  break;
213  case dqm::qstatus::ERROR:
215  break;
216  default:
218  break;
219  }
220 
221  qv.qtresult = atof(++qdata);
222  while (*qdata)
223  ++qdata;
224 
225  qv.qtname = ++qdata;
226  while (*qdata)
227  ++qdata;
228 
229  qv.algorithm = ++qdata;
230  while (*qdata)
231  ++qdata;
232 
233  qv.message = ++qdata;
234  while (*qdata)
235  ++qdata;
236  ++qdata;
237  }
238 }
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:52
static const int WARNING
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:51
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:53
static const int STATUS_OK
static const int ERROR

◆ updateMask()

void DQMNet::updateMask ( Peer p)
protected

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 888 of file DQMNet.cc.

References cms::cuda::assert(), edm::storage::IOUrgent, edm::storage::IOWrite, fileCollector::logme(), and AlCaHLTBitMon_ParallelJobs::p.

Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().

888  {
889  if (!p->socket)
890  return;
891 
892  // Listen to writes iff we have data to send.
893  unsigned oldmask = p->mask;
894  if (!p->sendq && (p->mask & IOWrite))
895  sel_.setMask(p->socket, p->mask &= ~IOWrite);
896 
897  if (p->sendq && !(p->mask & IOWrite))
898  sel_.setMask(p->socket, p->mask |= IOWrite);
899 
900  if (debug_ && oldmask != p->mask)
901  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
902 
903  // If we have nothing more to send and are no longer listening
904  // for reads, close up the shop for this peer.
905  if (p->mask == IOUrgent && !p->waiting) {
906  assert(!p->sendq);
907  if (debug_)
908  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
909  losePeer(nullptr, p, nullptr);
910  }
911 }
std::ostream & logme()
Definition: DQMNet.cc:50
assert(be >=bs)
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:74
bool debug_
Definition: DQMNet.h:341
lat::IOSelector sel_
Definition: DQMNet.h:357

◆ updatePeerMasks()

virtual void DQMNet::updatePeerMasks ( )
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ updateToCollector()

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1018 of file DQMNet.cc.

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

Referenced by DQMService::DQMService().

1018  {
1019  if (!downstream_.host.empty()) {
1020  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1021  << std::endl;
1022  return;
1023  }
1024 
1025  downstream_.update = true;
1026  downstream_.host = host;
1027  downstream_.port = port;
1028 }
AutoPeer downstream_
Definition: DQMNet.h:363
string host
Definition: query.py:115
std::ostream & logme()
Definition: DQMNet.cc:50
int port
Definition: query.py:116
std::string host
Definition: DQMNet.h:143

◆ waitForData()

void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
)
protected

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 118 of file DQMNet.cc.

References info(), Skims_PA_cff::name, and AlCaHLTBitMon_ParallelJobs::p.

118  {
119  // FIXME: Should we automatically record which exact peer the waiter
120  // is expecting to deliver data so we know to release the waiter if
121  // the other peer vanishes? The current implementation stands a
122  // chance for the waiter to wait indefinitely -- although we do
123  // force terminate the wait after a while.
124  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
125  WaitObject wo = {Time::current(), name, info, p};
126  waiting_.push_back(wo);
127  p->waiting++;
128 }
static const TGPicture * info(bool iBackgroundIsBlack)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:100
WaitList waiting_
Definition: DQMNet.h:364

Member Data Documentation

◆ appname_

std::string DQMNet::appname_
private

Definition at line 354 of file DQMNet.h.

◆ communicate_

pthread_t DQMNet::communicate_
private

Definition at line 366 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

◆ debug_

bool DQMNet::debug_
protected

Definition at line 341 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

◆ delay_

int DQMNet::delay_
private

Definition at line 369 of file DQMNet.h.

Referenced by delay(), and run().

◆ downstream_

AutoPeer DQMNet::downstream_
private

Definition at line 363 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

◆ DQM_MSG_GET_OBJECT

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3
static

Definition at line 72 of file DQMNet.h.

◆ DQM_MSG_HELLO

const uint32_t DQMNet::DQM_MSG_HELLO = 0
static

Definition at line 69 of file DQMNet.h.

◆ DQM_MSG_LIST_OBJECTS

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2
static

Definition at line 71 of file DQMNet.h.

Referenced by run().

◆ DQM_MSG_UPDATE_ME

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1
static

Definition at line 70 of file DQMNet.h.

Referenced by run().

◆ DQM_PROP_ACCUMULATE

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000
static

Definition at line 58 of file DQMNet.h.

◆ DQM_PROP_DEAD

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000
static

◆ DQM_PROP_EFFICIENCY_PLOT

const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000
static

◆ DQM_PROP_HAS_REFERENCE

const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000
static

Definition at line 56 of file DQMNet.h.

◆ DQM_PROP_LUMI

const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000
static

◆ DQM_PROP_MARKTODELETE

const uint32_t DQMNet::DQM_PROP_MARKTODELETE = 0x01000000
static

Definition at line 67 of file DQMNet.h.

◆ DQM_PROP_NEW

const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000
static

◆ DQM_PROP_RECEIVED

const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000
static

Definition at line 62 of file DQMNet.h.

◆ DQM_PROP_REPORT_ALARM

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
static

Definition at line 54 of file DQMNet.h.

◆ DQM_PROP_REPORT_CLEAR

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000
static

Definition at line 50 of file DQMNet.h.

◆ DQM_PROP_REPORT_ERROR

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100
static

◆ DQM_PROP_REPORT_MASK

const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00
static

Definition at line 49 of file DQMNet.h.

◆ DQM_PROP_REPORT_OTHER

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400
static

◆ DQM_PROP_REPORT_WARN

const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200
static

◆ DQM_PROP_RESET

const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000
static

Definition at line 59 of file DQMNet.h.

◆ DQM_PROP_STALE

const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000
static

Definition at line 65 of file DQMNet.h.

Referenced by run().

◆ DQM_PROP_TAGGED

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000
static

Definition at line 57 of file DQMNet.h.

◆ DQM_PROP_TYPE_DATABLOB

const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050
static

Definition at line 47 of file DQMNet.h.

◆ DQM_PROP_TYPE_INT

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001
static

Definition at line 31 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_INVALID

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000
static

Definition at line 30 of file DQMNet.h.

◆ DQM_PROP_TYPE_MASK

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff
static

◆ DQM_PROP_TYPE_REAL

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002
static

Definition at line 32 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_SCALAR

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f
static

Definition at line 29 of file DQMNet.h.

◆ DQM_PROP_TYPE_STRING

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003
static

Definition at line 33 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1D

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012
static

Definition at line 36 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1F

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010
static

◆ DQM_PROP_TYPE_TH1I

const uint32_t DQMNet::DQM_PROP_TYPE_TH1I = 0x00000013
static

Definition at line 37 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1S

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011
static

Definition at line 35 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2D

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022
static

Definition at line 40 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2F

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020
static

◆ DQM_PROP_TYPE_TH2I

const uint32_t DQMNet::DQM_PROP_TYPE_TH2I = 0x00000023
static

Definition at line 41 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2S

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021
static

Definition at line 39 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH3D

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032
static

Definition at line 44 of file DQMNet.h.

◆ DQM_PROP_TYPE_TH3F

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030
static

Definition at line 42 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH3S

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031
static

Definition at line 43 of file DQMNet.h.

◆ DQM_PROP_TYPE_TPROF

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040
static

Definition at line 45 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TPROF2D

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041
static

Definition at line 46 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_REPLY_LIST_BEGIN

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101
static

Definition at line 74 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ DQM_REPLY_LIST_END

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102
static

Definition at line 75 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ DQM_REPLY_NONE

const uint32_t DQMNet::DQM_REPLY_NONE = 103
static

Definition at line 76 of file DQMNet.h.

◆ DQM_REPLY_OBJECT

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104
static

Definition at line 77 of file DQMNet.h.

◆ flush_

bool DQMNet::flush_
private

Definition at line 372 of file DQMNet.h.

Referenced by run().

◆ lock_

pthread_mutex_t DQMNet::lock_
protected

Definition at line 342 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

◆ MAX_PEER_WAITREQS

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128
static

Definition at line 79 of file DQMNet.h.

◆ pid_

int DQMNet::pid_
private

Definition at line 355 of file DQMNet.h.

◆ sel_

lat::IOSelector DQMNet::sel_
private

Definition at line 357 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

◆ server_

lat::Socket* DQMNet::server_
private

Definition at line 358 of file DQMNet.h.

Referenced by startLocalServer().

◆ shutdown_

sig_atomic_t DQMNet::shutdown_
private

Definition at line 367 of file DQMNet.h.

Referenced by shutdown().

◆ upstream_

AutoPeer DQMNet::upstream_
private

Definition at line 362 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

◆ version_

lat::Time DQMNet::version_
private

Definition at line 360 of file DQMNet.h.

◆ waiting_

WaitList DQMNet::waiting_
private

Definition at line 364 of file DQMNet.h.

Referenced by run().

◆ waitMax_

lat::TimeSpan DQMNet::waitMax_
private

Definition at line 371 of file DQMNet.h.

Referenced by run().

◆ waitStale_

lat::TimeSpan DQMNet::waitStale_
private

Definition at line 370 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

◆ wakeup_

lat::Pipe DQMNet::wakeup_
private

Definition at line 359 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().