CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMStreamerReader.cc
Go to the documentation of this file.
9 
13 #include "DQMStreamerReader.h"
14 
15 #include <fstream>
16 #include <queue>
17 #include <cstdlib>
18 #include <boost/regex.hpp>
19 #include <boost/format.hpp>
20 #include <boost/range.hpp>
21 #include <boost/filesystem.hpp>
22 #include <boost/algorithm/string.hpp>
23 
25 
26 namespace dqmservices {
27 
29  edm::InputSourceDescription const& desc)
30  : StreamerInputSource(pset, desc), fiterator_(pset) {
31  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
32  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
33  hltSel_ =
34  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
35 
36  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
37  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
38  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
39  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
40 
41  triggerSel();
42 
43  reset_();
44 }
45 
47 
49  // We have to load at least a single header,
50  // so the ProductRegistry gets initialized.
51  //
52  // This must happen here (inside the constructor),
53  // as ProductRegistry gets frozen after we initialize:
54  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
55 
57  "Waiting for the first lumi in order to initialize.");
58 
60 
61  // Fast-forward to the last open file.
62  if (flagSkipFirstLumis_) {
63  unsigned int l = fiterator_.lastLumiFound();
64  if (l > 1) {
65  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
66  }
67  }
68 
69  for (;;) {
70  bool next = prepareNextFile();
71 
72  // check for end of run
73  if (!next) {
75  "End of run reached before DQMStreamerReader was initialised.");
76  return;
77  }
78 
79  // check if we have a file openned
80  if (file_.open()) {
81  // we are now initialised
82  break;
83  }
84 
85  // wait
86  fiterator_.delay();
87  }
88 
89  fiterator_.logFileAction("DQMStreamerReader initialised.");
90 }
91 
94  edm::ParameterSet pset;
95 
97 
98  file_.lumi_ = entry;
99  file_.streamFile_.reset(new edm::StreamerInputFile(path));
100 
101  InitMsgView const* header = getHeaderMsg();
102  deserializeAndMergeWithRegistry(*header, false);
103 
104  // dump the list of HLT trigger name from the header
105  // dumpInitHeader(header);
106 
107  // if specific trigger selection is requested, check if the requested triggers
108  // match with trigger paths in the header file
109  if (!acceptAllEvt_) {
110  Strings tnames;
111  header->hltTriggerNames(tnames);
112 
113  pset.addParameter<Strings>("SelectEvents", hltSel_);
114  eventSelector_.reset(new TriggerSelector(pset, tnames));
115 
116  // check if any trigger path name requested matches with trigger name in the
117  // header file
118  matchTriggerSel(tnames);
119  }
120 
121  // our initialization
123 
124  if (flagDeleteDatFiles_) {
125  // unlink the file
126  unlink(path.c_str());
127  }
128 }
129 
131  if (file_.open()) {
132  file_.streamFile_->closeStreamerFile();
133  file_.streamFile_ = nullptr;
134 
135  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
136  }
137 }
138 
140  closeFile_("skipping to another file");
141 
143  std::string p = fiterator_.make_path(currentLumi.datafn);
144 
145  if (boost::filesystem::exists(p)) {
146  openFile_(currentLumi);
147  return true;
148  } else {
149  /* dat file missing */
150  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
151  fiterator_.logLumiState(currentLumi, "error: data file missing");
152 
153  return false;
154  }
155 }
156 
158  InitMsgView const* header = file_.streamFile_->startMessage();
159 
160  if (header->code() != Header::INIT) { // INIT Msg
162  "DQMStreamerReader::readHeader")
163  << "received wrong message type: expected INIT, got " << header->code()
164  << "\n";
165  }
166 
167  return header;
168 }
169 
171  if (!file_.streamFile_->next()) {
172  return nullptr;
173  }
174 
175  EventMsgView const* msg = file_.streamFile_->currentRecord();
176 
177  // if (msg != nullptr) dumpEventView(msg);
178  return msg;
179 }
180 
193 
194  for (;;) {
196 
197  // check for end of run file and force quit
198  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
199  closeFile_("forced end-of-run");
200  return false;
201  }
202 
203  // check for end of run and quit if everything has been processed.
204  // this clean exit
205  if ((!file_.open()) && (!fiterator_.lumiReady()) &&
206  (fiterator_.state() == State::EOR)) {
207  return false;
208  }
209 
210  // if this is end of run and no more files to process
211  // close it
213  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
214  closeFile_("graceful end-of-run");
215  return false;
216  }
217 
218  // skip to the next file if we have no files openned yet
219  if (!file_.open()) {
220  if (fiterator_.lumiReady()) {
221  openNextFile_();
222  // we might need to open once more (if .dat is missing)
223  continue;
224  }
225  }
226 
227  // or if there is a next file and enough eventshas been processed.
229  openNextFile_();
230  // we might need to open once more (if .dat is missing)
231  continue;
232  }
233 
234  return true;
235  }
236 }
237 
243  EventMsgView const* eview = nullptr;
245 
246  // wait for the next event
247  for (;;) {
248  // edm::LogAbsolute("DQMStreamerReader")
249  // << "State loop.";
250  bool next = prepareNextFile();
251  if (!next) return nullptr;
252 
253  // sleep
254  if (!file_.open()) {
255  // the reader does not exist
256  fiterator_.delay();
257  } else {
258  // our reader exists, try to read out an event
259  eview = getEventMsg();
260 
261  if (eview == nullptr) {
262  // read unsuccessful
263  // this means end of file, so close the file
264  closeFile_("eof");
265  } else {
266  if (!acceptEvent(eview)) {
267  continue;
268  } else {
269  return eview;
270  }
271  }
272  }
273  }
274  return eview;
275 }
276 
281  EventMsgView const* eview = prepareNextEvent();
282  if (eview == nullptr) {
283  return false;
284  }
285 
286  // this is reachable only if eview is set
287  // and the file is openned
288  if (file_.streamFile_->newHeader()) {
289  // A new file has been opened and we must compare Headers here !!
290  // Get header/init from reader
291  InitMsgView const* header = getHeaderMsg();
292  deserializeAndMergeWithRegistry(*header, true);
293  }
294 
296  deserializeEvent(*eview);
297 
298  return true;
299 }
300 
306  acceptAllEvt_ = false;
307  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
308  ++i) {
309  std::string hltPath(*i);
310  boost::erase_all(hltPath, " \t");
311  if (hltPath == "*") acceptAllEvt_ = true;
312  }
313  return acceptAllEvt_;
314 }
315 
320  matchTriggerSel_ = false;
321  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end;
322  ++i) {
323  std::string hltPath(*i);
324  boost::erase_all(hltPath, " \t");
325  std::vector<Strings::const_iterator> matches =
326  edm::regexMatch(tnames, hltPath);
327  if (!matches.empty()) {
328  matchTriggerSel_ = true;
329  }
330  }
331 
332  if (!matchTriggerSel_) {
333  edm::LogWarning("Trigger selection does not match any trigger path!!!")
334  << std::endl;
335  }
336 
337  return matchTriggerSel_;
338 }
339 
344  if (acceptAllEvt_) return true;
345  if (!matchTriggerSel_) return false;
346 
347  std::vector<unsigned char> hltTriggerBits_;
348  int hltTriggerCount_ = evtmsg->hltCount();
349  if (hltTriggerCount_ > 0) {
350  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
351  }
352  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
353 
354  if (eventSelector_->wantAll() ||
355  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
356  return true;
357  } else {
358  return false;
359  }
360 }
361 
362 void DQMStreamerReader::skip(int toSkip) {
363  for (int i = 0; i != toSkip; ++i) {
364  EventMsgView const* evMsg = prepareNextEvent();
365 
366  if (evMsg == nullptr) {
367  return;
368  }
369  }
370 }
371 
373  edm::ConfigurationDescriptions& descriptions) {
375  desc.setComment("Reads events from streamer files.");
376 
377  desc.addUntracked<std::vector<std::string> >("SelectEvents")
378  ->setComment("HLT path to select events ");
379 
380  desc.addUntracked<int>("minEventsPerLumi", 1)
381  ->setComment(
382  "Minimum number of events to process per lumisection, "
383  "before switching to a new input file. If the next file "
384  "does not yet exist, "
385  "the number of processed events will be bigger.");
386 
387  desc.addUntracked<bool>("skipFirstLumis", false)
388  ->setComment(
389  "Skip (and ignore the minEventsPerLumi parameter) for the files "
390  "which have been available at the begining of the processing. "
391  "If set to true, the reader will open last available file for "
392  "processing.");
393 
394  desc.addUntracked<bool>("deleteDatFiles", false)
395  ->setComment(
396  "Delete data files after they have been closed, in order to "
397  "save disk space.");
398 
399  desc.addUntracked<bool>("endOfRunKills", false)
400  ->setComment(
401  "Kill the processing as soon as the end-of-run file appears, even if "
402  "there are/will be unprocessed lumisections.");
403 
404  // desc.addUntracked<unsigned int>("skipEvents", 0U)
405  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
406  // "have been processed.");
407 
408  // This next parameter is read in the base class, but its default value
409  // depends on the derived class, so it is set here.
410  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
411 
415 
416  descriptions.add("source", desc);
417 }
418 
419 } // end of namespace
420 
423 
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
std::string make_path(const std::string &fn)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
void logLumiState(const LumiEntry &lumi, const std::string &msg)
std::shared_ptr< TriggerSelector > eventSelector_
EventMsgView const * getEventMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
dqmservices::DQMStreamerReader DQMStreamerReader
void openFile_(const DQMFileIterator::LumiEntry &entry)
void hltTriggerNames(Strings &save_here) const
Definition: InitMessage.cc:146
void setComment(std::string const &value)
std::unique_ptr< edm::StreamerInputFile > streamFile_
virtual void skip(int toSkip)
#define DEFINE_FWK_INPUT_SOURCE(type)
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:144
void deserializeEvent(EventMsgView const &eventView)
virtual void closeFile_() overridefinal
void logFileAction(const std::string &msg, const std::string &fileName="") const
#define end
Definition: vmac.h:37
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, boost::regex const &regexp)
Definition: RegexMatch.cc:30
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
bool acceptEvent(const EventMsgView *)
InitMsgView const * getHeaderMsg()
uint32 hltCount() const
Definition: EventMessage.h:97
void add(std::string const &label, ParameterSetDescription const &psetDescription)
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:72
State
Definition: hltDiff.cc:314
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
void advanceToLumi(unsigned int lumi, std::string reason)
EventMsgView const * prepareNextEvent()
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_