CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMStreamerReader.cc
Go to the documentation of this file.
9 
13 #include "DQMStreamerReader.h"
14 
15 #include <fstream>
16 #include <queue>
17 #include <cstdlib>
18 #include <boost/regex.hpp>
19 #include <boost/format.hpp>
20 #include <boost/range.hpp>
21 #include <boost/filesystem.hpp>
22 #include <boost/algorithm/string.hpp>
23 
25 
26 namespace dqmservices {
27 
29  edm::InputSourceDescription const& desc)
30  : StreamerInputSource(pset, desc),
31  fiterator_(pset) {
32 
33  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
34  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
35  hltSel_ =
36  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
37 
38  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
39  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
40  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
41  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
42 
43  triggerSel();
44 
45  reset_();
46 }
47 
49 
51  // We have to load at least a single header,
52  // so the ProductRegistry gets initialized.
53  //
54  // This must happen here (inside the constructor),
55  // as ProductRegistry gets frozen after we initialize:
56  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
57 
59  "Waiting for the first lumi in order to initialize.");
60 
62 
63  // Fast-forward to the last open file.
64  if (flagSkipFirstLumis_) {
65  unsigned int l = fiterator_.lastLumiFound();
66  if (l > 1) {
67  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
68  }
69  }
70 
71  for (;;) {
72  bool next = prepareNextFile();
73 
74  // check for end of run
75  if (!next) {
77  "End of run reached before DQMStreamerReader was initialised.");
78  return;
79  }
80 
81  // check if we have a file openned
82  if (file_.open()) {
83  // we are now initialised
84  break;
85  }
86 
87  // wait
88  fiterator_.delay();
89  }
90 
91  fiterator_.logFileAction("DQMStreamerReader initialised.");
92 }
93 
96  edm::ParameterSet pset;
97 
99 
100  file_.lumi_ = entry;
101  file_.streamFile_.reset(new edm::StreamerInputFile(path));
102 
103  InitMsgView const* header = getHeaderMsg();
104  deserializeAndMergeWithRegistry(*header, false);
105 
106  // dump the list of HLT trigger name from the header
107  // dumpInitHeader(header);
108 
109  // if specific trigger selection is requested, check if the requested triggers
110  // match with trigger paths in the header file
111  if (!acceptAllEvt_){
112  Strings tnames;
113  header->hltTriggerNames(tnames);
114 
115  pset.addParameter<Strings>("SelectEvents", hltSel_);
116  eventSelector_.reset(new TriggerSelector(pset, tnames));
117 
118  // check if any trigger path name requested matches with trigger name in the header file
119  matchTriggerSel(tnames);
120  }
121 
122  // our initialization
124 
125  if (flagDeleteDatFiles_) {
126  // unlink the file
127  unlink(path.c_str());
128  }
129 }
130 
132  if (file_.open()) {
133  file_.streamFile_->closeStreamerFile();
134  file_.streamFile_ = nullptr;
135 
136  fiterator_.logLumiState(file_.lumi_, "close: " + reason);
137  }
138 }
139 
141  closeFile_("skipping to another file");
142 
144  std::string p = fiterator_.make_path(currentLumi.datafn);
145 
146  if (boost::filesystem::exists(p)) {
147  openFile_(currentLumi);
148  return true;
149  } else {
150  /* dat file missing */
151  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
152  fiterator_.logLumiState(currentLumi, "error: data file missing");
153 
154  return false;
155  }
156 }
157 
159  InitMsgView const* header = file_.streamFile_->startMessage();
160 
161  if (header->code() != Header::INIT) { // INIT Msg
162  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
163  << "received wrong message type: expected INIT, got " << header->code()
164  << "\n";
165  }
166 
167  return header;
168 }
169 
171  if (!file_.streamFile_->next()) {
172  return nullptr;
173  }
174 
175  EventMsgView const* msg = file_.streamFile_->currentRecord();
176 
177  // if (msg != nullptr) dumpEventView(msg);
178  return msg;
179 }
180 
192  typedef DQMFileIterator::State State;
193 
194  for (;;) {
196 
197  // check for end of run file and force quit
198  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
199  closeFile_("forced end-of-run");
200  return false;
201  }
202 
203  // check for end of run and quit if everything has been processed.
204  // this clean exit
205  if ((!file_.open()) && (!fiterator_.lumiReady()) &&
206  (fiterator_.state() == State::EOR)) {
207 
208  return false;
209  }
210 
211  // if this is end of run and no more files to process
212  // close it
214  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
215 
216  closeFile_("graceful end-of-run");
217  return false;
218  }
219 
220  // skip to the next file if we have no files openned yet
221  if (!file_.open()) {
222  if (fiterator_.lumiReady()) {
223  openNextFile_();
224  // we might need to open once more (if .dat is missing)
225  continue;
226  }
227  }
228 
229  // or if there is a next file and enough eventshas been processed.
231  openNextFile_();
232  // we might need to open once more (if .dat is missing)
233  continue;
234  }
235 
236  return true;
237  }
238 }
239 
245  EventMsgView const* eview = nullptr;
246  typedef DQMFileIterator::State State;
247 
248  // wait for the next event
249  for (;;) {
250  // edm::LogAbsolute("DQMStreamerReader")
251  // << "State loop.";
252  bool next = prepareNextFile();
253  if (!next) return nullptr;
254 
255  // sleep
256  if (!file_.open()) {
257  // the reader does not exist
258  fiterator_.delay();
259  } else {
260  // our reader exists, try to read out an event
261  eview = getEventMsg();
262 
263  if (eview == nullptr) {
264  // read unsuccessful
265  // this means end of file, so close the file
266  closeFile_("eof");
267  } else {
268  if (!acceptEvent(eview)) {
269  continue;
270  } else {
271  return eview;
272  }
273  }
274  }
275  }
276  return eview;
277 }
278 
283  EventMsgView const* eview = prepareNextEvent();
284  if (eview == nullptr) {
285  return false;
286  }
287 
288  // this is reachable only if eview is set
289  // and the file is openned
290  if (file_.streamFile_->newHeader()) {
291  // A new file has been opened and we must compare Headers here !!
292  // Get header/init from reader
293  InitMsgView const* header = getHeaderMsg();
294  deserializeAndMergeWithRegistry(*header, true);
295  }
296 
298  deserializeEvent(*eview);
299 
300  return true;
301 }
302 
307  acceptAllEvt_ = false;
308  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end());
309  i!=end; ++i){
310  std::string hltPath(*i);
311  boost::erase_all(hltPath, " \t");
312  if (hltPath == "*") acceptAllEvt_ = true;
313  }
314  return acceptAllEvt_;
315 }
316 
321  matchTriggerSel_ = false;
322  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end());
323  i!=end; ++i){
324  std::string hltPath(*i);
325  boost::erase_all(hltPath, " \t");
326  std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
327  if (matches.empty()){
328  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
329  matchTriggerSel_ = false;
330  }else{
331  matchTriggerSel_ = true;
332  }
333  }
334  return matchTriggerSel_;
335 }
336 
341 
342  if (acceptAllEvt_) return true;
343  if (!matchTriggerSel_) return false;
344 
345  std::vector<unsigned char> hltTriggerBits_;
346  int hltTriggerCount_ = evtmsg->hltCount();
347  if (hltTriggerCount_ > 0) {
348  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
349  }
350  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
351 
352  if (eventSelector_->wantAll() ||
353  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
354  return true;
355  }else{
356  return false;
357  }
358 }
359 
360 void DQMStreamerReader::skip(int toSkip) {
361  for (int i = 0; i != toSkip; ++i) {
362  EventMsgView const* evMsg = prepareNextEvent();
363 
364  if (evMsg == nullptr) {
365  return;
366  }
367  }
368 }
369 
371  edm::ConfigurationDescriptions& descriptions) {
372 
374  desc.setComment("Reads events from streamer files.");
375 
376  desc.addUntracked<std::vector<std::string> >("SelectEvents")
377  ->setComment("HLT path to select events ");
378 
379  desc.addUntracked<int>("minEventsPerLumi", 1)->setComment(
380  "Minimum number of events to process per lumisection, "
381  "before switching to a new input file. If the next file "
382  "does not yet exist, "
383  "the number of processed events will be bigger.");
384 
385  desc.addUntracked<bool>("skipFirstLumis", false)->setComment(
386  "Skip (and ignore the minEventsPerLumi parameter) for the files "
387  "which have been available at the begining of the processing. "
388  "If set to true, the reader will open last available file for "
389  "processing.");
390 
391  desc.addUntracked<bool>("deleteDatFiles", false)->setComment(
392  "Delete data files after they have been closed, in order to "
393  "save disk space.");
394 
395  desc.addUntracked<bool>("endOfRunKills", false)->setComment(
396  "Kill the processing as soon as the end-of-run file appears, even if "
397  "there are/will be unprocessed lumisections.");
398 
399  // desc.addUntracked<unsigned int>("skipEvents", 0U)
400  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
401  // "have been processed.");
402 
403  // This next parameter is read in the base class, but its default value
404  // depends on the derived class, so it is set here.
405  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
406 
410 
411  descriptions.add("source", desc);
412 }
413 
414 } // end of namespace
415 
418 
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
virtual void closeFile_()
Definition: InputSource.h:401
std::string make_path(const std::string &fn)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
void logLumiState(const LumiEntry &lumi, const std::string &msg)
std::shared_ptr< TriggerSelector > eventSelector_
EventMsgView const * getEventMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
dqmservices::DQMStreamerReader DQMStreamerReader
void openFile_(const DQMFileIterator::LumiEntry &entry)
void hltTriggerNames(Strings &save_here) const
Definition: InitMessage.cc:146
void setComment(std::string const &value)
std::unique_ptr< edm::StreamerInputFile > streamFile_
virtual void skip(int toSkip)
tuple path
else: Piece not in the list, fine.
#define DEFINE_FWK_INPUT_SOURCE(type)
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:142
void deserializeEvent(EventMsgView const &eventView)
void logFileAction(const std::string &msg, const std::string &fileName="") const
#define end
Definition: vmac.h:37
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, boost::regex const &regexp)
Definition: RegexMatch.cc:30
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
bool acceptEvent(const EventMsgView *)
InitMsgView const * getHeaderMsg()
uint32 hltCount() const
Definition: EventMessage.h:95
void add(std::string const &label, ParameterSetDescription const &psetDescription)
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:70
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
void advanceToLumi(unsigned int lumi, std::string reason)
EventMsgView const * prepareNextEvent()
static void fillDescription(edm::ParameterSetDescription &d)
struct dqmservices::DQMStreamerReader::OpenFile file_