CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMService.cc
Go to the documentation of this file.
1 #if !WITHOUT_CMS_FRAMEWORK
8 # include "classlib/utils/Regexp.h"
9 # include "classlib/utils/Error.h"
10 # include <pthread.h>
11 # include <iostream>
12 # include <string>
13 # include <memory>
14 #include "TBufferFile.h"
15 
16 // -------------------------------------------------------------------
17 static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
18 
22 { pthread_mutex_lock(&s_mutex); }
23 
26 { pthread_mutex_unlock(&s_mutex); }
27 
29 static void
31 { pthread_mutex_lock(&s_mutex); }
32 
33 static void
35 { restrictDQMAccess(); }
36 
38 static void
40 { pthread_mutex_unlock(&s_mutex); }
41 
42 static void
44 { releaseDQMAccess(); }
45 
46 // -------------------------------------------------------------------
48  : store_(&*edm::Service<DQMStore>()),
49  net_(0),
50  filter_(0),
51  lastFlush_(0),
52  publishFrequency_(5.0)
53 {
62 
63  std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
64  int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
65  bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
66  publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
67  std::string filter = pset.getUntrackedParameter<std::string>("filter", "");
68 
69  if (host != "" && port > 0)
70  {
71  net_ = new DQMBasicNet;
72  net_->debug(verbose);
73  net_->updateToCollector(host, port);
74  net_->start();
75  }
76 
77  if (! filter.empty())
78  {
79  try
80  {
81  filter_ = new lat::Regexp(filter);
82  if (! filter_->valid())
83  throw cms::Exception("DQMService")
84  << "Invalid 'filter' parameter value '" << filter << "':"
85  << " bad regular expression syntax at character "
86  << filter_->errorOffset() << ": " << filter_->errorMessage();
87  filter_->study();
88  }
89  catch (lat::Error &e)
90  {
91  throw cms::Exception("DQMService")
92  << "Invalid regular expression 'filter' parameter value '"
93  << filter << "': " << e.explain();
94  }
95  }
96 }
97 
99 {
100  shutdown();
101 }
102 
103 // Flush updates to the network layer at the end of each event. This
104 // is the only point at which the main application and the network
105 // layer interact outside initialisation and exit.
106 void
108 {
109  // Avoid sending updates excessively often.
111  double vtime = version * 1e-9;
112  if (vtime - lastFlush_ < publishFrequency_)
113  return;
114 
115  // OK, send an update.
116  if (net_)
117  {
119  std::set<std::string> seen;
120  std::string fullpath;
121 
122  // Lock the network layer so we can modify the data.
123  net_->lock();
124  bool updated = false;
125 
126  // Find updated contents and update the network cache.
127  DQMStore::MEMap::iterator i, e;
128  net_->reserveLocalSpace(store_->data_.size());
129  for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
130  {
131  const MonitorElement &me = *i;
132  fullpath.clear();
133  fullpath += *me.data_.dirname;
134  if (! me.data_.dirname->empty())
135  fullpath += '/';
136  fullpath += me.data_.objname;
137 
138  if (filter_ && filter_->search(fullpath) < 0)
139  continue;
140 
141  seen.insert(fullpath);
142  if (! me.wasUpdated())
143  continue;
144 
145  o.lastreq = 0;
146  o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
147  o.flags = me.data_.flags;
148  o.tag = me.data_.tag;
149  o.version = version;
150  o.dirname = me.data_.dirname;
151  o.objname = me.data_.objname;
152  assert(o.rawdata.empty());
153  assert(o.scalar.empty());
154  assert(o.qdata.empty());
155 
156  // Pack object and reference, scalar and quality data.
157  switch (me.kind())
158  {
162  me.packScalarData(o.scalar, "");
163  break;
164 
165  default:
166  {
167  TBufferFile buffer(TBufferFile::kWrite);
168  buffer.WriteObject(me.object_);
169  if (me.reference_)
170  buffer.WriteObject(me.reference_);
171  else
172  buffer.WriteObjectAny(0, 0);
173  o.rawdata.resize(buffer.Length());
174  memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
176  break;
177  }
178  }
179 
180  // Update.
181  net_->updateLocalObject(o);
182  DQMNet::DataBlob().swap(o.rawdata);
183  std::string().swap(o.scalar);
184  std::string().swap(o.qdata);
185  updated = true;
186  }
187 
188  // Find removed contents and clear the network cache.
189  if (net_->removeLocalExcept(seen))
190  updated = true;
191 
192  // Unlock the network layer.
193  net_->unlock();
194 
195  // Tell network to flush if we updated something.
196  if (updated)
197  net_->sendLocalChanges();
198  }
199 
200  store_->reset();
201  lastFlush_ = lat::Time::current().ns() * 1e-9;
202 }
203 
204 // Disengage the network service.
205 void
207 {
208  // If we have a network, let it go.
209  if (net_)
210  net_->shutdown();
211 }
212 
213 #endif // !WITHOUT_CMS_FRAMEWORK
T getUntrackedParameter(std::string const &, T const &) const
int i
Definition: DBlmapReader.cc:9
static void releaseDQMAccessM(const edm::ModuleDescription &)
Definition: DQMService.cc:43
QReports qreports
Definition: DQMNet.h:101
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void shutdown(void)
Definition: DQMService.cc:206
uint64_t version
Definition: DQMNet.h:98
double publishFrequency_
Definition: DQMService.h:27
DQMScope(void)
Definition: DQMService.cc:21
void watchPostModule(PostModule::slot_type const &iSlot)
uint32_t flags
Definition: DQMNet.h:96
void watchPreSourceConstruction(PreSourceConstruction::slot_type const &iSlot)
double lastFlush_
Definition: DQMService.h:26
void watchPostSourceConstruction(PostSourceConstruction::slot_type const &iSlot)
lat::Regexp * filter_
Definition: DQMService.h:25
uint32_t tag
Definition: DQMNet.h:97
const std::string * dirname
Definition: DQMNet.h:99
std::string qdata
Definition: DQMNet.h:110
int port
Definition: query.py:115
void watchPreModule(PreModule::slot_type const &iSlot)
void packScalarData(std::string &into, const char *prefix) const
convert scalar data into a string.
bool wasUpdated(void) const
true if ME was updated in last monitoring cycle
void watchPostProcessEvent(PostProcessEvent::slot_type const &iSlot)
static void releaseDQMAccess(void)
Release access to the DQM core.
Definition: DQMService.cc:39
static void restrictDQMAccessM(const edm::ModuleDescription &)
Definition: DQMService.cc:34
uint64_t lastreq
Definition: DQMNet.h:107
DQMStore * store_
Definition: DQMService.h:23
std::vector< unsigned char > DataBlob
Definition: DQMNet.h:78
void watchPreSource(PreSource::slot_type const &iSlot)
static size_t dqmhash(const void *key, size_t keylen)
Definition: DQMNet.h:198
static void restrictDQMAccess(void)
Restrict access to the DQM core.
Definition: DQMService.cc:30
Kind kind(void) const
Get the type of the monitor element.
Definition: mlp_gen.h:16
std::string objname
Definition: DQMNet.h:100
std::string scalar
Definition: DQMNet.h:109
DQMNet::CoreObject data_
static pthread_mutex_t s_mutex
Definition: DQMService.cc:17
string host
Definition: query.py:114
~DQMScope(void)
Release access lock to the DQM core.
Definition: DQMService.cc:25
tuple filter
USE THIS FOR SKIMMED TRACKS process.p = cms.Path(process.hltLevel1GTSeed*process.skimming*process.offlineBeamSpot*process.TrackRefitter2) OTHERWISE USE THIS.
Definition: align_tpl.py:86
unsigned long long uint64_t
Definition: Time.h:15
MEMap data_
Definition: DQMStore.h:423
uint64_t hash
Definition: DQMNet.h:106
static void packQualityData(std::string &into, const QReports &qr)
Definition: DQMNet.cc:172
DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
Definition: DQMService.cc:47
void flush(const edm::Event &, const edm::EventSetup &)
Definition: DQMService.cc:107
void watchPostSource(PostSource::slot_type const &iSlot)
~DQMService(void)
Definition: DQMService.cc:98
void reset(void)
Definition: DQMStore.cc:1524
DataBlob rawdata
Definition: DQMNet.h:108