CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
1 #include "DQMStreamerReader.h"
2 
12 
13 #include <cstdlib>
14 #include <filesystem>
15 #include <fstream>
16 #include <queue>
17 #include <algorithm>
18 #include <cctype>
19 
20 namespace dqmservices {
21 
23  : StreamerInputSource(pset, desc),
24  fiterator_(pset),
25  minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
26  flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
27  flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
28  flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
29  hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")) {
31  reset_();
32  }
33 
35  // Sometimes(?) the destructor called after service registry was already destructed
36  // and closeFile_ throws away no ServiceRegistry found exception...
37  //
38  // Normally, this file should be closed before this destructor is called.
39  //closeFileImp_("destructor");
40  }
41 
43  // We have to load at least a single header,
44  // so the ProductRegistry gets initialized.
45  //
46  // This must happen here (inside the constructor),
47  // as ProductRegistry gets frozen after we initialize:
48  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
49 
50  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
51 
53 
54  // Fast-forward to the last open file.
55  if (flagSkipFirstLumis_) {
56  unsigned int l = fiterator_.lastLumiFound();
57  if (l > 1) {
58  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
59  }
60  }
61 
62  for (;;) {
63  bool next = prepareNextFile();
64 
65  // check for end of run
66  if (!next) {
67  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
68  return;
69  }
70 
71  // check if we have a file openned
72  if (file_.open()) {
73  // we are now initialised
74  break;
75  }
76 
77  // wait
78  fiterator_.delay();
79  }
80 
81  fiterator_.logFileAction("DQMStreamerReader initialised.");
82  }
83 
86 
87  std::string path = entry.get_data_path();
88 
89  file_.lumi_ = entry;
90  file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
91 
92  InitMsgView const* header = getHeaderMsg();
93  if (isFirstFile_) {
95  }
96 
97  // dump the list of HLT trigger name from the header
98  // dumpInitHeader(header);
99 
100  // if specific trigger selection is requested, check if the requested triggers match with trigger paths in the header file
101  if (!acceptAllEvt_) {
102  std::vector<std::string> tnames;
103  header->hltTriggerNames(tnames);
104 
105  triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
106 
107  // check if any trigger path name requested matches with trigger name in the header file
108  setMatchTriggerSel(tnames);
109  }
110 
111  // our initialization
113 
114  if (flagDeleteDatFiles_) {
115  // unlink the file
116  unlink(path.c_str());
117  }
118  }
119 
121 
123  if (file_.open()) {
124  file_.streamFile_->closeStreamerFile();
125  file_.streamFile_ = nullptr;
126 
127  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
128  }
129  }
130 
132  if (isFirstFile_) {
133  //The file was already opened in the constructor
134  isFirstFile_ = false;
135  return;
136  }
137 
138  //Get header/init from reader
139  InitMsgView const* header = getHeaderMsg();
141  }
142 
144  closeFileImp_("skipping to another file");
145 
147  std::string p = currentLumi.get_data_path();
148 
149  if (std::filesystem::exists(p)) {
150  try {
151  openFileImp_(currentLumi);
152  return true;
153  } catch (const cms::Exception& e) {
154  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
155  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
156 
157  closeFileImp_("data file corrupted");
158  return false;
159  }
160  } else {
161  /* dat file missing */
162  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
163  fiterator_.logLumiState(currentLumi, "error: data file missing");
164 
165  return false;
166  }
167  }
168 
170  InitMsgView const* header = file_.streamFile_->startMessage();
171 
172  if (header->code() != Header::INIT) { // INIT Msg
173  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
174  << "received wrong message type: expected INIT, got " << header->code() << "\n";
175  }
176 
177  return header;
178  }
179 
181  auto next = file_.streamFile_->next();
183  return nullptr;
184  }
185 
187  return nullptr;
188  }
189 
190  EventMsgView const* msg = file_.streamFile_->currentRecord();
191 
192  // if (msg != nullptr) dumpEventView(msg);
193  return msg;
194  }
195 
208 
209  for (;;) {
211 
212  if (edm::shutdown_flag.load()) {
213  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
214 
215  closeFileImp_("shutdown flag is set");
216  return false;
217  }
218 
219  // check for end of run file and force quit
220  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
221  closeFileImp_("forced end-of-run");
222  return false;
223  }
224 
225  // check for end of run and quit if everything has been processed.
226  // this clean exit
227  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
228  return false;
229  }
230 
231  // if this is end of run and no more files to process
232  // close it
234  (fiterator_.state() == State::EOR)) {
235  closeFileImp_("graceful end-of-run");
236  return false;
237  }
238 
239  // skip to the next file if we have no files openned yet
240  if (!file_.open()) {
241  if (fiterator_.lumiReady()) {
243  // we might need to open once more (if .dat is missing)
244  continue;
245  }
246  }
247 
248  // or if there is a next file and enough eventshas been processed.
251  // we might need to open once more (if .dat is missing)
252  continue;
253  }
254 
255  return true;
256  }
257  }
258 
264  EventMsgView const* eview = nullptr;
266 
267  // wait for the next event
268  for (;;) {
269  // edm::LogAbsolute("DQMStreamerReader")
270  // << "State loop.";
271  bool next = prepareNextFile();
272  if (!next)
273  return nullptr;
274 
275  // sleep
276  if (!file_.open()) {
277  // the reader does not exist
278  fiterator_.delay();
279  } else {
280  // our reader exists, try to read out an event
281  eview = getEventMsg();
282 
283  if (eview == nullptr) {
284  // read unsuccessful
285  // this means end of file, so close the file
286  closeFileImp_("eof");
287  } else {
288  if (!acceptEvent(eview)) {
289  continue;
290  } else {
291  return eview;
292  }
293  }
294  }
295  }
296  return eview;
297  }
298 
303  try {
304  EventMsgView const* eview = prepareNextEvent();
305  if (eview == nullptr) {
306  if (file_.streamFile_ and file_.streamFile_->newHeader()) {
307  return Next::kFile;
308  }
309  return Next::kStop;
310  }
311 
312  deserializeEvent(*eview);
313  } catch (const cms::Exception& e) {
314  // try to recover from corrupted files/events
315  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
316  closeFileImp_("data file corrupted");
317 
318  // this is not optimal, but hopefully we won't catch this many times in a row
319  return checkNext();
320  }
321 
323 
324  return Next::kEvent;
325  }
326 
332  acceptAllEvt_ = false;
333  for (auto hltPath : hltSel_) {
334  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
335  hltPath.end());
336  if (hltPath == "*") {
337  acceptAllEvt_ = true;
338  break;
339  }
340  }
341  return acceptAllEvt_;
342  }
343 
347  bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
348  matchTriggerSel_ = false;
349  for (auto hltPath : hltSel_) {
350  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
351  hltPath.end());
352  auto const matches = edm::regexMatch(tnames, hltPath);
353  if (not matches.empty()) {
354  matchTriggerSel_ = true;
355  break;
356  }
357  }
358 
359  if (not matchTriggerSel_) {
360  edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
361  }
362 
363  return matchTriggerSel_;
364  }
365 
370  if (acceptAllEvt_)
371  return true;
372  if (!matchTriggerSel_)
373  return false;
374 
375  std::vector<unsigned char> hltTriggerBits_;
376  int hltTriggerCount_ = evtmsg->hltCount();
377  if (hltTriggerCount_ > 0) {
378  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
379  }
380  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
381 
382  return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
383  }
384 
385  void DQMStreamerReader::skip(int toSkip) {
386  try {
387  for (int i = 0; i != toSkip; ++i) {
388  EventMsgView const* evMsg = prepareNextEvent();
389 
390  if (evMsg == nullptr) {
391  return;
392  }
393  }
394  } catch (const cms::Exception& e) {
395  // try to recover from corrupted files/events
396  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
397  closeFileImp_("data file corrupted");
398  }
399  }
400 
403  desc.setComment("Reads events from streamer files.");
404 
405  desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
406 
407  desc.addUntracked<int>("minEventsPerLumi", 1)
408  ->setComment(
409  "Minimum number of events to process per lumisection, "
410  "before switching to a new input file. If the next file "
411  "does not yet exist, "
412  "the number of processed events will be bigger.");
413 
414  desc.addUntracked<bool>("skipFirstLumis", false)
415  ->setComment(
416  "Skip (and ignore the minEventsPerLumi parameter) for the files "
417  "which have been available at the begining of the processing. "
418  "If set to true, the reader will open last available file for "
419  "processing.");
420 
421  desc.addUntracked<bool>("deleteDatFiles", false)
422  ->setComment(
423  "Delete data files after they have been closed, in order to "
424  "save disk space.");
425 
426  desc.addUntracked<bool>("endOfRunKills", false)
427  ->setComment(
428  "Kill the processing as soon as the end-of-run file appears, even if "
429  "there are/will be unprocessed lumisections.");
430 
431  // desc.addUntracked<unsigned int>("skipEvents", 0U)
432  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
433  // "have been processed.");
434 
435  // This next parameter is read in the base class, but its default value
436  // depends on the derived class, so it is set here.
437  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
438 
442 
443  descriptions.add("source", desc);
444  }
445 
446 } // namespace dqmservices
447 
450 
static void fillDescription(ParameterSetDescription &description)
unsigned int const minEventsPerLs_
void hltTriggerBits(uint8 *put_here) const
std::vector< std::string > const hltSel_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
volatile std::atomic< bool > shutdown_flag
InitMsgView const * getHeaderMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
dqmservices::DQMStreamerReader DQMStreamerReader
std::shared_ptr< TriggerSelector > triggerSelector_
bool setMatchTriggerSel(std::vector< std::string > const &tnames)
EventMsgView const * getEventMsg()
std::unique_ptr< edm::StreamerInputFile > streamFile_
#define DEFINE_FWK_INPUT_SOURCE(type)
void deserializeEvent(EventMsgView const &eventView)
uint32 hltCount() const
Definition: EventMessage.h:94
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
void logFileAction(const std::string &msg, const std::string &fileName="") const
EventMsgView const * prepareNextEvent()
bool acceptEvent(const EventMsgView *)
def load(fileName)
Definition: svgfig.py:547
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
static void fillDescription(ParameterSetDescription &desc)
void closeFileImp_(const std::string &reason)
Log< level::Warning, false > LogWarning
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_
void skip(int toSkip) override