CMS 3D CMS Logo

InputSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
4 
22 
23 #include <cassert>
24 #include <fstream>
25 #include <iomanip>
26 
27 namespace edm {
28 
29  namespace {
30  std::string const& suffix(int count) {
31  static std::string const st("st");
32  static std::string const nd("nd");
33  static std::string const rd("rd");
34  static std::string const th("th");
35  // *0, *4 - *9 use "th".
36  int lastDigit = count % 10;
37  if (lastDigit >= 4 || lastDigit == 0)
38  return th;
39  // *11, *12, or *13 use "th".
40  if (count % 100 - lastDigit == 10)
41  return th;
42  return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
43  }
44  } // namespace
45 
47  : actReg_(desc.actReg_),
48  maxEvents_(desc.maxEvents_),
49  remainingEvents_(maxEvents_),
50  maxLumis_(desc.maxLumis_),
51  remainingLumis_(maxLumis_),
52  readCount_(0),
53  maxSecondsUntilRampdown_(desc.maxSecondsUntilRampdown_),
54  processingMode_(RunsLumisAndEvents),
55  moduleDescription_(desc.moduleDescription_),
56  productRegistry_(desc.productRegistry_),
57  processHistoryRegistry_(new ProcessHistoryRegistry),
58  branchIDListHelper_(desc.branchIDListHelper_),
59  thinnedAssociationsHelper_(desc.thinnedAssociationsHelper_),
60  processGUID_(createGlobalIdentifier()),
61  time_(),
62  newRun_(true),
63  newLumi_(true),
64  eventCached_(false),
65  state_(IsInvalid),
66  runAuxiliary_(),
67  lumiAuxiliary_(),
68  statusFileName_(),
69  numberOfEventsBeforeBigSkip_(0) {
70  if (pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
71  std::ostringstream statusfilename;
72  statusfilename << "source_" << getpid();
73  statusFileName_ = statusfilename.str();
74  }
75  if (maxSecondsUntilRampdown_ > 0) {
77  }
78 
79  std::string const defaultMode("RunsLumisAndEvents");
80  std::string const runMode("Runs");
81  std::string const runLumiMode("RunsAndLumis");
82 
83  // The default value provided as the second argument to the getUntrackedParameter function call
84  // is not used when the ParameterSet has been validated and the parameters are not optional
85  // in the description. As soon as all primary input sources and all modules with a secondary
86  // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
87  // calls can and should be deleted from the code.
88  std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
89  if (processingMode == runMode) {
91  } else if (processingMode == runLumiMode) {
93  } else if (processingMode != defaultMode) {
95  << "InputSource::InputSource()\n"
96  << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
97  << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
98  }
99  }
100 
102 
105  desc.setUnknown();
106  descriptions.addDefault(desc);
107  }
108 
110 
111  static std::string const kBaseType("Source");
112 
114 
116  std::string defaultString("RunsLumisAndEvents");
117  desc.addUntracked<std::string>("processingMode", defaultString)
118  ->setComment(
119  "'RunsLumisAndEvents': process runs, lumis, and events.\n"
120  "'RunsAndLumis': process runs and lumis (not events).\n"
121  "'Runs': process runs (not lumis or events).");
122  desc.addUntracked<bool>("writeStatusFile", false)
123  ->setComment("Write a status file. Intended for use by workflow management.");
124  }
125 
126  // This next function is to guarantee that "runs only" mode does not return events or lumis,
127  // and that "runs and lumis only" mode does not return events.
128  // For input sources that are not random access (e.g. you need to read through the events
129  // to get to the lumis and runs), this is all that is involved to implement these modes.
130  // For input sources where events or lumis can be skipped, getNextItemType() should
131  // implement the skipping internally, so that the performance gain is realized.
132  // If this is done for a source, the 'if' blocks in this function will never be entered
133  // for that source.
135  ItemType itemType = callWithTryCatchAndPrint<ItemType>([this]() { return getNextItemType(); },
136  "Calling InputSource::getNextItemType");
137 
138  if (itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
139  skipEvents(1);
140  return nextItemType_();
141  }
142  if (itemType == IsLumi && processingMode() == Runs) {
143  // QQQ skipLuminosityBlock_();
144  return nextItemType_();
145  }
146  return itemType;
147  }
148 
150  ItemType oldState = state_;
151  if (eventLimitReached()) {
152  // If the maximum event limit has been reached, stop.
153  state_ = IsStop;
154  } else if (lumiLimitReached()) {
155  // If the maximum lumi limit has been reached, stop
156  // when reaching a new file, run, or lumi.
157  if (oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
158  state_ = IsStop;
159  } else {
160  ItemType newState = nextItemType_();
161  if (newState == IsEvent) {
162  assert(processingMode() == RunsLumisAndEvents);
163  state_ = IsEvent;
164  } else {
165  state_ = IsStop;
166  }
167  }
168  } else {
169  ItemType newState = nextItemType_();
170  if (newState == IsStop) {
171  state_ = IsStop;
172  } else if (newState == IsSynchronize) {
174  } else if (newState == IsFile || oldState == IsInvalid) {
175  state_ = IsFile;
176  } else if (newState == IsRun || oldState == IsFile) {
178  state_ = IsRun;
179  } else if (newState == IsLumi || oldState == IsRun) {
180  assert(processingMode() != Runs);
182  state_ = IsLumi;
183  } else {
184  assert(processingMode() == RunsLumisAndEvents);
185  state_ = IsEvent;
186  }
187  }
188  if (state_ == IsStop) {
189  lumiAuxiliary_.reset();
190  runAuxiliary_.reset();
191  }
192  return state_;
193  }
194 
195  std::shared_ptr<LuminosityBlockAuxiliary> InputSource::readLuminosityBlockAuxiliary() {
196  return callWithTryCatchAndPrint<std::shared_ptr<LuminosityBlockAuxiliary> >(
197  [this]() { return readLuminosityBlockAuxiliary_(); }, "Calling InputSource::readLuminosityBlockAuxiliary_");
198  }
199 
200  std::shared_ptr<RunAuxiliary> InputSource::readRunAuxiliary() {
201  return callWithTryCatchAndPrint<std::shared_ptr<RunAuxiliary> >([this]() { return readRunAuxiliary_(); },
202  "Calling InputSource::readRunAuxiliary_");
203  }
204 
205  void InputSource::doBeginJob() { this->beginJob(); }
206 
208 
209  std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> InputSource::resourceSharedWithDelayedReader() {
211  }
212 
213  std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> InputSource::resourceSharedWithDelayedReader_() {
214  return std::pair<SharedResourcesAcquirer*, std::recursive_mutex*>(nullptr, nullptr);
215  }
216 
218 
219  // Return a dummy file block.
220  std::unique_ptr<FileBlock> InputSource::readFile() {
221  assert(state_ == IsFile);
222  assert(!limitReached());
223  return callWithTryCatchAndPrint<std::unique_ptr<FileBlock> >([this]() { return readFile_(); },
224  "Calling InputSource::readFile_");
225  }
226 
227  void InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
228  if (fb != nullptr)
229  fb->close();
230  callWithTryCatchAndPrint<void>(
231  [this]() { closeFile_(); }, "Calling InputSource::closeFile_", cleaningUpAfterException);
232  return;
233  }
234 
235  // Return a dummy file block.
236  // This function must be overridden for any input source that reads a file
237  // containing Products.
238  std::unique_ptr<FileBlock> InputSource::readFile_() { return std::make_unique<FileBlock>(); }
239 
241  RunSourceSentry sentry(*this, runPrincipal.index());
242  callWithTryCatchAndPrint<void>([this, &runPrincipal]() { readRun_(runPrincipal); },
243  "Calling InputSource::readRun_");
244  }
245 
247  RunSourceSentry sentry(*this, rp.index());
248  callWithTryCatchAndPrint<void>([this, &rp]() { readRun_(rp); }, "Calling InputSource::readRun_");
249  }
250 
252  LumiSourceSentry sentry(*this, lumiPrincipal.index());
253  callWithTryCatchAndPrint<void>([this, &lumiPrincipal]() { readLuminosityBlock_(lumiPrincipal); },
254  "Calling InputSource::readLuminosityBlock_");
255  if (remainingLumis_ > 0) {
256  --remainingLumis_;
257  }
258  }
259 
261  LumiSourceSentry sentry(*this, lbp.index());
262  callWithTryCatchAndPrint<void>([this, &lbp]() { readLuminosityBlock_(lbp); },
263  "Calling InputSource::readLuminosityBlock_");
264  if (remainingLumis_ > 0) {
265  --remainingLumis_;
266  }
267  }
268 
269  void InputSource::readRun_(RunPrincipal& runPrincipal) {
270  // Note: For the moment, we do not support saving and restoring the state of the
271  // random number generator if random numbers are generated during processing of runs
272  // (e.g. beginRun(), endRun())
274  }
275 
278  }
279 
281  assert(state_ == IsEvent);
282  assert(!eventLimitReached());
283  {
284  // block scope, in order to issue the PostSourceEvent signal before calling postRead and issueReports
285  EventSourceSentry sentry(*this, streamContext);
286 
287  callWithTryCatchAndPrint<void>([this, &ep]() { readEvent_(ep); }, "Calling InputSource::readEvent_");
288  }
289 
290  if (remainingEvents_ > 0)
292  ++readCount_;
293  setTimestamp(ep.time());
294  issueReports(ep.id(), ep.streamID());
295  }
296 
297  bool InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
298  bool result = false;
299 
300  if (not limitReached()) {
301  // the Pre/PostSourceEvent signals should be generated only if the event is actually found.
302  // this should be taken care of by an EventSourceSentry in the implementaion of readIt()
303 
304  //result = callWithTryCatchAndPrint<bool>( [this,&eventID,&ep](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
305  result = readIt(eventID, ep, streamContext);
306 
307  if (result) {
308  if (remainingEvents_ > 0)
310  ++readCount_;
311  issueReports(ep.id(), ep.streamID());
312  }
313  }
314  return result;
315  }
316 
318  callWithTryCatchAndPrint<void>([this, &offset]() { skip(offset); }, "Calling InputSource::skip");
319  }
320 
321  bool InputSource::goToEvent(EventID const& eventID) {
322  return callWithTryCatchAndPrint<bool>([this, &eventID]() { return goToEvent_(eventID); },
323  "Calling InputSource::goToEvent_");
324  }
325 
327  state_ = IsInvalid;
329  setNewRun();
330  setNewLumi();
332  callWithTryCatchAndPrint<void>([this]() { rewind_(); }, "Calling InputSource::rewind_");
333  }
334 
335  void InputSource::issueReports(EventID const& eventID, StreamID streamID) {
336  if (isInfoEnabled()) {
337  LogVerbatim("FwkReport") << "Begin processing the " << readCount_ << suffix(readCount_) << " record. Run "
338  << eventID.run() << ", Event " << eventID.event() << ", LumiSection "
339  << eventID.luminosityBlock() << " on stream " << streamID.value() << " at "
340  << std::setprecision(3) << TimeOfDay();
341  }
342  if (!statusFileName_.empty()) {
343  std::ofstream statusFile(statusFileName_.c_str());
344  statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
345  statusFile.close();
346  }
347 
348  // At some point we may want to initiate checkpointing here
349  }
350 
352  throw Exception(errors::LogicError) << "InputSource::readIt()\n"
353  << "Random access is not implemented for this type of Input Source\n"
354  << "Contact a Framework Developer\n";
355  }
356 
358  throw Exception(errors::LogicError) << "InputSource::setRun()\n"
359  << "Run number cannot be modified for this type of Input Source\n"
360  << "Contact a Framework Developer\n";
361  }
362 
364  throw Exception(errors::LogicError) << "InputSource::setLumi()\n"
365  << "Luminosity Block ID cannot be modified for this type of Input Source\n"
366  << "Contact a Framework Developer\n";
367  }
368 
369  void InputSource::skip(int) {
370  throw Exception(errors::LogicError) << "InputSource::skip()\n"
371  << "Random access are not implemented for this type of Input Source\n"
372  << "Contact a Framework Developer\n";
373  }
374 
376  throw Exception(errors::LogicError) << "InputSource::goToEvent_()\n"
377  << "Random access is not implemented for this type of Input Source\n"
378  << "Contact a Framework Developer\n";
379  return true;
380  }
381 
383  throw Exception(errors::LogicError) << "InputSource::rewind()\n"
384  << "Random access are not implemented for this type of Input Source\n"
385  << "Contact a Framework Developer\n";
386  }
387 
389  if (-1 == remainingEvents_) {
390  return;
391  }
392  if (iSkipped < remainingEvents_) {
393  remainingEvents_ -= iSkipped;
394  } else {
395  remainingEvents_ = 0;
396  }
397  }
398 
400 
402 
404  return callWithTryCatchAndPrint<bool>([this]() { return randomAccess_(); }, "Calling InputSource::randomAccess_");
405  }
406 
408  return callWithTryCatchAndPrint<ProcessingController::ForwardState>([this]() { return forwardState_(); },
409  "Calling InputSource::forwardState_");
410  }
411 
413  return callWithTryCatchAndPrint<ProcessingController::ReverseState>([this]() { return reverseState_(); },
414  "Calling InputSource::reverseState__");
415  }
416 
418 
420 
421  bool InputSource::randomAccess_() const { return false; }
422 
425  }
426 
429  }
430 
432  assert(runAuxiliary());
433  return processHistoryRegistry_->reducedProcessHistoryID(runAuxiliary()->processHistoryID());
434  }
435 
437  assert(runAuxiliary());
438  return runAuxiliary()->run();
439  }
440 
442  assert(luminosityBlockAuxiliary());
443  return luminosityBlockAuxiliary()->luminosityBlock();
444  }
445 
447  : source_(source), sc_(sc) {
448  source.actReg()->preSourceSignal_(sc_.streamID());
449  }
450 
452 
454  : source_(source), index_(index) {
455  source_.actReg()->preSourceLumiSignal_(index_);
456  }
457 
459 
461  : source_(source), index_(index) {
462  source_.actReg()->preSourceRunSignal_(index_);
463  }
464 
466 
468  : post_(source.actReg()->postOpenFileSignal_), lfn_(lfn), usedFallback_(usedFallback) {
469  source.actReg()->preOpenFileSignal_(lfn, usedFallback);
470  }
471 
473 
475  : post_(source.actReg()->postCloseFileSignal_), lfn_(lfn), usedFallback_(usedFallback) {
476  source.actReg()->preCloseFileSignal_(lfn, usedFallback);
477  }
478 
480 } // namespace edm
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:155
T getUntrackedParameter(std::string const &, T const &) const
virtual void setRun(RunNumber_t r)
Definition: InputSource.cc:357
std::shared_ptr< RunAuxiliary > runAuxiliary_
Definition: InputSource.h:425
void doBeginJob()
Called by framework at beginning of job.
Definition: InputSource.cc:205
virtual std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()=0
std::shared_ptr< LuminosityBlockAuxiliary > lumiAuxiliary_
Definition: InputSource.h:426
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:388
virtual void closeFile_()
Definition: InputSource.h:391
static std::string const source("source")
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
virtual void doBeginRun(RunPrincipal &rp, ProcessContext const *)
Called by framework at beginning of run.
Definition: InputSource.cc:399
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader()
Returns nullptr if no resource shared between the Source and a DelayedReader.
Definition: InputSource.cc:209
void readAndMergeRun(RunPrincipal &rp)
Read next run (same as a prior run)
Definition: InputSource.cc:246
std::string statusFileName_
Definition: InputSource.h:427
virtual void registerProducts()
Register any produced products.
Definition: InputSource.cc:217
virtual void setLumi(LuminosityBlockNumber_t lb)
Definition: InputSource.cc:363
EventID const & id() const
virtual void rewind_()
Definition: InputSource.cc:382
std::shared_ptr< RunAuxiliary > readRunAuxiliary()
Read next run Auxiliary.
Definition: InputSource.cc:200
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:233
void setTimestamp(Timestamp const &theTime)
To set the current time, as seen by the input source.
Definition: InputSource.h:324
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:436
LuminosityBlockIndex index() const
void fillRunPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
Definition: RunPrincipal.cc:26
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
unsigned int LuminosityBlockNumber_t
virtual void readEvent_(EventPrincipal &eventPrincipal)=0
void closeFile(FileBlock *, bool cleaningUpAfterException)
close current file
Definition: InputSource.cc:227
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: InputSource.cc:431
bool limitReached() const
Definition: InputSource.h:381
Timestamp const & time() const
virtual bool goToEvent_(EventID const &eventID)
Definition: InputSource.cc:375
void close()
Definition: FileBlock.h:117
std::chrono::time_point< std::chrono::steady_clock > processingStart_
Definition: InputSource.h:412
FileCloseSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:474
ProcessingController::ForwardState forwardState() const
Definition: InputSource.cc:407
EventSourceSentry(InputSource const &source, StreamContext &sc)
Definition: InputSource.cc:446
virtual ProcessingController::ForwardState forwardState_() const
Definition: InputSource.cc:423
virtual std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_()
Definition: InputSource.cc:213
void fillLuminosityBlockPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
ProcessingController::ReverseState reverseState() const
Definition: InputSource.cc:412
void addDefault(ParameterSetDescription const &psetDescription)
bool lumiLimitReached() const
Definition: InputSource.h:367
void readRun(RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
Read next run (new run)
Definition: InputSource.cc:240
virtual void beginJob()
Definition: InputSource.cc:417
ProcessingMode processingMode_
Definition: InputSource.h:413
int maxSecondsUntilRampdown_
Definition: InputSource.h:411
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary()
Read next luminosity block Auxilary.
Definition: InputSource.cc:195
virtual void doBeginLumi(LuminosityBlockPrincipal &lbp, ProcessContext const *)
Called by framework at beginning of lumi block.
Definition: InputSource.cc:401
virtual ItemType getNextItemType()=0
static const std::string & baseType()
Definition: InputSource.cc:113
void issueReports(EventID const &eventID, StreamID streamID)
issue an event report
Definition: InputSource.cc:335
StreamID streamID() const
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext)
Definition: InputSource.cc:351
bool goToEvent(EventID const &eventID)
Definition: InputSource.cc:321
runMode
define run mode.
void resetEventCached()
Definition: InputSource.h:359
StreamID const & streamID() const
Definition: StreamContext.h:54
virtual ~InputSource() noexcept(false)
Destructor.
Definition: InputSource.cc:101
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:441
#define noexcept
bool eventLimitReached() const
Definition: InputSource.h:366
void readAndMergeLumi(LuminosityBlockPrincipal &lbp)
Read next luminosity block (same as a prior lumi)
Definition: InputSource.cc:260
void skipEvents(int offset)
Definition: InputSource.cc:317
unsigned int value() const
Definition: StreamID.h:42
virtual void readRun_(RunPrincipal &runPrincipal)
Definition: InputSource.cc:269
virtual std::unique_ptr< FileBlock > readFile_()
Definition: InputSource.cc:238
LumiSourceSentry(InputSource const &source, LuminosityBlockIndex id)
Definition: InputSource.cc:453
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: InputSource.cc:103
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
void readEvent(EventPrincipal &ep, StreamContext &)
Read next event.
Definition: InputSource.cc:280
std::unique_ptr< FileBlock > readFile()
Read next file.
Definition: InputSource.cc:220
RunIndex index() const
Definition: RunPrincipal.h:56
virtual bool randomAccess_() const
Definition: InputSource.cc:421
static const std::string kBaseType("EDAnalyzer")
bool isInfoEnabled()
virtual std::shared_ptr< RunAuxiliary > readRunAuxiliary_()=0
ItemType nextItemType_()
Definition: InputSource.cc:134
HLT enums.
std::shared_ptr< ActivityRegistry > actReg() const
Accessor for Activity Registry.
Definition: InputSource.h:236
edm::propagate_const< std::unique_ptr< ProcessHistoryRegistry > > processHistoryRegistry_
Definition: InputSource.h:416
virtual ProcessingController::ReverseState reverseState_() const
Definition: InputSource.cc:427
bool randomAccess() const
Definition: InputSource.cc:403
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:115
void rewind()
Begin again at the first event.
Definition: InputSource.cc:326
unsigned int RunNumber_t
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal)
Definition: InputSource.cc:276
void readLuminosityBlock(LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
Read next luminosity block (new lumi)
Definition: InputSource.cc:251
FileOpenSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:467
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
ItemType nextItemType()
Advances the source to the next item.
Definition: InputSource.cc:149
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:46
RunSourceSentry(InputSource const &source, RunIndex id)
Definition: InputSource.cc:460
virtual void endJob()
Definition: InputSource.cc:419
static void prevalidate(ConfigurationDescriptions &)
Definition: InputSource.cc:109
virtual void skip(int offset)
Definition: InputSource.cc:369
std::string createGlobalIdentifier()