CMS 3D CMS Logo

DQMService.cc
Go to the documentation of this file.
1 #if !WITHOUT_CMS_FRAMEWORK
8 # include "classlib/utils/Regexp.h"
9 # include "classlib/utils/Error.h"
10 # include <mutex>
11 # include <iostream>
12 # include <string>
13 # include <memory>
14 #include "TBufferFile.h"
15 
16 // -------------------------------------------------------------------
17 static std::recursive_mutex s_mutex;
18 
22 { s_mutex.lock(); }
23 
26 { s_mutex.unlock(); }
27 
28 // -------------------------------------------------------------------
30  : store_(&*edm::Service<DQMStore>()),
31  net_(nullptr),
32  filter_(nullptr),
33  lastFlush_(0),
34  publishFrequency_(5.0)
35 {
38 
39  std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
40  int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
41  bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
42  publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
44 
45  if (host != "" && port > 0)
46  {
47  net_ = new DQMBasicNet;
48  net_->debug(verbose);
49  net_->updateToCollector(host, port);
50  net_->start();
51  }
52 
53  if (! filter.empty())
54  {
55  try
56  {
57  filter_ = new lat::Regexp(filter);
58  if (! filter_->valid())
59  throw cms::Exception("DQMService")
60  << "Invalid 'filter' parameter value '" << filter << "':"
61  << " bad regular expression syntax at character "
62  << filter_->errorOffset() << ": " << filter_->errorMessage();
63  filter_->study();
64  }
65  catch (lat::Error &e)
66  {
67  throw cms::Exception("DQMService")
68  << "Invalid regular expression 'filter' parameter value '"
69  << filter << "': " << e.explain();
70  }
71  }
72 }
73 
75 {
76  shutdown();
77 }
78 
79 // Flush updates to the network layer at the end of each event. This
80 // is the only point at which the main application and the network
81 // layer interact outside initialisation and exit.
83 {
84  // Avoid sending updates excessively often.
85  uint64_t version = lat::Time::current().ns();
86  double vtime = version * 1e-9;
87  if (vtime - lastFlush_ < publishFrequency_)
88  return;
89 
90  // OK, send an update.
91  if (net_)
92  {
94  std::set<std::string> seen;
95  std::string fullpath;
96 
97  // Lock the network layer so we can modify the data.
98  net_->lock();
99  bool updated = false;
100 
101  // Find updated contents and update the network cache.
102  DQMStore::MEMap::iterator i, e;
103  net_->reserveLocalSpace(store_->data_.size());
104  for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
105  {
106  const MonitorElement &me = *i;
107  fullpath.clear();
108  fullpath += *me.data_.dirname;
109  if (! me.data_.dirname->empty())
110  fullpath += '/';
111  fullpath += me.data_.objname;
112 
113  if (filter_ && filter_->search(fullpath) < 0)
114  continue;
115 
116  seen.insert(fullpath);
117  if (! me.wasUpdated())
118  continue;
119 
120  o.lastreq = 0;
121  o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
122  o.flags = me.data_.flags;
123  o.tag = me.data_.tag;
124  o.version = version;
125  o.dirname = me.data_.dirname;
126  o.objname = me.data_.objname;
127  assert(o.rawdata.empty());
128  assert(o.scalar.empty());
129  assert(o.qdata.empty());
130 
131  // Pack object and reference, scalar and quality data.
132  switch (me.kind())
133  {
137  me.packScalarData(o.scalar, "");
138  break;
139 
140  default:
141  {
142  TBufferFile buffer(TBufferFile::kWrite);
143  buffer.WriteObject(me.object_);
144  if (me.reference_)
145  buffer.WriteObject(me.reference_);
146  else
147  buffer.WriteObjectAny(nullptr, nullptr);
148  o.rawdata.resize(buffer.Length());
149  memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
151  break;
152  }
153  }
154 
155  // Update.
156  net_->updateLocalObject(o);
157  DQMNet::DataBlob().swap(o.rawdata);
158  std::string().swap(o.scalar);
159  std::string().swap(o.qdata);
160  updated = true;
161  }
162 
163  // Find removed contents and clear the network cache.
164  if (net_->removeLocalExcept(seen))
165  updated = true;
166 
167  // Unlock the network layer.
168  net_->unlock();
169 
170  // Tell network to flush if we updated something.
171  if (updated)
172  net_->sendLocalChanges();
173  }
174 
175  store_->reset();
176  lastFlush_ = lat::Time::current().ns() * 1e-9;
177 
178 
179 }
180 void
182 {
183  // Call a function independent to the framework
184  flushStandalone();
185 }
186 
187 // Disengage the network service.
188 void
190 {
191  // If we have a network, let it go.
192  if (net_)
193  net_->shutdown();
194 }
195 
196 #endif // !WITHOUT_CMS_FRAMEWORK
T getUntrackedParameter(std::string const &, T const &) const
host
Definition: query.py:114
QReports qreports
Definition: DQMNet.h:108
static std::recursive_mutex s_mutex
Definition: DQMService.cc:17
void shutdown(void)
Definition: DQMService.cc:189
uint64_t version
Definition: DQMNet.h:101
double publishFrequency_
Definition: DQMService.h:28
DQMScope(void)
Definition: DQMService.cc:21
void watchPostEvent(PostEvent::slot_type const &iSlot)
uint32_t flags
Definition: DQMNet.h:99
double lastFlush_
Definition: DQMService.h:27
void watchPostStreamEndLumi(PostStreamEndLumi::slot_type const &iSlot)
lat::Regexp * filter_
Definition: DQMService.h:26
uint32_t tag
Definition: DQMNet.h:100
const std::string * dirname
Definition: DQMNet.h:106
#define nullptr
std::string qdata
Definition: DQMNet.h:117
port
Definition: query.py:115
void packScalarData(std::string &into, const char *prefix) const
convert scalar data into a string.
bool wasUpdated(void) const
true if ME was updated in last monitoring cycle
uint64_t lastreq
Definition: DQMNet.h:114
DQMStore * store_
Definition: DQMService.h:24
void flush(edm::StreamContext const &sc)
Definition: DQMService.cc:181
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:81
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:217
Kind kind(void) const
Get the type of the monitor element.
Definition: mlp_gen.h:16
void flushStandalone()
Definition: DQMService.cc:82
std::string objname
Definition: DQMNet.h:107
std::string scalar
Definition: DQMNet.h:116
DQMNet::CoreObject data_
~DQMScope(void)
Release access lock to the DQM core.
Definition: DQMService.cc:25
unsigned long long uint64_t
Definition: Time.h:15
MEMap data_
Definition: DQMStore.h:714
uint64_t hash
Definition: DQMNet.h:113
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
Definition: DQMService.cc:29
HLT enums.
~DQMService(void)
Definition: DQMService.cc:74
void reset(void)
Definition: DQMStore.cc:2105
DataBlob rawdata
Definition: DQMNet.h:115