CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
9 
11 #include "DQMStreamerReader.h"
12 
13 #include <fstream>
14 #include <queue>
15 #include <cstdlib>
16 #include <boost/regex.hpp>
17 #include <boost/format.hpp>
18 #include <boost/range.hpp>
19 #include <boost/filesystem.hpp>
20 #include <boost/algorithm/string.hpp>
21 
23 
24 namespace dqmservices {
25 
27  edm::InputSourceDescription const& desc)
28  : StreamerInputSource(pset, desc), fiterator_(pset) {
29  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
30  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
31  hltSel_ =
32  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
33 
34  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
35  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
36  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
37  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
38 
39  triggerSel();
40 
41  reset_();
42 }
43 
45  // Sometimes(?) the destructor called after service registry was already destructed
46  // and closeFile_ throws away no ServiceRegistry found exception...
47  //
48  // Normally, this file should be closed before this destructor is called.
49  //closeFileImp_("destructor");
50 }
51 
53  // We have to load at least a single header,
54  // so the ProductRegistry gets initialized.
55  //
56  // This must happen here (inside the constructor),
57  // as ProductRegistry gets frozen after we initialize:
58  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
59 
61  "Waiting for the first lumi in order to initialize.");
62 
64 
65  // Fast-forward to the last open file.
66  if (flagSkipFirstLumis_) {
67  unsigned int l = fiterator_.lastLumiFound();
68  if (l > 1) {
69  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
70  }
71  }
72 
73  for (;;) {
74  bool next = prepareNextFile();
75 
76  // check for end of run
77  if (!next) {
79  "End of run reached before DQMStreamerReader was initialised.");
80  return;
81  }
82 
83  // check if we have a file openned
84  if (file_.open()) {
85  // we are now initialised
86  break;
87  }
88 
89  // wait
90  fiterator_.delay();
91  }
92 
93  fiterator_.logFileAction("DQMStreamerReader initialised.");
94 }
95 
99 
100  std::string path = entry.get_data_path();
101 
102  file_.lumi_ = entry;
103  file_.streamFile_.reset(new edm::StreamerInputFile(path));
104 
105  InitMsgView const* header = getHeaderMsg();
106  deserializeAndMergeWithRegistry(*header, false);
107 
108  // dump the list of HLT trigger name from the header
109  // dumpInitHeader(header);
110 
111  // if specific trigger selection is requested, check if the requested triggers
112  // match with trigger paths in the header file
113  if (!acceptAllEvt_) {
114  Strings tnames;
115  header->hltTriggerNames(tnames);
116 
117  pset.addParameter<Strings>("SelectEvents", hltSel_);
118  eventSelector_.reset(new TriggerSelector(pset, tnames));
119 
120  // check if any trigger path name requested matches with trigger name in the
121  // header file
122  matchTriggerSel(tnames);
123  }
124 
125  // our initialization
127 
128  if (flagDeleteDatFiles_) {
129  // unlink the file
130  unlink(path.c_str());
131  }
132 }
133 
135  if (file_.open()) {
136  file_.streamFile_->closeStreamerFile();
137  file_.streamFile_ = nullptr;
138 
139  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
140  }
141 }
142 
144  closeFileImp_("skipping to another file");
145 
147  std::string p = currentLumi.get_data_path();
148 
149  if (boost::filesystem::exists(p)) {
150  try {
151  openFileImp_(currentLumi);
152  return true;
153  } catch (const cms::Exception& e) {
154  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
155  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
156 
157  closeFileImp_("data file corrupted");
158  return false;
159  }
160  } else {
161  /* dat file missing */
162  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
163  fiterator_.logLumiState(currentLumi, "error: data file missing");
164 
165  return false;
166  }
167 }
168 
170  InitMsgView const* header = file_.streamFile_->startMessage();
171 
172  if (header->code() != Header::INIT) { // INIT Msg
174  "DQMStreamerReader::readHeader")
175  << "received wrong message type: expected INIT, got " << header->code()
176  << "\n";
177  }
178 
179  return header;
180 }
181 
183  if (!file_.streamFile_->next()) {
184  return nullptr;
185  }
186 
187  EventMsgView const* msg = file_.streamFile_->currentRecord();
188 
189  // if (msg != nullptr) dumpEventView(msg);
190  return msg;
191 }
192 
205 
206  for (;;) {
208 
209  if (edm::shutdown_flag.load()) {
210  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
211 
212  closeFileImp_("shutdown flag is set");
213  return false;
214  }
215 
216  // check for end of run file and force quit
217  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
218  closeFileImp_("forced end-of-run");
219  return false;
220  }
221 
222  // check for end of run and quit if everything has been processed.
223  // this clean exit
224  if ((!file_.open()) && (!fiterator_.lumiReady()) &&
225  (fiterator_.state() == State::EOR)) {
226  return false;
227  }
228 
229  // if this is end of run and no more files to process
230  // close it
232  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
233  closeFileImp_("graceful end-of-run");
234  return false;
235  }
236 
237  // skip to the next file if we have no files openned yet
238  if (!file_.open()) {
239  if (fiterator_.lumiReady()) {
241  // we might need to open once more (if .dat is missing)
242  continue;
243  }
244  }
245 
246  // or if there is a next file and enough eventshas been processed.
249  // we might need to open once more (if .dat is missing)
250  continue;
251  }
252 
253  return true;
254  }
255 }
256 
262  EventMsgView const* eview = nullptr;
264 
265  // wait for the next event
266  for (;;) {
267  // edm::LogAbsolute("DQMStreamerReader")
268  // << "State loop.";
269  bool next = prepareNextFile();
270  if (!next) return nullptr;
271 
272  // sleep
273  if (!file_.open()) {
274  // the reader does not exist
275  fiterator_.delay();
276  } else {
277  // our reader exists, try to read out an event
278  eview = getEventMsg();
279 
280  if (eview == nullptr) {
281  // read unsuccessful
282  // this means end of file, so close the file
283  closeFileImp_("eof");
284  } else {
285  if (!acceptEvent(eview)) {
286  continue;
287  } else {
288  return eview;
289  }
290  }
291  }
292  }
293  return eview;
294 }
295 
300  try {
301  EventMsgView const* eview = prepareNextEvent();
302  if (eview == nullptr) {
303  return false;
304  }
305 
306  // this is reachable only if eview is set
307  // and the file is openned
308  if (file_.streamFile_->newHeader()) {
309  // A new file has been opened and we must compare Headers here !!
310  // Get header/init from reader
311 
312  InitMsgView const* header = getHeaderMsg();
313  deserializeAndMergeWithRegistry(*header, true);
314  }
315 
316  deserializeEvent(*eview);
317  } catch (const cms::Exception& e) {
318  // try to recover from corrupted files/events
319  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
320  closeFileImp_("data file corrupted");
321 
322  // this is not optimal, but hopefully we won't catch this many times in a row
323  return checkNextEvent();
324  }
325 
327 
328  return true;
329 }
330 
336  acceptAllEvt_ = false;
337  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
338  ++i) {
340  boost::erase_all(hltPath, " \t");
341  if (hltPath == "*") acceptAllEvt_ = true;
342  }
343  return acceptAllEvt_;
344 }
345 
350  matchTriggerSel_ = false;
351  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
352  ++i) {
354  boost::erase_all(hltPath, " \t");
355  std::vector<Strings::const_iterator> matches =
356  edm::regexMatch(tnames, hltPath);
357  if (!matches.empty()) {
358  matchTriggerSel_ = true;
359  }
360  }
361 
362  if (!matchTriggerSel_) {
363  edm::LogWarning("Trigger selection does not match any trigger path!!!")
364  << std::endl;
365  }
366 
367  return matchTriggerSel_;
368 }
369 
374  if (acceptAllEvt_) return true;
375  if (!matchTriggerSel_) return false;
376 
377  std::vector<unsigned char> hltTriggerBits_;
378  int hltTriggerCount_ = evtmsg->hltCount();
379  if (hltTriggerCount_ > 0) {
380  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
381  }
382  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
383 
384  if (eventSelector_->wantAll() ||
385  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
386  return true;
387  } else {
388  return false;
389  }
390 }
391 
392 void DQMStreamerReader::skip(int toSkip) {
393  try {
394  for (int i = 0; i != toSkip; ++i) {
395  EventMsgView const* evMsg = prepareNextEvent();
396 
397  if (evMsg == nullptr) {
398  return;
399  }
400  }
401  } catch (const cms::Exception& e) {
402  // try to recover from corrupted files/events
403  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
404  closeFileImp_("data file corrupted");
405  }
406 }
407 
409  edm::ConfigurationDescriptions& descriptions) {
411  desc.setComment("Reads events from streamer files.");
412 
413  desc.addUntracked<std::vector<std::string> >("SelectEvents")
414  ->setComment("HLT path to select events ");
415 
416  desc.addUntracked<int>("minEventsPerLumi", 1)
417  ->setComment(
418  "Minimum number of events to process per lumisection, "
419  "before switching to a new input file. If the next file "
420  "does not yet exist, "
421  "the number of processed events will be bigger.");
422 
423  desc.addUntracked<bool>("skipFirstLumis", false)
424  ->setComment(
425  "Skip (and ignore the minEventsPerLumi parameter) for the files "
426  "which have been available at the begining of the processing. "
427  "If set to true, the reader will open last available file for "
428  "processing.");
429 
430  desc.addUntracked<bool>("deleteDatFiles", false)
431  ->setComment(
432  "Delete data files after they have been closed, in order to "
433  "save disk space.");
434 
435  desc.addUntracked<bool>("endOfRunKills", false)
436  ->setComment(
437  "Kill the processing as soon as the end-of-run file appears, even if "
438  "there are/will be unprocessed lumisections.");
439 
440  // desc.addUntracked<unsigned int>("skipEvents", 0U)
441  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
442  // "have been processed.");
443 
444  // This next parameter is read in the base class, but its default value
445  // depends on the derived class, so it is set here.
446  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
447 
451 
452  descriptions.add("source", desc);
453 }
454 
455 } // end of namespace
456 
459 
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
void logLumiState(const LumiEntry &lumi, const std::string &msg)
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< TriggerSelector > eventSelector_
EventMsgView const * getEventMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
char const * what() const override
Definition: Exception.cc:103
void hltTriggerNames(Strings &save_here) const
Definition: InitMessage.cc:141
#define DEFINE_FWK_INPUT_SOURCE(type)
void setComment(std::string const &value)
std::unique_ptr< edm::StreamerInputFile > streamFile_
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:125
void deserializeEvent(EventMsgView const &eventView)
void logFileAction(const std::string &msg, const std::string &fileName="") const
#define end
Definition: vmac.h:39
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
bool acceptEvent(const EventMsgView *)
def load(fileName)
Definition: svgfig.py:547
InitMsgView const * getHeaderMsg()
uint32 hltCount() const
Definition: EventMessage.h:94
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:285
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:65
void closeFileImp_(const std::string &reason)
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
void advanceToLumi(unsigned int lumi, std::string reason)
EventMsgView const * prepareNextEvent()
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_
void skip(int toSkip) override