CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
1 #include "DQMStreamerReader.h"
2 
12 
13 #include <cstdlib>
14 #include <filesystem>
15 #include <fstream>
16 #include <queue>
17 #include <algorithm>
18 #include <cctype>
19 
20 namespace dqmservices {
21  using namespace edm::streamer;
22 
25  fiterator_(pset),
26  minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
27  flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
28  flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
29  flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
30  hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")),
31  unitTest_(pset.getUntrackedParameter<bool>("unitTest", false)) {
33  reset_();
34  }
35 
37  // Sometimes(?) the destructor called after service registry was already destructed
38  // and closeFile_ throws away no ServiceRegistry found exception...
39  //
40  // Normally, this file should be closed before this destructor is called.
41  //closeFileImp_("destructor");
42  }
43 
45  // We have to load at least a single header,
46  // so the ProductRegistry gets initialized.
47  //
48  // This must happen here (inside the constructor),
49  // as ProductRegistry gets frozen after we initialize:
50  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
51 
52  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
53 
55 
56  // Fast-forward to the last open file.
57  if (flagSkipFirstLumis_) {
58  unsigned int l = fiterator_.lastLumiFound();
59  if (l > 1) {
60  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
61  }
62  }
63 
64  for (;;) {
65  bool next = prepareNextFile();
66 
67  // check for end of run
68  if (!next) {
69  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
70  return;
71  }
72 
73  // check if we have a file openned
74  if (file_.open()) {
75  // we are now initialised
76  break;
77  }
78 
79  // wait
80  fiterator_.delay();
81  }
82 
83  fiterator_.logFileAction("DQMStreamerReader initialised.");
84  }
85 
88  auto event = getEventMsg();
89  //file might be empty
90  if (not event)
91  return;
92  assert(event->isEventMetaData());
95  }
98 
99  std::string path = entry.get_data_path();
100 
101  file_.lumi_ = entry;
102  file_.streamFile_ = std::make_unique<StreamerInputFile>(path);
103 
104  InitMsgView const* header = getHeaderMsg();
105  if (isFirstFile_) {
106  setupMetaData(*header, false);
107  }
108 
109  // dump the list of HLT trigger name from the header
110  // dumpInitHeader(header);
111 
112  // if specific trigger selection is requested, check if the requested triggers match with trigger paths in the header file
113  if (!acceptAllEvt_) {
114  std::vector<std::string> tnames;
115  header->hltTriggerNames(tnames);
116 
117  triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
118 
119  // check if any trigger path name requested matches with trigger name in the header file
120  setMatchTriggerSel(tnames);
121  }
122 
123  // our initialization
125 
126  if (flagDeleteDatFiles_) {
127  // unlink the file
128  unlink(path.c_str());
129  }
130  }
131 
133 
135  if (file_.open()) {
136  file_.streamFile_->closeStreamerFile();
137  file_.streamFile_ = nullptr;
138 
139  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
140  }
141  }
142 
144  if (isFirstFile_) {
145  //The file was already opened in the constructor
146  isFirstFile_ = false;
147  return;
148  }
149 
152  artificialFileBoundary_ = false;
153  return;
154  }
155  //Get header/init from reader
156  InitMsgView const* header = getHeaderMsg();
157  setupMetaData(*header, true);
158  }
159 
161  closeFileImp_("skipping to another file");
162 
164  std::string p = currentLumi.get_data_path();
165 
166  if (std::filesystem::exists(p)) {
167  try {
168  openFileImp_(currentLumi);
169  return true;
170  } catch (const cms::Exception& e) {
171  if (unitTest_) {
172  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::openNextFileInp")
173  << std::string("Can't deserialize registry data (in open file): ") + e.what()
174  << "\n error: data file corrupted";
175  }
176 
177  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
178  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
179 
180  closeFileImp_("data file corrupted");
181  return false;
182  }
183  } else {
184  /* dat file missing */
185  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
186  fiterator_.logLumiState(currentLumi, "error: data file missing");
187 
188  return false;
189  }
190  }
191 
193  InitMsgView const* header = file_.streamFile_->startMessage();
194 
195  if (header->code() != Header::INIT) { // INIT Msg
196  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
197  << "received wrong message type: expected INIT, got " << header->code() << "\n";
198  }
199 
200  return header;
201  }
202 
204  auto next = file_.streamFile_->next();
206  return nullptr;
207  }
208 
210  return nullptr;
211  }
212 
213  EventMsgView const* msg = file_.streamFile_->currentRecord();
214 
215  // if (msg != nullptr) dumpEventView(msg);
216  return msg;
217  }
218 
231 
232  for (;;) {
234 
235  if (edm::shutdown_flag.load()) {
236  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
237 
238  closeFileImp_("shutdown flag is set");
239  return false;
240  }
241 
242  // check for end of run file and force quit
243  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
244  closeFileImp_("forced end-of-run");
245  return false;
246  }
247 
248  // check for end of run and quit if everything has been processed.
249  // this clean exit
250  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
251  return false;
252  }
253 
254  // if this is end of run and no more files to process
255  // close it
257  (fiterator_.state() == State::EOR)) {
258  closeFileImp_("graceful end-of-run");
259  return false;
260  }
261 
262  // skip to the next file if we have no files openned yet
263  if (!file_.open()) {
264  if (fiterator_.lumiReady()) {
266  // we might need to open once more (if .dat is missing)
267  continue;
268  }
269  }
270 
271  // or if there is a next file and enough eventshas been processed.
274  // we might need to open once more (if .dat is missing)
275  continue;
276  }
277 
278  return true;
279  }
280  }
281 
287  EventMsgView const* eview = nullptr;
289 
290  // wait for the next event
291  for (;;) {
292  // edm::LogAbsolute("DQMStreamerReader")
293  // << "State loop.";
294  bool next = prepareNextFile();
295  if (!next)
296  return nullptr;
297 
298  // sleep
299  if (!file_.open()) {
300  // the reader does not exist
301  fiterator_.delay();
302  } else {
303  // our reader exists, try to read out an event
304  eview = getEventMsg();
305 
306  if (eview == nullptr) {
307  // read unsuccessful
308  // this means end of file, so close the file
309  closeFileImp_("eof");
310  } else {
311  //NOTE: at this point need to see if meta data checksum changed. If it did
312  // we need to issue a 'new File' transition
313  if (eview->isEventMetaData()) {
314  auto lastEventMetaData = presentEventMetaDataChecksum();
315  if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
316  deserializeEventMetaData(*eview);
318  return nullptr;
319  } else {
320  //skipping
321  eview = getEventMsg();
322  assert((eview == nullptr) or (not eview->isEventMetaData()));
323  if (eview == nullptr) {
324  closeFileImp_("eof");
325  continue;
326  }
327  }
328  }
329 
330  if (!acceptEvent(eview)) {
331  continue;
332  } else {
333  return eview;
334  }
335  }
336  }
337  }
338  return eview;
339  }
340 
345  try {
346  EventMsgView const* eview = prepareNextEvent();
347  if (eview == nullptr) {
348  if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
349  return Next::kFile;
350  }
351  return Next::kStop;
352  }
353 
354  deserializeEvent(*eview);
355  } catch (const cms::Exception& e) {
356  // try to recover from corrupted files/events
357  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
358  closeFileImp_("data file corrupted");
359 
360  // this is not optimal, but hopefully we won't catch this many times in a row
361  return checkNext();
362  }
363 
365 
366  return Next::kEvent;
367  }
368 
374  acceptAllEvt_ = false;
375  for (auto hltPath : hltSel_) {
376  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
377  hltPath.end());
378  if (hltPath == "*") {
379  acceptAllEvt_ = true;
380  break;
381  }
382  }
383  return acceptAllEvt_;
384  }
385 
389  bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
390  matchTriggerSel_ = false;
391  for (auto hltPath : hltSel_) {
392  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
393  hltPath.end());
394  auto const matches = edm::regexMatch(tnames, hltPath);
395  if (not matches.empty()) {
396  matchTriggerSel_ = true;
397  break;
398  }
399  }
400 
401  if (not matchTriggerSel_) {
402  edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
403  }
404 
405  return matchTriggerSel_;
406  }
407 
412  if (acceptAllEvt_)
413  return true;
414  if (!matchTriggerSel_)
415  return false;
416 
417  std::vector<unsigned char> hltTriggerBits_;
418  int hltTriggerCount_ = evtmsg->hltCount();
419  if (hltTriggerCount_ > 0) {
420  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
421  }
422  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
423 
424  return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
425  }
426 
427  void DQMStreamerReader::skip(int toSkip) {
428  try {
429  for (int i = 0; i != toSkip; ++i) {
430  EventMsgView const* evMsg = prepareNextEvent();
431 
432  if (evMsg == nullptr) {
433  return;
434  }
435  }
436  } catch (const cms::Exception& e) {
437  // try to recover from corrupted files/events
438  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
439  closeFileImp_("data file corrupted");
440  }
441  }
442 
445  desc.setComment("Reads events from streamer files.");
446 
447  desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
448 
449  desc.addUntracked<int>("minEventsPerLumi", 1)
450  ->setComment(
451  "Minimum number of events to process per lumisection, "
452  "before switching to a new input file. If the next file "
453  "does not yet exist, "
454  "the number of processed events will be bigger.");
455 
456  desc.addUntracked<bool>("skipFirstLumis", false)
457  ->setComment(
458  "Skip (and ignore the minEventsPerLumi parameter) for the files "
459  "which have been available at the begining of the processing. "
460  "If set to true, the reader will open last available file for "
461  "processing.");
462 
463  desc.addUntracked<bool>("deleteDatFiles", false)
464  ->setComment(
465  "Delete data files after they have been closed, in order to "
466  "save disk space.");
467 
468  desc.addUntracked<bool>("endOfRunKills", false)
469  ->setComment(
470  "Kill the processing as soon as the end-of-run file appears, even if "
471  "there are/will be unprocessed lumisections.");
472 
473  desc.addUntracked<bool>("unitTest", false)
474  ->setComment("Kill the processing if the input data cannot be deserialized");
475 
476  // desc.addUntracked<unsigned int>("skipEvents", 0U)
477  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
478  // "have been processed.");
479 
480  // This next parameter is read in the base class, but its default value
481  // depends on the derived class, so it is set here.
482  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
483 
487 
488  descriptions.add("source", desc);
489  }
490 
491 } // namespace dqmservices
492 
495 
unsigned int const minEventsPerLs_
void deserializeEventMetaData(EventMsgView const &eventView)
std::vector< std::string > const hltSel_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
bool acceptEvent(const edm::streamer::EventMsgView *)
volatile std::atomic< bool > shutdown_flag
edm::streamer::InitMsgView const * getHeaderMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static void fillDescription(ParameterSetDescription &description)
dqmservices::DQMStreamerReader DQMStreamerReader
std::shared_ptr< TriggerSelector > triggerSelector_
assert(be >=bs)
void hltTriggerBits(uint8 *put_here) const
bool setMatchTriggerSel(std::vector< std::string > const &tnames)
edm::streamer::EventMsgView const * getEventMsg()
#define DEFINE_FWK_INPUT_SOURCE(type)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void setupMetaData(edm::streamer::InitMsgView const &msg, bool subsequent)
void deserializeEvent(EventMsgView const &eventView)
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
void logFileAction(const std::string &msg, const std::string &fileName="") const
edm::streamer::EventMsgView const * prepareNextEvent()
std::unique_ptr< edm::streamer::StreamerInputFile > streamFile_
uint32_t eventMetaDataChecksum(EventMsgView const &eventView) const
def load(fileName)
Definition: svgfig.py:547
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
static void fillDescription(ParameterSetDescription &desc)
void closeFileImp_(const std::string &reason)
Log< level::Warning, false > LogWarning
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_
Definition: event.py:1
void skip(int toSkip) override