CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
9 
11 #include "DQMStreamerReader.h"
12 
13 #include <cstdlib>
14 #include <filesystem>
15 #include <fstream>
16 #include <memory>
17 #include <queue>
18 #include <algorithm>
19 #include <cctype>
20 
22 
23 namespace dqmservices {
24 
26  : StreamerInputSource(pset, desc), fiterator_(pset) {
27  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
28  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
29  hltSel_ = pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
30 
31  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
32  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
33  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
34  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
35 
36  triggerSel();
37 
38  reset_();
39  }
40 
42  // Sometimes(?) the destructor called after service registry was already destructed
43  // and closeFile_ throws away no ServiceRegistry found exception...
44  //
45  // Normally, this file should be closed before this destructor is called.
46  //closeFileImp_("destructor");
47  }
48 
50  // We have to load at least a single header,
51  // so the ProductRegistry gets initialized.
52  //
53  // This must happen here (inside the constructor),
54  // as ProductRegistry gets frozen after we initialize:
55  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
56 
57  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
58 
60 
61  // Fast-forward to the last open file.
62  if (flagSkipFirstLumis_) {
63  unsigned int l = fiterator_.lastLumiFound();
64  if (l > 1) {
65  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
66  }
67  }
68 
69  for (;;) {
70  bool next = prepareNextFile();
71 
72  // check for end of run
73  if (!next) {
74  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
75  return;
76  }
77 
78  // check if we have a file openned
79  if (file_.open()) {
80  // we are now initialised
81  break;
82  }
83 
84  // wait
85  fiterator_.delay();
86  }
87 
88  fiterator_.logFileAction("DQMStreamerReader initialised.");
89  }
90 
94 
95  std::string path = entry.get_data_path();
96 
97  file_.lumi_ = entry;
98  file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
99 
100  InitMsgView const* header = getHeaderMsg();
101  if (isFirstFile_) {
103  }
104 
105  // dump the list of HLT trigger name from the header
106  // dumpInitHeader(header);
107 
108  // if specific trigger selection is requested, check if the requested triggers
109  // match with trigger paths in the header file
110  if (!acceptAllEvt_) {
111  Strings tnames;
112  header->hltTriggerNames(tnames);
113 
114  pset.addParameter<Strings>("SelectEvents", hltSel_);
115  eventSelector_.reset(new TriggerSelector(pset, tnames));
116 
117  // check if any trigger path name requested matches with trigger name in the
118  // header file
119  matchTriggerSel(tnames);
120  }
121 
122  // our initialization
124 
125  if (flagDeleteDatFiles_) {
126  // unlink the file
127  unlink(path.c_str());
128  }
129  }
130 
132 
134  if (file_.open()) {
135  file_.streamFile_->closeStreamerFile();
136  file_.streamFile_ = nullptr;
137 
138  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
139  }
140  }
141 
143  if (isFirstFile_) {
144  //The file was already opened in the constructor
145  isFirstFile_ = false;
146  return;
147  }
148 
149  //Get header/init from reader
150  InitMsgView const* header = getHeaderMsg();
152  }
153 
155  closeFileImp_("skipping to another file");
156 
158  std::string p = currentLumi.get_data_path();
159 
160  if (std::filesystem::exists(p)) {
161  try {
162  openFileImp_(currentLumi);
163  return true;
164  } catch (const cms::Exception& e) {
165  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
166  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
167 
168  closeFileImp_("data file corrupted");
169  return false;
170  }
171  } else {
172  /* dat file missing */
173  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
174  fiterator_.logLumiState(currentLumi, "error: data file missing");
175 
176  return false;
177  }
178  }
179 
181  InitMsgView const* header = file_.streamFile_->startMessage();
182 
183  if (header->code() != Header::INIT) { // INIT Msg
184  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
185  << "received wrong message type: expected INIT, got " << header->code() << "\n";
186  }
187 
188  return header;
189  }
190 
192  auto next = file_.streamFile_->next();
194  return nullptr;
195  }
196 
198  return nullptr;
199  }
200 
201  EventMsgView const* msg = file_.streamFile_->currentRecord();
202 
203  // if (msg != nullptr) dumpEventView(msg);
204  return msg;
205  }
206 
219 
220  for (;;) {
222 
223  if (edm::shutdown_flag.load()) {
224  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
225 
226  closeFileImp_("shutdown flag is set");
227  return false;
228  }
229 
230  // check for end of run file and force quit
231  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
232  closeFileImp_("forced end-of-run");
233  return false;
234  }
235 
236  // check for end of run and quit if everything has been processed.
237  // this clean exit
238  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
239  return false;
240  }
241 
242  // if this is end of run and no more files to process
243  // close it
245  (fiterator_.state() == State::EOR)) {
246  closeFileImp_("graceful end-of-run");
247  return false;
248  }
249 
250  // skip to the next file if we have no files openned yet
251  if (!file_.open()) {
252  if (fiterator_.lumiReady()) {
254  // we might need to open once more (if .dat is missing)
255  continue;
256  }
257  }
258 
259  // or if there is a next file and enough eventshas been processed.
262  // we might need to open once more (if .dat is missing)
263  continue;
264  }
265 
266  return true;
267  }
268  }
269 
275  EventMsgView const* eview = nullptr;
277 
278  // wait for the next event
279  for (;;) {
280  // edm::LogAbsolute("DQMStreamerReader")
281  // << "State loop.";
282  bool next = prepareNextFile();
283  if (!next)
284  return nullptr;
285 
286  // sleep
287  if (!file_.open()) {
288  // the reader does not exist
289  fiterator_.delay();
290  } else {
291  // our reader exists, try to read out an event
292  eview = getEventMsg();
293 
294  if (eview == nullptr) {
295  // read unsuccessful
296  // this means end of file, so close the file
297  closeFileImp_("eof");
298  } else {
299  if (!acceptEvent(eview)) {
300  continue;
301  } else {
302  return eview;
303  }
304  }
305  }
306  }
307  return eview;
308  }
309 
314  try {
315  EventMsgView const* eview = prepareNextEvent();
316  if (eview == nullptr) {
317  if (file_.streamFile_ and file_.streamFile_->newHeader()) {
318  return Next::kFile;
319  }
320  return Next::kStop;
321  }
322 
323  deserializeEvent(*eview);
324  } catch (const cms::Exception& e) {
325  // try to recover from corrupted files/events
326  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
327  closeFileImp_("data file corrupted");
328 
329  // this is not optimal, but hopefully we won't catch this many times in a row
330  return checkNext();
331  }
332 
334 
335  return Next::kEvent;
336  }
337 
343  acceptAllEvt_ = false;
344  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
346  hltPath.erase(
347  std::remove_if(
348  hltPath.begin(), hltPath.end(), [](char c) { return std::isspace(static_cast<unsigned char>(c)); }),
349  hltPath.end());
350  if (hltPath == "*")
351  acceptAllEvt_ = true;
352  }
353  return acceptAllEvt_;
354  }
355 
360  matchTriggerSel_ = false;
361  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
363  hltPath.erase(
364  std::remove_if(
365  hltPath.begin(), hltPath.end(), [](char c) { return std::isspace(static_cast<unsigned char>(c)); }),
366  hltPath.end());
367  std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
368  if (!matches.empty()) {
369  matchTriggerSel_ = true;
370  }
371  }
372 
373  if (!matchTriggerSel_) {
374  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
375  }
376 
377  return matchTriggerSel_;
378  }
379 
384  if (acceptAllEvt_)
385  return true;
386  if (!matchTriggerSel_)
387  return false;
388 
389  std::vector<unsigned char> hltTriggerBits_;
390  int hltTriggerCount_ = evtmsg->hltCount();
391  if (hltTriggerCount_ > 0) {
392  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
393  }
394  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
395 
396  if (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
397  return true;
398  } else {
399  return false;
400  }
401  }
402 
403  void DQMStreamerReader::skip(int toSkip) {
404  try {
405  for (int i = 0; i != toSkip; ++i) {
406  EventMsgView const* evMsg = prepareNextEvent();
407 
408  if (evMsg == nullptr) {
409  return;
410  }
411  }
412  } catch (const cms::Exception& e) {
413  // try to recover from corrupted files/events
414  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
415  closeFileImp_("data file corrupted");
416  }
417  }
418 
421  desc.setComment("Reads events from streamer files.");
422 
423  desc.addUntracked<std::vector<std::string> >("SelectEvents")->setComment("HLT path to select events ");
424 
425  desc.addUntracked<int>("minEventsPerLumi", 1)
426  ->setComment(
427  "Minimum number of events to process per lumisection, "
428  "before switching to a new input file. If the next file "
429  "does not yet exist, "
430  "the number of processed events will be bigger.");
431 
432  desc.addUntracked<bool>("skipFirstLumis", false)
433  ->setComment(
434  "Skip (and ignore the minEventsPerLumi parameter) for the files "
435  "which have been available at the begining of the processing. "
436  "If set to true, the reader will open last available file for "
437  "processing.");
438 
439  desc.addUntracked<bool>("deleteDatFiles", false)
440  ->setComment(
441  "Delete data files after they have been closed, in order to "
442  "save disk space.");
443 
444  desc.addUntracked<bool>("endOfRunKills", false)
445  ->setComment(
446  "Kill the processing as soon as the end-of-run file appears, even if "
447  "there are/will be unprocessed lumisections.");
448 
449  // desc.addUntracked<unsigned int>("skipEvents", 0U)
450  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
451  // "have been processed.");
452 
453  // This next parameter is read in the base class, but its default value
454  // depends on the derived class, so it is set here.
455  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
456 
460 
461  descriptions.add("source", desc);
462  }
463 
464 } // namespace dqmservices
465 
468 
static void fillDescription(ParameterSetDescription &description)
void hltTriggerBits(uint8 *put_here) const
void logLumiState(const LumiEntry &lumi, const std::string &msg)
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< TriggerSelector > eventSelector_
InitMsgView const * getHeaderMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
dqmservices::DQMStreamerReader DQMStreamerReader
EventMsgView const * getEventMsg()
std::unique_ptr< edm::StreamerInputFile > streamFile_
#define DEFINE_FWK_INPUT_SOURCE(type)
void deserializeEvent(EventMsgView const &eventView)
uint32 hltCount() const
Definition: EventMessage.h:94
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
void logFileAction(const std::string &msg, const std::string &fileName="") const
EventMsgView const * prepareNextEvent()
bool acceptEvent(const EventMsgView *)
def load(fileName)
Definition: svgfig.py:547
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:285
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
static void fillDescription(ParameterSetDescription &desc)
void closeFileImp_(const std::string &reason)
Log< level::Warning, false > LogWarning
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_
void skip(int toSkip) override