CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMService.cc
Go to the documentation of this file.
1 #if !WITHOUT_CMS_FRAMEWORK
8 # include "classlib/utils/Regexp.h"
9 # include "classlib/utils/Error.h"
10 # include <pthread.h>
11 # include <iostream>
12 # include <string>
13 # include <memory>
14 #include "TBufferFile.h"
15 
16 // -------------------------------------------------------------------
17 static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
18 
22 { pthread_mutex_lock(&s_mutex); }
23 
26 { pthread_mutex_unlock(&s_mutex); }
27 
29 static void
31 { pthread_mutex_lock(&s_mutex); }
32 
33 static void
35 { restrictDQMAccess(); }
36 
38 static void
40 { pthread_mutex_unlock(&s_mutex); }
41 
42 static void
44 { releaseDQMAccess(); }
45 
46 // -------------------------------------------------------------------
48  : store_(&*edm::Service<DQMStore>()),
49  net_(0),
50  filter_(0),
51  lastFlush_(0),
52  publishFrequency_(5.0)
53 {
62 
63  std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
64  int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
65  bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
66  publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
67  std::string filter = pset.getUntrackedParameter<std::string>("filter", "");
68 
69  if (host != "" && port > 0)
70  {
71  net_ = new DQMBasicNet;
72  net_->debug(verbose);
73  net_->updateToCollector(host, port);
74  net_->start();
75  }
76 
77  if (! filter.empty())
78  {
79  try
80  {
81  filter_ = new lat::Regexp(filter);
82  if (! filter_->valid())
83  throw cms::Exception("DQMService")
84  << "Invalid 'filter' parameter value '" << filter << "':"
85  << " bad regular expression syntax at character "
86  << filter_->errorOffset() << ": " << filter_->errorMessage();
87  filter_->study();
88  }
89  catch (lat::Error &e)
90  {
91  throw cms::Exception("DQMService")
92  << "Invalid regular expression 'filter' parameter value '"
93  << filter << "': " << e.explain();
94  }
95  }
96 }
97 
99 {
100  shutdown();
101 }
102 
103 // Flush updates to the network layer at the end of each event. This
104 // is the only point at which the main application and the network
105 // layer interact outside initialisation and exit.
107 {
108  // Avoid sending updates excessively often.
110  double vtime = version * 1e-9;
111  if (vtime - lastFlush_ < publishFrequency_)
112  return;
113 
114  // OK, send an update.
115  if (net_)
116  {
118  std::set<std::string> seen;
119  std::string fullpath;
120 
121  // Lock the network layer so we can modify the data.
122  net_->lock();
123  bool updated = false;
124 
125  // Find updated contents and update the network cache.
126  DQMStore::MEMap::iterator i, e;
127  net_->reserveLocalSpace(store_->data_.size());
128  for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
129  {
130  const MonitorElement &me = *i;
131  fullpath.clear();
132  fullpath += *me.data_.dirname;
133  if (! me.data_.dirname->empty())
134  fullpath += '/';
135  fullpath += me.data_.objname;
136 
137  if (filter_ && filter_->search(fullpath) < 0)
138  continue;
139 
140  seen.insert(fullpath);
141  if (! me.wasUpdated())
142  continue;
143 
144  o.lastreq = 0;
145  o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
146  o.flags = me.data_.flags;
147  o.tag = me.data_.tag;
148  o.version = version;
149  o.dirname = me.data_.dirname;
150  o.objname = me.data_.objname;
151  assert(o.rawdata.empty());
152  assert(o.scalar.empty());
153  assert(o.qdata.empty());
154 
155  // Pack object and reference, scalar and quality data.
156  switch (me.kind())
157  {
161  me.packScalarData(o.scalar, "");
162  break;
163 
164  default:
165  {
166  TBufferFile buffer(TBufferFile::kWrite);
167  buffer.WriteObject(me.object_);
168  if (me.reference_)
169  buffer.WriteObject(me.reference_);
170  else
171  buffer.WriteObjectAny(0, 0);
172  o.rawdata.resize(buffer.Length());
173  memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
175  break;
176  }
177  }
178 
179  // Update.
180  net_->updateLocalObject(o);
181  DQMNet::DataBlob().swap(o.rawdata);
182  std::string().swap(o.scalar);
183  std::string().swap(o.qdata);
184  updated = true;
185  }
186 
187  // Find removed contents and clear the network cache.
188  if (net_->removeLocalExcept(seen))
189  updated = true;
190 
191  // Unlock the network layer.
192  net_->unlock();
193 
194  // Tell network to flush if we updated something.
195  if (updated)
196  net_->sendLocalChanges();
197  }
198 
199  store_->reset();
200  lastFlush_ = lat::Time::current().ns() * 1e-9;
201 
202 
203 }
204 void
206 {
207  // Call a function independent to the framework
208  flushStandalone();
209 }
210 
211 // Disengage the network service.
212 void
214 {
215  // If we have a network, let it go.
216  if (net_)
217  net_->shutdown();
218 }
219 
220 #endif // !WITHOUT_CMS_FRAMEWORK
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
static void releaseDQMAccessM(const edm::ModuleDescription &)
Definition: DQMService.cc:43
QReports qreports
Definition: DQMNet.h:102
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void shutdown(void)
Definition: DQMService.cc:213
uint64_t version
Definition: DQMNet.h:99
double publishFrequency_
Definition: DQMService.h:28
DQMScope(void)
Definition: DQMService.cc:21
void watchPostModule(PostModule::slot_type const &iSlot)
uint32_t flags
Definition: DQMNet.h:97
void watchPreSourceConstruction(PreSourceConstruction::slot_type const &iSlot)
double lastFlush_
Definition: DQMService.h:27
void watchPostSourceConstruction(PostSourceConstruction::slot_type const &iSlot)
lat::Regexp * filter_
Definition: DQMService.h:26
uint32_t tag
Definition: DQMNet.h:98
const std::string * dirname
Definition: DQMNet.h:100
std::string qdata
Definition: DQMNet.h:111
int port
Definition: query.py:115
void watchPreModule(PreModule::slot_type const &iSlot)
void packScalarData(std::string &into, const char *prefix) const
convert scalar data into a string.
bool wasUpdated(void) const
true if ME was updated in last monitoring cycle
void watchPostProcessEvent(PostProcessEvent::slot_type const &iSlot)
static void releaseDQMAccess(void)
Release access to the DQM core.
Definition: DQMService.cc:39
static void restrictDQMAccessM(const edm::ModuleDescription &)
Definition: DQMService.cc:34
uint64_t lastreq
Definition: DQMNet.h:108
DQMStore * store_
Definition: DQMService.h:24
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:79
void watchPreSource(PreSource::slot_type const &iSlot)
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:199
static void restrictDQMAccess(void)
Restrict access to the DQM core.
Definition: DQMService.cc:30
Kind kind(void) const
Get the type of the monitor element.
Definition: mlp_gen.h:16
void flushStandalone()
Definition: DQMService.cc:106
std::string objname
Definition: DQMNet.h:101
std::string scalar
Definition: DQMNet.h:110
DQMNet::CoreObject data_
static pthread_mutex_t s_mutex
Definition: DQMService.cc:17
string host
Definition: query.py:114
~DQMScope(void)
Release access lock to the DQM core.
Definition: DQMService.cc:25
unsigned long long uint64_t
Definition: Time.h:15
MEMap data_
Definition: DQMStore.h:457
uint64_t hash
Definition: DQMNet.h:107
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:177
DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
Definition: DQMService.cc:47
void flush(const edm::Event &, const edm::EventSetup &)
Definition: DQMService.cc:205
void watchPostSource(PostSource::slot_type const &iSlot)
~DQMService(void)
Definition: DQMService.cc:98
void reset(void)
Definition: DQMStore.cc:1727
DataBlob rawdata
Definition: DQMNet.h:109