CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
9 
11 #include "DQMStreamerReader.h"
12 
13 #include <fstream>
14 #include <queue>
15 #include <cstdlib>
16 #include <boost/regex.hpp>
17 #include <boost/format.hpp>
18 #include <boost/range.hpp>
19 #include <boost/filesystem.hpp>
20 #include <boost/algorithm/string.hpp>
21 
23 
24 namespace dqmservices {
25 
27  : StreamerInputSource(pset, desc), fiterator_(pset) {
28  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
29  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
30  hltSel_ = pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
31 
32  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
33  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
34  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
35  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
36 
37  triggerSel();
38 
39  reset_();
40  }
41 
43  // Sometimes(?) the destructor called after service registry was already destructed
44  // and closeFile_ throws away no ServiceRegistry found exception...
45  //
46  // Normally, this file should be closed before this destructor is called.
47  //closeFileImp_("destructor");
48  }
49 
51  // We have to load at least a single header,
52  // so the ProductRegistry gets initialized.
53  //
54  // This must happen here (inside the constructor),
55  // as ProductRegistry gets frozen after we initialize:
56  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
57 
58  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
59 
61 
62  // Fast-forward to the last open file.
63  if (flagSkipFirstLumis_) {
64  unsigned int l = fiterator_.lastLumiFound();
65  if (l > 1) {
66  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
67  }
68  }
69 
70  for (;;) {
71  bool next = prepareNextFile();
72 
73  // check for end of run
74  if (!next) {
75  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
76  return;
77  }
78 
79  // check if we have a file openned
80  if (file_.open()) {
81  // we are now initialised
82  break;
83  }
84 
85  // wait
86  fiterator_.delay();
87  }
88 
89  fiterator_.logFileAction("DQMStreamerReader initialised.");
90  }
91 
95 
96  std::string path = entry.get_data_path();
97 
98  file_.lumi_ = entry;
99  file_.streamFile_.reset(new edm::StreamerInputFile(path));
100 
101  InitMsgView const* header = getHeaderMsg();
102  deserializeAndMergeWithRegistry(*header, false);
103 
104  // dump the list of HLT trigger name from the header
105  // dumpInitHeader(header);
106 
107  // if specific trigger selection is requested, check if the requested triggers
108  // match with trigger paths in the header file
109  if (!acceptAllEvt_) {
110  Strings tnames;
111  header->hltTriggerNames(tnames);
112 
113  pset.addParameter<Strings>("SelectEvents", hltSel_);
114  eventSelector_.reset(new TriggerSelector(pset, tnames));
115 
116  // check if any trigger path name requested matches with trigger name in the
117  // header file
118  matchTriggerSel(tnames);
119  }
120 
121  // our initialization
123 
124  if (flagDeleteDatFiles_) {
125  // unlink the file
126  unlink(path.c_str());
127  }
128  }
129 
131  if (file_.open()) {
132  file_.streamFile_->closeStreamerFile();
133  file_.streamFile_ = nullptr;
134 
135  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
136  }
137  }
138 
140  closeFileImp_("skipping to another file");
141 
143  std::string p = currentLumi.get_data_path();
144 
145  if (boost::filesystem::exists(p)) {
146  try {
147  openFileImp_(currentLumi);
148  return true;
149  } catch (const cms::Exception& e) {
150  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
151  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
152 
153  closeFileImp_("data file corrupted");
154  return false;
155  }
156  } else {
157  /* dat file missing */
158  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
159  fiterator_.logLumiState(currentLumi, "error: data file missing");
160 
161  return false;
162  }
163  }
164 
166  InitMsgView const* header = file_.streamFile_->startMessage();
167 
168  if (header->code() != Header::INIT) { // INIT Msg
169  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
170  << "received wrong message type: expected INIT, got " << header->code() << "\n";
171  }
172 
173  return header;
174  }
175 
177  if (!file_.streamFile_->next()) {
178  return nullptr;
179  }
180 
181  EventMsgView const* msg = file_.streamFile_->currentRecord();
182 
183  // if (msg != nullptr) dumpEventView(msg);
184  return msg;
185  }
186 
199 
200  for (;;) {
202 
203  if (edm::shutdown_flag.load()) {
204  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
205 
206  closeFileImp_("shutdown flag is set");
207  return false;
208  }
209 
210  // check for end of run file and force quit
211  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
212  closeFileImp_("forced end-of-run");
213  return false;
214  }
215 
216  // check for end of run and quit if everything has been processed.
217  // this clean exit
218  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
219  return false;
220  }
221 
222  // if this is end of run and no more files to process
223  // close it
225  (fiterator_.state() == State::EOR)) {
226  closeFileImp_("graceful end-of-run");
227  return false;
228  }
229 
230  // skip to the next file if we have no files openned yet
231  if (!file_.open()) {
232  if (fiterator_.lumiReady()) {
234  // we might need to open once more (if .dat is missing)
235  continue;
236  }
237  }
238 
239  // or if there is a next file and enough eventshas been processed.
242  // we might need to open once more (if .dat is missing)
243  continue;
244  }
245 
246  return true;
247  }
248  }
249 
255  EventMsgView const* eview = nullptr;
257 
258  // wait for the next event
259  for (;;) {
260  // edm::LogAbsolute("DQMStreamerReader")
261  // << "State loop.";
262  bool next = prepareNextFile();
263  if (!next)
264  return nullptr;
265 
266  // sleep
267  if (!file_.open()) {
268  // the reader does not exist
269  fiterator_.delay();
270  } else {
271  // our reader exists, try to read out an event
272  eview = getEventMsg();
273 
274  if (eview == nullptr) {
275  // read unsuccessful
276  // this means end of file, so close the file
277  closeFileImp_("eof");
278  } else {
279  if (!acceptEvent(eview)) {
280  continue;
281  } else {
282  return eview;
283  }
284  }
285  }
286  }
287  return eview;
288  }
289 
294  try {
295  EventMsgView const* eview = prepareNextEvent();
296  if (eview == nullptr) {
297  return false;
298  }
299 
300  // this is reachable only if eview is set
301  // and the file is openned
302  if (file_.streamFile_->newHeader()) {
303  // A new file has been opened and we must compare Headers here !!
304  // Get header/init from reader
305 
306  InitMsgView const* header = getHeaderMsg();
307  deserializeAndMergeWithRegistry(*header, true);
308  }
309 
310  deserializeEvent(*eview);
311  } catch (const cms::Exception& e) {
312  // try to recover from corrupted files/events
313  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
314  closeFileImp_("data file corrupted");
315 
316  // this is not optimal, but hopefully we won't catch this many times in a row
317  return checkNextEvent();
318  }
319 
321 
322  return true;
323  }
324 
330  acceptAllEvt_ = false;
331  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
333  boost::erase_all(hltPath, " \t");
334  if (hltPath == "*")
335  acceptAllEvt_ = true;
336  }
337  return acceptAllEvt_;
338  }
339 
344  matchTriggerSel_ = false;
345  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
347  boost::erase_all(hltPath, " \t");
348  std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
349  if (!matches.empty()) {
350  matchTriggerSel_ = true;
351  }
352  }
353 
354  if (!matchTriggerSel_) {
355  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
356  }
357 
358  return matchTriggerSel_;
359  }
360 
365  if (acceptAllEvt_)
366  return true;
367  if (!matchTriggerSel_)
368  return false;
369 
370  std::vector<unsigned char> hltTriggerBits_;
371  int hltTriggerCount_ = evtmsg->hltCount();
372  if (hltTriggerCount_ > 0) {
373  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
374  }
375  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
376 
377  if (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
378  return true;
379  } else {
380  return false;
381  }
382  }
383 
384  void DQMStreamerReader::skip(int toSkip) {
385  try {
386  for (int i = 0; i != toSkip; ++i) {
387  EventMsgView const* evMsg = prepareNextEvent();
388 
389  if (evMsg == nullptr) {
390  return;
391  }
392  }
393  } catch (const cms::Exception& e) {
394  // try to recover from corrupted files/events
395  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
396  closeFileImp_("data file corrupted");
397  }
398  }
399 
402  desc.setComment("Reads events from streamer files.");
403 
404  desc.addUntracked<std::vector<std::string> >("SelectEvents")->setComment("HLT path to select events ");
405 
406  desc.addUntracked<int>("minEventsPerLumi", 1)
407  ->setComment(
408  "Minimum number of events to process per lumisection, "
409  "before switching to a new input file. If the next file "
410  "does not yet exist, "
411  "the number of processed events will be bigger.");
412 
413  desc.addUntracked<bool>("skipFirstLumis", false)
414  ->setComment(
415  "Skip (and ignore the minEventsPerLumi parameter) for the files "
416  "which have been available at the begining of the processing. "
417  "If set to true, the reader will open last available file for "
418  "processing.");
419 
420  desc.addUntracked<bool>("deleteDatFiles", false)
421  ->setComment(
422  "Delete data files after they have been closed, in order to "
423  "save disk space.");
424 
425  desc.addUntracked<bool>("endOfRunKills", false)
426  ->setComment(
427  "Kill the processing as soon as the end-of-run file appears, even if "
428  "there are/will be unprocessed lumisections.");
429 
430  // desc.addUntracked<unsigned int>("skipEvents", 0U)
431  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
432  // "have been processed.");
433 
434  // This next parameter is read in the base class, but its default value
435  // depends on the derived class, so it is set here.
436  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
437 
441 
442  descriptions.add("source", desc);
443  }
444 
445 } // namespace dqmservices
446 
449 
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
void logLumiState(const LumiEntry &lumi, const std::string &msg)
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< TriggerSelector > eventSelector_
EventMsgView const * getEventMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
char const * what() const override
Definition: Exception.cc:103
void hltTriggerNames(Strings &save_here) const
Definition: InitMessage.cc:141
#define DEFINE_FWK_INPUT_SOURCE(type)
void setComment(std::string const &value)
std::unique_ptr< edm::StreamerInputFile > streamFile_
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:124
void deserializeEvent(EventMsgView const &eventView)
void logFileAction(const std::string &msg, const std::string &fileName="") const
#define end
Definition: vmac.h:39
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
bool acceptEvent(const EventMsgView *)
def load(fileName)
Definition: svgfig.py:547
InitMsgView const * getHeaderMsg()
uint32 hltCount() const
Definition: EventMessage.h:94
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:285
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:65
void closeFileImp_(const std::string &reason)
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
void advanceToLumi(unsigned int lumi, std::string reason)
EventMsgView const * prepareNextEvent()
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_
void skip(int toSkip) override