CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
9 
11 #include "DQMStreamerReader.h"
12 
13 #include <fstream>
14 #include <queue>
15 #include <cstdlib>
16 #include <boost/regex.hpp>
17 #include <boost/format.hpp>
18 #include <boost/range.hpp>
19 #include <boost/filesystem.hpp>
20 #include <boost/algorithm/string.hpp>
21 
23 
24 namespace dqmservices {
25 
27  : StreamerInputSource(pset, desc), fiterator_(pset) {
28  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
29  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
30  hltSel_ = pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
31 
32  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
33  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
34  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
35  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
36 
37  triggerSel();
38 
39  reset_();
40  }
41 
43  // Sometimes(?) the destructor called after service registry was already destructed
44  // and closeFile_ throws away no ServiceRegistry found exception...
45  //
46  // Normally, this file should be closed before this destructor is called.
47  //closeFileImp_("destructor");
48  }
49 
51  // We have to load at least a single header,
52  // so the ProductRegistry gets initialized.
53  //
54  // This must happen here (inside the constructor),
55  // as ProductRegistry gets frozen after we initialize:
56  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
57 
58  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
59 
61 
62  // Fast-forward to the last open file.
63  if (flagSkipFirstLumis_) {
64  unsigned int l = fiterator_.lastLumiFound();
65  if (l > 1) {
66  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
67  }
68  }
69 
70  for (;;) {
71  bool next = prepareNextFile();
72 
73  // check for end of run
74  if (!next) {
75  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
76  return;
77  }
78 
79  // check if we have a file openned
80  if (file_.open()) {
81  // we are now initialised
82  break;
83  }
84 
85  // wait
86  fiterator_.delay();
87  }
88 
89  fiterator_.logFileAction("DQMStreamerReader initialised.");
90  }
91 
95 
96  std::string path = entry.get_data_path();
97 
98  file_.lumi_ = entry;
100 
101  InitMsgView const* header = getHeaderMsg();
102  if (isFirstFile_) {
104  }
105 
106  // dump the list of HLT trigger name from the header
107  // dumpInitHeader(header);
108 
109  // if specific trigger selection is requested, check if the requested triggers
110  // match with trigger paths in the header file
111  if (!acceptAllEvt_) {
112  Strings tnames;
113  header->hltTriggerNames(tnames);
114 
115  pset.addParameter<Strings>("SelectEvents", hltSel_);
116  eventSelector_.reset(new TriggerSelector(pset, tnames));
117 
118  // check if any trigger path name requested matches with trigger name in the
119  // header file
120  matchTriggerSel(tnames);
121  }
122 
123  // our initialization
125 
126  if (flagDeleteDatFiles_) {
127  // unlink the file
128  unlink(path.c_str());
129  }
130  }
131 
133 
135  if (file_.open()) {
136  file_.streamFile_->closeStreamerFile();
137  file_.streamFile_ = nullptr;
138 
139  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
140  }
141  }
142 
144  if (isFirstFile_) {
145  //The file was already opened in the constructor
146  isFirstFile_ = false;
147  return;
148  }
149 
150  //Get header/init from reader
151  InitMsgView const* header = getHeaderMsg();
153  }
154 
156  closeFileImp_("skipping to another file");
157 
159  std::string p = currentLumi.get_data_path();
160 
161  if (boost::filesystem::exists(p)) {
162  try {
163  openFileImp_(currentLumi);
164  return true;
165  } catch (const cms::Exception& e) {
166  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
167  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
168 
169  closeFileImp_("data file corrupted");
170  return false;
171  }
172  } else {
173  /* dat file missing */
174  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
175  fiterator_.logLumiState(currentLumi, "error: data file missing");
176 
177  return false;
178  }
179  }
180 
182  InitMsgView const* header = file_.streamFile_->startMessage();
183 
184  if (header->code() != Header::INIT) { // INIT Msg
185  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
186  << "received wrong message type: expected INIT, got " << header->code() << "\n";
187  }
188 
189  return header;
190  }
191 
193  auto next = file_.streamFile_->next();
195  return nullptr;
196  }
197 
199  return nullptr;
200  }
201 
202  EventMsgView const* msg = file_.streamFile_->currentRecord();
203 
204  // if (msg != nullptr) dumpEventView(msg);
205  return msg;
206  }
207 
220 
221  for (;;) {
223 
224  if (edm::shutdown_flag.load()) {
225  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
226 
227  closeFileImp_("shutdown flag is set");
228  return false;
229  }
230 
231  // check for end of run file and force quit
232  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
233  closeFileImp_("forced end-of-run");
234  return false;
235  }
236 
237  // check for end of run and quit if everything has been processed.
238  // this clean exit
239  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
240  return false;
241  }
242 
243  // if this is end of run and no more files to process
244  // close it
246  (fiterator_.state() == State::EOR)) {
247  closeFileImp_("graceful end-of-run");
248  return false;
249  }
250 
251  // skip to the next file if we have no files openned yet
252  if (!file_.open()) {
253  if (fiterator_.lumiReady()) {
255  // we might need to open once more (if .dat is missing)
256  continue;
257  }
258  }
259 
260  // or if there is a next file and enough eventshas been processed.
263  // we might need to open once more (if .dat is missing)
264  continue;
265  }
266 
267  return true;
268  }
269  }
270 
276  EventMsgView const* eview = nullptr;
278 
279  // wait for the next event
280  for (;;) {
281  // edm::LogAbsolute("DQMStreamerReader")
282  // << "State loop.";
283  bool next = prepareNextFile();
284  if (!next)
285  return nullptr;
286 
287  // sleep
288  if (!file_.open()) {
289  // the reader does not exist
290  fiterator_.delay();
291  } else {
292  // our reader exists, try to read out an event
293  eview = getEventMsg();
294 
295  if (eview == nullptr) {
296  // read unsuccessful
297  // this means end of file, so close the file
298  closeFileImp_("eof");
299  } else {
300  if (!acceptEvent(eview)) {
301  continue;
302  } else {
303  return eview;
304  }
305  }
306  }
307  }
308  return eview;
309  }
310 
315  try {
316  EventMsgView const* eview = prepareNextEvent();
317  if (eview == nullptr) {
318  if (file_.streamFile_ and file_.streamFile_->newHeader()) {
319  return Next::kFile;
320  }
321  return Next::kStop;
322  }
323 
324  deserializeEvent(*eview);
325  } catch (const cms::Exception& e) {
326  // try to recover from corrupted files/events
327  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
328  closeFileImp_("data file corrupted");
329 
330  // this is not optimal, but hopefully we won't catch this many times in a row
331  return checkNext();
332  }
333 
335 
336  return Next::kEvent;
337  }
338 
344  acceptAllEvt_ = false;
345  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
347  boost::erase_all(hltPath, " \t");
348  if (hltPath == "*")
349  acceptAllEvt_ = true;
350  }
351  return acceptAllEvt_;
352  }
353 
358  matchTriggerSel_ = false;
359  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
361  boost::erase_all(hltPath, " \t");
362  std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
363  if (!matches.empty()) {
364  matchTriggerSel_ = true;
365  }
366  }
367 
368  if (!matchTriggerSel_) {
369  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
370  }
371 
372  return matchTriggerSel_;
373  }
374 
379  if (acceptAllEvt_)
380  return true;
381  if (!matchTriggerSel_)
382  return false;
383 
384  std::vector<unsigned char> hltTriggerBits_;
385  int hltTriggerCount_ = evtmsg->hltCount();
386  if (hltTriggerCount_ > 0) {
387  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
388  }
389  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
390 
391  if (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
392  return true;
393  } else {
394  return false;
395  }
396  }
397 
398  void DQMStreamerReader::skip(int toSkip) {
399  try {
400  for (int i = 0; i != toSkip; ++i) {
401  EventMsgView const* evMsg = prepareNextEvent();
402 
403  if (evMsg == nullptr) {
404  return;
405  }
406  }
407  } catch (const cms::Exception& e) {
408  // try to recover from corrupted files/events
409  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
410  closeFileImp_("data file corrupted");
411  }
412  }
413 
416  desc.setComment("Reads events from streamer files.");
417 
418  desc.addUntracked<std::vector<std::string> >("SelectEvents")->setComment("HLT path to select events ");
419 
420  desc.addUntracked<int>("minEventsPerLumi", 1)
421  ->setComment(
422  "Minimum number of events to process per lumisection, "
423  "before switching to a new input file. If the next file "
424  "does not yet exist, "
425  "the number of processed events will be bigger.");
426 
427  desc.addUntracked<bool>("skipFirstLumis", false)
428  ->setComment(
429  "Skip (and ignore the minEventsPerLumi parameter) for the files "
430  "which have been available at the begining of the processing. "
431  "If set to true, the reader will open last available file for "
432  "processing.");
433 
434  desc.addUntracked<bool>("deleteDatFiles", false)
435  ->setComment(
436  "Delete data files after they have been closed, in order to "
437  "save disk space.");
438 
439  desc.addUntracked<bool>("endOfRunKills", false)
440  ->setComment(
441  "Kill the processing as soon as the end-of-run file appears, even if "
442  "there are/will be unprocessed lumisections.");
443 
444  // desc.addUntracked<unsigned int>("skipEvents", 0U)
445  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
446  // "have been processed.");
447 
448  // This next parameter is read in the base class, but its default value
449  // depends on the derived class, so it is set here.
450  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
451 
455 
456  descriptions.add("source", desc);
457  }
458 
459 } // namespace dqmservices
460 
463 
ConfigurationDescriptions.h
mps_fire.i
i
Definition: mps_fire.py:355
dqmservices::DQMStreamerReader::reset_
void reset_() override
Definition: DQMStreamerReader.cc:50
dqmservices::DQMFileIterator::logFileAction
void logFileAction(const std::string &msg, const std::string &fileName="") const
Definition: DQMFileIterator.cc:373
dqmservices::DQMStreamerReader::triggerSel
bool triggerSel()
Definition: DQMStreamerReader.cc:343
MessageLogger.h
dqmservices
Definition: DQMFileIterator.cc:18
dqmservices::DQMStreamerReader::acceptAllEvt_
bool acceptAllEvt_
Definition: DQMStreamerReader.h:59
hcaldqm::flag::State
State
Definition: Flag.h:13
edm::StreamerInputSource::deserializeEvent
void deserializeEvent(EventMsgView const &eventView)
Definition: StreamerInputSource.cc:185
dqmservices::DQMStreamerReader::acceptEvent
bool acceptEvent(const EventMsgView *)
Definition: DQMStreamerReader.cc:378
edm::regexMatch
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
mps_splice.entry
entry
Definition: mps_splice.py:68
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
dqmservices::DQMStreamerReader::openFileImp_
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
Definition: DQMStreamerReader.cc:92
dqmservices::DQMStreamerReader::OpenFile::streamFile_
std::unique_ptr< edm::StreamerInputFile > streamFile_
Definition: DQMStreamerReader.h:78
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
edm::StreamerInputSource::fillDescription
static void fillDescription(ParameterSetDescription &description)
Definition: StreamerInputSource.cc:502
edm::InputSourceDescription
Definition: InputSourceDescription.h:20
dqmservices::DQMFileIterator::open
LumiEntry open()
Definition: DQMFileIterator.cc:123
dqmservices::DQMStreamerReader::closeFileImp_
void closeFileImp_(const std::string &reason)
Definition: DQMStreamerReader.cc:134
mps_check.msg
tuple msg
Definition: mps_check.py:285
dqmservices::DQMStreamerReader::genuineReadFile
void genuineReadFile() override
Definition: DQMStreamerReader.cc:143
dqmservices::DQMStreamerReader::isFirstFile_
bool isFirstFile_
Definition: DQMStreamerReader.h:61
dqmservices::DQMStreamerReader::DQMStreamerReader
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
Definition: DQMStreamerReader.cc:26
dqmservices::DQMStreamerReader::runInputDir_
std::string runInputDir_
Definition: DQMStreamerReader.h:64
dqmservices::DQMStreamerReader::openNextFileImp_
bool openNextFileImp_()
Definition: DQMStreamerReader.cc:155
dqmservices::DQMFileIterator::delay
void delay()
Definition: DQMFileIterator.cc:388
dqmservices::DQMFileIterator::LumiEntry
Definition: DQMFileIterator.h:20
dqmservices::DQMStreamerReader::minEventsPerLs_
unsigned int minEventsPerLs_
Definition: DQMStreamerReader.h:69
Header::INIT
Definition: MsgHeader.h:15
dqmservices::TriggerSelector
Definition: TriggerSelector.h:19
EventMsgView
Definition: EventMessage.h:72
dqmservices::DQMStreamerReader::flagEndOfRunKills_
bool flagEndOfRunKills_
Definition: DQMStreamerReader.h:72
end
#define end
Definition: vmac.h:39
EventSkipperByID.h
dqmservices::DQMFileIterator::fillDescription
static void fillDescription(edm::ParameterSetDescription &d)
Definition: DQMFileIterator.cc:395
edm::StreamerInputSource::deserializeAndMergeWithRegistry
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
Definition: StreamerInputSource.cc:167
dqmservices::DQMStreamerReader::~DQMStreamerReader
~DQMStreamerReader() override
Definition: DQMStreamerReader.cc:42
dqmservices::DQMStreamerReader::checkNext
Next checkNext() override
Definition: DQMStreamerReader.cc:314
edm::StreamerInputFile
Definition: StreamerInputFile.h:19
EDMException.h
dqmservices::DQMStreamerReader::flagDeleteDatFiles_
bool flagDeleteDatFiles_
Definition: DQMStreamerReader.h:73
MakerMacros.h
EventMsgView::hltCount
uint32 hltCount() const
Definition: EventMessage.h:94
dqmservices::DQMStreamerReader::matchTriggerSel_
bool matchTriggerSel_
Definition: DQMStreamerReader.h:60
dqmservices::DQMStreamerReader::prepareNextFile
bool prepareNextFile()
Definition: DQMStreamerReader.cc:218
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
dqmservices::DQMStreamerReader::file_
struct dqmservices::DQMStreamerReader::OpenFile file_
DEFINE_FWK_INPUT_SOURCE
#define DEFINE_FWK_INPUT_SOURCE(type)
Definition: InputSourceMacros.h:8
EventMsgView::hltTriggerBits
void hltTriggerBits(uint8 *put_here) const
Definition: EventMessage.cc:110
svgfig.load
def load(fileName)
Definition: svgfig.py:547
ParameterSetDescription.h
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::LogWarning
Definition: MessageLogger.h:141
dqmservices::DQMStreamerReader::prepareNextEvent
EventMsgView const * prepareNextEvent()
Definition: DQMStreamerReader.cc:275
UnixSignalHandlers.h
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
edm::RawInputSource::Next::kFile
dqmservices::DQMFileIterator::LumiEntry::get_data_path
std::string get_data_path() const
Definition: DQMFileIterator.cc:44
edm::ParameterSet
Definition: ParameterSet.h:36
DumpTools.h
edm::ParameterSetDescription::setComment
void setComment(std::string const &value)
Definition: ParameterSetDescription.cc:33
dqmservices::DQMFileIterator::state
State state()
Definition: DQMFileIterator.cc:121
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::RawInputSource::Next::kStop
dqmservices::DQMStreamerReader::OpenFile::lumi_
DQMFileIterator::LumiEntry lumi_
Definition: DQMStreamerReader.h:79
dqmservices::DQMFileIterator::lumiReady
bool lumiReady()
Definition: DQMFileIterator.cc:129
dqmservices::DQMStreamerReader::processedEventPerLs_
unsigned int processedEventPerLs_
Definition: DQMStreamerReader.h:68
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:193
InputSourceMacros.h
PixelMapPlotter.reason
reason
Definition: PixelMapPlotter.py:509
dqmservices::DQMFileIterator::advanceToLumi
void advanceToLumi(unsigned int lumi, std::string reason)
Definition: DQMFileIterator.cc:147
dqmservices::DQMStreamerReader::flagSkipFirstLumis_
bool flagSkipFirstLumis_
Definition: DQMStreamerReader.h:71
edm::RawInputSource::Next
Next
Definition: RawInputSource.h:24
dqmservices::DQMStreamerReader::skip
void skip(int toSkip) override
Definition: DQMStreamerReader.cc:398
dqmservices::DQMStreamerReader::genuineCloseFile
void genuineCloseFile() override
Definition: DQMStreamerReader.cc:132
zMuMuMuonUserData.hltPath
hltPath
Definition: zMuMuMuonUserData.py:20
DQMStreamerReader.h
dqmservices::DQMStreamerReader::getHeaderMsg
InitMsgView const * getHeaderMsg()
Definition: DQMStreamerReader.cc:181
dqmservices::DQMStreamerReader::fiterator_
DQMFileIterator fiterator_
Definition: DQMStreamerReader.h:75
dqmservices::DQMStreamerReader::matchTriggerSel
bool matchTriggerSel(Strings const &tnames)
Definition: DQMStreamerReader.cc:357
Exception
Definition: hltDiff.cc:246
edm::EventSkipperByID::fillDescription
static void fillDescription(ParameterSetDescription &desc)
Definition: EventSkipperByID.cc:116
Exception.h
dqmservices::DQMFileIterator::update_state
void update_state()
Definition: DQMFileIterator.cc:314
patCandidatesForDimuonsSequences_cff.matches
matches
Definition: patCandidatesForDimuonsSequences_cff.py:131
dqmservices::DQMStreamerReader::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: DQMStreamerReader.cc:414
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
ParameterSet.h
dqmservices::DQMStreamerReader::Strings
std::vector< std::string > Strings
Definition: DQMStreamerReader.h:32
dqmservices::DQMStreamerReader::getEventMsg
EventMsgView const * getEventMsg()
Definition: DQMStreamerReader.cc:192
edm::RawInputSource::Next::kEvent
DQMStreamerReader
dqmservices::DQMStreamerReader DQMStreamerReader
Definition: DQMStreamerReader.cc:464
dqmservices::DQMFileIterator::logLumiState
void logLumiState(const LumiEntry &lumi, const std::string &msg)
Definition: DQMFileIterator.cc:378
edm::errors::FileReadError
Definition: EDMException.h:50
RegexMatch.h
dqmservices::DQMFileIterator::lastLumiFound
unsigned int lastLumiFound()
Definition: DQMFileIterator.cc:139
dqmservices::DQMFileIterator::State
State
Definition: DQMFileIterator.h:50
dqmservices::DQMStreamerReader::runNumber_
unsigned int runNumber_
Definition: DQMStreamerReader.h:63
dqmservices::DQMStreamerReader
Definition: DQMStreamerReader.h:24
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
dqmservices::DQMStreamerReader::eventSelector_
std::shared_ptr< TriggerSelector > eventSelector_
Definition: DQMStreamerReader.h:86
dqmservices::DQMStreamerReader::OpenFile::open
bool open()
Definition: DQMStreamerReader.h:81
GetRecoTauVFromDQM_MC_cff.next
next
Definition: GetRecoTauVFromDQM_MC_cff.py:31
InitMsgView
Definition: InitMessage.h:61
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
dqmservices::DQMStreamerReader::hltSel_
Strings hltSel_
Definition: DQMStreamerReader.h:66