CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMStreamerReader.cc
Go to the documentation of this file.
12 
15 
16 #include "DQMStreamerReader.h"
17 
18 #include <fstream>
19 #include <queue>
20 #include <cstdlib>
21 #include <boost/regex.hpp>
22 #include <boost/format.hpp>
23 #include <boost/range.hpp>
24 #include <boost/filesystem.hpp>
25 
27 
28 namespace edm {
29 
31  InputSourceDescription const& desc)
32  : StreamerInputSource(pset, desc),
33  fiterator_(pset, DQMFileIterator::JS_DATA),
34  streamReader_(),
35  eventSkipperByID_(EventSkipperByID::create(pset).release()) {
36 
37  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
38  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
39  hltSel_ =
40  pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
41 
42  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
43  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
44  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
45  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
46 
47  reset_();
48 }
49 
51 
53  // We have to load at least a single header,
54  // so the ProductRegistry gets initialized.
55  //
56  // This must happen here (inside the constructor),
57  // as ProductRegistry gets frozen after we initialize:
58  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
59 
61  "Waiting for the first lumi in order to initialize.");
62 
64 
65  // Fast-forward to the last open file.
66  if (flagSkipFirstLumis_) {
67  unsigned int l = fiterator_.lastLumiFound();
68  if (l > 1) {
70  }
71  }
72 
73  for (;;) {
74  bool next = prepareNextFile();
75 
76  // check for end of run
77  if (!next) {
79  "End of run reached before DQMStreamerReader was initialised.");
80  return;
81  }
82 
83  // check if we have a file openned
84  if (streamReader_.get() != nullptr) {
85  // we are now initialised
86  break;
87  }
88 
89  // wait
90  fiterator_.delay();
91  }
92 
93  fiterator_.logFileAction("DQMStreamerReader initialised.");
94 }
95 
96 void DQMStreamerReader::openFile_(std::string newStreamerFile_) {
98  edm::ParameterSet pset;
99 
100  streamReader_ = std::unique_ptr<StreamerInputFile>(
101  new StreamerInputFile(newStreamerFile_, eventSkipperByID_));
102 
103  InitMsgView const* header = getHeaderMsg();
104  deserializeAndMergeWithRegistry(*header, false);
105 
106  // dump the list of HLT trigger name from the header
107  // dumpInitHeader(header);
108 
109  Strings tnames;
110  header->hltTriggerNames(tnames);
111 
112  pset.addParameter<Strings>("SelectEvents", hltSel_);
113  eventSelector_.reset(new TriggerSelector(pset, tnames));
114 
115  // our initialization
117 
118  if (flagDeleteDatFiles_) {
119  // unlink the file
120  unlink(newStreamerFile_.c_str());
121  }
122 }
123 
125  if (streamReader_.get() != nullptr) {
126  streamReader_->closeStreamerFile();
127  streamReader_ = nullptr;
128  }
129 }
130 
132  closeFile_();
133 
136  fiterator_.pop();
137 
138  if (boost::filesystem::exists(p)) {
139  openFile_(p);
140  return true;
141  } else {
142  /* dat file missing */
143  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
144 
145  return false;
146  }
147 }
148 
150  InitMsgView const* header = streamReader_->startMessage();
151 
152  if (header->code() != Header::INIT) { // INIT Msg
153  throw Exception(errors::FileReadError, "DQMStreamerReader::readHeader")
154  << "received wrong message type: expected INIT, got " << header->code()
155  << "\n";
156  }
157 
158  return header;
159 }
160 
162  if (!streamReader_->next()) {
163  return nullptr;
164  }
165 
166  EventMsgView const* msg = streamReader_->currentRecord();
167 
168  // if (msg != nullptr) dumpEventView(msg);
169  return msg;
170 }
171 
183  typedef DQMFileIterator::State State;
184 
185  for (;;) {
187 
188  // check for end of run file and force quit
189  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
190  closeFile_();
191  return false;
192  }
193 
194  // check for end of run and quit if everything has been processed.
195  // this clean exit
196  if ((streamReader_.get() == nullptr) && (!fiterator_.lumiReady()) &&
197  (fiterator_.state() == State::EOR)) {
198 
199  closeFile_();
200  return false;
201  }
202 
203  // if this is end of run and no more files to process
204  // close it
206  (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
207 
208  closeFile_();
209  return false;
210  }
211 
212  // skip to the next file if we have no files openned yet
213  if (streamReader_.get() == nullptr) {
214  if (fiterator_.lumiReady()) {
215  openNextFile_();
216  // we might need to open once more (if .dat is missing)
217  continue;
218  }
219  }
220 
221  // or if there is a next file and enough eventshas been processed.
223  openNextFile_();
224  // we might need to open once more (if .dat is missing)
225  continue;
226  }
227 
228  return true;
229  }
230 }
231 
238 
239  EventMsgView const* eview = nullptr;
240  typedef DQMFileIterator::State State;
241 
242  // wait for the next event
243  for (;;) {
244  // edm::LogAbsolute("DQMStreamerReader")
245  // << "State loop.";
246  bool next = prepareNextFile();
247  if (!next) return nullptr;
248 
249  // sleep
250  if (streamReader_.get() == nullptr) {
251  // the reader does not exist
252  fiterator_.delay();
253  } else {
254  // our reader exists, try to read out an event
255  eview = getEventMsg();
256 
257  if (eview == nullptr) {
258  // read unsuccessful
259  // this means end of file, so close the file
260  closeFile_();
261  } else {
262  if (!acceptEvent(eview)) {
263  continue;
264  } else {
265  return eview;
266  }
267  }
268  }
269  }
270  return eview;
271 }
272 
277  EventMsgView const* eview = prepareNextEvent();
278  if (eview == nullptr) {
279  return false;
280  }
281 
282  // this is reachable only if eview is set
283  // and the file is openned
284  if (streamReader_->newHeader()) {
285  // A new file has been opened and we must compare Headers here !!
286  // Get header/init from reader
287  InitMsgView const* header = getHeaderMsg();
288  deserializeAndMergeWithRegistry(*header, true);
289  }
290 
292  deserializeEvent(*eview);
293 
294  return true;
295 }
296 
298 
299  std::vector<unsigned char> hltTriggerBits_;
300  int hltTriggerCount_ = evtmsg->hltCount();
301  if (hltTriggerCount_ > 0) {
302  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
303  }
304  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
305 
306  if (eventSelector_->wantAll() ||
307  eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
308  return true;
309  } else {
310  return false;
311  }
312 }
313 
314 void DQMStreamerReader::skip(int toSkip) {
315  for (int i = 0; i != toSkip; ++i) {
316  EventMsgView const* evMsg = prepareNextEvent();
317 
318  if (evMsg == nullptr) {
319  return;
320  }
321 
322  // If the event would have been skipped anyway, don't count it as a skipped
323  // event.
324  if (eventSkipperByID_ && eventSkipperByID_->skipIt(
325  evMsg->run(), evMsg->lumi(), evMsg->event())) {
326  --i;
327  }
328  }
329 }
330 
332  ConfigurationDescriptions& descriptions) {
334  desc.setComment("Reads events from streamer files.");
335 
336  desc.addUntracked<std::vector<std::string> >("SelectEvents")
337  ->setComment("HLT path to select events ");
338 
339  desc.addUntracked<int>("minEventsPerLumi", 1)->setComment(
340  "Minimum number of events to process per lumisection, "
341  "before switching to a new input file. If the next file "
342  "does not yet exist, "
343  "the number of processed events will be bigger.");
344 
345  desc.addUntracked<bool>("skipFirstLumis", false)->setComment(
346  "Skip (and ignore the minEventsPerLumi parameter) for the files "
347  "which have been available at the begining of the processing. "
348  "If set to true, the reader will open last available file for "
349  "processing.");
350 
351  desc.addUntracked<bool>("deleteDatFiles", false)->setComment(
352  "Delete data files after they have been closed, in order to "
353  "save disk space.");
354 
355  desc.addUntracked<bool>("endOfRunKills", false)->setComment(
356  "Kill the processing as soon as the end-of-run file appears, even if "
357  "there are/will be unprocessed lumisections.");
358 
359  // desc.addUntracked<unsigned int>("skipEvents", 0U)
360  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
361  // "have been processed.");
362 
363  // This next parameter is read in the base class, but its default value
364  // depends on the derived class, so it is set here.
365  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
366 
370 
371  descriptions.add("source", desc);
372 }
373 }
T getUntrackedParameter(std::string const &, T const &) const
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
bool acceptEvent(const EventMsgView *)
uint32 lumi() const
Definition: EventMessage.cc:85
std::string make_path_data(const LumiEntry &lumi)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void hltTriggerBits(uint8 *put_here) const
tuple lumi
Definition: fjr2json.py:35
DQMStreamerReader(ParameterSet const &pset, InputSourceDescription const &desc)
void advanceToLumi(unsigned int lumi)
std::vector< std::string > Strings
uint32 run() const
Definition: EventMessage.cc:73
void setComment(std::string const &value)
void openFile_(std::string filename)
InitMsgView const * getHeaderMsg()
TriggerSelectorPtr eventSelector_
EventMsgView const * prepareNextEvent()
const LumiEntry & front()
std::unique_ptr< StreamerInputFile > streamReader_
void addParameter(std::string const &name, T const &value)
Definition: ParameterSet.h:142
void logFileAction(const std::string &msg, const std::string &fileName="") const
void deserializeEvent(EventMsgView const &eventView)
static void fillDescriptions(ConfigurationDescriptions &descriptions)
static void fillDescription(ParameterSetDescription &d)
uint32 event() const
Definition: EventMessage.cc:79
unsigned int lastLumiFound()
unsigned int processedEventPerLs_
uint32 hltCount() const
Definition: EventMessage.h:95
void add(std::string const &label, ParameterSetDescription const &psetDescription)
EventMsgView const * getEventMsg()
static void fillDescription(ParameterSetDescription &desc)
uint32 code() const
Definition: InitMessage.h:70
DQMFileIterator fiterator_
virtual void skip(int toSkip)
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
SurfaceDeformation * create(int type, const std::vector< double > &params)
boost::shared_ptr< EventSkipperByID > eventSkipperByID_