CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
1 #include "DQMStreamerReader.h"
2 
12 
13 #include <cstdlib>
14 #include <filesystem>
15 #include <fstream>
16 #include <queue>
17 #include <algorithm>
18 #include <cctype>
19 
20 namespace dqmservices {
21  using namespace edm::streamer;
22 
25  fiterator_(pset),
26  minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
27  flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
28  flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
29  flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
30  hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")) {
32  reset_();
33  }
34 
36  // Sometimes(?) the destructor called after service registry was already destructed
37  // and closeFile_ throws away no ServiceRegistry found exception...
38  //
39  // Normally, this file should be closed before this destructor is called.
40  //closeFileImp_("destructor");
41  }
42 
44  // We have to load at least a single header,
45  // so the ProductRegistry gets initialized.
46  //
47  // This must happen here (inside the constructor),
48  // as ProductRegistry gets frozen after we initialize:
49  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
50 
51  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
52 
54 
55  // Fast-forward to the last open file.
56  if (flagSkipFirstLumis_) {
57  unsigned int l = fiterator_.lastLumiFound();
58  if (l > 1) {
59  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
60  }
61  }
62 
63  for (;;) {
64  bool next = prepareNextFile();
65 
66  // check for end of run
67  if (!next) {
68  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
69  return;
70  }
71 
72  // check if we have a file openned
73  if (file_.open()) {
74  // we are now initialised
75  break;
76  }
77 
78  // wait
79  fiterator_.delay();
80  }
81 
82  fiterator_.logFileAction("DQMStreamerReader initialised.");
83  }
84 
87  auto event = getEventMsg();
88  //file might be empty
89  if (not event)
90  return;
91  assert(event->isEventMetaData());
94  }
97 
98  std::string path = entry.get_data_path();
99 
100  file_.lumi_ = entry;
101  file_.streamFile_ = std::make_unique<StreamerInputFile>(path);
102 
103  InitMsgView const* header = getHeaderMsg();
104  if (isFirstFile_) {
105  setupMetaData(*header, false);
106  }
107 
108  // dump the list of HLT trigger name from the header
109  // dumpInitHeader(header);
110 
111  // if specific trigger selection is requested, check if the requested triggers match with trigger paths in the header file
112  if (!acceptAllEvt_) {
113  std::vector<std::string> tnames;
114  header->hltTriggerNames(tnames);
115 
116  triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
117 
118  // check if any trigger path name requested matches with trigger name in the header file
119  setMatchTriggerSel(tnames);
120  }
121 
122  // our initialization
124 
125  if (flagDeleteDatFiles_) {
126  // unlink the file
127  unlink(path.c_str());
128  }
129  }
130 
132 
134  if (file_.open()) {
135  file_.streamFile_->closeStreamerFile();
136  file_.streamFile_ = nullptr;
137 
138  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
139  }
140  }
141 
143  if (isFirstFile_) {
144  //The file was already opened in the constructor
145  isFirstFile_ = false;
146  return;
147  }
148 
151  artificialFileBoundary_ = false;
152  return;
153  }
154  //Get header/init from reader
155  InitMsgView const* header = getHeaderMsg();
156  setupMetaData(*header, true);
157  }
158 
160  closeFileImp_("skipping to another file");
161 
163  std::string p = currentLumi.get_data_path();
164 
165  if (std::filesystem::exists(p)) {
166  try {
167  openFileImp_(currentLumi);
168  return true;
169  } catch (const cms::Exception& e) {
170  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
171  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
172 
173  closeFileImp_("data file corrupted");
174  return false;
175  }
176  } else {
177  /* dat file missing */
178  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
179  fiterator_.logLumiState(currentLumi, "error: data file missing");
180 
181  return false;
182  }
183  }
184 
186  InitMsgView const* header = file_.streamFile_->startMessage();
187 
188  if (header->code() != Header::INIT) { // INIT Msg
189  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
190  << "received wrong message type: expected INIT, got " << header->code() << "\n";
191  }
192 
193  return header;
194  }
195 
197  auto next = file_.streamFile_->next();
199  return nullptr;
200  }
201 
203  return nullptr;
204  }
205 
206  EventMsgView const* msg = file_.streamFile_->currentRecord();
207 
208  // if (msg != nullptr) dumpEventView(msg);
209  return msg;
210  }
211 
224 
225  for (;;) {
227 
228  if (edm::shutdown_flag.load()) {
229  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
230 
231  closeFileImp_("shutdown flag is set");
232  return false;
233  }
234 
235  // check for end of run file and force quit
236  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
237  closeFileImp_("forced end-of-run");
238  return false;
239  }
240 
241  // check for end of run and quit if everything has been processed.
242  // this clean exit
243  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
244  return false;
245  }
246 
247  // if this is end of run and no more files to process
248  // close it
250  (fiterator_.state() == State::EOR)) {
251  closeFileImp_("graceful end-of-run");
252  return false;
253  }
254 
255  // skip to the next file if we have no files openned yet
256  if (!file_.open()) {
257  if (fiterator_.lumiReady()) {
259  // we might need to open once more (if .dat is missing)
260  continue;
261  }
262  }
263 
264  // or if there is a next file and enough eventshas been processed.
267  // we might need to open once more (if .dat is missing)
268  continue;
269  }
270 
271  return true;
272  }
273  }
274 
280  EventMsgView const* eview = nullptr;
282 
283  // wait for the next event
284  for (;;) {
285  // edm::LogAbsolute("DQMStreamerReader")
286  // << "State loop.";
287  bool next = prepareNextFile();
288  if (!next)
289  return nullptr;
290 
291  // sleep
292  if (!file_.open()) {
293  // the reader does not exist
294  fiterator_.delay();
295  } else {
296  // our reader exists, try to read out an event
297  eview = getEventMsg();
298 
299  if (eview == nullptr) {
300  // read unsuccessful
301  // this means end of file, so close the file
302  closeFileImp_("eof");
303  } else {
304  //NOTE: at this point need to see if meta data checksum changed. If it did
305  // we need to issue a 'new File' transition
306  if (eview->isEventMetaData()) {
307  auto lastEventMetaData = presentEventMetaDataChecksum();
308  if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
309  deserializeEventMetaData(*eview);
311  return nullptr;
312  } else {
313  //skipping
314  eview = getEventMsg();
315  assert((eview == nullptr) or (not eview->isEventMetaData()));
316  if (eview == nullptr) {
317  closeFileImp_("eof");
318  continue;
319  }
320  }
321  }
322 
323  if (!acceptEvent(eview)) {
324  continue;
325  } else {
326  return eview;
327  }
328  }
329  }
330  }
331  return eview;
332  }
333 
338  try {
339  EventMsgView const* eview = prepareNextEvent();
340  if (eview == nullptr) {
341  if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
342  return Next::kFile;
343  }
344  return Next::kStop;
345  }
346 
347  deserializeEvent(*eview);
348  } catch (const cms::Exception& e) {
349  // try to recover from corrupted files/events
350  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
351  closeFileImp_("data file corrupted");
352 
353  // this is not optimal, but hopefully we won't catch this many times in a row
354  return checkNext();
355  }
356 
358 
359  return Next::kEvent;
360  }
361 
367  acceptAllEvt_ = false;
368  for (auto hltPath : hltSel_) {
369  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
370  hltPath.end());
371  if (hltPath == "*") {
372  acceptAllEvt_ = true;
373  break;
374  }
375  }
376  return acceptAllEvt_;
377  }
378 
382  bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
383  matchTriggerSel_ = false;
384  for (auto hltPath : hltSel_) {
385  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
386  hltPath.end());
387  auto const matches = edm::regexMatch(tnames, hltPath);
388  if (not matches.empty()) {
389  matchTriggerSel_ = true;
390  break;
391  }
392  }
393 
394  if (not matchTriggerSel_) {
395  edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
396  }
397 
398  return matchTriggerSel_;
399  }
400 
405  if (acceptAllEvt_)
406  return true;
407  if (!matchTriggerSel_)
408  return false;
409 
410  std::vector<unsigned char> hltTriggerBits_;
411  int hltTriggerCount_ = evtmsg->hltCount();
412  if (hltTriggerCount_ > 0) {
413  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
414  }
415  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
416 
417  return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
418  }
419 
420  void DQMStreamerReader::skip(int toSkip) {
421  try {
422  for (int i = 0; i != toSkip; ++i) {
423  EventMsgView const* evMsg = prepareNextEvent();
424 
425  if (evMsg == nullptr) {
426  return;
427  }
428  }
429  } catch (const cms::Exception& e) {
430  // try to recover from corrupted files/events
431  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
432  closeFileImp_("data file corrupted");
433  }
434  }
435 
438  desc.setComment("Reads events from streamer files.");
439 
440  desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
441 
442  desc.addUntracked<int>("minEventsPerLumi", 1)
443  ->setComment(
444  "Minimum number of events to process per lumisection, "
445  "before switching to a new input file. If the next file "
446  "does not yet exist, "
447  "the number of processed events will be bigger.");
448 
449  desc.addUntracked<bool>("skipFirstLumis", false)
450  ->setComment(
451  "Skip (and ignore the minEventsPerLumi parameter) for the files "
452  "which have been available at the begining of the processing. "
453  "If set to true, the reader will open last available file for "
454  "processing.");
455 
456  desc.addUntracked<bool>("deleteDatFiles", false)
457  ->setComment(
458  "Delete data files after they have been closed, in order to "
459  "save disk space.");
460 
461  desc.addUntracked<bool>("endOfRunKills", false)
462  ->setComment(
463  "Kill the processing as soon as the end-of-run file appears, even if "
464  "there are/will be unprocessed lumisections.");
465 
466  // desc.addUntracked<unsigned int>("skipEvents", 0U)
467  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
468  // "have been processed.");
469 
470  // This next parameter is read in the base class, but its default value
471  // depends on the derived class, so it is set here.
472  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
473 
477 
478  descriptions.add("source", desc);
479  }
480 
481 } // namespace dqmservices
482 
485 
unsigned int const minEventsPerLs_
void deserializeEventMetaData(EventMsgView const &eventView)
std::vector< std::string > const hltSel_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
bool acceptEvent(const edm::streamer::EventMsgView *)
volatile std::atomic< bool > shutdown_flag
edm::streamer::InitMsgView const * getHeaderMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static void fillDescription(ParameterSetDescription &description)
dqmservices::DQMStreamerReader DQMStreamerReader
std::shared_ptr< TriggerSelector > triggerSelector_
assert(be >=bs)
void hltTriggerBits(uint8 *put_here) const
bool setMatchTriggerSel(std::vector< std::string > const &tnames)
edm::streamer::EventMsgView const * getEventMsg()
#define DEFINE_FWK_INPUT_SOURCE(type)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void setupMetaData(edm::streamer::InitMsgView const &msg, bool subsequent)
void deserializeEvent(EventMsgView const &eventView)
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
void logFileAction(const std::string &msg, const std::string &fileName="") const
edm::streamer::EventMsgView const * prepareNextEvent()
std::unique_ptr< edm::streamer::StreamerInputFile > streamFile_
uint32_t eventMetaDataChecksum(EventMsgView const &eventView) const
def load(fileName)
Definition: svgfig.py:547
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
static void fillDescription(ParameterSetDescription &desc)
void closeFileImp_(const std::string &reason)
Log< level::Warning, false > LogWarning
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_
Definition: event.py:1
void skip(int toSkip) override