CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMStreamerReader.cc
Go to the documentation of this file.
9 
13 #include "DQMStreamerReader.h"
14 
15 #include <fstream>
16 #include <queue>
17 #include <cstdlib>
18 #include <boost/regex.hpp>
19 #include <boost/format.hpp>
20 #include <boost/range.hpp>
21 #include <boost/filesystem.hpp>
22 #include <boost/algorithm/string.hpp>
23 
25 
26 namespace dqmservices {
27 
29  edm::InputSourceDescription const& desc)
30  : StreamerInputSource(pset, desc),
31  fiterator_(pset),
32  streamReader_(),
33  eventSkipperByID_(edm::EventSkipperByID::create(pset).release()) {
34 
35  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
36  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
37  hltSel_ =
38  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
39 
40  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
41  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
42  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
43  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
44 
45  triggerSel();
46 
47  reset_();
48 }
49 
51 
53  // We have to load at least a single header,
54  // so the ProductRegistry gets initialized.
55  //
56  // This must happen here (inside the constructor),
57  // as ProductRegistry gets frozen after we initialize:
58  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
59 
61  "Waiting for the first lumi in order to initialize.");
62 
64 
65  // Fast-forward to the last open file.
66  if (flagSkipFirstLumis_) {
67  unsigned int l = fiterator_.lastLumiFound();
68  if (l > 1) {
70  }
71  }
72 
73  for (;;) {
74  bool next = prepareNextFile();
75 
76  // check for end of run
77  if (!next) {
79  "End of run reached before DQMStreamerReader was initialised.");
80  return;
81  }
82 
83  // check if we have a file openned
84  if (streamReader_.get() != nullptr) {
85  // we are now initialised
86  break;
87  }
88 
89  // wait
90  fiterator_.delay();
91  }
92 
93  fiterator_.logFileAction("DQMStreamerReader initialised.");
94 }
95 
96 void DQMStreamerReader::openFile_(std::string newStreamerFile_) {
98  edm::ParameterSet pset;
99 
100  streamReader_ = std::unique_ptr<edm::StreamerInputFile>(
101  new edm::StreamerInputFile(newStreamerFile_, eventSkipperByID_));
102 
103  InitMsgView const* header = getHeaderMsg();
104  deserializeAndMergeWithRegistry(*header, false);
105 
106  // dump the list of HLT trigger name from the header
107  // dumpInitHeader(header);
108 
109  // if specific trigger selection is requested, check if the requested triggers
110  // match with trigger paths in the header file
111  if (!acceptAllEvt_){
112  Strings tnames;
113  header->hltTriggerNames(tnames);
114 
115  pset.addParameter<Strings>("SelectEvents", hltSel_);
116  eventSelector_.reset(new TriggerSelector(pset, tnames));
117 
118  // check if any trigger path name requested matches with trigger name in the header file
119  matchTriggerSel(tnames);
120  }
121 
122  // our initialization
124 
125  if (flagDeleteDatFiles_) {
126  // unlink the file
127  unlink(newStreamerFile_.c_str());
128  }
129 }
130 
132  if (streamReader_.get() != nullptr) {
133  streamReader_->closeStreamerFile();
134  streamReader_ = nullptr;
135  }
136 }
137 
139  closeFile_();
140 
143  fiterator_.pop();
144 
145  if (boost::filesystem::exists(p)) {
146  openFile_(p);
147  return true;
148  } else {
149  /* dat file missing */
150  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
151 
152  return false;
153  }
154 }
155 
157  InitMsgView const* header = streamReader_->startMessage();
158 
159  if (header->code() != Header::INIT) { // INIT Msg
160  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
161  << "received wrong message type: expected INIT, got " << header->code()
162  << "\n";
163  }
164 
165  return header;
166 }
167 
169  if (!streamReader_->next()) {
170  return nullptr;
171  }
172 
173  EventMsgView const* msg = streamReader_->currentRecord();
174 
175  // if (msg != nullptr) dumpEventView(msg);
176  return msg;
177 }
178 
190  typedef DQMFileIterator::State State;
191 
192  for (;;) {
194 
195  // check for end of run file and force quit
196  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
197  closeFile_();
198  return false;
199  }
200 
201  // check for end of run and quit if everything has been processed.
202  // this clean exit
203  if ((streamReader_.get() == nullptr) && (!fiterator_.lumiReady()) &&
204  (fiterator_.state() == State::EOR)) {
205 
206  closeFile_();
207  return false;
208  }
209 
210  // if this is end of run and no more files to process
211  // close it
213  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
214 
215  closeFile_();
216  return false;
217  }
218 
219  // skip to the next file if we have no files openned yet
220  if (streamReader_.get() == nullptr) {
221  if (fiterator_.lumiReady()) {
222  openNextFile_();
223  // we might need to open once more (if .dat is missing)
224  continue;
225  }
226  }
227 
228  // or if there is a next file and enough eventshas been processed.
230  openNextFile_();
231  // we might need to open once more (if .dat is missing)
232  continue;
233  }
234 
235  return true;
236  }
237 }
238 
245 
246  EventMsgView const* eview = nullptr;
247  typedef DQMFileIterator::State State;
248 
249  // wait for the next event
250  for (;;) {
251  // edm::LogAbsolute("DQMStreamerReader")
252  // << "State loop.";
253  bool next = prepareNextFile();
254  if (!next) return nullptr;
255 
256  // sleep
257  if (streamReader_.get() == nullptr) {
258  // the reader does not exist
259  fiterator_.delay();
260  } else {
261  // our reader exists, try to read out an event
262  eview = getEventMsg();
263 
264  if (eview == nullptr) {
265  // read unsuccessful
266  // this means end of file, so close the file
267  closeFile_();
268  } else {
269  if (!acceptEvent(eview)) {
270  continue;
271  } else {
272  return eview;
273  }
274  }
275  }
276  }
277  return eview;
278 }
279 
284  EventMsgView const* eview = prepareNextEvent();
285  if (eview == nullptr) {
286  return false;
287  }
288 
289  // this is reachable only if eview is set
290  // and the file is openned
291  if (streamReader_->newHeader()) {
292  // A new file has been opened and we must compare Headers here !!
293  // Get header/init from reader
294  InitMsgView const* header = getHeaderMsg();
295  deserializeAndMergeWithRegistry(*header, true);
296  }
297 
299  deserializeEvent(*eview);
300 
301  if (mon_.isAvailable()) {
302  mon_->reportEvents(1);
303  }
304 
305  return true;
306 }
307 
312  acceptAllEvt_ = false;
313  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end());
314  i!=end; ++i){
315  std::string hltPath(*i);
316  boost::erase_all(hltPath, " \t");
317  if (hltPath == "*") acceptAllEvt_ = true;
318  }
319  return acceptAllEvt_;
320 }
321 
326  matchTriggerSel_ = false;
327  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end());
328  i!=end; ++i){
329  std::string hltPath(*i);
330  boost::erase_all(hltPath, " \t");
331  std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
332  if (matches.empty()){
333  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
334  matchTriggerSel_ = false;
335  }else{
336  matchTriggerSel_ = true;
337  }
338  }
339  return matchTriggerSel_;
340 }
341 
346 
347  if (acceptAllEvt_) return true;
348  if (!matchTriggerSel_) return false;
349 
350  std::vector<unsigned char> hltTriggerBits_;
351  int hltTriggerCount_ = evtmsg->hltCount();
352  if (hltTriggerCount_ > 0) {
353  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
354  }
355  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
356 
357  if (eventSelector_->wantAll() ||
358  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
359  return true;
360  }else{
361  return false;
362  }
363 }
364 
365 void DQMStreamerReader::skip(int toSkip) {
366  for (int i = 0; i != toSkip; ++i) {
367  EventMsgView const* evMsg = prepareNextEvent();
368 
369  if (evMsg == nullptr) {
370  return;
371  }
372 
373  // If the event would have been skipped anyway, don't count it as a skipped
374  // event.
375  if (eventSkipperByID_ && eventSkipperByID_->skipIt(
376  evMsg->run(), evMsg->lumi(), evMsg->event())) {
377  --i;
378  }
379  }
380 }
381 
383  edm::ConfigurationDescriptions& descriptions) {
384 
386  desc.setComment("Reads events from streamer files.");
387 
388  desc.addUntracked<std::vector<std::string> >("SelectEvents")
389  ->setComment("HLT path to select events ");
390 
391  desc.addUntracked<int>("minEventsPerLumi", 1)->setComment(
392  "Minimum number of events to process per lumisection, "
393  "before switching to a new input file. If the next file "
394  "does not yet exist, "
395  "the number of processed events will be bigger.");
396 
397  desc.addUntracked<bool>("skipFirstLumis", false)->setComment(
398  "Skip (and ignore the minEventsPerLumi parameter) for the files "
399  "which have been available at the begining of the processing. "
400  "If set to true, the reader will open last available file for "
401  "processing.");
402 
403  desc.addUntracked<bool>("deleteDatFiles", false)->setComment(
404  "Delete data files after they have been closed, in order to "
405  "save disk space.");
406 
407  desc.addUntracked<bool>("endOfRunKills", false)->setComment(
408  "Kill the processing as soon as the end-of-run file appears, even if "
409  "there are/will be unprocessed lumisections.");
410 
411  // desc.addUntracked<unsigned int>("skipEvents", 0U)
412  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
413  // "have been processed.");
414 
415  // This next parameter is read in the base class, but its default value
416  // depends on the derived class, so it is set here.
417  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
418 
422 
423  descriptions.add("source", desc);
424 }
425 
426 } // end of namespace
427 
430 
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
uint32 lumi() const
Definition: EventMessage.cc:85
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
tuple lumi
Definition: fjr2json.py:35
std::shared_ptr< TriggerSelector > eventSelector_
EventMsgView const * getEventMsg()
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
dqmservices::DQMStreamerReader DQMStreamerReader
std::unique_ptr< edm::StreamerInputFile > streamReader_
uint32 run() const
Definition: EventMessage.cc:73
void advanceToLumi(unsigned int lumi)
std::string make_path_data(const LumiEntry &lumi)
void setComment(std::string const &value)
virtual void skip(int toSkip)
#define DEFINE_FWK_INPUT_SOURCE(type)
void openFile_(std::string filename)
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:142
void deserializeEvent(EventMsgView const &eventView)
void logFileAction(const std::string &msg, const std::string &fileName="") const
#define end
Definition: vmac.h:37
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, boost::regex const &regexp)
Definition: RegexMatch.cc:30
uint32 event() const
Definition: EventMessage.cc:79
DQMStreamerReader(edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
std::vector< std::string > Strings
boost::shared_ptr< edm::EventSkipperByID > eventSkipperByID_
bool acceptEvent(const EventMsgView *)
InitMsgView const * getHeaderMsg()
edm::Service< DQMMonitoringService > mon_
uint32 hltCount() const
Definition: EventMessage.h:95
void add(std::string const &label, ParameterSetDescription const &psetDescription)
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:70
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
EventMsgView const * prepareNextEvent()
SurfaceDeformation * create(int type, const std::vector< double > &params)
static void fillDescription(edm::ParameterSetDescription &d)