CMS 3D CMS Logo

InputSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
4 
22 
23 #include <cassert>
24 #include <fstream>
25 #include <iomanip>
26 
27 namespace edm {
28 
29  namespace {
30  std::string const& suffix(int count) {
31  static std::string const st("st");
32  static std::string const nd("nd");
33  static std::string const rd("rd");
34  static std::string const th("th");
35  // *0, *4 - *9 use "th".
36  int lastDigit = count % 10;
37  if(lastDigit >= 4 || lastDigit == 0) return th;
38  // *11, *12, or *13 use "th".
39  if(count % 100 - lastDigit == 10) return th;
40  return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
41  }
42  }
43 
45  actReg_(desc.actReg_),
46  maxEvents_(desc.maxEvents_),
47  remainingEvents_(maxEvents_),
48  maxLumis_(desc.maxLumis_),
49  remainingLumis_(maxLumis_),
50  readCount_(0),
51  maxSecondsUntilRampdown_(desc.maxSecondsUntilRampdown_),
52  processingMode_(RunsLumisAndEvents),
53  moduleDescription_(desc.moduleDescription_),
54  productRegistry_(desc.productRegistry_),
55  processHistoryRegistry_(new ProcessHistoryRegistry),
56  branchIDListHelper_(desc.branchIDListHelper_),
57  thinnedAssociationsHelper_(desc.thinnedAssociationsHelper_),
58  processGUID_(createGlobalIdentifier()),
59  time_(),
60  newRun_(true),
61  newLumi_(true),
62  eventCached_(false),
63  state_(IsInvalid),
64  runAuxiliary_(),
65  lumiAuxiliary_(),
66  statusFileName_(),
67  numberOfEventsBeforeBigSkip_(0) {
68 
69  if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
70  std::ostringstream statusfilename;
71  statusfilename << "source_" << getpid();
72  statusFileName_ = statusfilename.str();
73  }
74  if (maxSecondsUntilRampdown_ > 0) {
76  }
77 
78  std::string const defaultMode("RunsLumisAndEvents");
79  std::string const runMode("Runs");
80  std::string const runLumiMode("RunsAndLumis");
81 
82  // The default value provided as the second argument to the getUntrackedParameter function call
83  // is not used when the ParameterSet has been validated and the parameters are not optional
84  // in the description. As soon as all primary input sources and all modules with a secondary
85  // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
86  // calls can and should be deleted from the code.
87  std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
88  if(processingMode == runMode) {
90  } else if(processingMode == runLumiMode) {
92  } else if(processingMode != defaultMode) {
94  << "InputSource::InputSource()\n"
95  << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
96  << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
97  }
98  }
99 
101 
102  void
105  desc.setUnknown();
106  descriptions.addDefault(desc);
107  }
108 
109  void
111  }
112 
113 
114  static std::string const kBaseType("Source");
115 
116  std::string const&
118  return kBaseType;
119  }
120 
121  void
123  std::string defaultString("RunsLumisAndEvents");
124  desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
125  "'RunsLumisAndEvents': process runs, lumis, and events.\n"
126  "'RunsAndLumis': process runs and lumis (not events).\n"
127  "'Runs': process runs (not lumis or events).");
128  desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
129  }
130 
131  // This next function is to guarantee that "runs only" mode does not return events or lumis,
132  // and that "runs and lumis only" mode does not return events.
133  // For input sources that are not random access (e.g. you need to read through the events
134  // to get to the lumis and runs), this is all that is involved to implement these modes.
135  // For input sources where events or lumis can be skipped, getNextItemType() should
136  // implement the skipping internally, so that the performance gain is realized.
137  // If this is done for a source, the 'if' blocks in this function will never be entered
138  // for that source.
141  ItemType itemType = callWithTryCatchAndPrint<ItemType>( [this](){ return getNextItemType(); }, "Calling InputSource::getNextItemType" );
142 
143  if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
144  skipEvents(1);
145  return nextItemType_();
146  }
147  if(itemType == IsLumi && processingMode() == Runs) {
148  // QQQ skipLuminosityBlock_();
149  return nextItemType_();
150  }
151  return itemType;
152  }
153 
156  ItemType oldState = state_;
157  if(eventLimitReached()) {
158  // If the maximum event limit has been reached, stop.
159  state_ = IsStop;
160  } else if(lumiLimitReached()) {
161  // If the maximum lumi limit has been reached, stop
162  // when reaching a new file, run, or lumi.
163  if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
164  state_ = IsStop;
165  } else {
166  ItemType newState = nextItemType_();
167  if(newState == IsEvent) {
168  assert (processingMode() == RunsLumisAndEvents);
169  state_ = IsEvent;
170  } else {
171  state_ = IsStop;
172  }
173  }
174  } else {
175  ItemType newState = nextItemType_();
176  if(newState == IsStop) {
177  state_ = IsStop;
178  } else if(newState == IsSynchronize) {
180  } else if(newState == IsFile || oldState == IsInvalid) {
181  state_ = IsFile;
182  } else if(newState == IsRun || oldState == IsFile) {
184  state_ = IsRun;
185  } else if(newState == IsLumi || oldState == IsRun) {
186  assert (processingMode() != Runs);
188  state_ = IsLumi;
189  } else {
190  assert (processingMode() == RunsLumisAndEvents);
191  state_ = IsEvent;
192  }
193  }
194  if(state_ == IsStop) {
195  lumiAuxiliary_.reset();
196  runAuxiliary_.reset();
197  }
198  return state_;
199  }
200 
201  std::shared_ptr<LuminosityBlockAuxiliary>
203  return callWithTryCatchAndPrint<std::shared_ptr<LuminosityBlockAuxiliary> >( [this](){ return readLuminosityBlockAuxiliary_(); },
204  "Calling InputSource::readLuminosityBlockAuxiliary_" );
205  }
206 
207  std::shared_ptr<RunAuxiliary>
209  return callWithTryCatchAndPrint<std::shared_ptr<RunAuxiliary> >( [this](){ return readRunAuxiliary_(); },
210  "Calling InputSource::readRunAuxiliary_" );
211  }
212 
213  void
215  this->beginJob();
216  }
217 
218  void
220  endJob();
221  }
222 
223  std::pair<SharedResourcesAcquirer*,std::recursive_mutex*>
226  }
227 
228  std::pair<SharedResourcesAcquirer*,std::recursive_mutex*>
230  return std::pair<SharedResourcesAcquirer*,std::recursive_mutex*>(nullptr,nullptr);
231  }
232 
233  void
235  }
236 
237  // Return a dummy file block.
238  std::unique_ptr<FileBlock>
240  assert(state_ == IsFile);
241  assert(!limitReached());
242  return callWithTryCatchAndPrint<std::unique_ptr<FileBlock> >( [this](){ return readFile_(); },
243  "Calling InputSource::readFile_" );
244  }
245 
246  void
247  InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
248  if(fb != nullptr) fb->close();
249  callWithTryCatchAndPrint<void>( [this](){ closeFile_(); },
250  "Calling InputSource::closeFile_",
251  cleaningUpAfterException );
252  return;
253  }
254 
255  // Return a dummy file block.
256  // This function must be overridden for any input source that reads a file
257  // containing Products.
258  std::unique_ptr<FileBlock>
260  return std::make_unique<FileBlock>();
261  }
262 
263  void
265  RunSourceSentry sentry(*this, runPrincipal.index());
266  callWithTryCatchAndPrint<void>( [this,&runPrincipal](){ readRun_(runPrincipal); }, "Calling InputSource::readRun_" );
267  }
268 
269  void
271  RunSourceSentry sentry(*this, rp.index());
272  callWithTryCatchAndPrint<void>( [this,&rp](){ readRun_(rp); }, "Calling InputSource::readRun_" );
273  }
274 
275  void
277  LumiSourceSentry sentry(*this, lumiPrincipal.index());
278  callWithTryCatchAndPrint<void>( [this,&lumiPrincipal](){ readLuminosityBlock_(lumiPrincipal); }, "Calling InputSource::readLuminosityBlock_" );
279  if(remainingLumis_ > 0) {
280  --remainingLumis_;
281  }
282  }
283 
284  void
286  LumiSourceSentry sentry(*this, lbp.index());
287  callWithTryCatchAndPrint<void>( [this,&lbp](){ readLuminosityBlock_(lbp); }, "Calling InputSource::readLuminosityBlock_" );
288  if(remainingLumis_ > 0) {
289  --remainingLumis_;
290  }
291  }
292 
293  void
295  // Note: For the moment, we do not support saving and restoring the state of the
296  // random number generator if random numbers are generated during processing of runs
297  // (e.g. beginRun(), endRun())
299  }
300 
301  void
304  }
305 
306  void
308  assert(state_ == IsEvent);
309  assert(!eventLimitReached());
310  {
311  // block scope, in order to issue the PostSourceEvent signal before calling postRead and issueReports
312  EventSourceSentry sentry(*this, streamContext);
313 
314  callWithTryCatchAndPrint<void>( [this,&ep](){ readEvent_(ep); }, "Calling InputSource::readEvent_" );
315  }
316 
318  ++readCount_;
319  setTimestamp(ep.time());
320  issueReports(ep.id(), ep.streamID());
321  }
322 
323  bool
324  InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
325  bool result = false;
326 
327  if (not limitReached()) {
328  // the Pre/PostSourceEvent signals should be generated only if the event is actually found.
329  // this should be taken care of by an EventSourceSentry in the implementaion of readIt()
330 
331  //result = callWithTryCatchAndPrint<bool>( [this,&eventID,&ep](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
332  result = readIt(eventID, ep, streamContext);
333 
334  if (result) {
336  ++readCount_;
337  issueReports(ep.id(), ep.streamID());
338  }
339  }
340  return result;
341  }
342 
343  void
345  callWithTryCatchAndPrint<void>( [this,&offset](){ skip(offset); }, "Calling InputSource::skip" );
346  }
347 
348  bool
349  InputSource::goToEvent(EventID const& eventID) {
350  return callWithTryCatchAndPrint<bool>( [this,&eventID](){ return goToEvent_(eventID); }, "Calling InputSource::goToEvent_" );
351  }
352 
353  void
355  state_ = IsInvalid;
357  setNewRun();
358  setNewLumi();
360  callWithTryCatchAndPrint<void>( [this](){ rewind_(); }, "Calling InputSource::rewind_" );
361  }
362 
363  void
364  InputSource::issueReports(EventID const& eventID, StreamID streamID) {
365  if(isInfoEnabled()) {
366  LogVerbatim("FwkReport") << "Begin processing the " << readCount_
367  << suffix(readCount_) << " record. Run " << eventID.run()
368  << ", Event " << eventID.event()
369  << ", LumiSection " << eventID.luminosityBlock()
370  << " on stream "<<streamID.value()
371  << " at " << std::setprecision(3) << TimeOfDay();
372  }
373  if(!statusFileName_.empty()) {
374  std::ofstream statusFile(statusFileName_.c_str());
375  statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
376  statusFile.close();
377  }
378 
379  // At some point we may want to initiate checkpointing here
380  }
381 
382  bool
385  << "InputSource::readIt()\n"
386  << "Random access is not implemented for this type of Input Source\n"
387  << "Contact a Framework Developer\n";
388  }
389 
390  void
393  << "InputSource::setRun()\n"
394  << "Run number cannot be modified for this type of Input Source\n"
395  << "Contact a Framework Developer\n";
396  }
397 
398  void
401  << "InputSource::setLumi()\n"
402  << "Luminosity Block ID cannot be modified for this type of Input Source\n"
403  << "Contact a Framework Developer\n";
404  }
405 
406  void
409  << "InputSource::skip()\n"
410  << "Random access are not implemented for this type of Input Source\n"
411  << "Contact a Framework Developer\n";
412  }
413 
414  bool
417  << "InputSource::goToEvent_()\n"
418  << "Random access is not implemented for this type of Input Source\n"
419  << "Contact a Framework Developer\n";
420  return true;
421  }
422 
423  void
426  << "InputSource::rewind()\n"
427  << "Random access are not implemented for this type of Input Source\n"
428  << "Contact a Framework Developer\n";
429  }
430 
431  void
433  if(-1 == remainingEvents_) {
434  return;
435  }
436  if(iSkipped < remainingEvents_) {
437  remainingEvents_ -= iSkipped;
438  } else {
439  remainingEvents_ = 0;
440  }
441  }
442 
443  void
445  }
446 
447  void
449  }
450 
451  bool
453  return callWithTryCatchAndPrint<bool>( [this](){ return randomAccess_(); },
454  "Calling InputSource::randomAccess_" );
455  }
456 
459  return callWithTryCatchAndPrint<ProcessingController::ForwardState>( [this](){ return forwardState_(); },
460  "Calling InputSource::forwardState_" );
461  }
462 
465  return callWithTryCatchAndPrint<ProcessingController::ReverseState>( [this](){ return reverseState_(); },
466  "Calling InputSource::reverseState__" );
467  }
468 
469  void
471 
472  void
474 
475  bool
477  return false;
478  }
479 
483  }
484 
488  }
489 
490  ProcessHistoryID const&
492  assert(runAuxiliary());
493  return processHistoryRegistry_->reducedProcessHistoryID(runAuxiliary()->processHistoryID());
494  }
495 
498  assert(runAuxiliary());
499  return runAuxiliary()->run();
500  }
501 
504  assert(luminosityBlockAuxiliary());
505  return luminosityBlockAuxiliary()->luminosityBlock();
506  }
507 
509  source_(source),
510  sc_(sc)
511  {
512  source.actReg()->preSourceSignal_(sc_.streamID());
513  }
514 
516  source_.actReg()->postSourceSignal_(sc_.streamID());
517  }
518 
520  source_(source),
521  index_(index)
522  {
523  source_.actReg()->preSourceLumiSignal_(index_);
524  }
525 
527  source_.actReg()->postSourceLumiSignal_(index_);
528  }
529 
531  source_(source),
532  index_(index)
533  {
534  source_.actReg()->preSourceRunSignal_(index_);
535  }
536 
538  source_.actReg()->postSourceRunSignal_(index_);
539  }
540 
542  std::string const& lfn,
543  bool usedFallback) :
544  post_(source.actReg()->postOpenFileSignal_),
545  lfn_(lfn),
546  usedFallback_(usedFallback) {
547  source.actReg()->preOpenFileSignal_(lfn, usedFallback);
548  }
549 
552  }
553 
555  std::string const& lfn,
556  bool usedFallback) :
557  post_(source.actReg()->postCloseFileSignal_),
558  lfn_(lfn),
559  usedFallback_(usedFallback) {
560  source.actReg()->preCloseFileSignal_(lfn, usedFallback);
561  }
562 
565  }
566 }
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:168
T getUntrackedParameter(std::string const &, T const &) const
virtual void setRun(RunNumber_t r)
Definition: InputSource.cc:391
std::shared_ptr< RunAuxiliary > runAuxiliary_
Definition: InputSource.h:428
void doBeginJob()
Called by framework at beginning of job.
Definition: InputSource.cc:214
virtual std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()=0
std::shared_ptr< LuminosityBlockAuxiliary > lumiAuxiliary_
Definition: InputSource.h:429
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:432
virtual void closeFile_()
Definition: InputSource.h:393
static std::string const source("source")
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
virtual void doBeginRun(RunPrincipal &rp, ProcessContext const *)
Called by framework at beginning of run.
Definition: InputSource.cc:444
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader()
Returns nullptr if no resource shared between the Source and a DelayedReader.
Definition: InputSource.cc:224
void readAndMergeRun(RunPrincipal &rp)
Read next run (same as a prior run)
Definition: InputSource.cc:270
std::string statusFileName_
Definition: InputSource.h:430
virtual void registerProducts()
Register any produced products.
Definition: InputSource.cc:234
virtual void setLumi(LuminosityBlockNumber_t lb)
Definition: InputSource.cc:399
EventID const & id() const
virtual void rewind_()
Definition: InputSource.cc:424
#define noexcept
std::shared_ptr< RunAuxiliary > readRunAuxiliary()
Read next run Auxiliary.
Definition: InputSource.cc:208
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:240
void setTimestamp(Timestamp const &theTime)
To set the current time, as seen by the input source.
Definition: InputSource.h:332
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:497
LuminosityBlockIndex index() const
void fillRunPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
Definition: RunPrincipal.cc:28
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
unsigned int LuminosityBlockNumber_t
virtual void readEvent_(EventPrincipal &eventPrincipal)=0
void closeFile(FileBlock *, bool cleaningUpAfterException)
close current file
Definition: InputSource.cc:247
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: InputSource.cc:491
bool limitReached() const
Definition: InputSource.h:383
Timestamp const & time() const
virtual bool goToEvent_(EventID const &eventID)
Definition: InputSource.cc:415
void close()
Definition: FileBlock.h:114
std::chrono::time_point< std::chrono::steady_clock > processingStart_
Definition: InputSource.h:415
FileCloseSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:554
ProcessingController::ForwardState forwardState() const
Definition: InputSource.cc:458
EventSourceSentry(InputSource const &source, StreamContext &sc)
Definition: InputSource.cc:508
virtual ProcessingController::ForwardState forwardState_() const
Definition: InputSource.cc:481
virtual std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_()
Definition: InputSource.cc:229
void fillLuminosityBlockPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
ProcessingController::ReverseState reverseState() const
Definition: InputSource.cc:464
void addDefault(ParameterSetDescription const &psetDescription)
bool lumiLimitReached() const
Definition: InputSource.h:375
void readRun(RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
Read next run (new run)
Definition: InputSource.cc:264
virtual void beginJob()
Definition: InputSource.cc:470
ProcessingMode processingMode_
Definition: InputSource.h:416
int maxSecondsUntilRampdown_
Definition: InputSource.h:414
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary()
Read next luminosity block Auxilary.
Definition: InputSource.cc:202
virtual void doBeginLumi(LuminosityBlockPrincipal &lbp, ProcessContext const *)
Called by framework at beginning of lumi block.
Definition: InputSource.cc:448
virtual ItemType getNextItemType()=0
static const std::string & baseType()
Definition: InputSource.cc:117
void issueReports(EventID const &eventID, StreamID streamID)
issue an event report
Definition: InputSource.cc:364
StreamID streamID() const
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext)
Definition: InputSource.cc:383
bool goToEvent(EventID const &eventID)
Definition: InputSource.cc:349
runMode
define run mode.
void resetEventCached()
Definition: InputSource.h:367
StreamID const & streamID() const
Definition: StreamContext.h:57
virtual ~InputSource() noexcept(false)
Destructor.
Definition: InputSource.cc:100
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:503
bool eventLimitReached() const
Definition: InputSource.h:374
void readAndMergeLumi(LuminosityBlockPrincipal &lbp)
Read next luminosity block (same as a prior lumi)
Definition: InputSource.cc:285
void skipEvents(int offset)
Definition: InputSource.cc:344
unsigned int value() const
Definition: StreamID.h:46
virtual void readRun_(RunPrincipal &runPrincipal)
Definition: InputSource.cc:294
virtual std::unique_ptr< FileBlock > readFile_()
Definition: InputSource.cc:259
LumiSourceSentry(InputSource const &source, LuminosityBlockIndex id)
Definition: InputSource.cc:519
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: InputSource.cc:103
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:246
void readEvent(EventPrincipal &ep, StreamContext &)
Read next event.
Definition: InputSource.cc:307
std::unique_ptr< FileBlock > readFile()
Read next file.
Definition: InputSource.cc:239
RunIndex index() const
Definition: RunPrincipal.h:57
virtual bool randomAccess_() const
Definition: InputSource.cc:476
static const std::string kBaseType("EDAnalyzer")
bool isInfoEnabled()
virtual std::shared_ptr< RunAuxiliary > readRunAuxiliary_()=0
ItemType nextItemType_()
Definition: InputSource.cc:140
HLT enums.
std::shared_ptr< ActivityRegistry > actReg() const
Accessor for Activity Registry.
Definition: InputSource.h:243
edm::propagate_const< std::unique_ptr< ProcessHistoryRegistry > > processHistoryRegistry_
Definition: InputSource.h:419
virtual ProcessingController::ReverseState reverseState_() const
Definition: InputSource.cc:486
bool randomAccess() const
Definition: InputSource.cc:452
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:122
void rewind()
Begin again at the first event.
Definition: InputSource.cc:354
unsigned int RunNumber_t
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal)
Definition: InputSource.cc:302
void readLuminosityBlock(LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
Read next luminosity block (new lumi)
Definition: InputSource.cc:276
FileOpenSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:541
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:249
ItemType nextItemType()
Advances the source to the next item.
Definition: InputSource.cc:155
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:44
RunSourceSentry(InputSource const &source, RunIndex id)
Definition: InputSource.cc:530
virtual void endJob()
Definition: InputSource.cc:473
static void prevalidate(ConfigurationDescriptions &)
Definition: InputSource.cc:110
virtual void skip(int offset)
Definition: InputSource.cc:407
std::string createGlobalIdentifier()