CMS 3D CMS Logo

DQMService.cc
Go to the documentation of this file.
7 #include "classlib/utils/Error.h"
8 #include <mutex>
9 #include <iostream>
10 #include <string>
11 #include <memory>
12 #include "TBufferFile.h"
13 
14 // -------------------------------------------------------------------
15 static std::recursive_mutex s_mutex;
16 
20 
23 
24 // -------------------------------------------------------------------
26  : store_(&*edm::Service<DQMStore>()), net_(nullptr), lastFlush_(0), publishFrequency_(5.0) {
29 
30  std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
31  int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
32  bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
33  publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
34 
35  if (!host.empty() && port > 0) {
36  net_ = new DQMBasicNet;
37  net_->debug(verbose);
39  net_->start();
40  }
41 }
42 
44 
45 // Flush updates to the network layer at the end of each event. This
46 // is the only point at which the main application and the network
47 // layer interact outside initialisation and exit.
49  // Avoid sending updates excessively often.
50  uint64_t version = lat::Time::current().ns();
51  double vtime = version * 1e-9;
52  if (vtime - lastFlush_ < publishFrequency_)
53  return;
54 
55  // OK, send an update.
56  if (net_) {
58  std::set<std::string> seen;
60 
61  // Lock the network layer so we can modify the data.
62  net_->lock();
63  bool updated = false;
64 
65  auto mes = store_->getAllContents("");
66  for (MonitorElement *me : mes) {
67  auto fullpath = me->getFullname();
68  seen.insert(fullpath);
69  if (!me->wasUpdated())
70  continue;
71 
72  o.lastreq = 0;
73  o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
74  o.flags = me->data_.flags;
75  o.version = version;
76  o.dirname = me->data_.dirname.substr(0, me->data_.dirname.size() - 1);
77  o.objname = me->data_.objname;
78  assert(o.rawdata.empty());
79  assert(o.scalar.empty());
80  assert(o.qdata.empty());
81 
82  // Pack object and reference, scalar and quality data.
83 
84  switch (me->kind()) {
88  me->packScalarData(o.scalar, "");
89  break;
90  default: {
91  TBufferFile buffer(TBufferFile::kWrite);
92  buffer.WriteObject(me->getTH1());
93  // placeholder for (no longer supported) reference
94  buffer.WriteObjectAny(nullptr, nullptr);
95  o.rawdata.resize(buffer.Length());
96  memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
97  DQMNet::packQualityData(o.qdata, me->data_.qreports);
98  break;
99  }
100  }
101 
103  DQMNet::DataBlob().swap(o.rawdata);
104  std::string().swap(o.scalar);
105  std::string().swap(o.qdata);
106  updated = true;
107  }
108 
109  // Find removed contents and clear the network cache.
110  if (net_->removeLocalExcept(seen))
111  updated = true;
112 
113  // Unlock the network layer.
114  net_->unlock();
115 
116  // Tell network to flush if we updated something.
117  if (updated)
119  }
120 
121  lastFlush_ = lat::Time::current().ns() * 1e-9;
122 }
124  // Call a function independent to the framework
125  flushStandalone();
126 }
127 
128 // Disengage the network service.
130  // If we have a network, let it go.
131  if (net_)
132  net_->shutdown();
133 }
static std::recursive_mutex s_mutex
Definition: DQMService.cc:15
DQMBasicNet * net_
Definition: DQMService.h:27
string host
Definition: query.py:115
void lock()
Acquire a lock on the DQM net layer.
Definition: DQMNet.cc:1066
double publishFrequency_
Definition: DQMService.h:29
bool verbose
void watchPostEvent(PostEvent::slot_type const &iSlot)
void shutdown()
Definition: DQMService.cc:129
double lastFlush_
Definition: DQMService.h:28
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
~DQMScope()
Release access lock to the DQM core.
Definition: DQMService.cc:22
int port
Definition: query.py:116
void shutdown()
Stop the network layer and wait it to finish.
Definition: DQMNet.cc:1046
assert(be >=bs)
void debug(bool doit)
Definition: DQMNet.cc:945
DQMStore * store_
Definition: DQMService.h:26
void start()
Definition: DQMNet.cc:1080
virtual std::vector< dqm::harvesting::MonitorElement * > getAllContents(std::string const &path) const
Definition: DQMStore.cc:641
void flush(edm::StreamContext const &sc)
Definition: DQMService.cc:123
void sendLocalChanges()
Definition: DQMNet.cc:1203
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:196
void flushStandalone()
Definition: DQMService.cc:48
unsigned long long uint64_t
Definition: Time.h:13
bool removeLocalExcept(const std::set< std::string > &known)
Definition: DQMNet.cc:1241
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:158
DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
Definition: DQMService.cc:25
HLT enums.
void unlock()
Release the lock on the DQM net layer.
Definition: DQMNet.cc:1072
void updateLocalObject(Object &o)
Definition: DQMNet.cc:1220
void updateToCollector(const std::string &host, int port)
Definition: DQMNet.cc:1018
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:85