CMS 3D CMS Logo

DQMStreamerReader.cc
Go to the documentation of this file.
10 
14 #include "DQMStreamerReader.h"
15 
16 #include <fstream>
17 #include <queue>
18 #include <cstdlib>
19 #include <boost/regex.hpp>
20 #include <boost/format.hpp>
21 #include <boost/range.hpp>
22 #include <boost/filesystem.hpp>
23 #include <boost/algorithm/string.hpp>
24 
26 
27 namespace dqmservices {
28 
30  edm::InputSourceDescription const& desc)
31  : StreamerInputSource(pset, desc), fiterator_(pset) {
32  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
33  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
34  hltSel_ =
35  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
36 
37  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
38  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
39  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
40  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
41 
42  triggerSel();
43 
44  reset_();
45 }
46 
48  // Sometimes(?) the destructor called after service registry was already destructed
49  // and closeFile_ throws away no ServiceRegistry found exception...
50  //
51  // Normally, this file should be closed before this destructor is called.
52  //closeFile_("destructor");
53 }
54 
56  // We have to load at least a single header,
57  // so the ProductRegistry gets initialized.
58  //
59  // This must happen here (inside the constructor),
60  // as ProductRegistry gets frozen after we initialize:
61  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
62 
64  "Waiting for the first lumi in order to initialize.");
65 
67 
68  // Fast-forward to the last open file.
69  if (flagSkipFirstLumis_) {
70  unsigned int l = fiterator_.lastLumiFound();
71  if (l > 1) {
72  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
73  }
74  }
75 
76  for (;;) {
77  bool next = prepareNextFile();
78 
79  // check for end of run
80  if (!next) {
82  "End of run reached before DQMStreamerReader was initialised.");
83  return;
84  }
85 
86  // check if we have a file openned
87  if (file_.open()) {
88  // we are now initialised
89  break;
90  }
91 
92  // wait
93  fiterator_.delay();
94  }
95 
96  fiterator_.logFileAction("DQMStreamerReader initialised.");
97 }
98 
102 
103  std::string path = entry.get_data_path();
104 
105  file_.lumi_ = entry;
106  file_.streamFile_.reset(new edm::StreamerInputFile(path));
107 
108  InitMsgView const* header = getHeaderMsg();
109  deserializeAndMergeWithRegistry(*header, false);
110 
111  // dump the list of HLT trigger name from the header
112  // dumpInitHeader(header);
113 
114  // if specific trigger selection is requested, check if the requested triggers
115  // match with trigger paths in the header file
116  if (!acceptAllEvt_) {
117  Strings tnames;
118  header->hltTriggerNames(tnames);
119 
120  pset.addParameter<Strings>("SelectEvents", hltSel_);
121  eventSelector_.reset(new TriggerSelector(pset, tnames));
122 
123  // check if any trigger path name requested matches with trigger name in the
124  // header file
125  matchTriggerSel(tnames);
126  }
127 
128  // our initialization
130 
131  if (flagDeleteDatFiles_) {
132  // unlink the file
133  unlink(path.c_str());
134  }
135 }
136 
138  if (file_.open()) {
139  file_.streamFile_->closeStreamerFile();
140  file_.streamFile_ = nullptr;
141 
142  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
143  }
144 }
145 
147  closeFile_("skipping to another file");
148 
150  std::string p = currentLumi.get_data_path();
151 
152  if (boost::filesystem::exists(p)) {
153  try {
154  openFile_(currentLumi);
155  return true;
156  } catch (const cms::Exception& e) {
157  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
158  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
159 
160  closeFile_("data file corrupted");
161  return false;
162  }
163  } else {
164  /* dat file missing */
165  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
166  fiterator_.logLumiState(currentLumi, "error: data file missing");
167 
168  return false;
169  }
170 }
171 
173  InitMsgView const* header = file_.streamFile_->startMessage();
174 
175  if (header->code() != Header::INIT) { // INIT Msg
177  "DQMStreamerReader::readHeader")
178  << "received wrong message type: expected INIT, got " << header->code()
179  << "\n";
180  }
181 
182  return header;
183 }
184 
186  if (!file_.streamFile_->next()) {
187  return nullptr;
188  }
189 
190  EventMsgView const* msg = file_.streamFile_->currentRecord();
191 
192  // if (msg != nullptr) dumpEventView(msg);
193  return msg;
194 }
195 
208 
209  for (;;) {
211 
212  if (edm::shutdown_flag.load()) {
213  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
214 
215  closeFile_("shutdown flag is set");
216  return false;
217  }
218 
219  // check for end of run file and force quit
220  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
221  closeFile_("forced end-of-run");
222  return false;
223  }
224 
225  // check for end of run and quit if everything has been processed.
226  // this clean exit
227  if ((!file_.open()) && (!fiterator_.lumiReady()) &&
228  (fiterator_.state() == State::EOR)) {
229  return false;
230  }
231 
232  // if this is end of run and no more files to process
233  // close it
235  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
236  closeFile_("graceful end-of-run");
237  return false;
238  }
239 
240  // skip to the next file if we have no files openned yet
241  if (!file_.open()) {
242  if (fiterator_.lumiReady()) {
243  openNextFile_();
244  // we might need to open once more (if .dat is missing)
245  continue;
246  }
247  }
248 
249  // or if there is a next file and enough eventshas been processed.
251  openNextFile_();
252  // we might need to open once more (if .dat is missing)
253  continue;
254  }
255 
256  return true;
257  }
258 }
259 
265  EventMsgView const* eview = nullptr;
267 
268  // wait for the next event
269  for (;;) {
270  // edm::LogAbsolute("DQMStreamerReader")
271  // << "State loop.";
272  bool next = prepareNextFile();
273  if (!next) return nullptr;
274 
275  // sleep
276  if (!file_.open()) {
277  // the reader does not exist
278  fiterator_.delay();
279  } else {
280  // our reader exists, try to read out an event
281  eview = getEventMsg();
282 
283  if (eview == nullptr) {
284  // read unsuccessful
285  // this means end of file, so close the file
286  closeFile_("eof");
287  } else {
288  if (!acceptEvent(eview)) {
289  continue;
290  } else {
291  return eview;
292  }
293  }
294  }
295  }
296  return eview;
297 }
298 
303  try {
304  EventMsgView const* eview = prepareNextEvent();
305  if (eview == nullptr) {
306  return false;
307  }
308 
309  // this is reachable only if eview is set
310  // and the file is openned
311  if (file_.streamFile_->newHeader()) {
312  // A new file has been opened and we must compare Headers here !!
313  // Get header/init from reader
314 
315  InitMsgView const* header = getHeaderMsg();
316  deserializeAndMergeWithRegistry(*header, true);
317  }
318 
319  deserializeEvent(*eview);
320  } catch (const cms::Exception& e) {
321  // try to recover from corrupted files/events
322  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
323  closeFile_("data file corrupted");
324 
325  // this is not optimal, but hopefully we won't catch this many times in a row
326  return checkNextEvent();
327  }
328 
330 
331  return true;
332 }
333 
339  acceptAllEvt_ = false;
340  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
341  ++i) {
343  boost::erase_all(hltPath, " \t");
344  if (hltPath == "*") acceptAllEvt_ = true;
345  }
346  return acceptAllEvt_;
347 }
348 
353  matchTriggerSel_ = false;
354  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
355  ++i) {
357  boost::erase_all(hltPath, " \t");
358  std::vector<Strings::const_iterator> matches =
359  edm::regexMatch(tnames, hltPath);
360  if (!matches.empty()) {
361  matchTriggerSel_ = true;
362  }
363  }
364 
365  if (!matchTriggerSel_) {
366  edm::LogWarning("Trigger selection does not match any trigger path!!!")
367  << std::endl;
368  }
369 
370  return matchTriggerSel_;
371 }
372 
377  if (acceptAllEvt_) return true;
378  if (!matchTriggerSel_) return false;
379 
380  std::vector<unsigned char> hltTriggerBits_;
381  int hltTriggerCount_ = evtmsg->hltCount();
382  if (hltTriggerCount_ > 0) {
383  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
384  }
385  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
386 
387  if (eventSelector_->wantAll() ||
388  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
389  return true;
390  } else {
391  return false;
392  }
393 }
394 
395 void DQMStreamerReader::skip(int toSkip) {
396  try {
397  for (int i = 0; i != toSkip; ++i) {
398  EventMsgView const* evMsg = prepareNextEvent();
399 
400  if (evMsg == nullptr) {
401  return;
402  }
403  }
404  } catch (const cms::Exception& e) {
405  // try to recover from corrupted files/events
406  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
407  closeFile_("data file corrupted");
408  }
409 }
410 
412  edm::ConfigurationDescriptions& descriptions) {
414  desc.setComment("Reads events from streamer files.");
415 
416  desc.addUntracked<std::vector<std::string> >("SelectEvents")
417  ->setComment("HLT path to select events ");
418 
419  desc.addUntracked<int>("minEventsPerLumi", 1)
420  ->setComment(
421  "Minimum number of events to process per lumisection, "
422  "before switching to a new input file. If the next file "
423  "does not yet exist, "
424  "the number of processed events will be bigger.");
425 
426  desc.addUntracked<bool>("skipFirstLumis", false)
427  ->setComment(
428  "Skip (and ignore the minEventsPerLumi parameter) for the files "
429  "which have been available at the begining of the processing. "
430  "If set to true, the reader will open last available file for "
431  "processing.");
432 
433  desc.addUntracked<bool>("deleteDatFiles", false)
434  ->setComment(
435  "Delete data files after they have been closed, in order to "
436  "save disk space.");
437 
438  desc.addUntracked<bool>("endOfRunKills", false)
439  ->setComment(
440  "Kill the processing as soon as the end-of-run file appears, even if "
441  "there are/will be unprocessed lumisections.");
442 
443  // desc.addUntracked<unsigned int>("skipEvents", 0U)
444  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
445  // "have been processed.");
446 
447  // This next parameter is read in the base class, but its default value
448  // depends on the derived class, so it is set here.
449  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
450 
454 
455  descriptions.add("source", desc);
456 }
457 
458 } // end of namespace
459 
462 
virtual char const * what() const
Definition: Exception.cc:141
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
void logLumiState(const LumiEntry &lumi, const std::string &msg)
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< TriggerSelector > eventSelector_
EventMsgView const * getEventMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void openFile_(const DQMFileIterator::LumiEntry &entry)
void hltTriggerNames(Strings &save_here) const
Definition: InitMessage.cc:146
void setComment(std::string const &value)
std::unique_ptr< edm::StreamerInputFile > streamFile_
virtual void skip(int toSkip)
#define DEFINE_FWK_INPUT_SOURCE(type)
virtual void closeFile_() override final
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:144
void deserializeEvent(EventMsgView const &eventView)
void logFileAction(const std::string &msg, const std::string &fileName="") const
#define end
Definition: vmac.h:37
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
bool acceptEvent(const EventMsgView *)
def load(fileName)
Definition: svgfig.py:546
InitMsgView const * getHeaderMsg()
uint32 hltCount() const
Definition: EventMessage.h:97
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:30
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:72
State
Definition: hltDiff.cc:287
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
void advanceToLumi(unsigned int lumi, std::string reason)
EventMsgView const * prepareNextEvent()
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_