CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
9 
11 #include "DQMStreamerReader.h"
12 
13 #include <cstdlib>
14 #include <filesystem>
15 #include <fstream>
16 #include <memory>
17 #include <queue>
18 
19 #include <boost/algorithm/string.hpp>
20 #include <boost/format.hpp>
21 #include <boost/range.hpp>
22 #include <boost/regex.hpp>
23 #include <boost/algorithm/string.hpp>
24 
26 
27 namespace dqmservices {
28 
30  : StreamerInputSource(pset, desc), fiterator_(pset) {
31  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
32  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
33  hltSel_ = pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
34 
35  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
36  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
37  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
38  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
39 
40  triggerSel();
41 
42  reset_();
43  }
44 
46  // Sometimes(?) the destructor called after service registry was already destructed
47  // and closeFile_ throws away no ServiceRegistry found exception...
48  //
49  // Normally, this file should be closed before this destructor is called.
50  //closeFileImp_("destructor");
51  }
52 
54  // We have to load at least a single header,
55  // so the ProductRegistry gets initialized.
56  //
57  // This must happen here (inside the constructor),
58  // as ProductRegistry gets frozen after we initialize:
59  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
60 
61  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
62 
64 
65  // Fast-forward to the last open file.
66  if (flagSkipFirstLumis_) {
67  unsigned int l = fiterator_.lastLumiFound();
68  if (l > 1) {
69  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
70  }
71  }
72 
73  for (;;) {
74  bool next = prepareNextFile();
75 
76  // check for end of run
77  if (!next) {
78  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
79  return;
80  }
81 
82  // check if we have a file openned
83  if (file_.open()) {
84  // we are now initialised
85  break;
86  }
87 
88  // wait
89  fiterator_.delay();
90  }
91 
92  fiterator_.logFileAction("DQMStreamerReader initialised.");
93  }
94 
98 
99  std::string path = entry.get_data_path();
100 
101  file_.lumi_ = entry;
102  file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
103 
104  InitMsgView const* header = getHeaderMsg();
105  if (isFirstFile_) {
107  }
108 
109  // dump the list of HLT trigger name from the header
110  // dumpInitHeader(header);
111 
112  // if specific trigger selection is requested, check if the requested triggers
113  // match with trigger paths in the header file
114  if (!acceptAllEvt_) {
115  Strings tnames;
116  header->hltTriggerNames(tnames);
117 
118  pset.addParameter<Strings>("SelectEvents", hltSel_);
119  eventSelector_.reset(new TriggerSelector(pset, tnames));
120 
121  // check if any trigger path name requested matches with trigger name in the
122  // header file
123  matchTriggerSel(tnames);
124  }
125 
126  // our initialization
128 
129  if (flagDeleteDatFiles_) {
130  // unlink the file
131  unlink(path.c_str());
132  }
133  }
134 
136 
138  if (file_.open()) {
139  file_.streamFile_->closeStreamerFile();
140  file_.streamFile_ = nullptr;
141 
142  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
143  }
144  }
145 
147  if (isFirstFile_) {
148  //The file was already opened in the constructor
149  isFirstFile_ = false;
150  return;
151  }
152 
153  //Get header/init from reader
154  InitMsgView const* header = getHeaderMsg();
156  }
157 
159  closeFileImp_("skipping to another file");
160 
162  std::string p = currentLumi.get_data_path();
163 
164  if (std::filesystem::exists(p)) {
165  try {
166  openFileImp_(currentLumi);
167  return true;
168  } catch (const cms::Exception& e) {
169  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
170  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
171 
172  closeFileImp_("data file corrupted");
173  return false;
174  }
175  } else {
176  /* dat file missing */
177  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
178  fiterator_.logLumiState(currentLumi, "error: data file missing");
179 
180  return false;
181  }
182  }
183 
185  InitMsgView const* header = file_.streamFile_->startMessage();
186 
187  if (header->code() != Header::INIT) { // INIT Msg
188  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
189  << "received wrong message type: expected INIT, got " << header->code() << "\n";
190  }
191 
192  return header;
193  }
194 
196  auto next = file_.streamFile_->next();
198  return nullptr;
199  }
200 
202  return nullptr;
203  }
204 
205  EventMsgView const* msg = file_.streamFile_->currentRecord();
206 
207  // if (msg != nullptr) dumpEventView(msg);
208  return msg;
209  }
210 
223 
224  for (;;) {
226 
227  if (edm::shutdown_flag.load()) {
228  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
229 
230  closeFileImp_("shutdown flag is set");
231  return false;
232  }
233 
234  // check for end of run file and force quit
235  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
236  closeFileImp_("forced end-of-run");
237  return false;
238  }
239 
240  // check for end of run and quit if everything has been processed.
241  // this clean exit
242  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
243  return false;
244  }
245 
246  // if this is end of run and no more files to process
247  // close it
249  (fiterator_.state() == State::EOR)) {
250  closeFileImp_("graceful end-of-run");
251  return false;
252  }
253 
254  // skip to the next file if we have no files openned yet
255  if (!file_.open()) {
256  if (fiterator_.lumiReady()) {
258  // we might need to open once more (if .dat is missing)
259  continue;
260  }
261  }
262 
263  // or if there is a next file and enough eventshas been processed.
266  // we might need to open once more (if .dat is missing)
267  continue;
268  }
269 
270  return true;
271  }
272  }
273 
279  EventMsgView const* eview = nullptr;
281 
282  // wait for the next event
283  for (;;) {
284  // edm::LogAbsolute("DQMStreamerReader")
285  // << "State loop.";
286  bool next = prepareNextFile();
287  if (!next)
288  return nullptr;
289 
290  // sleep
291  if (!file_.open()) {
292  // the reader does not exist
293  fiterator_.delay();
294  } else {
295  // our reader exists, try to read out an event
296  eview = getEventMsg();
297 
298  if (eview == nullptr) {
299  // read unsuccessful
300  // this means end of file, so close the file
301  closeFileImp_("eof");
302  } else {
303  if (!acceptEvent(eview)) {
304  continue;
305  } else {
306  return eview;
307  }
308  }
309  }
310  }
311  return eview;
312  }
313 
318  try {
319  EventMsgView const* eview = prepareNextEvent();
320  if (eview == nullptr) {
321  if (file_.streamFile_ and file_.streamFile_->newHeader()) {
322  return Next::kFile;
323  }
324  return Next::kStop;
325  }
326 
327  deserializeEvent(*eview);
328  } catch (const cms::Exception& e) {
329  // try to recover from corrupted files/events
330  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
331  closeFileImp_("data file corrupted");
332 
333  // this is not optimal, but hopefully we won't catch this many times in a row
334  return checkNext();
335  }
336 
338 
339  return Next::kEvent;
340  }
341 
347  acceptAllEvt_ = false;
348  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
350  boost::erase_all(hltPath, " \t");
351  if (hltPath == "*")
352  acceptAllEvt_ = true;
353  }
354  return acceptAllEvt_;
355  }
356 
361  matchTriggerSel_ = false;
362  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
364  boost::erase_all(hltPath, " \t");
365  std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
366  if (!matches.empty()) {
367  matchTriggerSel_ = true;
368  }
369  }
370 
371  if (!matchTriggerSel_) {
372  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
373  }
374 
375  return matchTriggerSel_;
376  }
377 
382  if (acceptAllEvt_)
383  return true;
384  if (!matchTriggerSel_)
385  return false;
386 
387  std::vector<unsigned char> hltTriggerBits_;
388  int hltTriggerCount_ = evtmsg->hltCount();
389  if (hltTriggerCount_ > 0) {
390  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
391  }
392  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
393 
394  if (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
395  return true;
396  } else {
397  return false;
398  }
399  }
400 
401  void DQMStreamerReader::skip(int toSkip) {
402  try {
403  for (int i = 0; i != toSkip; ++i) {
404  EventMsgView const* evMsg = prepareNextEvent();
405 
406  if (evMsg == nullptr) {
407  return;
408  }
409  }
410  } catch (const cms::Exception& e) {
411  // try to recover from corrupted files/events
412  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
413  closeFileImp_("data file corrupted");
414  }
415  }
416 
419  desc.setComment("Reads events from streamer files.");
420 
421  desc.addUntracked<std::vector<std::string> >("SelectEvents")->setComment("HLT path to select events ");
422 
423  desc.addUntracked<int>("minEventsPerLumi", 1)
424  ->setComment(
425  "Minimum number of events to process per lumisection, "
426  "before switching to a new input file. If the next file "
427  "does not yet exist, "
428  "the number of processed events will be bigger.");
429 
430  desc.addUntracked<bool>("skipFirstLumis", false)
431  ->setComment(
432  "Skip (and ignore the minEventsPerLumi parameter) for the files "
433  "which have been available at the begining of the processing. "
434  "If set to true, the reader will open last available file for "
435  "processing.");
436 
437  desc.addUntracked<bool>("deleteDatFiles", false)
438  ->setComment(
439  "Delete data files after they have been closed, in order to "
440  "save disk space.");
441 
442  desc.addUntracked<bool>("endOfRunKills", false)
443  ->setComment(
444  "Kill the processing as soon as the end-of-run file appears, even if "
445  "there are/will be unprocessed lumisections.");
446 
447  // desc.addUntracked<unsigned int>("skipEvents", 0U)
448  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
449  // "have been processed.");
450 
451  // This next parameter is read in the base class, but its default value
452  // depends on the derived class, so it is set here.
453  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
454 
458 
459  descriptions.add("source", desc);
460  }
461 
462 } // namespace dqmservices
463 
466 
ConfigurationDescriptions.h
mps_fire.i
i
Definition: mps_fire.py:428
dqmservices::DQMStreamerReader::reset_
void reset_() override
Definition: DQMStreamerReader.cc:53
dqmservices::DQMFileIterator::logFileAction
void logFileAction(const std::string &msg, const std::string &fileName="") const
Definition: DQMFileIterator.cc:376
dqmservices::DQMStreamerReader::triggerSel
bool triggerSel()
Definition: DQMStreamerReader.cc:346
MessageLogger.h
dqmservices
Definition: DQMFileIterator.cc:19
dqmservices::DQMStreamerReader::acceptAllEvt_
bool acceptAllEvt_
Definition: DQMStreamerReader.h:59
hcaldqm::flag::State
State
Definition: Flag.h:13
edm::StreamerInputSource::deserializeEvent
void deserializeEvent(EventMsgView const &eventView)
Definition: StreamerInputSource.cc:185
dqmservices::DQMStreamerReader::acceptEvent
bool acceptEvent(const EventMsgView *)
Definition: DQMStreamerReader.cc:381
edm::regexMatch
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
mps_splice.entry
entry
Definition: mps_splice.py:68
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
dqmservices::DQMStreamerReader::openFileImp_
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
Definition: DQMStreamerReader.cc:95
dqmservices::DQMStreamerReader::OpenFile::streamFile_
std::unique_ptr< edm::StreamerInputFile > streamFile_
Definition: DQMStreamerReader.h:78
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
edm::StreamerInputSource::fillDescription
static void fillDescription(ParameterSetDescription &description)
Definition: StreamerInputSource.cc:513
edm::InputSourceDescription
Definition: InputSourceDescription.h:20
dqmservices::DQMFileIterator::open
LumiEntry open()
Definition: DQMFileIterator.cc:124
dqmservices::DQMStreamerReader::closeFileImp_
void closeFileImp_(const std::string &reason)
Definition: DQMStreamerReader.cc:137
mps_check.msg
tuple msg
Definition: mps_check.py:285
dqmservices::DQMStreamerReader::genuineReadFile
void genuineReadFile() override
Definition: DQMStreamerReader.cc:146
dqmservices::DQMStreamerReader::isFirstFile_
bool isFirstFile_
Definition: DQMStreamerReader.h:61
dqmservices::DQMStreamerReader::DQMStreamerReader
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
Definition: DQMStreamerReader.cc:29
dqmservices::DQMStreamerReader::runInputDir_
std::string runInputDir_
Definition: DQMStreamerReader.h:64
dqmservices::DQMStreamerReader::openNextFileImp_
bool openNextFileImp_()
Definition: DQMStreamerReader.cc:158
dqmservices::DQMFileIterator::delay
void delay()
Definition: DQMFileIterator.cc:391
dqmservices::DQMFileIterator::LumiEntry
Definition: DQMFileIterator.h:20
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
dqmservices::DQMStreamerReader::minEventsPerLs_
unsigned int minEventsPerLs_
Definition: DQMStreamerReader.h:69
Header::INIT
Definition: MsgHeader.h:15
dqmservices::TriggerSelector
Definition: TriggerSelector.h:19
EventMsgView
Definition: EventMessage.h:72
dqmservices::DQMStreamerReader::flagEndOfRunKills_
bool flagEndOfRunKills_
Definition: DQMStreamerReader.h:72
EventSkipperByID.h
dqmservices::DQMFileIterator::fillDescription
static void fillDescription(edm::ParameterSetDescription &d)
Definition: DQMFileIterator.cc:398
edm::StreamerInputSource::deserializeAndMergeWithRegistry
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
Definition: StreamerInputSource.cc:167
dqmservices::DQMStreamerReader::~DQMStreamerReader
~DQMStreamerReader() override
Definition: DQMStreamerReader.cc:45
dqmservices::DQMStreamerReader::checkNext
Next checkNext() override
Definition: DQMStreamerReader.cc:317
EDMException.h
dqmservices::DQMStreamerReader::flagDeleteDatFiles_
bool flagDeleteDatFiles_
Definition: DQMStreamerReader.h:73
MakerMacros.h
EventMsgView::hltCount
uint32 hltCount() const
Definition: EventMessage.h:94
dqmservices::DQMStreamerReader::matchTriggerSel_
bool matchTriggerSel_
Definition: DQMStreamerReader.h:60
dqmservices::DQMStreamerReader::prepareNextFile
bool prepareNextFile()
Definition: DQMStreamerReader.cc:221
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
dqmservices::DQMStreamerReader::file_
struct dqmservices::DQMStreamerReader::OpenFile file_
DEFINE_FWK_INPUT_SOURCE
#define DEFINE_FWK_INPUT_SOURCE(type)
Definition: InputSourceMacros.h:8
mps_fire.end
end
Definition: mps_fire.py:242
EventMsgView::hltTriggerBits
void hltTriggerBits(uint8 *put_here) const
Definition: EventMessage.cc:110
svgfig.load
def load(fileName)
Definition: svgfig.py:547
ParameterSetDescription.h
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
dqmservices::DQMStreamerReader::prepareNextEvent
EventMsgView const * prepareNextEvent()
Definition: DQMStreamerReader.cc:278
UnixSignalHandlers.h
edm::RawInputSource::Next::kFile
dqmservices::DQMFileIterator::LumiEntry::get_data_path
std::string get_data_path() const
Definition: DQMFileIterator.cc:45
edm::ParameterSet
Definition: ParameterSet.h:47
DumpTools.h
dqmservices::DQMFileIterator::state
State state()
Definition: DQMFileIterator.cc:122
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::RawInputSource::Next::kStop
dqmservices::DQMStreamerReader::OpenFile::lumi_
DQMFileIterator::LumiEntry lumi_
Definition: DQMStreamerReader.h:79
dqmservices::DQMFileIterator::lumiReady
bool lumiReady()
Definition: DQMFileIterator.cc:130
dqmservices::DQMStreamerReader::processedEventPerLs_
unsigned int processedEventPerLs_
Definition: DQMStreamerReader.h:68
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
InputSourceMacros.h
PixelMapPlotter.reason
reason
Definition: PixelMapPlotter.py:509
dqmservices::DQMFileIterator::advanceToLumi
void advanceToLumi(unsigned int lumi, std::string reason)
Definition: DQMFileIterator.cc:148
dqmservices::DQMStreamerReader::flagSkipFirstLumis_
bool flagSkipFirstLumis_
Definition: DQMStreamerReader.h:71
edm::RawInputSource::Next
Next
Definition: RawInputSource.h:24
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
dqmservices::DQMStreamerReader::skip
void skip(int toSkip) override
Definition: DQMStreamerReader.cc:401
dqmservices::DQMStreamerReader::genuineCloseFile
void genuineCloseFile() override
Definition: DQMStreamerReader.cc:135
zMuMuMuonUserData.hltPath
hltPath
Definition: zMuMuMuonUserData.py:20
DQMStreamerReader.h
dqmservices::DQMStreamerReader::getHeaderMsg
InitMsgView const * getHeaderMsg()
Definition: DQMStreamerReader.cc:184
dqmservices::DQMStreamerReader::fiterator_
DQMFileIterator fiterator_
Definition: DQMStreamerReader.h:75
dqmservices::DQMStreamerReader::matchTriggerSel
bool matchTriggerSel(Strings const &tnames)
Definition: DQMStreamerReader.cc:360
Exception
Definition: hltDiff.cc:246
edm::EventSkipperByID::fillDescription
static void fillDescription(ParameterSetDescription &desc)
Definition: EventSkipperByID.cc:116
Exception.h
dqmservices::DQMFileIterator::update_state
void update_state()
Definition: DQMFileIterator.cc:317
patCandidatesForDimuonsSequences_cff.matches
matches
Definition: patCandidatesForDimuonsSequences_cff.py:131
dqmservices::DQMStreamerReader::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: DQMStreamerReader.cc:417
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
ParameterSet.h
dqmservices::DQMStreamerReader::Strings
std::vector< std::string > Strings
Definition: DQMStreamerReader.h:32
dqmservices::DQMStreamerReader::getEventMsg
EventMsgView const * getEventMsg()
Definition: DQMStreamerReader.cc:195
edm::RawInputSource::Next::kEvent
DQMStreamerReader
dqmservices::DQMStreamerReader DQMStreamerReader
Definition: DQMStreamerReader.cc:467
dqmservices::DQMFileIterator::logLumiState
void logLumiState(const LumiEntry &lumi, const std::string &msg)
Definition: DQMFileIterator.cc:381
edm::errors::FileReadError
Definition: EDMException.h:50
RegexMatch.h
dqmservices::DQMFileIterator::lastLumiFound
unsigned int lastLumiFound()
Definition: DQMFileIterator.cc:140
dqmservices::DQMFileIterator::State
State
Definition: DQMFileIterator.h:50
dqmservices::DQMStreamerReader::runNumber_
unsigned int runNumber_
Definition: DQMStreamerReader.h:63
dqmservices::DQMStreamerReader
Definition: DQMStreamerReader.h:24
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
dqmservices::DQMStreamerReader::eventSelector_
std::shared_ptr< TriggerSelector > eventSelector_
Definition: DQMStreamerReader.h:86
dqmservices::DQMStreamerReader::OpenFile::open
bool open()
Definition: DQMStreamerReader.h:81
GetRecoTauVFromDQM_MC_cff.next
next
Definition: GetRecoTauVFromDQM_MC_cff.py:31
InitMsgView
Definition: InitMessage.h:61
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
dqmservices::DQMStreamerReader::hltSel_
Strings hltSel_
Definition: DQMStreamerReader.h:66