CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes
DQMNet Class Referenceabstract

#include <DQMNet.h>

Inheritance diagram for DQMNet:
DQMImplNet< ObjType > DQMImplNet< DQMNet::Object > DQMBasicNet

Classes

struct  AutoPeer
 
struct  Bucket
 
struct  CoreObject
 
struct  HashEqual
 
struct  HashOp
 
struct  Object
 
struct  Peer
 
struct  WaitObject
 

Public Types

using DataBlob = std::vector< unsigned char >
 
using QReports = std::vector< QValue >
 
using QValue = MonitorElementData::QReport::QValue
 
using TagList = std::vector< uint32_t >
 
using WaitList = std::list< WaitObject >
 

Public Member Functions

void debug (bool doit)
 
void delay (int delay)
 
 DQMNet (const std::string &appname="")
 
 DQMNet (const DQMNet &)=delete
 
void listenToCollector (const std::string &host, int port)
 
void lock ()
 Acquire a lock on the DQM net layer. More...
 
DQMNetoperator= (const DQMNet &)=delete
 
void run ()
 
void sendLocalChanges ()
 
void shutdown ()
 Stop the network layer and wait it to finish. More...
 
void staleObjectWaitLimit (lat::TimeSpan time)
 
void start ()
 
void startLocalServer (int port)
 
void startLocalServer (const char *path)
 
void unlock ()
 Release the lock on the DQM net layer. More...
 
void updateToCollector (const std::string &host, int port)
 
virtual ~DQMNet ()
 

Static Public Member Functions

static size_t dqmhash (const void *key, size_t keylen)
 
static void packQualityData (std::string &into, const QReports &qr)
 
static bool setOrder (const CoreObject &a, const CoreObject &b)
 
static void unpackQualityData (QReports &qr, uint32_t &flags, const char *from)
 

Static Public Attributes

static const uint32_t DQM_MSG_GET_OBJECT = 3
 
static const uint32_t DQM_MSG_HELLO = 0
 
static const uint32_t DQM_MSG_LIST_OBJECTS = 2
 
static const uint32_t DQM_MSG_UPDATE_ME = 1
 
static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000
 
static const uint32_t DQM_PROP_DEAD = 0x00080000
 
static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000
 
static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000
 
static const uint32_t DQM_PROP_LUMI = 0x00040000
 
static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000
 
static const uint32_t DQM_PROP_NEW = 0x00010000
 
static const uint32_t DQM_PROP_RECEIVED = 0x00020000
 
static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
 
static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000
 
static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100
 
static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00
 
static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400
 
static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200
 
static const uint32_t DQM_PROP_RESET = 0x00008000
 
static const uint32_t DQM_PROP_STALE = 0x00100000
 
static const uint32_t DQM_PROP_TAGGED = 0x00002000
 
static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050
 
static const uint32_t DQM_PROP_TYPE_INT = 0x00000001
 
static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000
 
static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff
 
static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002
 
static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f
 
static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003
 
static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012
 
static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010
 
static const uint32_t DQM_PROP_TYPE_TH1I = 0x00000013
 
static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011
 
static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022
 
static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020
 
static const uint32_t DQM_PROP_TYPE_TH2I = 0x00000023
 
static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021
 
static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032
 
static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030
 
static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031
 
static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040
 
static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041
 
static const uint32_t DQM_REPLY_LIST_BEGIN = 101
 
static const uint32_t DQM_REPLY_LIST_END = 102
 
static const uint32_t DQM_REPLY_NONE = 103
 
static const uint32_t DQM_REPLY_OBJECT = 104
 
static const uint32_t MAX_PEER_WAITREQS = 128
 

Protected Member Functions

virtual PeercreatePeer (lat::Socket *s)=0
 
virtual ObjectfindObject (Peer *p, const std::string &name, Peer **owner=nullptr)=0
 
virtual PeergetPeer (lat::Socket *s)=0
 
std::ostream & logme ()
 
virtual ObjectmakeObject (Peer *p, const std::string &name)=0
 
virtual void markObjectsDead (Peer *p)=0
 
virtual bool onMessage (Bucket *msg, Peer *p, unsigned char *data, size_t len)
 
virtual void purgeDeadObjects (Peer *p)=0
 
virtual void releaseFromWait (Bucket *msg, WaitObject &w, Object *o)
 
virtual void removePeer (Peer *p, lat::Socket *s)=0
 
virtual void sendObjectListToPeer (Bucket *msg, bool all, bool clear)=0
 
virtual void sendObjectListToPeers (bool all)=0
 
virtual void sendObjectToPeer (Bucket *msg, Object &o, bool data)
 
virtual bool shouldStop ()
 
void updateMask (Peer *p)
 
virtual void updatePeerMasks ()=0
 
void waitForData (Peer *p, const std::string &name, const std::string &info, Peer *owner)
 

Static Protected Member Functions

static void copydata (Bucket *b, const void *data, size_t len)
 
static void discard (Bucket *&b)
 

Protected Attributes

bool debug_
 
pthread_mutex_t lock_
 

Private Member Functions

void losePeer (const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
 
bool onLocalNotify (lat::IOSelectEvent *ev)
 
bool onPeerConnect (lat::IOSelectEvent *ev)
 
bool onPeerData (lat::IOSelectEvent *ev, Peer *p)
 Handle communication to a particular client. More...
 
void releaseFromWait (WaitList::iterator i, Object *o)
 
void releaseWaiters (const std::string &name, Object *o)
 
void requestObjectData (Peer *p, const char *name, size_t len)
 Queue an object request to the data server. More...
 

Private Attributes

std::string appname_
 
pthread_t communicate_
 
int delay_
 
AutoPeer downstream_
 
bool flush_
 
int pid_
 
lat::IOSelector sel_
 
lat::Socket * server_
 
sig_atomic_t shutdown_
 
AutoPeer upstream_
 
lat::Time version_
 
WaitList waiting_
 
lat::TimeSpan waitMax_
 
lat::TimeSpan waitStale_
 
lat::Pipe wakeup_
 

Detailed Description

Definition at line 25 of file DQMNet.h.

Member Typedef Documentation

◆ DataBlob

using DQMNet::DataBlob = std::vector<unsigned char>

Definition at line 84 of file DQMNet.h.

◆ QReports

using DQMNet::QReports = std::vector<QValue>

Definition at line 85 of file DQMNet.h.

◆ QValue

Definition at line 83 of file DQMNet.h.

◆ TagList

using DQMNet::TagList = std::vector<uint32_t>

Definition at line 86 of file DQMNet.h.

◆ WaitList

using DQMNet::WaitList = std::list<WaitObject>

Definition at line 87 of file DQMNet.h.

Constructor & Destructor Documentation

◆ DQMNet() [1/2]

DQMNet::DQMNet ( const std::string &  appname = "")

Definition at line 910 of file DQMNet.cc.

References downstream_, edm::storage::IORead, DQMNet::AutoPeer::next, O_NONBLOCK, onLocalNotify(), DQMNet::AutoPeer::peer, DQMNet::AutoPeer::port, sel_, DQMNet::AutoPeer::update, upstream_, and wakeup_.

911  : debug_(false),
912  appname_(appname.empty() ? "DQMNet" : appname.c_str()),
913  pid_(getpid()),
914  server_(nullptr),
915  version_(Time::current()),
916  communicate_((pthread_t)-1),
917  shutdown_(0),
918  delay_(1000),
919  waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
920  waitMax_(0, 0, 0, 5 /* seconds */, 0),
921  flush_(false) {
922  // Create a pipe for the local DQM to tell the communicator
923  // thread that local DQM data has changed and that the peers
924  // should be notified.
925  fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
926  sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
927 
928  // Initialise the upstream and downstream to empty.
929  upstream_.peer = downstream_.peer = nullptr;
933 }
AutoPeer downstream_
Definition: DQMNet.h:362
bool onLocalNotify(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:860
pthread_t communicate_
Definition: DQMNet.h:365
lat::Time version_
Definition: DQMNet.h:359
int delay_
Definition: DQMNet.h:368
AutoPeer upstream_
Definition: DQMNet.h:361
int pid_
Definition: DQMNet.h:354
std::string appname_
Definition: DQMNet.h:353
Peer * peer
Definition: DQMNet.h:140
sig_atomic_t shutdown_
Definition: DQMNet.h:366
lat::Time next
Definition: DQMNet.h:141
lat::Pipe wakeup_
Definition: DQMNet.h:358
bool debug_
Definition: DQMNet.h:340
bool flush_
Definition: DQMNet.h:371
lat::Socket * server_
Definition: DQMNet.h:357
#define O_NONBLOCK
Definition: SysFile.h:23
lat::TimeSpan waitMax_
Definition: DQMNet.h:370
lat::TimeSpan waitStale_
Definition: DQMNet.h:369
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ ~DQMNet()

DQMNet::~DQMNet ( )
virtual

Definition at line 935 of file DQMNet.cc.

935  {
936  // FIXME
937 }

◆ DQMNet() [2/2]

DQMNet::DQMNet ( const DQMNet )
delete

Member Function Documentation

◆ copydata()

void DQMNet::copydata ( Bucket b,
const void *  data,
size_t  len 
)
staticprotected

Definition at line 53 of file DQMNet.cc.

References b, and data.

Referenced by run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

53  {
54  b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
55 }
double b
Definition: hdecay.h:120
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80

◆ createPeer()

virtual Peer* DQMNet::createPeer ( lat::Socket *  s)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ debug()

void DQMNet::debug ( bool  doit)

Enable or disable verbose debugging. Must be called before calling run() or start().

Definition at line 941 of file DQMNet.cc.

References debug_.

Referenced by DQMService::DQMService().

941 { debug_ = doit; }
bool debug_
Definition: DQMNet.h:340

◆ delay()

void DQMNet::delay ( int  delay)

Set the I/O dispatching delay. Must be called before calling run() or start().

Definition at line 945 of file DQMNet.cc.

References delay_.

945 { delay_ = delay; }
int delay_
Definition: DQMNet.h:368
void delay(int delay)
Definition: DQMNet.cc:945

◆ discard()

void DQMNet::discard ( Bucket *&  b)
staticprotected

Definition at line 58 of file DQMNet.cc.

References b, and GetRecoTauVFromDQM_MC_cff::next.

Referenced by OrderedSet.OrderedSet::pop().

58  {
59  while (b) {
60  Bucket *next = b->next;
61  delete b;
62  b = next;
63  }
64 }
double b
Definition: hdecay.h:120

◆ dqmhash()

static size_t DQMNet::dqmhash ( const void *  key,
size_t  keylen 
)
inlinestatic

Definition at line 195 of file DQMNet.h.

References a, b, HltBtagPostValidation_cff::c, dqmhashfinal, dqmhashmix, dqmdumpme::k, and submitPVResolutionJobs::key.

Referenced by DQMImplNet< DQMNet::Object >::findObject(), DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::makeObject().

195  {
196  // Reduced version of Bob Jenkins' hash function at:
197  // http://www.burtleburtle.net/bob/c/lookup3.c
198 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
199 #define dqmhashmix(a, b, c) \
200  { \
201  a -= c; \
202  a ^= dqmhashrot(c, 4); \
203  c += b; \
204  b -= a; \
205  b ^= dqmhashrot(a, 6); \
206  a += c; \
207  c -= b; \
208  c ^= dqmhashrot(b, 8); \
209  b += a; \
210  a -= c; \
211  a ^= dqmhashrot(c, 16); \
212  c += b; \
213  b -= a; \
214  b ^= dqmhashrot(a, 19); \
215  a += c; \
216  c -= b; \
217  c ^= dqmhashrot(b, 4); \
218  b += a; \
219  }
220 #define dqmhashfinal(a, b, c) \
221  { \
222  c ^= b; \
223  c -= dqmhashrot(b, 14); \
224  a ^= c; \
225  a -= dqmhashrot(c, 11); \
226  b ^= a; \
227  b -= dqmhashrot(a, 25); \
228  c ^= b; \
229  c -= dqmhashrot(b, 16); \
230  a ^= c; \
231  a -= dqmhashrot(c, 4); \
232  b ^= a; \
233  b -= dqmhashrot(a, 14); \
234  c ^= b; \
235  c -= dqmhashrot(b, 24); \
236  }
237 
238  uint32_t a, b, c;
239  a = b = c = 0xdeadbeef + (uint32_t)keylen;
240  const auto *k = (const unsigned char *)key;
241 
242  // all but the last block: affect some bits of (a, b, c)
243  while (keylen > 12) {
244  a += k[0];
245  a += ((uint32_t)k[1]) << 8;
246  a += ((uint32_t)k[2]) << 16;
247  a += ((uint32_t)k[3]) << 24;
248  b += k[4];
249  b += ((uint32_t)k[5]) << 8;
250  b += ((uint32_t)k[6]) << 16;
251  b += ((uint32_t)k[7]) << 24;
252  c += k[8];
253  c += ((uint32_t)k[9]) << 8;
254  c += ((uint32_t)k[10]) << 16;
255  c += ((uint32_t)k[11]) << 24;
256  dqmhashmix(a, b, c);
257  keylen -= 12;
258  k += 12;
259  }
260 
261  // last block: affect all 32 bits of (c); all case statements fall through
262  switch (keylen) {
263  case 12:
264  c += ((uint32_t)k[11]) << 24;
265  [[fallthrough]];
266  case 11:
267  c += ((uint32_t)k[10]) << 16;
268  [[fallthrough]];
269  case 10:
270  c += ((uint32_t)k[9]) << 8;
271  [[fallthrough]];
272  case 9:
273  c += k[8];
274  [[fallthrough]];
275  case 8:
276  b += ((uint32_t)k[7]) << 24;
277  [[fallthrough]];
278  case 7:
279  b += ((uint32_t)k[6]) << 16;
280  [[fallthrough]];
281  case 6:
282  b += ((uint32_t)k[5]) << 8;
283  [[fallthrough]];
284  case 5:
285  b += k[4];
286  [[fallthrough]];
287  case 4:
288  a += ((uint32_t)k[3]) << 24;
289  [[fallthrough]];
290  case 3:
291  a += ((uint32_t)k[2]) << 16;
292  [[fallthrough]];
293  case 2:
294  a += ((uint32_t)k[1]) << 8;
295  [[fallthrough]];
296  case 1:
297  a += k[0];
298  break;
299  case 0:
300  return c;
301  }
302 
303  dqmhashfinal(a, b, c);
304  return c;
305 #undef dqmhashrot
306 #undef dqmhashmix
307 #undef dqmhashfinal
308  }
#define dqmhashmix(a, b, c)
key
prepare the HTCondor submission files and eventually submit them
double b
Definition: hdecay.h:120
#define dqmhashfinal(a, b, c)
double a
Definition: hdecay.h:121

◆ findObject()

virtual Object* DQMNet::findObject ( Peer p,
const std::string &  name,
Peer **  owner = nullptr 
)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ getPeer()

virtual Peer* DQMNet::getPeer ( lat::Socket *  s)
protectedpure virtual

◆ listenToCollector()

void DQMNet::listenToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically receive updates from upstream DQM sources. Must be called before calling run() or start().

Definition at line 1029 of file DQMNet.cc.

References query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, DQMNet::AutoPeer::update, and upstream_.

1029  {
1030  if (!upstream_.host.empty()) {
1031  logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1032  << std::endl;
1033  return;
1034  }
1035 
1036  upstream_.update = false;
1037  upstream_.host = host;
1038  upstream_.port = port;
1039 }
string host
Definition: query.py:115
std::ostream & logme()
Definition: DQMNet.cc:46
AutoPeer upstream_
Definition: DQMNet.h:361
int port
Definition: query.py:116
std::string host
Definition: DQMNet.h:142

◆ lock()

void DQMNet::lock ( )

Acquire a lock on the DQM net layer.

Definition at line 1062 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

1062  {
1063  if (communicate_ != (pthread_t)-1)
1064  pthread_mutex_lock(&lock_);
1065 }
pthread_t communicate_
Definition: DQMNet.h:365
pthread_mutex_t lock_
Definition: DQMNet.h:341

◆ logme()

std::ostream & DQMNet::logme ( )
protected

Definition at line 46 of file DQMNet.cc.

References gather_cfg::cout, submitPVValidationJobs::now, and RecoSummaryTask_cfi::Time.

Referenced by listenToCollector(), run(), DQMImplNet< DQMNet::Object >::sendObjectListToPeers(), start(), startLocalServer(), and updateToCollector().

46  {
47  Time now = Time::current();
48  return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
49  << "]: ";
50 }
int pid_
Definition: DQMNet.h:354
std::string appname_
Definition: DQMNet.h:353

◆ losePeer()

void DQMNet::losePeer ( const char *  reason,
Peer peer,
lat::IOSelectEvent *  event,
lat::Error *  err = nullptr 
)
private

Handle errors with a peer socket. Zaps the socket send queue, the socket itself, detaches the socket from the selector, and purges any pending wait requests linked to the socket.

Definition at line 70 of file DQMNet.cc.

References DQMNet::Peer::automatic, MillePedeFileConverter_cfg::e, submitPVResolutionJobs::err, makeMEIFBenchmarkPlots::ev, mps_fire::i, fileCollector::logme(), DQMNet::AutoPeer::peer, DQMNet::Peer::peeraddr, PixelMapPlotter::reason, alignCSCRings::s, DQMNet::Peer::sendq, DQMNet::Peer::socket, and AlCaHLTBitMon_QueryRunRegistry::string.

70  {
71  if (reason)
72  logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
73 
74  Socket *s = peer->socket;
75 
76  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
77  if (i->peer == peer)
78  waiting_.erase(i++);
79  else
80  ++i;
81 
82  if (ev)
83  ev->source = nullptr;
84 
85  discard(peer->sendq);
86  if (peer->automatic)
87  peer->automatic->peer = nullptr;
88 
89  sel_.detach(s);
90  s->close();
91  removePeer(peer, s);
92  delete s;
93 }
std::ostream & logme()
Definition: DQMNet.cc:46
static void discard(Bucket *&b)
Definition: DQMNet.cc:58
virtual void removePeer(Peer *p, lat::Socket *s)=0
WaitList waiting_
Definition: DQMNet.h:363
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ makeObject()

virtual Object* DQMNet::makeObject ( Peer p,
const std::string &  name 
)
protectedpure virtual

◆ markObjectsDead()

virtual void DQMNet::markObjectsDead ( Peer p)
protectedpure virtual

◆ onLocalNotify()

bool DQMNet::onLocalNotify ( lat::IOSelectEvent *  ev)
private

React to notifications from the DQM thread. This is a simple message to tell this thread to wake up and send unsollicited updates to the peers when new DQM data appears. We don't send the updates here, but just set a flag to tell the main event pump to send a notification later. This avoids sending unnecessarily frequent DQM object updates.

Definition at line 860 of file DQMNet.cc.

References visDQMUpload::buf, MillePedeFileConverter_cfg::e, makeMEIFBenchmarkPlots::ev, fileCollector::logme(), and GetRecoTauVFromDQM_MC_cff::next.

Referenced by DQMNet().

860  {
861  // Discard the data in the pipe, we care only about the wakeup.
862  try {
863  IOSize sz;
864  unsigned char buf[1024];
865  while ((sz = ev->source->read(buf, sizeof(buf))))
866  ;
867  } catch (Error &e) {
868  auto *next = dynamic_cast<SystemError *>(e.next());
869  if (next && next->portable() == SysErr::ErrTryAgain)
870  ; // Ignore it
871  else
872  logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
873  }
874 
875  // Tell the main event pump to send an update in a little while.
876  flush_ = true;
877 
878  // We are never done, always keep going.
879  return false;
880 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:46
size_t IOSize
Definition: IOTypes.h:15
bool flush_
Definition: DQMNet.h:371

◆ onMessage()

bool DQMNet::onMessage ( Bucket msg,
Peer p,
unsigned char *  data,
size_t  len 
)
protectedvirtual

Definition at line 437 of file DQMNet.cc.

References cms::cuda::assert(), data, HLT_2024v12_cff::flags, ALPAKA_ACCELERATOR_NAMESPACE::caPixelDoublets::if(), fileCollector::logme(), mps_check::msg, Skims_PA_cff::name, EcalTangentSkim_cfg::o, AlCaHLTBitMon_ParallelJobs::p, and AlCaHLTBitMon_QueryRunRegistry::string.

437  {
438  // Decode and process this message.
439  uint32_t type;
440  memcpy(&type, data + sizeof(uint32_t), sizeof(type));
441  switch (type) {
442  case DQM_MSG_UPDATE_ME: {
443  if (len != 2 * sizeof(uint32_t)) {
444  logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
445  return false;
446  }
447 
448  if (debug_)
449  logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
450 
451  p->update = true;
452  }
453  return true;
454 
455  case DQM_MSG_LIST_OBJECTS: {
456  if (debug_)
457  logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
458 
459  // Send over current status: list of known objects.
460  sendObjectListToPeer(msg, true, false);
461  }
462  return true;
463 
464  case DQM_MSG_GET_OBJECT: {
465  if (debug_)
466  logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
467 
468  if (len < 3 * sizeof(uint32_t)) {
469  logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
470  return false;
471  }
472 
473  uint32_t namelen;
474  memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
475  if (len != 3 * sizeof(uint32_t) + namelen) {
476  logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
477  << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
478  return false;
479  }
480 
481  std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
482  Peer *owner = nullptr;
483  Object *o = findObject(nullptr, name, &owner);
484  if (o) {
485  o->lastreq = Time::current().ns();
486  if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
488  waitForData(p, name, "", owner);
489  else
490  sendObjectToPeer(msg, *o, true);
491  } else {
492  uint32_t words[3];
493  words[0] = sizeof(words) + name.size();
494  words[1] = DQM_REPLY_NONE;
495  words[2] = name.size();
496 
497  msg->data.reserve(msg->data.size() + words[0]);
498  copydata(msg, &words[0], sizeof(words));
499  copydata(msg, &name[0], name.size());
500  }
501  }
502  return true;
503 
504  case DQM_REPLY_LIST_BEGIN: {
505  if (len != 4 * sizeof(uint32_t)) {
506  logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
507  return false;
508  }
509 
510  // Get the update status: whether this is a full update.
511  uint32_t flags;
512  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
513 
514  if (debug_)
515  logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
516  << p->peeraddr << ", size " << len << std::endl;
517 
518  // If we are about to receive a full list of objects, flag all
519  // objects as possibly dead. Subsequent object notifications
520  // will undo this for the live objects. We cannot delete
521  // objects quite yet, as we may get inquiry from another client
522  // while we are processing the incoming list, so we keep the
523  // objects tentatively alive as long as we've not seen the end.
524  if (flags)
526  }
527  return true;
528 
529  case DQM_REPLY_LIST_END: {
530  if (len != 4 * sizeof(uint32_t)) {
531  logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
532  return false;
533  }
534 
535  // Get the update status: whether this is a full update.
536  uint32_t flags;
537  memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
538 
539  // If we received a full list of objects, now purge all dead
540  // objects. We need to do this in two stages in case we receive
541  // updates in many parts, and end up sending updates to others in
542  // between; this avoids us lying live objects are dead.
543  if (flags)
545 
546  if (debug_)
547  logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
548  << ", size " << len << std::endl;
549 
550  // Indicate we have received another update from this peer.
551  // Also indicate we should flush to our clients.
552  flush_ = true;
553  p->updates++;
554  }
555  return true;
556 
557  case DQM_REPLY_OBJECT: {
558  uint32_t words[9];
559  if (len < sizeof(words)) {
560  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
561  return false;
562  }
563 
564  memcpy(&words[0], data, sizeof(words));
565  uint32_t &namelen = words[6];
566  uint32_t &datalen = words[7];
567  uint32_t &qlen = words[8];
568 
569  if (len != sizeof(words) + namelen + datalen + qlen) {
570  logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
571  << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
572  << std::endl;
573  return false;
574  }
575 
576  unsigned char *namedata = data + sizeof(words);
577  unsigned char *objdata = namedata + namelen;
578  unsigned char *qdata = objdata + datalen;
579  unsigned char *enddata = qdata + qlen;
580  std::string name((char *)namedata, namelen);
581  assert(enddata == data + len);
582 
583  if (debug_)
584  logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
585  << std::endl;
586 
587  // Mark the peer as a known object source.
588  p->source = true;
589 
590  // Initialise or update an object entry.
591  Object *o = findObject(p, name);
592  if (!o)
593  o = makeObject(p, name);
594 
595  o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
596  o->tag = words[5];
597  o->version = ((uint64_t)words[4] << 32 | words[3]);
598  o->scalar.clear();
599  o->qdata.clear();
600  if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
601  o->rawdata.clear();
602  o->scalar.insert(o->scalar.end(), objdata, qdata);
603  } else if (datalen) {
604  o->rawdata.clear();
605  o->rawdata.insert(o->rawdata.end(), objdata, qdata);
606  } else if (!o->rawdata.empty())
607  o->flags |= DQM_PROP_STALE;
608  o->qdata.insert(o->qdata.end(), qdata, enddata);
609 
610  // If we had an object for this one already and this is a list
611  // update without data, issue an immediate data get request.
612  if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
613  requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
614 
615  // If we have the object data, release from wait.
616  if (datalen)
618  }
619  return true;
620 
621  case DQM_REPLY_NONE: {
622  uint32_t words[3];
623  if (len < sizeof(words)) {
624  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
625  return false;
626  }
627 
628  memcpy(&words[0], data, sizeof(words));
629  uint32_t &namelen = words[2];
630 
631  if (len != sizeof(words) + namelen) {
632  logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
633  << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
634  return false;
635  }
636 
637  unsigned char *namedata = data + sizeof(words);
638  std::string name((char *)namedata, namelen);
639 
640  if (debug_)
641  logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
642  << std::endl;
643 
644  // Mark the peer as a known object source.
645  p->source = true;
646 
647  // If this was a known object, kill it.
648  if (Object *o = findObject(p, name)) {
649  o->flags |= DQM_PROP_DEAD;
651  }
652 
653  // If someone was waiting for this, let them go.
654  releaseWaiters(name, nullptr);
655  }
656  return true;
657 
658  default:
659  logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
660  << std::endl;
661  return false;
662  }
663 }
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:69
static const uint32_t DQM_REPLY_LIST_END
Definition: DQMNet.h:74
std::ostream & logme()
Definition: DQMNet.cc:46
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:28
void releaseWaiters(const std::string &name, Object *o)
Definition: DQMNet.cc:143
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:63
assert(be >=bs)
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
Definition: DQMNet.cc:114
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:96
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual void markObjectsDead(Peer *p)=0
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:76
virtual Object * makeObject(Peer *p, const std::string &name)=0
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:394
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:71
unsigned long long uint64_t
Definition: Time.h:13
tuple msg
Definition: mps_check.py:286
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:64
static const uint32_t DQM_REPLY_LIST_BEGIN
Definition: DQMNet.h:73
virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear)=0
bool debug_
Definition: DQMNet.h:340
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:75
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:70
bool flush_
Definition: DQMNet.h:371
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
static const uint32_t DQM_PROP_NEW
Definition: DQMNet.h:60
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:27
virtual void purgeDeadObjects(Peer *p)=0
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
static const uint32_t DQM_PROP_RECEIVED
Definition: DQMNet.h:61

◆ onPeerConnect()

bool DQMNet::onPeerConnect ( lat::IOSelectEvent *  ev)
private

Respond to new connections on the server socket. Accepts the connection and creates a new socket for the peer, and sets it up for further communication. Returns false always to tell the IOSelector to keep processing events for the server socket.

Definition at line 815 of file DQMNet.cc.

References cms::cuda::assert(), makeMEIFBenchmarkPlots::ev, dqm-mbProfile::format, edm::storage::IORead, edm::storage::IOUrgent, DTRecHitClients_cfi::local, CommonMethods::lock(), fileCollector::logme(), onPeerData(), AlCaHLTBitMon_ParallelJobs::p, alignCSCRings::s, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by startLocalServer().

815  {
816  // Recover the server socket.
817  assert(ev->source == server_);
818 
819  // Accept the connection.
820  Socket *s = server_->accept();
821  assert(s);
822  assert(!s->isBlocking());
823 
824  // Record it to our list of peers.
825  lock();
826  Peer *p = createPeer(s);
827  std::string localaddr;
828  if (auto *inet = dynamic_cast<InetSocket *>(s)) {
829  InetAddress peeraddr = inet->peername();
830  InetAddress myaddr = inet->sockname();
831  p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
832  localaddr = fmt::format("{}:{}", myaddr.hostname(), myaddr.port());
833  } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
834  p->peeraddr = local->peername().path();
835  localaddr = local->sockname().path();
836  } else
837  assert(false);
838 
839  p->mask = IORead | IOUrgent;
840  p->socket = s;
841 
842  // Report the new connection.
843  if (debug_)
844  logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
845 
846  // Attach it to the listener.
847  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
848  unlock();
849 
850  // We are never done.
851  return false;
852 }
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1062
std::ostream & logme()
Definition: DQMNet.cc:46
virtual Peer * createPeer(lat::Socket *s)=0
assert(be >=bs)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:667
bool debug_
Definition: DQMNet.h:340
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1068
lat::Socket * server_
Definition: DQMNet.h:357
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ onPeerData()

bool DQMNet::onPeerData ( lat::IOSelectEvent *  ev,
Peer p 
)
private

Handle communication to a particular client.

Definition at line 667 of file DQMNet.cc.

References cms::cuda::assert(), b, visDQMUpload::buf, data, DQMNet::Bucket::data, fileCollector::done, MillePedeFileConverter_cfg::e, makeMEIFBenchmarkPlots::ev, edm::storage::IORead, edm::storage::IOUrgent, edm::storage::IOWrite, CommonMethods::lock(), fileCollector::logme(), MESSAGE_SIZE_LIMIT, mps_check::msg, GetRecoTauVFromDQM_MC_cff::next, DQMNet::Bucket::next, AlCaHLTBitMon_ParallelJobs::p, SOCKET_READ_GROWTH, SOCKET_READ_SIZE, and validateGeometry_cfg::valid.

Referenced by onPeerConnect(), and run().

667  {
668  lock();
669  assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
670 
671  // If there is a problem with the peer socket, discard the peer
672  // and tell the selector to stop prcessing events for it. If
673  // this is a server connection, we will eventually recreate
674  // everything if/when the data server comes back.
675  if (ev->events & IOUrgent) {
676  if (p->automatic) {
677  logme() << "WARNING: connection to the DQM server at " << p->peeraddr
678  << " lost (will attempt to reconnect in 15 seconds)\n";
679  losePeer(nullptr, p, ev);
680  } else
681  losePeer("WARNING: lost peer connection ", p, ev);
682 
683  unlock();
684  return true;
685  }
686 
687  // If we can write to the peer socket, pump whatever we can into it.
688  if (ev->events & IOWrite) {
689  while (Bucket *b = p->sendq) {
690  IOSize len = b->data.size() - p->sendpos;
691  const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
692  IOSize done;
693 
694  try {
695  done = (len ? ev->source->write(data, len) : 0);
696  if (debug_ && len)
697  logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
698  } catch (Error &e) {
699  losePeer("WARNING: unable to write to peer ", p, ev, &e);
700  unlock();
701  return true;
702  }
703 
704  p->sendpos += done;
705  if (p->sendpos == b->data.size()) {
706  Bucket *old = p->sendq;
707  p->sendq = old->next;
708  p->sendpos = 0;
709  old->next = nullptr;
710  discard(old);
711  }
712 
713  if (!done && len)
714  // Cannot write any more.
715  break;
716  }
717  }
718 
719  // If there is data to be read from the peer, first receive what we
720  // can get out the socket, the process all complete requests.
721  if (ev->events & IORead) {
722  // First build up the incoming buffer of data in the socket.
723  // Remember the last size returned by the socket; we need
724  // it to determine if the remote end closed the connection.
725  IOSize sz;
726  try {
727  std::vector<unsigned char> buf(SOCKET_READ_SIZE);
728  do
729  if ((sz = ev->source->read(&buf[0], buf.size()))) {
730  if (debug_)
731  logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
732  DataBlob &data = p->incoming;
733  if (data.capacity() < data.size() + sz)
734  data.reserve(data.size() + SOCKET_READ_GROWTH);
735  data.insert(data.end(), &buf[0], &buf[0] + sz);
736  }
737  while (sz == sizeof(buf));
738  } catch (Error &e) {
739  auto *next = dynamic_cast<SystemError *>(e.next());
740  if (next && next->portable() == SysErr::ErrTryAgain)
741  sz = 1; // Ignore it, and fake no end of data.
742  else {
743  // Houston we have a problem.
744  losePeer("WARNING: failed to read from peer ", p, ev, &e);
745  unlock();
746  return true;
747  }
748  }
749 
750  // Process fully received messages as long as we can.
751  size_t consumed = 0;
752  DataBlob &data = p->incoming;
753  while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
754  uint32_t msglen;
755  memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
756 
757  if (msglen >= MESSAGE_SIZE_LIMIT) {
758  losePeer("WARNING: excessively large message from ", p, ev);
759  unlock();
760  return true;
761  }
762 
763  if (data.size() - consumed >= msglen) {
764  bool valid = true;
765  if (msglen < 2 * sizeof(uint32_t)) {
766  logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
767  valid = false;
768  } else {
769  // Decode and process this message.
770  Bucket msg;
771  msg.next = nullptr;
772  valid = onMessage(&msg, p, &data[0] + consumed, msglen);
773 
774  // If we created a response, chain it to the write queue.
775  if (!msg.data.empty()) {
776  Bucket **prev = &p->sendq;
777  while (*prev)
778  prev = &(*prev)->next;
779 
780  *prev = new Bucket;
781  (*prev)->next = nullptr;
782  (*prev)->data.swap(msg.data);
783  }
784  }
785 
786  if (!valid) {
787  losePeer("WARNING: data stream error with ", p, ev);
788  unlock();
789  return true;
790  }
791 
792  consumed += msglen;
793  } else
794  break;
795  }
796 
797  data.erase(data.begin(), data.begin() + consumed);
798 
799  // If the client has closed the connection, shut down our end. If
800  // we have something to send back still, leave the write direction
801  // open. Otherwise close the shop for this client.
802  if (sz == 0)
803  sel_.setMask(p->socket, p->mask &= ~IORead);
804  }
805 
806  // Yes, please keep processing events for this socket.
807  unlock();
808  return false;
809 }
edm::ErrorSummaryEntry Error
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1062
std::ostream & logme()
Definition: DQMNet.cc:46
virtual Peer * getPeer(lat::Socket *s)=0
#define MESSAGE_SIZE_LIMIT
Definition: DQMNet.cc:25
#define SOCKET_READ_SIZE
Definition: DQMNet.cc:28
static void discard(Bucket *&b)
Definition: DQMNet.cc:58
assert(be >=bs)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
Definition: DQMNet.cc:437
#define SOCKET_READ_GROWTH
Definition: DQMNet.cc:29
static const uint32_t MAX_PEER_WAITREQS
Definition: DQMNet.h:78
size_t IOSize
Definition: IOTypes.h:15
double b
Definition: hdecay.h:120
tuple msg
Definition: mps_check.py:286
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:70
bool debug_
Definition: DQMNet.h:340
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1068
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:84
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ operator=()

DQMNet& DQMNet::operator= ( const DQMNet )
delete

◆ packQualityData()

void DQMNet::packQualityData ( std::string &  into,
const QReports qr 
)
static

Pack quality results in qr into a string into for peristent storage, such as network transfer or archival.

Definition at line 154 of file DQMNet.cc.

References visDQMUpload::buf.

Referenced by DQMService::flushStandalone(), and dqm::impl::MonitorElement::packQualityData().

154  {
155  char buf[64];
156  std::ostringstream qrs;
157  QReports::const_iterator qi, qe;
158  for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
159  int pos = 0;
160  sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
161  qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
162  << '\0';
163  }
164  into = qrs.str();
165 }

◆ purgeDeadObjects()

virtual void DQMNet::purgeDeadObjects ( Peer p)
protectedpure virtual

◆ releaseFromWait() [1/2]

void DQMNet::releaseFromWait ( Bucket msg,
WaitObject w,
Object o 
)
protectedvirtual

Definition at line 376 of file DQMNet.cc.

References mps_check::msg, EcalTangentSkim_cfg::o, and w().

Referenced by run().

376  {
377  if (o)
378  sendObjectToPeer(msg, *o, true);
379  else {
380  uint32_t words[3];
381  words[0] = sizeof(words) + w.name.size();
382  words[1] = DQM_REPLY_NONE;
383  words[2] = w.name.size();
384 
385  msg->data.reserve(msg->data.size() + words[0]);
386  copydata(msg, &words[0], sizeof(words));
387  copydata(msg, &w.name[0], w.name.size());
388  }
389 }
T w() const
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
Definition: DQMNet.cc:394
tuple msg
Definition: mps_check.py:286
static const uint32_t DQM_REPLY_NONE
Definition: DQMNet.h:75
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53

◆ releaseFromWait() [2/2]

void DQMNet::releaseFromWait ( WaitList::iterator  i,
Object o 
)
private

Definition at line 128 of file DQMNet.cc.

References cms::cuda::assert(), mps_fire::i, mps_check::msg, DQMNet::Bucket::next, and EcalTangentSkim_cfg::o.

128  {
129  Bucket **msg = &i->peer->sendq;
130  while (*msg)
131  msg = &(*msg)->next;
132  *msg = new Bucket;
133  (*msg)->next = nullptr;
134 
135  releaseFromWait(*msg, *i, o);
136 
137  assert(i->peer->waiting > 0);
138  i->peer->waiting--;
139  waiting_.erase(i);
140 }
assert(be >=bs)
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:376
tuple msg
Definition: mps_check.py:286
WaitList waiting_
Definition: DQMNet.h:363

◆ releaseWaiters()

void DQMNet::releaseWaiters ( const std::string &  name,
Object o 
)
private

Definition at line 143 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, mps_fire::i, Skims_PA_cff::name, and EcalTangentSkim_cfg::o.

143  {
144  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
145  if (i->name == name)
146  releaseFromWait(i++, o);
147  else
148  ++i;
149 }
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:376
WaitList waiting_
Definition: DQMNet.h:363

◆ removePeer()

virtual void DQMNet::removePeer ( Peer p,
lat::Socket *  s 
)
protectedpure virtual

◆ requestObjectData()

void DQMNet::requestObjectData ( Peer p,
const char *  name,
size_t  len 
)
private

Queue an object request to the data server.

Definition at line 96 of file DQMNet.cc.

References mps_check::msg, Skims_PA_cff::name, DQMNet::Bucket::next, and AlCaHLTBitMon_ParallelJobs::p.

96  {
97  // Issue request to peer.
98  Bucket **msg = &p->sendq;
99  while (*msg)
100  msg = &(*msg)->next;
101  *msg = new Bucket;
102  (*msg)->next = nullptr;
103 
104  uint32_t words[3];
105  words[0] = sizeof(words) + len;
106  words[1] = DQM_MSG_GET_OBJECT;
107  words[2] = len;
108  copydata(*msg, words, sizeof(words));
109  copydata(*msg, name, len);
110 }
static const uint32_t DQM_MSG_GET_OBJECT
Definition: DQMNet.h:71
tuple msg
Definition: mps_check.py:286
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53

◆ run()

void DQMNet::run ( )

Run the actual I/O processing loop.

Definition at line 1087 of file DQMNet.cc.

References generateTowerEtThresholdLUT::addr, copydata(), createPeer(), debug_, delay_, downstream_, DQM_MSG_LIST_OBJECTS, DQM_MSG_UPDATE_ME, DQM_PROP_STALE, MillePedeFileConverter_cfg::e, findObject(), flush_, dqm-mbProfile::format, mps_fire::i, edm::storage::IORead, edm::storage::IOUrgent, edm::storage::IOWrite, lock(), logme(), DQMNet::AutoPeer::next, submitPVValidationJobs::now, EcalTangentSkim_cfg::o, onPeerData(), AlCaHLTBitMon_ParallelJobs::p, releaseFromWait(), alignCSCRings::s, sel_, sendObjectListToPeers(), shouldStop(), SOCKET_BUF_SIZE, RecoSummaryTask_cfi::Time, unlock(), updatePeerMasks(), upstream_, waiting_, waitMax_, and waitStale_.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

1087  {
1088  Time now;
1089  Time nextFlush = 0;
1090  AutoPeer *automatic[2] = {&upstream_, &downstream_};
1091 
1092  // Perform I/O. Every once in a while flush updates to peers.
1093  while (!shouldStop()) {
1094  for (auto ap : automatic) {
1095  // If we need a server connection and don't have one yet,
1096  // initiate asynchronous connection creation. Swallow errors
1097  // in case the server won't talk to us.
1098  if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1099  ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1100  InetSocket *s = nullptr;
1101  try {
1102  InetAddress addr(ap->host.c_str(), ap->port);
1103  s = new InetSocket(SOCK_STREAM, 0, addr.family());
1104  s->setBlocking(false);
1105  s->connect(addr);
1106  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1107  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1108  } catch (Error &e) {
1109  auto *sys = dynamic_cast<SystemError *>(e.next());
1110  if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1111  // "In progress" just means the connection is in progress.
1112  // The connection is ready when the socket is writeable.
1113  // Anything else is a real problem.
1114  if (s)
1115  s->abort();
1116  delete s;
1117  s = nullptr;
1118  }
1119  }
1120 
1121  // Set up with the selector if we were successful. If this is
1122  // the upstream collector, queue a request for updates.
1123  if (s) {
1124  Peer *p = createPeer(s);
1125  ap->peer = p;
1126 
1127  InetAddress peeraddr = ((InetSocket *)s)->peername();
1128  InetAddress myaddr = ((InetSocket *)s)->sockname();
1129  p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
1130  p->mask = IORead | IOWrite | IOUrgent;
1131  p->update = ap->update;
1132  p->automatic = ap;
1133  p->socket = s;
1134  sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1135  if (ap == &upstream_) {
1136  uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1137  p->sendq = new Bucket;
1138  p->sendq->next = nullptr;
1139  copydata(p->sendq, words, sizeof(words));
1140  }
1141 
1142  // Report the new connection.
1143  if (debug_)
1144  logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1145  << std::endl;
1146  }
1147  }
1148  }
1149 
1150  // Pump events for a while.
1151  sel_.dispatch(delay_);
1152  now = Time::current();
1153  lock();
1154 
1155  // Check if flush is required. Flush only if one is needed.
1156  // Always sends the full object list, but only rarely.
1157  if (flush_ && now > nextFlush) {
1158  flush_ = false;
1159  nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1160  sendObjectListToPeers(true);
1161  }
1162 
1163  // Update the data server and peer selection masks. If we
1164  // have no more data to send and listening for writes, remove
1165  // the write mask. If we have something to write and aren't
1166  // listening for writes, start listening so we can send off
1167  // the data.
1168  updatePeerMasks();
1169 
1170  // Release peers that have been waiting for data for too long.
1171  Time waitold = now - waitMax_;
1172  Time waitstale = now - waitStale_;
1173  for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1174  Object *o = findObject(nullptr, i->name);
1175 
1176  // If we have (stale) object data, wait only up to stale limit.
1177  // Otherwise if we have no data at all, wait up to the max limit.
1178  if (i->time < waitold) {
1179  logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1180  << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1181  releaseFromWait(i++, o);
1182  } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1183  logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1184  << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1185  releaseFromWait(i++, o);
1186  }
1187 
1188  // Keep it for now.
1189  else
1190  ++i;
1191  }
1192 
1193  unlock();
1194  }
1195 }
edm::ErrorSummaryEntry Error
AutoPeer downstream_
Definition: DQMNet.h:362
static const uint32_t DQM_MSG_UPDATE_ME
Definition: DQMNet.h:69
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1062
std::ostream & logme()
Definition: DQMNet.cc:46
virtual void updatePeerMasks()=0
int delay_
Definition: DQMNet.h:368
virtual Peer * createPeer(lat::Socket *s)=0
AutoPeer upstream_
Definition: DQMNet.h:361
virtual bool shouldStop()
Definition: DQMNet.cc:372
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
Definition: DQMNet.cc:667
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Definition: DQMNet.cc:376
lat::Time next
Definition: DQMNet.h:141
static const uint32_t DQM_PROP_STALE
Definition: DQMNet.h:64
bool debug_
Definition: DQMNet.h:340
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1068
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:26
static const uint32_t DQM_MSG_LIST_OBJECTS
Definition: DQMNet.h:70
bool flush_
Definition: DQMNet.h:371
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
lat::TimeSpan waitMax_
Definition: DQMNet.h:370
lat::TimeSpan waitStale_
Definition: DQMNet.h:369
WaitList waiting_
Definition: DQMNet.h:363
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ sendLocalChanges()

void DQMNet::sendLocalChanges ( )

Definition at line 1199 of file DQMNet.cc.

References wakeup_.

Referenced by DQMService::flushStandalone(), and DQMImplNet< DQMNet::Object >::removePeer().

1199  {
1200  char byte = 0;
1201  wakeup_.sink()->write(&byte, 1);
1202 }
lat::Pipe wakeup_
Definition: DQMNet.h:358

◆ sendObjectListToPeer()

virtual void DQMNet::sendObjectListToPeer ( Bucket msg,
bool  all,
bool  clear 
)
protectedpure virtual

◆ sendObjectListToPeers()

virtual void DQMNet::sendObjectListToPeers ( bool  all)
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ sendObjectToPeer()

void DQMNet::sendObjectToPeer ( Bucket msg,
Object o,
bool  data 
)
protectedvirtual

Definition at line 394 of file DQMNet.cc.

References data, HLT_2024v12_cff::flags, mps_check::msg, and EcalTangentSkim_cfg::o.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

394  {
395  uint32_t flags = o.flags & ~DQM_PROP_DEAD;
396  DataBlob objdata;
397 
399  objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
400  else if (data)
401  objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
402 
403  uint32_t words[9];
404  uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
405  uint32_t datalen = objdata.size();
406  uint32_t qlen = o.qdata.size();
407 
408  if (o.dirname.empty())
409  --namelen;
410 
411  words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
412  words[1] = DQM_REPLY_OBJECT;
413  words[2] = flags;
414  words[3] = (o.version >> 0) & 0xffffffff;
415  words[4] = (o.version >> 32) & 0xffffffff;
416  words[5] = o.tag;
417  words[6] = namelen;
418  words[7] = datalen;
419  words[8] = qlen;
420 
421  msg->data.reserve(msg->data.size() + words[0]);
422  copydata(msg, &words[0], 9 * sizeof(uint32_t));
423  if (namelen) {
424  copydata(msg, &(o.dirname)[0], o.dirname.size());
425  if (!o.dirname.empty())
426  copydata(msg, "/", 1);
427  copydata(msg, &o.objname[0], o.objname.size());
428  }
429  if (datalen)
430  copydata(msg, &objdata[0], datalen);
431  if (qlen)
432  copydata(msg, &o.qdata[0], qlen);
433 }
static const uint32_t DQM_PROP_TYPE_SCALAR
Definition: DQMNet.h:28
static const uint32_t DQM_PROP_DEAD
Definition: DQMNet.h:63
static const uint32_t DQM_REPLY_OBJECT
Definition: DQMNet.h:76
tuple msg
Definition: mps_check.py:286
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
static void copydata(Bucket *b, const void *data, size_t len)
Definition: DQMNet.cc:53
static const uint32_t DQM_PROP_TYPE_MASK
Definition: DQMNet.h:27
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:84

◆ setOrder()

static bool DQMNet::setOrder ( const CoreObject a,
const CoreObject b 
)
inlinestatic

Definition at line 166 of file DQMNet.h.

References a, and b.

Referenced by dqm::impl::MonitorElement::operator<().

166  {
167  if (a.run == b.run) {
168  if (a.lumi == b.lumi) {
169  if (a.streamId == b.streamId) {
170  if (a.moduleId == b.moduleId) {
171  if (a.dirname == b.dirname) {
172  return a.objname < b.objname;
173  }
174  return a.dirname < b.dirname;
175  }
176  return a.moduleId < b.moduleId;
177  }
178  return a.streamId < b.streamId;
179  }
180  return a.lumi < b.lumi;
181  }
182  return a.run < b.run;
183  }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121

◆ shouldStop()

bool DQMNet::shouldStop ( )
protectedvirtual

Definition at line 372 of file DQMNet.cc.

Referenced by run().

372 { return shutdown_; }
sig_atomic_t shutdown_
Definition: DQMNet.h:366

◆ shutdown()

void DQMNet::shutdown ( )

Stop the network layer and wait it to finish.

Definition at line 1042 of file DQMNet.cc.

References communicate_, and shutdown_.

Referenced by DQMService::shutdown().

1042  {
1043  shutdown_ = 1;
1044  if (communicate_ != (pthread_t)-1)
1045  pthread_join(communicate_, nullptr);
1046 }
pthread_t communicate_
Definition: DQMNet.h:365
sig_atomic_t shutdown_
Definition: DQMNet.h:366

◆ staleObjectWaitLimit()

void DQMNet::staleObjectWaitLimit ( lat::TimeSpan  time)

Set the time limit for waiting updates to stale objects. Once limit has been exhausted whatever data exists is returned. Applies only when data has been received, another time limit is applied when no data payload has been received at all.

Definition at line 951 of file DQMNet.cc.

References hcalRecHitTable_cff::time, and waitStale_.

951 { waitStale_ = time; }
lat::TimeSpan waitStale_
Definition: DQMNet.h:369

◆ start()

void DQMNet::start ( )

Start running the network layer in a new thread. This is an exclusive alternative to the run() method, which runs the network layer in the caller's thread.

Definition at line 1076 of file DQMNet.cc.

References communicate(), communicate_, lock_, and logme().

Referenced by progressbar.ProgressBar::__next__(), Types.LuminosityBlockRange::cppID(), Types.EventRange::cppID(), and DQMService::DQMService().

1076  {
1077  if (communicate_ != (pthread_t)-1) {
1078  logme() << "ERROR: DQM networking thread has already been started\n";
1079  return;
1080  }
1081 
1082  pthread_mutex_init(&lock_, nullptr);
1083  pthread_create(&communicate_, nullptr, &communicate, this);
1084 }
pthread_t communicate_
Definition: DQMNet.h:365
std::ostream & logme()
Definition: DQMNet.cc:46
pthread_mutex_t lock_
Definition: DQMNet.h:341
static void * communicate(void *obj)
Definition: DQMNet.cc:1053

◆ startLocalServer() [1/2]

void DQMNet::startLocalServer ( int  port)

Start a server socket for accessing this DQM node remotely. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 956 of file DQMNet.cc.

References generateTowerEtThresholdLUT::addr, MillePedeFileConverter_cfg::e, Exception, edm::storage::IOAccept, logme(), onPeerConnect(), query::port, alignCSCRings::s, sel_, server_, and SOCKET_BUF_SIZE.

956  {
957  if (server_) {
958  logme() << "ERROR: DQM server was already started.\n";
959  return;
960  }
961 
962  try {
963  InetAddress addr("0.0.0.0", port);
964  auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
965  s->bind(addr);
966  s->listen(10);
967  s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
968  s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
969  s->setBlocking(false);
970  sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
971  } catch (Error &e) {
972  // FIXME: Do we need to do this when we throw an exception anyway?
973  // FIXME: Abort instead?
974  logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
975 
976  throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
977 
978  port << ": " << e.explain().c_str();
979  }
980 
981  logme() << "INFO: DQM server started at port " << port << std::endl;
982 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:46
int port
Definition: query.py:116
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:815
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:26
lat::Socket * server_
Definition: DQMNet.h:357
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ startLocalServer() [2/2]

void DQMNet::startLocalServer ( const char *  path)

Start a server socket for accessing this DQM node over a file system socket. Must be called before calling run() or start(). May throw an Exception if the server socket cannot be initialised.

Definition at line 987 of file DQMNet.cc.

References MillePedeFileConverter_cfg::e, Exception, edm::storage::IOAccept, logme(), onPeerConnect(), castor_dqm_sourceclient_file_cfg::path, sel_, server_, and SOCKET_BUF_SIZE.

987  {
988  if (server_) {
989  logme() << "ERROR: DQM server was already started.\n";
990  return;
991  }
992 
993  try {
994  server_ = new LocalServerSocket(path, 10);
995  server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
996  server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
997  server_->setBlocking(false);
998  sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
999  } catch (Error &e) {
1000  // FIXME: Do we need to do this when we throw an exception anyway?
1001  // FIXME: Abort instead?
1002  logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1003 
1004  throw cms::Exception("DQMNet::startLocalServer")
1005  << "Failed to start server at path " << path << ": " << e.explain().c_str();
1006  }
1007 
1008  logme() << "INFO: DQM server started at path " << path << std::endl;
1009 }
edm::ErrorSummaryEntry Error
std::ostream & logme()
Definition: DQMNet.cc:46
bool onPeerConnect(lat::IOSelectEvent *ev)
Definition: DQMNet.cc:815
#define SOCKET_BUF_SIZE
Definition: DQMNet.cc:26
lat::Socket * server_
Definition: DQMNet.h:357
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ unlock()

void DQMNet::unlock ( )

Release the lock on the DQM net layer.

Definition at line 1068 of file DQMNet.cc.

References communicate_, and lock_.

Referenced by DQMService::flushStandalone(), and run().

1068  {
1069  if (communicate_ != (pthread_t)-1)
1070  pthread_mutex_unlock(&lock_);
1071 }
pthread_t communicate_
Definition: DQMNet.h:365
pthread_mutex_t lock_
Definition: DQMNet.h:341

◆ unpackQualityData()

void DQMNet::unpackQualityData ( QReports qr,
uint32_t &  flags,
const char *  from 
)
static

Unpack the quality results from string from into qr. Assumes the data was saved with packQualityData().

Definition at line 169 of file DQMNet.cc.

References MonitorElementData::QReport::QValue::algorithm, MonitorElementData::QReport::QValue::code, DQM_PROP_REPORT_ERROR, DQM_PROP_REPORT_OTHER, DQM_PROP_REPORT_WARN, dqm::qstatus::ERROR, HLT_2024v12_cff::flags, MonitorElementData::QReport::QValue::message, MonitorElementData::QReport::QValue::qtname, MonitorElementData::QReport::QValue::qtresult, dqm::qstatus::STATUS_OK, and dqm::qstatus::WARNING.

169  {
170  const char *qdata = from;
171 
172  // Count how many qresults there are.
173  size_t nqv = 0;
174  while (*qdata) {
175  ++nqv;
176  while (*qdata)
177  ++qdata;
178  ++qdata;
179  while (*qdata)
180  ++qdata;
181  ++qdata;
182  while (*qdata)
183  ++qdata;
184  ++qdata;
185  while (*qdata)
186  ++qdata;
187  ++qdata;
188  while (*qdata)
189  ++qdata;
190  ++qdata;
191  }
192 
193  // Now extract the qreports.
194  qdata = from;
195  qr.reserve(nqv);
196  while (*qdata) {
197  qr.emplace_back();
198  DQMNet::QValue &qv = qr.back();
199 
200  qv.code = atoi(qdata);
201  while (*qdata)
202  ++qdata;
203  switch (qv.code) {
205  break;
208  break;
209  case dqm::qstatus::ERROR:
211  break;
212  default:
214  break;
215  }
216 
217  qv.qtresult = atof(++qdata);
218  while (*qdata)
219  ++qdata;
220 
221  qv.qtname = ++qdata;
222  while (*qdata)
223  ++qdata;
224 
225  qv.algorithm = ++qdata;
226  while (*qdata)
227  ++qdata;
228 
229  qv.message = ++qdata;
230  while (*qdata)
231  ++qdata;
232  ++qdata;
233  }
234 }
static const uint32_t DQM_PROP_REPORT_WARN
Definition: DQMNet.h:51
static const int WARNING
static const uint32_t DQM_PROP_REPORT_ERROR
Definition: DQMNet.h:50
static const uint32_t DQM_PROP_REPORT_OTHER
Definition: DQMNet.h:52
static const int STATUS_OK
static const int ERROR

◆ updateMask()

void DQMNet::updateMask ( Peer p)
protected

Update the selector mask for a peer based on data queues. Close the connection if there is no reason to maintain it open.

Definition at line 884 of file DQMNet.cc.

References cms::cuda::assert(), edm::storage::IOUrgent, edm::storage::IOWrite, fileCollector::logme(), and AlCaHLTBitMon_ParallelJobs::p.

Referenced by DQMImplNet< DQMNet::Object >::updatePeerMasks().

884  {
885  if (!p->socket)
886  return;
887 
888  // Listen to writes iff we have data to send.
889  unsigned oldmask = p->mask;
890  if (!p->sendq && (p->mask & IOWrite))
891  sel_.setMask(p->socket, p->mask &= ~IOWrite);
892 
893  if (p->sendq && !(p->mask & IOWrite))
894  sel_.setMask(p->socket, p->mask |= IOWrite);
895 
896  if (debug_ && oldmask != p->mask)
897  logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
898 
899  // If we have nothing more to send and are no longer listening
900  // for reads, close up the shop for this peer.
901  if (p->mask == IOUrgent && !p->waiting) {
902  assert(!p->sendq);
903  if (debug_)
904  logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
905  losePeer(nullptr, p, nullptr);
906  }
907 }
std::ostream & logme()
Definition: DQMNet.cc:46
assert(be >=bs)
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
Definition: DQMNet.cc:70
bool debug_
Definition: DQMNet.h:340
lat::IOSelector sel_
Definition: DQMNet.h:356

◆ updatePeerMasks()

virtual void DQMNet::updatePeerMasks ( )
protectedpure virtual

Implemented in DQMImplNet< ObjType >, and DQMImplNet< DQMNet::Object >.

Referenced by run().

◆ updateToCollector()

void DQMNet::updateToCollector ( const std::string &  host,
int  port 
)

Tell the network layer to connect to host and port and automatically send updates whenever local DQM data changes. Must be called before calling run() or start().

Definition at line 1014 of file DQMNet.cc.

References downstream_, query::host, DQMNet::AutoPeer::host, logme(), query::port, DQMNet::AutoPeer::port, and DQMNet::AutoPeer::update.

Referenced by DQMService::DQMService().

1014  {
1015  if (!downstream_.host.empty()) {
1016  logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1017  << std::endl;
1018  return;
1019  }
1020 
1021  downstream_.update = true;
1022  downstream_.host = host;
1023  downstream_.port = port;
1024 }
AutoPeer downstream_
Definition: DQMNet.h:362
string host
Definition: query.py:115
std::ostream & logme()
Definition: DQMNet.cc:46
int port
Definition: query.py:116
std::string host
Definition: DQMNet.h:142

◆ waitForData()

void DQMNet::waitForData ( Peer p,
const std::string &  name,
const std::string &  info,
Peer owner 
)
protected

Queue a request for an object and put a peer into the mode of waiting for object data to appear.

Definition at line 114 of file DQMNet.cc.

References info(), Skims_PA_cff::name, and AlCaHLTBitMon_ParallelJobs::p.

114  {
115  // FIXME: Should we automatically record which exact peer the waiter
116  // is expecting to deliver data so we know to release the waiter if
117  // the other peer vanishes? The current implementation stands a
118  // chance for the waiter to wait indefinitely -- although we do
119  // force terminate the wait after a while.
120  requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
121  WaitObject wo = {Time::current(), name, info, p};
122  waiting_.push_back(wo);
123  p->waiting++;
124 }
static const TGPicture * info(bool iBackgroundIsBlack)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
Definition: DQMNet.cc:96
WaitList waiting_
Definition: DQMNet.h:363

Member Data Documentation

◆ appname_

std::string DQMNet::appname_
private

Definition at line 353 of file DQMNet.h.

◆ communicate_

pthread_t DQMNet::communicate_
private

Definition at line 365 of file DQMNet.h.

Referenced by lock(), shutdown(), start(), and unlock().

◆ debug_

bool DQMNet::debug_
protected

Definition at line 340 of file DQMNet.h.

Referenced by debug(), run(), and DQMImplNet< DQMNet::Object >::sendObjectListToPeers().

◆ delay_

int DQMNet::delay_
private

Definition at line 368 of file DQMNet.h.

Referenced by delay(), and run().

◆ downstream_

AutoPeer DQMNet::downstream_
private

Definition at line 362 of file DQMNet.h.

Referenced by DQMNet(), run(), and updateToCollector().

◆ DQM_MSG_GET_OBJECT

const uint32_t DQMNet::DQM_MSG_GET_OBJECT = 3
static

Definition at line 71 of file DQMNet.h.

◆ DQM_MSG_HELLO

const uint32_t DQMNet::DQM_MSG_HELLO = 0
static

Definition at line 68 of file DQMNet.h.

◆ DQM_MSG_LIST_OBJECTS

const uint32_t DQMNet::DQM_MSG_LIST_OBJECTS = 2
static

Definition at line 70 of file DQMNet.h.

Referenced by run().

◆ DQM_MSG_UPDATE_ME

const uint32_t DQMNet::DQM_MSG_UPDATE_ME = 1
static

Definition at line 69 of file DQMNet.h.

Referenced by run().

◆ DQM_PROP_ACCUMULATE

const uint32_t DQMNet::DQM_PROP_ACCUMULATE = 0x00004000
static

Definition at line 57 of file DQMNet.h.

◆ DQM_PROP_DEAD

const uint32_t DQMNet::DQM_PROP_DEAD = 0x00080000
static

◆ DQM_PROP_EFFICIENCY_PLOT

const uint32_t DQMNet::DQM_PROP_EFFICIENCY_PLOT = 0x00200000
static

◆ DQM_PROP_HAS_REFERENCE

const uint32_t DQMNet::DQM_PROP_HAS_REFERENCE = 0x00001000
static

Definition at line 55 of file DQMNet.h.

◆ DQM_PROP_LUMI

const uint32_t DQMNet::DQM_PROP_LUMI = 0x00040000
static

◆ DQM_PROP_MARKTODELETE

const uint32_t DQMNet::DQM_PROP_MARKTODELETE = 0x01000000
static

Definition at line 66 of file DQMNet.h.

◆ DQM_PROP_NEW

const uint32_t DQMNet::DQM_PROP_NEW = 0x00010000
static

◆ DQM_PROP_RECEIVED

const uint32_t DQMNet::DQM_PROP_RECEIVED = 0x00020000
static

Definition at line 61 of file DQMNet.h.

◆ DQM_PROP_REPORT_ALARM

const uint32_t DQMNet::DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER)
static

Definition at line 53 of file DQMNet.h.

◆ DQM_PROP_REPORT_CLEAR

const uint32_t DQMNet::DQM_PROP_REPORT_CLEAR = 0x00000000
static

Definition at line 49 of file DQMNet.h.

◆ DQM_PROP_REPORT_ERROR

const uint32_t DQMNet::DQM_PROP_REPORT_ERROR = 0x00000100
static

◆ DQM_PROP_REPORT_MASK

const uint32_t DQMNet::DQM_PROP_REPORT_MASK = 0x00000f00
static

Definition at line 48 of file DQMNet.h.

◆ DQM_PROP_REPORT_OTHER

const uint32_t DQMNet::DQM_PROP_REPORT_OTHER = 0x00000400
static

◆ DQM_PROP_REPORT_WARN

const uint32_t DQMNet::DQM_PROP_REPORT_WARN = 0x00000200
static

◆ DQM_PROP_RESET

const uint32_t DQMNet::DQM_PROP_RESET = 0x00008000
static

Definition at line 58 of file DQMNet.h.

◆ DQM_PROP_STALE

const uint32_t DQMNet::DQM_PROP_STALE = 0x00100000
static

Definition at line 64 of file DQMNet.h.

Referenced by run().

◆ DQM_PROP_TAGGED

const uint32_t DQMNet::DQM_PROP_TAGGED = 0x00002000
static

Definition at line 56 of file DQMNet.h.

◆ DQM_PROP_TYPE_DATABLOB

const uint32_t DQMNet::DQM_PROP_TYPE_DATABLOB = 0x00000050
static

Definition at line 46 of file DQMNet.h.

◆ DQM_PROP_TYPE_INT

const uint32_t DQMNet::DQM_PROP_TYPE_INT = 0x00000001
static

Definition at line 30 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_INVALID

const uint32_t DQMNet::DQM_PROP_TYPE_INVALID = 0x00000000
static

Definition at line 29 of file DQMNet.h.

◆ DQM_PROP_TYPE_MASK

const uint32_t DQMNet::DQM_PROP_TYPE_MASK = 0x000000ff
static

◆ DQM_PROP_TYPE_REAL

const uint32_t DQMNet::DQM_PROP_TYPE_REAL = 0x00000002
static

Definition at line 31 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_SCALAR

const uint32_t DQMNet::DQM_PROP_TYPE_SCALAR = 0x0000000f
static

Definition at line 28 of file DQMNet.h.

◆ DQM_PROP_TYPE_STRING

const uint32_t DQMNet::DQM_PROP_TYPE_STRING = 0x00000003
static

Definition at line 32 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1D

const uint32_t DQMNet::DQM_PROP_TYPE_TH1D = 0x00000012
static

Definition at line 35 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1F

const uint32_t DQMNet::DQM_PROP_TYPE_TH1F = 0x00000010
static

◆ DQM_PROP_TYPE_TH1I

const uint32_t DQMNet::DQM_PROP_TYPE_TH1I = 0x00000013
static

Definition at line 36 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH1S

const uint32_t DQMNet::DQM_PROP_TYPE_TH1S = 0x00000011
static

Definition at line 34 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2D

const uint32_t DQMNet::DQM_PROP_TYPE_TH2D = 0x00000022
static

Definition at line 39 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2F

const uint32_t DQMNet::DQM_PROP_TYPE_TH2F = 0x00000020
static

◆ DQM_PROP_TYPE_TH2I

const uint32_t DQMNet::DQM_PROP_TYPE_TH2I = 0x00000023
static

Definition at line 40 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH2S

const uint32_t DQMNet::DQM_PROP_TYPE_TH2S = 0x00000021
static

Definition at line 38 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH3D

const uint32_t DQMNet::DQM_PROP_TYPE_TH3D = 0x00000032
static

Definition at line 43 of file DQMNet.h.

◆ DQM_PROP_TYPE_TH3F

const uint32_t DQMNet::DQM_PROP_TYPE_TH3F = 0x00000030
static

Definition at line 41 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TH3S

const uint32_t DQMNet::DQM_PROP_TYPE_TH3S = 0x00000031
static

Definition at line 42 of file DQMNet.h.

◆ DQM_PROP_TYPE_TPROF

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF = 0x00000040
static

Definition at line 44 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_PROP_TYPE_TPROF2D

const uint32_t DQMNet::DQM_PROP_TYPE_TPROF2D = 0x00000041
static

Definition at line 45 of file DQMNet.h.

Referenced by dqmservices::DQMProtobufReader::load().

◆ DQM_REPLY_LIST_BEGIN

const uint32_t DQMNet::DQM_REPLY_LIST_BEGIN = 101
static

Definition at line 73 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ DQM_REPLY_LIST_END

const uint32_t DQMNet::DQM_REPLY_LIST_END = 102
static

Definition at line 74 of file DQMNet.h.

Referenced by DQMImplNet< DQMNet::Object >::sendObjectListToPeer().

◆ DQM_REPLY_NONE

const uint32_t DQMNet::DQM_REPLY_NONE = 103
static

Definition at line 75 of file DQMNet.h.

◆ DQM_REPLY_OBJECT

const uint32_t DQMNet::DQM_REPLY_OBJECT = 104
static

Definition at line 76 of file DQMNet.h.

◆ flush_

bool DQMNet::flush_
private

Definition at line 371 of file DQMNet.h.

Referenced by run().

◆ lock_

pthread_mutex_t DQMNet::lock_
protected

Definition at line 341 of file DQMNet.h.

Referenced by lock(), start(), and unlock().

◆ MAX_PEER_WAITREQS

const uint32_t DQMNet::MAX_PEER_WAITREQS = 128
static

Definition at line 78 of file DQMNet.h.

◆ pid_

int DQMNet::pid_
private

Definition at line 354 of file DQMNet.h.

◆ sel_

lat::IOSelector DQMNet::sel_
private

Definition at line 356 of file DQMNet.h.

Referenced by DQMNet(), run(), and startLocalServer().

◆ server_

lat::Socket* DQMNet::server_
private

Definition at line 357 of file DQMNet.h.

Referenced by startLocalServer().

◆ shutdown_

sig_atomic_t DQMNet::shutdown_
private

Definition at line 366 of file DQMNet.h.

Referenced by shutdown().

◆ upstream_

AutoPeer DQMNet::upstream_
private

Definition at line 361 of file DQMNet.h.

Referenced by DQMNet(), listenToCollector(), and run().

◆ version_

lat::Time DQMNet::version_
private

Definition at line 359 of file DQMNet.h.

◆ waiting_

WaitList DQMNet::waiting_
private

Definition at line 363 of file DQMNet.h.

Referenced by run().

◆ waitMax_

lat::TimeSpan DQMNet::waitMax_
private

Definition at line 370 of file DQMNet.h.

Referenced by run().

◆ waitStale_

lat::TimeSpan DQMNet::waitStale_
private

Definition at line 369 of file DQMNet.h.

Referenced by run(), and staleObjectWaitLimit().

◆ wakeup_

lat::Pipe DQMNet::wakeup_
private

Definition at line 358 of file DQMNet.h.

Referenced by DQMNet(), and sendLocalChanges().