CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMStreamerReader.cc
Go to the documentation of this file.
12 
16 #include "DQMStreamerReader.h"
17 
18 #include <fstream>
19 #include <queue>
20 #include <cstdlib>
21 #include <boost/regex.hpp>
22 #include <boost/format.hpp>
23 #include <boost/range.hpp>
24 #include <boost/filesystem.hpp>
25 #include "boost/algorithm/string.hpp"
26 
28 
29 namespace edm {
30 
32  InputSourceDescription const& desc)
33  : StreamerInputSource(pset, desc),
34  fiterator_(pset, DQMFileIterator::JS_DATA),
35  streamReader_(),
36  eventSkipperByID_(EventSkipperByID::create(pset).release()) {
37 
38  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
39  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
40  hltSel_ =
41  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
42 
43  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
44  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
45  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
46  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
47 
48  triggerSel();
49 
50  reset_();
51 }
52 
54 
56  // We have to load at least a single header,
57  // so the ProductRegistry gets initialized.
58  //
59  // This must happen here (inside the constructor),
60  // as ProductRegistry gets frozen after we initialize:
61  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
62 
64  "Waiting for the first lumi in order to initialize.");
65 
67 
68  // Fast-forward to the last open file.
69  if (flagSkipFirstLumis_) {
70  unsigned int l = fiterator_.lastLumiFound();
71  if (l > 1) {
73  }
74  }
75 
76  for (;;) {
77  bool next = prepareNextFile();
78 
79  // check for end of run
80  if (!next) {
82  "End of run reached before DQMStreamerReader was initialised.");
83  return;
84  }
85 
86  // check if we have a file openned
87  if (streamReader_.get() != nullptr) {
88  // we are now initialised
89  break;
90  }
91 
92  // wait
93  fiterator_.delay();
94  }
95 
96  fiterator_.logFileAction("DQMStreamerReader initialised.");
97 }
98 
99 void DQMStreamerReader::openFile_(std::string newStreamerFile_) {
101  edm::ParameterSet pset;
102 
103  streamReader_ = std::unique_ptr<StreamerInputFile>(
104  new StreamerInputFile(newStreamerFile_, eventSkipperByID_));
105 
106  InitMsgView const* header = getHeaderMsg();
107  deserializeAndMergeWithRegistry(*header, false);
108 
109  // dump the list of HLT trigger name from the header
110  // dumpInitHeader(header);
111 
112  // if specific trigger selection is requested, check if the requested triggers
113  // match with trigger paths in the header file
114  if (!acceptAllEvt_){
115  Strings tnames;
116  header->hltTriggerNames(tnames);
117 
118  pset.addParameter<Strings>("SelectEvents", hltSel_);
119  eventSelector_.reset(new TriggerSelector(pset, tnames));
120 
121  // check if any trigger path name requested matches with trigger name in the header file
122  matchTriggerSel(tnames);
123  }
124 
125  // our initialization
127 
128  if (flagDeleteDatFiles_) {
129  // unlink the file
130  unlink(newStreamerFile_.c_str());
131  }
132 }
133 
135  if (streamReader_.get() != nullptr) {
136  streamReader_->closeStreamerFile();
137  streamReader_ = nullptr;
138  }
139 }
140 
142  closeFile_();
143 
146  fiterator_.pop();
147 
148  if (boost::filesystem::exists(p)) {
149  openFile_(p);
150  return true;
151  } else {
152  /* dat file missing */
153  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
154 
155  return false;
156  }
157 }
158 
160  InitMsgView const* header = streamReader_->startMessage();
161 
162  if (header->code() != Header::INIT) { // INIT Msg
163  throw Exception(errors::FileReadError, "DQMStreamerReader::readHeader")
164  << "received wrong message type: expected INIT, got " << header->code()
165  << "\n";
166  }
167 
168  return header;
169 }
170 
172  if (!streamReader_->next()) {
173  return nullptr;
174  }
175 
176  EventMsgView const* msg = streamReader_->currentRecord();
177 
178  // if (msg != nullptr) dumpEventView(msg);
179  return msg;
180 }
181 
193  typedef DQMFileIterator::State State;
194 
195  for (;;) {
197 
198  // check for end of run file and force quit
199  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
200  closeFile_();
201  return false;
202  }
203 
204  // check for end of run and quit if everything has been processed.
205  // this clean exit
206  if ((streamReader_.get() == nullptr) && (!fiterator_.lumiReady()) &&
207  (fiterator_.state() == State::EOR)) {
208 
209  closeFile_();
210  return false;
211  }
212 
213  // if this is end of run and no more files to process
214  // close it
216  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
217 
218  closeFile_();
219  return false;
220  }
221 
222  // skip to the next file if we have no files openned yet
223  if (streamReader_.get() == nullptr) {
224  if (fiterator_.lumiReady()) {
225  openNextFile_();
226  // we might need to open once more (if .dat is missing)
227  continue;
228  }
229  }
230 
231  // or if there is a next file and enough eventshas been processed.
233  openNextFile_();
234  // we might need to open once more (if .dat is missing)
235  continue;
236  }
237 
238  return true;
239  }
240 }
241 
248 
249  EventMsgView const* eview = nullptr;
250  typedef DQMFileIterator::State State;
251 
252  // wait for the next event
253  for (;;) {
254  // edm::LogAbsolute("DQMStreamerReader")
255  // << "State loop.";
256  bool next = prepareNextFile();
257  if (!next) return nullptr;
258 
259  // sleep
260  if (streamReader_.get() == nullptr) {
261  // the reader does not exist
262  fiterator_.delay();
263  } else {
264  // our reader exists, try to read out an event
265  eview = getEventMsg();
266 
267  if (eview == nullptr) {
268  // read unsuccessful
269  // this means end of file, so close the file
270  closeFile_();
271  } else {
272  if (!acceptEvent(eview)) {
273  continue;
274  } else {
275  return eview;
276  }
277  }
278  }
279  }
280  return eview;
281 }
282 
287  EventMsgView const* eview = prepareNextEvent();
288  if (eview == nullptr) {
289  return false;
290  }
291 
292  // this is reachable only if eview is set
293  // and the file is openned
294  if (streamReader_->newHeader()) {
295  // A new file has been opened and we must compare Headers here !!
296  // Get header/init from reader
297  InitMsgView const* header = getHeaderMsg();
298  deserializeAndMergeWithRegistry(*header, true);
299  }
300 
302  deserializeEvent(*eview);
303 
304  return true;
305 }
306 
311  acceptAllEvt_ = false;
312  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end());
313  i!=end; ++i){
314  std::string hltPath(*i);
315  boost::erase_all(hltPath, " \t");
316  if (hltPath == "*") acceptAllEvt_ = true;
317  }
318  return acceptAllEvt_;
319 }
320 
325  matchTriggerSel_ = false;
326  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end());
327  i!=end; ++i){
328  std::string hltPath(*i);
329  boost::erase_all(hltPath, " \t");
330  std::vector<Strings::const_iterator> matches = regexMatch(tnames, hltPath);
331  if (matches.empty()){
332  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
333  matchTriggerSel_ = false;
334  }else{
335  matchTriggerSel_ = true;
336  }
337  }
338  return matchTriggerSel_;
339 }
340 
345 
346  if (acceptAllEvt_) return true;
347  if (!matchTriggerSel_) return false;
348 
349  std::vector<unsigned char> hltTriggerBits_;
350  int hltTriggerCount_ = evtmsg->hltCount();
351  if (hltTriggerCount_ > 0) {
352  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
353  }
354  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
355 
356  if (eventSelector_->wantAll() ||
357  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
358  return true;
359  }else{
360  return false;
361  }
362 }
363 
364 void DQMStreamerReader::skip(int toSkip) {
365  for (int i = 0; i != toSkip; ++i) {
366  EventMsgView const* evMsg = prepareNextEvent();
367 
368  if (evMsg == nullptr) {
369  return;
370  }
371 
372  // If the event would have been skipped anyway, don't count it as a skipped
373  // event.
374  if (eventSkipperByID_ && eventSkipperByID_->skipIt(
375  evMsg->run(), evMsg->lumi(), evMsg->event())) {
376  --i;
377  }
378  }
379 }
380 
382  ConfigurationDescriptions& descriptions) {
384  desc.setComment("Reads events from streamer files.");
385 
386  desc.addUntracked<std::vector<std::string> >("SelectEvents")
387  ->setComment("HLT path to select events ");
388 
389  desc.addUntracked<int>("minEventsPerLumi", 1)->setComment(
390  "Minimum number of events to process per lumisection, "
391  "before switching to a new input file. If the next file "
392  "does not yet exist, "
393  "the number of processed events will be bigger.");
394 
395  desc.addUntracked<bool>("skipFirstLumis", false)->setComment(
396  "Skip (and ignore the minEventsPerLumi parameter) for the files "
397  "which have been available at the begining of the processing. "
398  "If set to true, the reader will open last available file for "
399  "processing.");
400 
401  desc.addUntracked<bool>("deleteDatFiles", false)->setComment(
402  "Delete data files after they have been closed, in order to "
403  "save disk space.");
404 
405  desc.addUntracked<bool>("endOfRunKills", false)->setComment(
406  "Kill the processing as soon as the end-of-run file appears, even if "
407  "there are/will be unprocessed lumisections.");
408 
409  // desc.addUntracked<unsigned int>("skipEvents", 0U)
410  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
411  // "have been processed.");
412 
413  // This next parameter is read in the base class, but its default value
414  // depends on the derived class, so it is set here.
415  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
416 
420 
421  descriptions.add("source", desc);
422 }
423 }
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
bool acceptEvent(const EventMsgView *)
uint32 lumi() const
Definition: EventMessage.cc:85
std::string make_path_data(const LumiEntry &lumi)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
tuple lumi
Definition: fjr2json.py:35
DQMStreamerReader(ParameterSet const &pset, InputSourceDescription const &desc)
void advanceToLumi(unsigned int lumi)
std::vector< std::string > Strings
uint32 run() const
Definition: EventMessage.cc:73
bool matchTriggerSel(Strings const &tnames)
void setComment(std::string const &value)
void openFile_(std::string filename)
InitMsgView const * getHeaderMsg()
TriggerSelectorPtr eventSelector_
EventMsgView const * prepareNextEvent()
const LumiEntry & front()
std::unique_ptr< StreamerInputFile > streamReader_
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:142
void logFileAction(const std::string &msg, const std::string &fileName="") const
void deserializeEvent(EventMsgView const &eventView)
static void fillDescriptions(ConfigurationDescriptions &descriptions)
static void fillDescription(ParameterSetDescription &d)
#define end
Definition: vmac.h:37
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, boost::regex const &regexp)
Definition: RegexMatch.cc:30
uint32 event() const
Definition: EventMessage.cc:79
unsigned int lastLumiFound()
unsigned int processedEventPerLs_
uint32 hltCount() const
Definition: EventMessage.h:95
void add(std::string const &label, ParameterSetDescription const &psetDescription)
EventMsgView const * getEventMsg()
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:70
DQMFileIterator fiterator_
virtual void skip(int toSkip)
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
SurfaceDeformation * create(int type, const std::vector< double > &params)
boost::shared_ptr< EventSkipperByID > eventSkipperByID_