CMS 3D CMS Logo

DQMService.cc
Go to the documentation of this file.
7 #include <mutex>
8 #include <iostream>
9 #include <string>
10 #include <memory>
11 #include "TBufferFile.h"
12 
13 // -------------------------------------------------------------------
14 static std::recursive_mutex s_mutex;
15 
19 
22 
23 // -------------------------------------------------------------------
25  : store_(&*edm::Service<DQMStore>()), net_(nullptr), lastFlush_(0), publishFrequency_(5.0) {
28 
29  std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
30  int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
31  bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
32  publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
33 
34  if (!host.empty() && port > 0) {
35  net_ = new DQMBasicNet;
36  net_->debug(verbose);
38  net_->start();
39  }
40 }
41 
43 
44 // Flush updates to the network layer at the end of each event. This
45 // is the only point at which the main application and the network
46 // layer interact outside initialisation and exit.
48  // Avoid sending updates excessively often.
49  uint64_t version = lat::Time::current().ns();
50  double vtime = version * 1e-9;
51  if (vtime - lastFlush_ < publishFrequency_)
52  return;
53 
54  // OK, send an update.
55  if (net_) {
57  std::set<std::string> seen;
59 
60  // Lock the network layer so we can modify the data.
61  net_->lock();
62  bool updated = false;
63 
64  auto mes = store_->getAllContents("");
65  for (MonitorElement *me : mes) {
66  auto fullpath = me->getFullname();
67  seen.insert(fullpath);
68  if (!me->wasUpdated())
69  continue;
70 
71  o.lastreq = 0;
72  o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
73  o.flags = me->data_.flags;
74  o.version = version;
75  o.dirname = me->data_.dirname.substr(0, me->data_.dirname.size() - 1);
76  o.objname = me->data_.objname;
77  assert(o.rawdata.empty());
78  assert(o.scalar.empty());
79  assert(o.qdata.empty());
80 
81  // Pack object and reference, scalar and quality data.
82 
83  switch (me->kind()) {
87  me->packScalarData(o.scalar, "");
88  break;
89  default: {
90  TBufferFile buffer(TBufferFile::kWrite);
91  buffer.WriteObject(me->getTH1());
92  // placeholder for (no longer supported) reference
93  buffer.WriteObjectAny(nullptr, nullptr);
94  o.rawdata.resize(buffer.Length());
95  memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
96  DQMNet::packQualityData(o.qdata, me->data_.qreports);
97  break;
98  }
99  }
100 
102  DQMNet::DataBlob().swap(o.rawdata);
103  std::string().swap(o.scalar);
104  std::string().swap(o.qdata);
105  updated = true;
106  }
107 
108  // Find removed contents and clear the network cache.
109  if (net_->removeLocalExcept(seen))
110  updated = true;
111 
112  // Unlock the network layer.
113  net_->unlock();
114 
115  // Tell network to flush if we updated something.
116  if (updated)
118  }
119 
120  lastFlush_ = lat::Time::current().ns() * 1e-9;
121 }
123  // Call a function independent to the framework
124  flushStandalone();
125 }
126 
127 // Disengage the network service.
129  // If we have a network, let it go.
130  if (net_)
131  net_->shutdown();
132 }
static std::recursive_mutex s_mutex
Definition: DQMService.cc:14
DQMBasicNet * net_
Definition: DQMService.h:27
string host
Definition: query.py:115
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1062
double publishFrequency_
Definition: DQMService.h:29
bool verbose
void watchPostEvent(PostEvent::slot_type const &iSlot)
void shutdown()
Definition: DQMService.cc:128
double lastFlush_
Definition: DQMService.h:28
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
~DQMScope()
Release access lock to the DQM core.
Definition: DQMService.cc:21
int port
Definition: query.py:116
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1042
assert(be >=bs)
void debug(bool doit)
Definition: DQMNet.cc:941
DQMStore * store_
Definition: DQMService.h:26
void start()
Definition: DQMNet.cc:1076
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
Definition: DQMStore.cc:641
void flush(edm::StreamContext const &sc)
Definition: DQMService.cc:122
void sendLocalChanges()
Definition: DQMNet.cc:1199
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:195
void flushStandalone()
Definition: DQMService.cc:47
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1237
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:154
DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
Definition: DQMService.cc:24
HLT enums.
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1068
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1216
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1014
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:84