CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
InputSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
4 
26 
27 #include <cassert>
28 #include <fstream>
29 #include <iomanip>
30 
31 namespace edm {
32 
33  namespace {
34  std::string const& suffix(int count) {
35  static std::string const st("st");
36  static std::string const nd("nd");
37  static std::string const rd("rd");
38  static std::string const th("th");
39  // *0, *4 - *9 use "th".
40  int lastDigit = count % 10;
41  if(lastDigit >= 4 || lastDigit == 0) return th;
42  // *11, *12, or *13 use "th".
43  if(count % 100 - lastDigit == 10) return th;
44  return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
45  }
46  template <typename T>
47  std::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
48  return std::shared_ptr<T>(ptr, do_nothing_deleter());
49  }
50  }
51 
54  actReg_(desc.actReg_),
55  maxEvents_(desc.maxEvents_),
56  remainingEvents_(maxEvents_),
57  maxLumis_(desc.maxLumis_),
58  remainingLumis_(maxLumis_),
59  readCount_(0),
60  maxSecondsUntilRampdown_(desc.maxSecondsUntilRampdown_),
61  processingMode_(RunsLumisAndEvents),
62  moduleDescription_(desc.moduleDescription_),
63  productRegistry_(createSharedPtrToStatic<ProductRegistry>(desc.productRegistry_)),
64  processHistoryRegistry_(new ProcessHistoryRegistry),
65  branchIDListHelper_(desc.branchIDListHelper_),
66  primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
67  processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
68  time_(),
69  newRun_(true),
70  newLumi_(true),
71  eventCached_(false),
72  state_(IsInvalid),
73  runAuxiliary_(),
74  lumiAuxiliary_(),
75  statusFileName_(),
76  receiver_(),
77  numberOfEventsBeforeBigSkip_(0) {
78 
79  if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
80  std::ostringstream statusfilename;
81  statusfilename << "source_" << getpid();
82  statusFileName_ = statusfilename.str();
83  }
84  if (maxSecondsUntilRampdown_ > 0) {
86  }
87 
88  // Secondary input sources currently do not have a product registry.
89  if(primary_) {
90  assert(desc.productRegistry_ != 0);
91  }
92  std::string const defaultMode("RunsLumisAndEvents");
93  std::string const runMode("Runs");
94  std::string const runLumiMode("RunsAndLumis");
95 
96  // The default value provided as the second argument to the getUntrackedParameter function call
97  // is not used when the ParameterSet has been validated and the parameters are not optional
98  // in the description. As soon as all primary input sources and all modules with a secondary
99  // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
100  // calls can and should be deleted from the code.
101  std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
102  if(processingMode == runMode) {
104  } else if(processingMode == runLumiMode) {
106  } else if(processingMode != defaultMode) {
108  << "InputSource::InputSource()\n"
109  << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
110  << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
111  }
112  }
113 
115 
116  void
119  desc.setUnknown();
120  descriptions.addDefault(desc);
121  }
122 
123  void
125  }
126 
127 
128  static std::string const kBaseType("Source");
129 
130  std::string const&
132  return kBaseType;
133  }
134 
135  void
137  std::string defaultString("RunsLumisAndEvents");
138  desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
139  "'RunsLumisAndEvents': process runs, lumis, and events.\n"
140  "'RunsAndLumis': process runs and lumis (not events).\n"
141  "'Runs': process runs (not lumis or events).");
142  desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
143  }
144 
145  bool
147  if(eventLimitReached()) {
148  return false;
149  }
151  receiver_->receive();
152  unsigned long toSkip = receiver_->numberToSkip();
153  if(0 != toSkip) {
154  skipEvents(toSkip);
156  }
157  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
159  return false;
160  }
161  }
162  return true;
163  }
164 
165  // This next function is to guarantee that "runs only" mode does not return events or lumis,
166  // and that "runs and lumis only" mode does not return events.
167  // For input sources that are not random access (e.g. you need to read through the events
168  // to get to the lumis and runs), this is all that is involved to implement these modes.
169  // For input sources where events or lumis can be skipped, getNextItemType() should
170  // implement the skipping internally, so that the performance gain is realized.
171  // If this is done for a source, the 'if' blocks in this function will never be entered
172  // for that source.
175  ItemType itemType = callWithTryCatchAndPrint<ItemType>( [this](){ return getNextItemType(); }, "Calling InputSource::getNextItemType" );
176 
177  if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
178  skipEvents(1);
179  return nextItemType_();
180  }
181  if(itemType == IsLumi && processingMode() == Runs) {
182  // QQQ skipLuminosityBlock_();
183  return nextItemType_();
184  }
185  return itemType;
186  }
187 
191  if(eventLimitReached()) {
192  // If the maximum event limit has been reached, stop.
193  state_ = IsStop;
194  } else if(lumiLimitReached()) {
195  // If the maximum lumi limit has been reached, stop
196  // when reaching a new file, run, or lumi.
197  if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
198  state_ = IsStop;
199  } else {
200  ItemType newState = nextItemType_();
201  if(newState == IsEvent) {
202  assert (processingMode() == RunsLumisAndEvents);
203  state_ = IsEvent;
204  } else {
205  state_ = IsStop;
206  }
207  }
208  } else {
209  ItemType newState = nextItemType_();
210  if(newState == IsStop) {
211  state_ = IsStop;
212  } else if(newState == IsSynchronize) {
214  } else if(newState == IsFile || oldState == IsInvalid) {
215  state_ = IsFile;
216  } else if(newState == IsRun || oldState == IsFile) {
218  state_ = IsRun;
219  } else if(newState == IsLumi || oldState == IsRun) {
220  assert (processingMode() != Runs);
222  state_ = IsLumi;
223  } else {
224  assert (processingMode() == RunsLumisAndEvents);
225  state_ = IsEvent;
226  }
227  }
228  if(state_ == IsStop) {
229  lumiAuxiliary_.reset();
230  runAuxiliary_.reset();
231  }
232  return state_;
233  }
234 
235  std::shared_ptr<LuminosityBlockAuxiliary>
237  return callWithTryCatchAndPrint<std::shared_ptr<LuminosityBlockAuxiliary> >( [this](){ return readLuminosityBlockAuxiliary_(); },
238  "Calling InputSource::readLuminosityBlockAuxiliary_" );
239  }
240 
241  std::shared_ptr<RunAuxiliary>
243  return callWithTryCatchAndPrint<std::shared_ptr<RunAuxiliary> >( [this](){ return readRunAuxiliary_(); },
244  "Calling InputSource::readRunAuxiliary_" );
245  }
246 
247  void
249  this->beginJob();
250  }
251 
252  void
254  endJob();
255  }
256 
260  }
261 
264  return nullptr;
265  }
266 
267  void
269  if(!typeLabelList().empty()) {
271  }
272  }
273 
274  // Return a dummy file block.
275  std::unique_ptr<FileBlock>
277  assert(state_ == IsFile);
278  assert(!limitReached());
279  return callWithTryCatchAndPrint<std::unique_ptr<FileBlock> >( [this](){ return readFile_(); },
280  "Calling InputSource::readFile_" );
281  }
282 
283  void
284  InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
285  if(fb != nullptr) fb->close();
286  callWithTryCatchAndPrint<void>( [this](){ closeFile_(); },
287  "Calling InputSource::closeFile_",
288  cleaningUpAfterException );
289  return;
290  }
291 
292  // Return a dummy file block.
293  // This function must be overridden for any input source that reads a file
294  // containing Products.
295  std::unique_ptr<FileBlock>
297  return std::unique_ptr<FileBlock>(new FileBlock);
298  }
299 
300  void
301  InputSource::readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender) {
302  RunSourceSentry sentry(*this);
303  callWithTryCatchAndPrint<void>( [this,&runPrincipal](){ readRun_(runPrincipal); }, "Calling InputSource::readRun_" );
304  }
305 
306  void
308  RunSourceSentry sentry(*this);
309  callWithTryCatchAndPrint<void>( [this,&rp](){ readRun_(rp); }, "Calling InputSource::readRun_" );
310  }
311 
312  void
314  LumiSourceSentry sentry(*this);
315  callWithTryCatchAndPrint<void>( [this,&lumiPrincipal](){ readLuminosityBlock_(lumiPrincipal); }, "Calling InputSource::readLuminosityBlock_" );
316  if(remainingLumis_ > 0) {
317  --remainingLumis_;
318  }
319  }
320 
321  void
323  LumiSourceSentry sentry(*this);
324  callWithTryCatchAndPrint<void>( [this,&lbp](){ readLuminosityBlock_(lbp); }, "Calling InputSource::readLuminosityBlock_" );
325  if(remainingLumis_ > 0) {
326  --remainingLumis_;
327  }
328  }
329 
330  void
332  // Note: For the moment, we do not support saving and restoring the state of the
333  // random number generator if random numbers are generated during processing of runs
334  // (e.g. beginRun(), endRun())
336  }
337 
338  void
341  }
342 
343  void
345  assert(state_ == IsEvent);
346  assert(!eventLimitReached());
347  {
348  // block scope, in order to issue the PostSourceEvent signal before calling postRead and issueReports
349  EventSourceSentry sentry(*this, streamContext);
350 
351  callWithTryCatchAndPrint<void>( [this,&ep](){ readEvent_(ep); }, "Calling InputSource::readEvent_" );
352  if(receiver_) {
354  }
355  }
356 
358  ++readCount_;
359  setTimestamp(ep.time());
360  issueReports(ep.id());
361  }
362 
363  bool
364  InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
365  bool result = false;
366 
367  if (not limitReached()) {
368  // the Pre/PostSourceEvent signals should be generated only if the event is actually found.
369  // this should be taken care of by an EventSourceSentry in the implementaion of readIt()
370 
371  //result = callWithTryCatchAndPrint<bool>( [this,&eventID,&ep](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
372  result = readIt(eventID, ep, streamContext);
373 
374  if (result) {
376  ++readCount_;
377  issueReports(ep.id());
378  }
379  }
380  return result;
381  }
382 
383  void
385  callWithTryCatchAndPrint<void>( [this,&offset](){ skip(offset); }, "Calling InputSource::skip" );
386  }
387 
388  bool
389  InputSource::goToEvent(EventID const& eventID) {
390  return callWithTryCatchAndPrint<bool>( [this,&eventID](){ return goToEvent_(eventID); }, "Calling InputSource::goToEvent_" );
391  }
392 
393  void
395  state_ = IsInvalid;
397  setNewRun();
398  setNewLumi();
400  callWithTryCatchAndPrint<void>( [this](){ rewind_(); }, "Calling InputSource::rewind_" );
401  if(receiver_) {
402  unsigned int numberToSkip = receiver_->numberToSkip();
403  skip(numberToSkip);
404  decreaseRemainingEventsBy(numberToSkip);
405  }
406  }
407 
408  void
410  if(isInfoEnabled()) {
411  LogVerbatim("FwkReport") << "Begin processing the " << readCount_
412  << suffix(readCount_) << " record. Run " << eventID.run()
413  << ", Event " << eventID.event()
414  << ", LumiSection " << eventID.luminosityBlock()
415  << " at " << std::setprecision(3) << TimeOfDay();
416  }
417  if(!statusFileName_.empty()) {
418  std::ofstream statusFile(statusFileName_.c_str());
419  statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
420  statusFile.close();
421  }
422 
423  // At some point we may want to initiate checkpointing here
424  }
425 
426  bool
429  << "InputSource::readIt()\n"
430  << "Random access is not implemented for this type of Input Source\n"
431  << "Contact a Framework Developer\n";
432  }
433 
434  void
437  << "InputSource::setRun()\n"
438  << "Run number cannot be modified for this type of Input Source\n"
439  << "Contact a Framework Developer\n";
440  }
441 
442  void
445  << "InputSource::setLumi()\n"
446  << "Luminosity Block ID cannot be modified for this type of Input Source\n"
447  << "Contact a Framework Developer\n";
448  }
449 
450  void
453  << "InputSource::skip()\n"
454  << "Forking and random access are not implemented for this type of Input Source\n"
455  << "Contact a Framework Developer\n";
456  }
457 
458  bool
461  << "InputSource::goToEvent_()\n"
462  << "Random access is not implemented for this type of Input Source\n"
463  << "Contact a Framework Developer\n";
464  return true;
465  }
466 
467  void
470  << "InputSource::rewind()\n"
471  << "Forking and random access are not implemented for this type of Input Source\n"
472  << "Contact a Framework Developer\n";
473  }
474 
475  void
477  if(-1 == remainingEvents_) {
478  return;
479  }
480  if(iSkipped < remainingEvents_) {
481  remainingEvents_ -= iSkipped;
482  } else {
483  remainingEvents_ = 0;
484  }
485  }
486 
487  void
489  Run run(rp, moduleDescription(), nullptr);
490  callWithTryCatchAndPrint<void>( [this,&run](){ beginRun(run); }, "Calling InputSource::beginRun" );
491  run.commit_();
492  }
493 
494  void
495  InputSource::doEndRun(RunPrincipal& rp, bool cleaningUpAfterException, ProcessContext const* processContext) {
496  rp.setEndTime(time_);
497  rp.setComplete();
498  Run run(rp, moduleDescription(), nullptr);
499  callWithTryCatchAndPrint<void>( [this,&run](){ endRun(run); }, "Calling InputSource::endRun", cleaningUpAfterException );
500  run.commit_();
501  }
502 
503  void
505  LuminosityBlock lb(lbp, moduleDescription(), nullptr);
506  callWithTryCatchAndPrint<void>( [this,&lb](){ beginLuminosityBlock(lb); }, "Calling InputSource::beginLuminosityBlock" );
507  lb.commit_();
508  }
509 
510  void
511  InputSource::doEndLumi(LuminosityBlockPrincipal& lbp, bool cleaningUpAfterException, ProcessContext const* processContext) {
512  lbp.setEndTime(time_);
513  lbp.setComplete();
514  LuminosityBlock lb(lbp, moduleDescription(), nullptr);
515  callWithTryCatchAndPrint<void>( [this,&lb](){ endLuminosityBlock(lb); }, "Calling InputSource::endLuminosityBlock", cleaningUpAfterException );
516  lb.commit_();
517  }
518 
519  void
521  callWithTryCatchAndPrint<void>( [this](){ preForkReleaseResources(); }, "Calling InputSource::preForkReleaseResources" );
522  }
523 
524  void
525  InputSource::doPostForkReacquireResources(std::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
526  callWithTryCatchAndPrint<void>( [this, &iReceiver](){ postForkReacquireResources(iReceiver); },
527  "Calling InputSource::postForkReacquireResources" );
528  }
529 
530  bool
532  return callWithTryCatchAndPrint<bool>( [this](){ return randomAccess_(); },
533  "Calling InputSource::randomAccess_" );
534  }
535 
538  return callWithTryCatchAndPrint<ProcessingController::ForwardState>( [this](){ return forwardState_(); },
539  "Calling InputSource::forwardState_" );
540  }
541 
544  return callWithTryCatchAndPrint<ProcessingController::ReverseState>( [this](){ return reverseState_(); },
545  "Calling InputSource::reverseState__" );
546  }
547 
548  void
550 
551  void
553 
554  void
556 
557  void
559 
560  void
562 
563  void
565 
566  void
568 
569  void
570  InputSource::postForkReacquireResources(std::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
571  receiver_ = iReceiver;
572  receiver_->receive();
573  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
574  rewind();
575  }
576 
577  bool
579  return false;
580  }
581 
585  }
586 
590  }
591 
592  ProcessHistoryID const&
594  assert(runAuxiliary());
595  return processHistoryRegistry_->reducedProcessHistoryID(runAuxiliary()->processHistoryID());
596  }
597 
600  assert(runAuxiliary());
601  return runAuxiliary()->run();
602  }
603 
606  assert(luminosityBlockAuxiliary());
607  return luminosityBlockAuxiliary()->luminosityBlock();
608  }
609 
610  InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
611  pre();
612  }
613 
615  post_();
616  }
617 
619  source_(source),
620  sc_(sc)
621  {
622  source.actReg()->preSourceSignal_(sc_.streamID());
623  }
624 
626  source_.actReg()->postSourceSignal_(sc_.streamID());
627  }
628 
630  sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
631  }
632 
634  sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
635  }
636 
638  std::string const& lfn,
639  bool usedFallback) :
640  post_(source.actReg()->postOpenFileSignal_),
641  lfn_(lfn),
642  usedFallback_(usedFallback) {
643  source.actReg()->preOpenFileSignal_(lfn, usedFallback);
644  }
645 
647  post_(lfn_, usedFallback_);
648  }
649 
651  std::string const& lfn,
652  bool usedFallback) :
653  post_(source.actReg()->postCloseFileSignal_),
654  lfn_(lfn),
655  usedFallback_(usedFallback) {
656  source.actReg()->preCloseFileSignal_(lfn, usedFallback);
657  }
658 
660  post_(lfn_, usedFallback_);
661  }
662 }
RunNumber_t run() const
Definition: EventID.h:39
virtual ~InputSource()
Destructor.
Definition: InputSource.cc:114
EventNumber_t event() const
Definition: EventID.h:41
ProcessHistoryRegistry const & processHistoryRegistry() const
Const accessor for process history registry.
Definition: InputSource.h:171
T getUntrackedParameter(std::string const &, T const &) const
virtual void setRun(RunNumber_t r)
Definition: InputSource.cc:435
virtual void endRun(Run &)
Definition: InputSource.cc:558
virtual void preForkReleaseResources()
Definition: InputSource.cc:567
void issueReports(EventID const &eventID)
issue an event report
Definition: InputSource.cc:409
std::shared_ptr< RunAuxiliary > runAuxiliary_
Definition: InputSource.h:449
void doBeginJob()
Called by framework at beginning of job.
Definition: InputSource.cc:248
virtual std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()=0
std::shared_ptr< LuminosityBlockAuxiliary > lumiAuxiliary_
Definition: InputSource.h:450
virtual void closeFile_()
Definition: InputSource.h:408
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:476
Timestamp time_
Definition: InputSource.h:444
static std::string const source("source")
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void doBeginRun(RunPrincipal &rp, ProcessContext const *)
Called by framework at beginning of run.
Definition: InputSource.cc:488
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:253
RunSourceSentry(InputSource const &source)
Definition: InputSource.cc:633
void readAndMergeRun(RunPrincipal &rp)
Read next run (same as a prior run)
Definition: InputSource.cc:307
std::string statusFileName_
Definition: InputSource.h:451
void doPostForkReacquireResources(std::shared_ptr< multicore::MessageReceiverForSource >)
Definition: InputSource.cc:525
void registerProducts()
Register any produced products.
Definition: InputSource.cc:268
virtual void setLumi(LuminosityBlockNumber_t lb)
Definition: InputSource.cc:443
EventID const & id() const
virtual void rewind_()
Definition: InputSource.cc:468
std::shared_ptr< RunAuxiliary > readRunAuxiliary()
Read next run Auxiliary.
Definition: InputSource.cc:242
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:253
virtual void postForkReacquireResources(std::shared_ptr< multicore::MessageReceiverForSource >)
Definition: InputSource.cc:570
void setTimestamp(Timestamp const &theTime)
To set the current time, as seen by the input source.
Definition: InputSource.h:345
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:599
void fillRunPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
Definition: RunPrincipal.cc:21
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
unsigned int LuminosityBlockNumber_t
virtual void beginLuminosityBlock(LuminosityBlock &)
Definition: InputSource.cc:549
virtual void readEvent_(EventPrincipal &eventPrincipal)=0
void closeFile(FileBlock *, bool cleaningUpAfterException)
close current file
Definition: InputSource.cc:284
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: InputSource.cc:593
bool limitReached() const
Definition: InputSource.h:398
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
Timestamp const & time() const
virtual bool goToEvent_(EventID const &eventID)
Definition: InputSource.cc:459
void close()
Definition: FileBlock.h:114
std::chrono::time_point< std::chrono::steady_clock > processingStart_
Definition: InputSource.h:436
FileCloseSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:650
ProcessingController::ForwardState forwardState() const
Definition: InputSource.cc:537
TypeLabelList & typeLabelList()
used by the fwk to register the list of products of this module
unsigned int numberOfEventsBeforeBigSkip_
Definition: InputSource.h:455
EventSourceSentry(InputSource const &source, StreamContext &sc)
Definition: InputSource.cc:618
virtual ProcessingController::ForwardState forwardState_() const
Definition: InputSource.cc:583
void fillLuminosityBlockPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
int remainingEvents() const
Definition: InputSource.h:194
ProcessingController::ReverseState reverseState() const
Definition: InputSource.cc:543
void addDefault(ParameterSetDescription const &psetDescription)
bool lumiLimitReached() const
Definition: InputSource.h:390
void readRun(RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
Read next run (new run)
Definition: InputSource.cc:301
virtual void beginJob()
Definition: InputSource.cc:561
ProcessingMode processingMode_
Definition: InputSource.h:437
int maxSecondsUntilRampdown_
Definition: InputSource.h:435
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary()
Read next luminosity block Auxilary.
Definition: InputSource.cc:236
void setEndTime(Timestamp const &time)
void doBeginLumi(LuminosityBlockPrincipal &lbp, ProcessContext const *)
Called by framework at beginning of lumi block.
Definition: InputSource.cc:504
virtual ItemType getNextItemType()=0
tuple result
Definition: query.py:137
static const std::string & baseType()
Definition: InputSource.cc:131
void commit_()
Definition: Run.cc:83
void doEndLumi(LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
Called by framework at end of lumi block.
Definition: InputSource.cc:511
std::shared_ptr< RunPrincipal > const runPrincipal() const
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext)
Definition: InputSource.cc:427
bool goToEvent(EventID const &eventID)
Definition: InputSource.cc:389
void doPreForkReleaseResources()
Called by the framework before forking the process.
Definition: InputSource.cc:520
SourceSentry(Sig &pre, Sig &post)
Definition: InputSource.cc:610
virtual SharedResourcesAcquirer * resourceSharedWithDelayedReader_() const
Definition: InputSource.cc:263
std::shared_ptr< edm::multicore::MessageReceiverForSource > receiver_
Definition: InputSource.h:454
#define end
Definition: vmac.h:37
unsigned int offset(bool)
void resetEventCached()
Definition: InputSource.h:382
StreamID const & streamID() const
Definition: StreamContext.h:57
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:605
bool eventLimitReached() const
Definition: InputSource.h:389
void readAndMergeLumi(LuminosityBlockPrincipal &lbp)
Read next luminosity block (same as a prior lumi)
Definition: InputSource.cc:322
void skipEvents(int offset)
Definition: InputSource.cc:384
virtual void beginRun(Run &)
Definition: InputSource.cc:555
virtual void readRun_(RunPrincipal &runPrincipal)
Definition: InputSource.cc:331
virtual std::unique_ptr< FileBlock > readFile_()
Definition: InputSource.cc:296
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:347
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: InputSource.cc:117
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
void readEvent(EventPrincipal &ep, StreamContext &)
Read next event.
Definition: InputSource.cc:344
std::unique_ptr< FileBlock > readFile()
Read next file.
Definition: InputSource.cc:276
virtual bool randomAccess_() const
Definition: InputSource.cc:578
static const std::string kBaseType("EDAnalyzer")
bool isInfoEnabled()
virtual std::shared_ptr< RunAuxiliary > readRunAuxiliary_()=0
ItemType nextItemType_()
Definition: InputSource.cc:174
#define begin
Definition: vmac.h:30
std::shared_ptr< ActivityRegistry > actReg() const
Accessor for Activity Registry.
Definition: InputSource.h:256
virtual ProcessingController::ReverseState reverseState_() const
Definition: InputSource.cc:588
std::unique_ptr< ProcessHistoryRegistry > processHistoryRegistry_
Definition: InputSource.h:440
bool randomAccess() const
Definition: InputSource.cc:531
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:136
bool const primary_
Definition: InputSource.h:442
void rewind()
Begin again at the first event.
Definition: InputSource.cc:394
int remainingLuminosityBlocks() const
Definition: InputSource.h:202
ModuleDescription const & moduleDescription() const
Accessor for &#39;module&#39; description.
Definition: InputSource.h:205
unsigned int RunNumber_t
Definition: EventRange.h:32
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal)
Definition: InputSource.cc:339
volatile std::atomic< bool > shutdown_flag false
void readLuminosityBlock(LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
Read next luminosity block (new lumi)
Definition: InputSource.cc:313
FileOpenSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:637
static void addToRegistry(TypeLabelList::const_iterator const &iBegin, TypeLabelList::const_iterator const &iEnd, ModuleDescription const &iDesc, ProductRegistry &iReg, bool iIsListener=false)
void doEndRun(RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
Called by framework at end of run.
Definition: InputSource.cc:495
long double T
LumiSourceSentry(InputSource const &source)
Definition: InputSource.cc:629
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:262
ItemType nextItemType()
Advances the source to the next item.
Definition: InputSource.cc:189
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:52
virtual void endJob()
Definition: InputSource.cc:564
static void prevalidate(ConfigurationDescriptions &)
Definition: InputSource.cc:124
SharedResourcesAcquirer * resourceSharedWithDelayedReader() const
Returns nullptr if no resource shared between the Source and a DelayedReader.
Definition: InputSource.cc:258
virtual void skip(int offset)
Definition: InputSource.cc:451
std::string createGlobalIdentifier()
Definition: Run.h:41
virtual void endLuminosityBlock(LuminosityBlock &)
Definition: InputSource.cc:552