CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMStreamerReader.cc
Go to the documentation of this file.
9 
13 #include "DQMStreamerReader.h"
14 
15 #include <fstream>
16 #include <queue>
17 #include <cstdlib>
18 #include <boost/regex.hpp>
19 #include <boost/format.hpp>
20 #include <boost/range.hpp>
21 #include <boost/filesystem.hpp>
22 #include <boost/algorithm/string.hpp>
23 
25 
26 namespace dqmservices {
27 
29  edm::InputSourceDescription const& desc)
30  : StreamerInputSource(pset, desc), fiterator_(pset) {
31  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
32  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
33  hltSel_ =
34  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
35 
36  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
37  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
38  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
39  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
40 
41  triggerSel();
42 
43  reset_();
44 }
45 
47  // Sometimes(?) the destructor called after service registry was already destructed
48  // and closeFile_ throws away no ServiceRegistry found exception...
49  //
50  // Normally, this file should be closed before this destructor is called.
51  //closeFile_("destructor");
52 }
53 
55  // We have to load at least a single header,
56  // so the ProductRegistry gets initialized.
57  //
58  // This must happen here (inside the constructor),
59  // as ProductRegistry gets frozen after we initialize:
60  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
61 
63  "Waiting for the first lumi in order to initialize.");
64 
66 
67  // Fast-forward to the last open file.
68  if (flagSkipFirstLumis_) {
69  unsigned int l = fiterator_.lastLumiFound();
70  if (l > 1) {
71  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
72  }
73  }
74 
75  for (;;) {
76  bool next = prepareNextFile();
77 
78  // check for end of run
79  if (!next) {
81  "End of run reached before DQMStreamerReader was initialised.");
82  return;
83  }
84 
85  // check if we have a file openned
86  if (file_.open()) {
87  // we are now initialised
88  break;
89  }
90 
91  // wait
92  fiterator_.delay();
93  }
94 
95  fiterator_.logFileAction("DQMStreamerReader initialised.");
96 }
97 
101 
102  std::string path = entry.get_data_path();
103 
104  file_.lumi_ = entry;
105  file_.streamFile_.reset(new edm::StreamerInputFile(path));
106 
107  InitMsgView const* header = getHeaderMsg();
108  deserializeAndMergeWithRegistry(*header, false);
109 
110  // dump the list of HLT trigger name from the header
111  // dumpInitHeader(header);
112 
113  // if specific trigger selection is requested, check if the requested triggers
114  // match with trigger paths in the header file
115  if (!acceptAllEvt_) {
116  Strings tnames;
117  header->hltTriggerNames(tnames);
118 
119  pset.addParameter<Strings>("SelectEvents", hltSel_);
120  eventSelector_.reset(new TriggerSelector(pset, tnames));
121 
122  // check if any trigger path name requested matches with trigger name in the
123  // header file
124  matchTriggerSel(tnames);
125  }
126 
127  // our initialization
129 
130  if (flagDeleteDatFiles_) {
131  // unlink the file
132  unlink(path.c_str());
133  }
134 }
135 
137  if (file_.open()) {
138  file_.streamFile_->closeStreamerFile();
139  file_.streamFile_ = nullptr;
140 
141  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
142  }
143 }
144 
146  closeFile_("skipping to another file");
147 
149  std::string p = currentLumi.get_data_path();
150 
151  if (boost::filesystem::exists(p)) {
152  try {
153  openFile_(currentLumi);
154  return true;
155  } catch (const cms::Exception& e) {
156  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
157  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
158 
159  closeFile_("data file corrupted");
160  return false;
161  }
162  } else {
163  /* dat file missing */
164  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
165  fiterator_.logLumiState(currentLumi, "error: data file missing");
166 
167  return false;
168  }
169 }
170 
172  InitMsgView const* header = file_.streamFile_->startMessage();
173 
174  if (header->code() != Header::INIT) { // INIT Msg
176  "DQMStreamerReader::readHeader")
177  << "received wrong message type: expected INIT, got " << header->code()
178  << "\n";
179  }
180 
181  return header;
182 }
183 
185  if (!file_.streamFile_->next()) {
186  return nullptr;
187  }
188 
189  EventMsgView const* msg = file_.streamFile_->currentRecord();
190 
191  // if (msg != nullptr) dumpEventView(msg);
192  return msg;
193 }
194 
207 
208  for (;;) {
210 
211  // check for end of run file and force quit
212  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
213  closeFile_("forced end-of-run");
214  return false;
215  }
216 
217  // check for end of run and quit if everything has been processed.
218  // this clean exit
219  if ((!file_.open()) && (!fiterator_.lumiReady()) &&
220  (fiterator_.state() == State::EOR)) {
221  return false;
222  }
223 
224  // if this is end of run and no more files to process
225  // close it
227  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
228  closeFile_("graceful end-of-run");
229  return false;
230  }
231 
232  // skip to the next file if we have no files openned yet
233  if (!file_.open()) {
234  if (fiterator_.lumiReady()) {
235  openNextFile_();
236  // we might need to open once more (if .dat is missing)
237  continue;
238  }
239  }
240 
241  // or if there is a next file and enough eventshas been processed.
243  openNextFile_();
244  // we might need to open once more (if .dat is missing)
245  continue;
246  }
247 
248  return true;
249  }
250 }
251 
257  EventMsgView const* eview = nullptr;
259 
260  // wait for the next event
261  for (;;) {
262  // edm::LogAbsolute("DQMStreamerReader")
263  // << "State loop.";
264  bool next = prepareNextFile();
265  if (!next) return nullptr;
266 
267  // sleep
268  if (!file_.open()) {
269  // the reader does not exist
270  fiterator_.delay();
271  } else {
272  // our reader exists, try to read out an event
273  eview = getEventMsg();
274 
275  if (eview == nullptr) {
276  // read unsuccessful
277  // this means end of file, so close the file
278  closeFile_("eof");
279  } else {
280  if (!acceptEvent(eview)) {
281  continue;
282  } else {
283  return eview;
284  }
285  }
286  }
287  }
288  return eview;
289 }
290 
295  try {
296  EventMsgView const* eview = prepareNextEvent();
297  if (eview == nullptr) {
298  return false;
299  }
300 
301  // this is reachable only if eview is set
302  // and the file is openned
303  if (file_.streamFile_->newHeader()) {
304  // A new file has been opened and we must compare Headers here !!
305  // Get header/init from reader
306 
307  InitMsgView const* header = getHeaderMsg();
308  deserializeAndMergeWithRegistry(*header, true);
309  }
310 
311  deserializeEvent(*eview);
312  } catch (const cms::Exception& e) {
313  // try to recover from corrupted files/events
314  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
315  closeFile_("data file corrupted");
316 
317  // this is not optimal, but hopefully we won't catch this many times in a row
318  return checkNextEvent();
319  }
320 
322 
323  return true;
324 }
325 
331  acceptAllEvt_ = false;
332  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
333  ++i) {
334  std::string hltPath(*i);
335  boost::erase_all(hltPath, " \t");
336  if (hltPath == "*") acceptAllEvt_ = true;
337  }
338  return acceptAllEvt_;
339 }
340 
345  matchTriggerSel_ = false;
346  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
347  ++i) {
348  std::string hltPath(*i);
349  boost::erase_all(hltPath, " \t");
350  std::vector<Strings::const_iterator> matches =
351  edm::regexMatch(tnames, hltPath);
352  if (!matches.empty()) {
353  matchTriggerSel_ = true;
354  }
355  }
356 
357  if (!matchTriggerSel_) {
358  edm::LogWarning("Trigger selection does not match any trigger path!!!")
359  << std::endl;
360  }
361 
362  return matchTriggerSel_;
363 }
364 
369  if (acceptAllEvt_) return true;
370  if (!matchTriggerSel_) return false;
371 
372  std::vector<unsigned char> hltTriggerBits_;
373  int hltTriggerCount_ = evtmsg->hltCount();
374  if (hltTriggerCount_ > 0) {
375  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
376  }
377  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
378 
379  if (eventSelector_->wantAll() ||
380  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
381  return true;
382  } else {
383  return false;
384  }
385 }
386 
387 void DQMStreamerReader::skip(int toSkip) {
388  try {
389  for (int i = 0; i != toSkip; ++i) {
390  EventMsgView const* evMsg = prepareNextEvent();
391 
392  if (evMsg == nullptr) {
393  return;
394  }
395  }
396  } catch (const cms::Exception& e) {
397  // try to recover from corrupted files/events
398  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
399  closeFile_("data file corrupted");
400  }
401 }
402 
404  edm::ConfigurationDescriptions& descriptions) {
406  desc.setComment("Reads events from streamer files.");
407 
408  desc.addUntracked<std::vector<std::string> >("SelectEvents")
409  ->setComment("HLT path to select events ");
410 
411  desc.addUntracked<int>("minEventsPerLumi", 1)
412  ->setComment(
413  "Minimum number of events to process per lumisection, "
414  "before switching to a new input file. If the next file "
415  "does not yet exist, "
416  "the number of processed events will be bigger.");
417 
418  desc.addUntracked<bool>("skipFirstLumis", false)
419  ->setComment(
420  "Skip (and ignore the minEventsPerLumi parameter) for the files "
421  "which have been available at the begining of the processing. "
422  "If set to true, the reader will open last available file for "
423  "processing.");
424 
425  desc.addUntracked<bool>("deleteDatFiles", false)
426  ->setComment(
427  "Delete data files after they have been closed, in order to "
428  "save disk space.");
429 
430  desc.addUntracked<bool>("endOfRunKills", false)
431  ->setComment(
432  "Kill the processing as soon as the end-of-run file appears, even if "
433  "there are/will be unprocessed lumisections.");
434 
435  // desc.addUntracked<unsigned int>("skipEvents", 0U)
436  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
437  // "have been processed.");
438 
439  // This next parameter is read in the base class, but its default value
440  // depends on the derived class, so it is set here.
441  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
442 
446 
447  descriptions.add("source", desc);
448 }
449 
450 } // end of namespace
451 
454 
virtual char const * what() const
Definition: Exception.cc:141
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
void logLumiState(const LumiEntry &lumi, const std::string &msg)
std::shared_ptr< TriggerSelector > eventSelector_
EventMsgView const * getEventMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
dqmservices::DQMStreamerReader DQMStreamerReader
void openFile_(const DQMFileIterator::LumiEntry &entry)
void hltTriggerNames(Strings &save_here) const
Definition: InitMessage.cc:146
void setComment(std::string const &value)
std::unique_ptr< edm::StreamerInputFile > streamFile_
virtual void skip(int toSkip)
#define DEFINE_FWK_INPUT_SOURCE(type)
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:144
void deserializeEvent(EventMsgView const &eventView)
virtual void closeFile_() overridefinal
void logFileAction(const std::string &msg, const std::string &fileName="") const
#define end
Definition: vmac.h:37
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
bool acceptEvent(const EventMsgView *)
InitMsgView const * getHeaderMsg()
uint32 hltCount() const
Definition: EventMessage.h:97
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:30
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:72
list entry
Definition: mps_splice.py:62
State
Definition: hltDiff.cc:286
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
void advanceToLumi(unsigned int lumi, std::string reason)
EventMsgView const * prepareNextEvent()
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_