CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
InputSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
4 
26 
27 #include <cassert>
28 #include <fstream>
29 #include <iomanip>
30 
31 namespace edm {
32 
33  namespace {
34  std::string const& suffix(int count) {
35  static std::string const st("st");
36  static std::string const nd("nd");
37  static std::string const rd("rd");
38  static std::string const th("th");
39  // *0, *4 - *9 use "th".
40  int lastDigit = count % 10;
41  if(lastDigit >= 4 || lastDigit == 0) return th;
42  // *11, *12, or *13 use "th".
43  if(count % 100 - lastDigit == 10) return th;
44  return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
45  }
46  template <typename T>
47  std::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
48  return std::shared_ptr<T>(ptr, do_nothing_deleter());
49  }
50  }
51 
54  actReg_(desc.actReg_),
55  maxEvents_(desc.maxEvents_),
56  remainingEvents_(maxEvents_),
57  maxLumis_(desc.maxLumis_),
58  remainingLumis_(maxLumis_),
59  readCount_(0),
60  maxSecondsUntilRampdown_(desc.maxSecondsUntilRampdown_),
61  processingMode_(RunsLumisAndEvents),
62  moduleDescription_(desc.moduleDescription_),
63  productRegistry_(createSharedPtrToStatic<ProductRegistry>(desc.productRegistry_)),
64  processHistoryRegistry_(new ProcessHistoryRegistry),
65  branchIDListHelper_(desc.branchIDListHelper_),
66  thinnedAssociationsHelper_(desc.thinnedAssociationsHelper_),
67  primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
68  processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
69  time_(),
70  newRun_(true),
71  newLumi_(true),
72  eventCached_(false),
73  state_(IsInvalid),
74  runAuxiliary_(),
75  lumiAuxiliary_(),
76  statusFileName_(),
77  receiver_(),
78  numberOfEventsBeforeBigSkip_(0) {
79 
80  if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
81  std::ostringstream statusfilename;
82  statusfilename << "source_" << getpid();
83  statusFileName_ = statusfilename.str();
84  }
85  if (maxSecondsUntilRampdown_ > 0) {
87  }
88 
89  // Secondary input sources currently do not have a product registry.
90  if(primary_) {
91  assert(desc.productRegistry_ != 0);
92  }
93  std::string const defaultMode("RunsLumisAndEvents");
94  std::string const runMode("Runs");
95  std::string const runLumiMode("RunsAndLumis");
96 
97  // The default value provided as the second argument to the getUntrackedParameter function call
98  // is not used when the ParameterSet has been validated and the parameters are not optional
99  // in the description. As soon as all primary input sources and all modules with a secondary
100  // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
101  // calls can and should be deleted from the code.
102  std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
103  if(processingMode == runMode) {
105  } else if(processingMode == runLumiMode) {
107  } else if(processingMode != defaultMode) {
109  << "InputSource::InputSource()\n"
110  << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
111  << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
112  }
113  }
114 
116 
117  void
120  desc.setUnknown();
121  descriptions.addDefault(desc);
122  }
123 
124  void
126  }
127 
128 
129  static std::string const kBaseType("Source");
130 
131  std::string const&
133  return kBaseType;
134  }
135 
136  void
138  std::string defaultString("RunsLumisAndEvents");
139  desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
140  "'RunsLumisAndEvents': process runs, lumis, and events.\n"
141  "'RunsAndLumis': process runs and lumis (not events).\n"
142  "'Runs': process runs (not lumis or events).");
143  desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
144  }
145 
146  bool
148  if(eventLimitReached()) {
149  return false;
150  }
152  receiver_->receive();
153  unsigned long toSkip = receiver_->numberToSkip();
154  if(0 != toSkip) {
155  skipEvents(toSkip);
157  }
158  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
160  return false;
161  }
162  }
163  return true;
164  }
165 
166  // This next function is to guarantee that "runs only" mode does not return events or lumis,
167  // and that "runs and lumis only" mode does not return events.
168  // For input sources that are not random access (e.g. you need to read through the events
169  // to get to the lumis and runs), this is all that is involved to implement these modes.
170  // For input sources where events or lumis can be skipped, getNextItemType() should
171  // implement the skipping internally, so that the performance gain is realized.
172  // If this is done for a source, the 'if' blocks in this function will never be entered
173  // for that source.
176  ItemType itemType = callWithTryCatchAndPrint<ItemType>( [this](){ return getNextItemType(); }, "Calling InputSource::getNextItemType" );
177 
178  if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
179  skipEvents(1);
180  return nextItemType_();
181  }
182  if(itemType == IsLumi && processingMode() == Runs) {
183  // QQQ skipLuminosityBlock_();
184  return nextItemType_();
185  }
186  return itemType;
187  }
188 
192  if(eventLimitReached()) {
193  // If the maximum event limit has been reached, stop.
194  state_ = IsStop;
195  } else if(lumiLimitReached()) {
196  // If the maximum lumi limit has been reached, stop
197  // when reaching a new file, run, or lumi.
198  if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
199  state_ = IsStop;
200  } else {
201  ItemType newState = nextItemType_();
202  if(newState == IsEvent) {
203  assert (processingMode() == RunsLumisAndEvents);
204  state_ = IsEvent;
205  } else {
206  state_ = IsStop;
207  }
208  }
209  } else {
210  ItemType newState = nextItemType_();
211  if(newState == IsStop) {
212  state_ = IsStop;
213  } else if(newState == IsSynchronize) {
215  } else if(newState == IsFile || oldState == IsInvalid) {
216  state_ = IsFile;
217  } else if(newState == IsRun || oldState == IsFile) {
219  state_ = IsRun;
220  } else if(newState == IsLumi || oldState == IsRun) {
221  assert (processingMode() != Runs);
223  state_ = IsLumi;
224  } else {
225  assert (processingMode() == RunsLumisAndEvents);
226  state_ = IsEvent;
227  }
228  }
229  if(state_ == IsStop) {
230  lumiAuxiliary_.reset();
231  runAuxiliary_.reset();
232  }
233  return state_;
234  }
235 
236  std::shared_ptr<LuminosityBlockAuxiliary>
238  return callWithTryCatchAndPrint<std::shared_ptr<LuminosityBlockAuxiliary> >( [this](){ return readLuminosityBlockAuxiliary_(); },
239  "Calling InputSource::readLuminosityBlockAuxiliary_" );
240  }
241 
242  std::shared_ptr<RunAuxiliary>
244  return callWithTryCatchAndPrint<std::shared_ptr<RunAuxiliary> >( [this](){ return readRunAuxiliary_(); },
245  "Calling InputSource::readRunAuxiliary_" );
246  }
247 
248  void
250  this->beginJob();
251  }
252 
253  void
255  endJob();
256  }
257 
261  }
262 
265  return nullptr;
266  }
267 
268  void
270  if(!typeLabelList().empty()) {
272  }
273  }
274 
275  // Return a dummy file block.
276  std::unique_ptr<FileBlock>
278  assert(state_ == IsFile);
279  assert(!limitReached());
280  return callWithTryCatchAndPrint<std::unique_ptr<FileBlock> >( [this](){ return readFile_(); },
281  "Calling InputSource::readFile_" );
282  }
283 
284  void
285  InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
286  if(fb != nullptr) fb->close();
287  callWithTryCatchAndPrint<void>( [this](){ closeFile_(); },
288  "Calling InputSource::closeFile_",
289  cleaningUpAfterException );
290  return;
291  }
292 
293  // Return a dummy file block.
294  // This function must be overridden for any input source that reads a file
295  // containing Products.
296  std::unique_ptr<FileBlock>
298  return std::unique_ptr<FileBlock>(new FileBlock);
299  }
300 
301  void
302  InputSource::readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender) {
303  RunSourceSentry sentry(*this);
304  callWithTryCatchAndPrint<void>( [this,&runPrincipal](){ readRun_(runPrincipal); }, "Calling InputSource::readRun_" );
305  }
306 
307  void
309  RunSourceSentry sentry(*this);
310  callWithTryCatchAndPrint<void>( [this,&rp](){ readRun_(rp); }, "Calling InputSource::readRun_" );
311  }
312 
313  void
315  LumiSourceSentry sentry(*this);
316  callWithTryCatchAndPrint<void>( [this,&lumiPrincipal](){ readLuminosityBlock_(lumiPrincipal); }, "Calling InputSource::readLuminosityBlock_" );
317  if(remainingLumis_ > 0) {
318  --remainingLumis_;
319  }
320  }
321 
322  void
324  LumiSourceSentry sentry(*this);
325  callWithTryCatchAndPrint<void>( [this,&lbp](){ readLuminosityBlock_(lbp); }, "Calling InputSource::readLuminosityBlock_" );
326  if(remainingLumis_ > 0) {
327  --remainingLumis_;
328  }
329  }
330 
331  void
333  // Note: For the moment, we do not support saving and restoring the state of the
334  // random number generator if random numbers are generated during processing of runs
335  // (e.g. beginRun(), endRun())
337  }
338 
339  void
342  }
343 
344  void
346  assert(state_ == IsEvent);
347  assert(!eventLimitReached());
348  {
349  // block scope, in order to issue the PostSourceEvent signal before calling postRead and issueReports
350  EventSourceSentry sentry(*this, streamContext);
351 
352  callWithTryCatchAndPrint<void>( [this,&ep](){ readEvent_(ep); }, "Calling InputSource::readEvent_" );
353  if(receiver_) {
355  }
356  }
357 
359  ++readCount_;
360  setTimestamp(ep.time());
361  issueReports(ep.id());
362  }
363 
364  bool
365  InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
366  bool result = false;
367 
368  if (not limitReached()) {
369  // the Pre/PostSourceEvent signals should be generated only if the event is actually found.
370  // this should be taken care of by an EventSourceSentry in the implementaion of readIt()
371 
372  //result = callWithTryCatchAndPrint<bool>( [this,&eventID,&ep](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
373  result = readIt(eventID, ep, streamContext);
374 
375  if (result) {
377  ++readCount_;
378  issueReports(ep.id());
379  }
380  }
381  return result;
382  }
383 
384  void
386  callWithTryCatchAndPrint<void>( [this,&offset](){ skip(offset); }, "Calling InputSource::skip" );
387  }
388 
389  bool
390  InputSource::goToEvent(EventID const& eventID) {
391  return callWithTryCatchAndPrint<bool>( [this,&eventID](){ return goToEvent_(eventID); }, "Calling InputSource::goToEvent_" );
392  }
393 
394  void
396  state_ = IsInvalid;
398  setNewRun();
399  setNewLumi();
401  callWithTryCatchAndPrint<void>( [this](){ rewind_(); }, "Calling InputSource::rewind_" );
402  if(receiver_) {
403  unsigned int numberToSkip = receiver_->numberToSkip();
404  skip(numberToSkip);
405  decreaseRemainingEventsBy(numberToSkip);
406  }
407  }
408 
409  void
411  if(isInfoEnabled()) {
412  LogVerbatim("FwkReport") << "Begin processing the " << readCount_
413  << suffix(readCount_) << " record. Run " << eventID.run()
414  << ", Event " << eventID.event()
415  << ", LumiSection " << eventID.luminosityBlock()
416  << " at " << std::setprecision(3) << TimeOfDay();
417  }
418  if(!statusFileName_.empty()) {
419  std::ofstream statusFile(statusFileName_.c_str());
420  statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
421  statusFile.close();
422  }
423 
424  // At some point we may want to initiate checkpointing here
425  }
426 
427  bool
430  << "InputSource::readIt()\n"
431  << "Random access is not implemented for this type of Input Source\n"
432  << "Contact a Framework Developer\n";
433  }
434 
435  void
438  << "InputSource::setRun()\n"
439  << "Run number cannot be modified for this type of Input Source\n"
440  << "Contact a Framework Developer\n";
441  }
442 
443  void
446  << "InputSource::setLumi()\n"
447  << "Luminosity Block ID cannot be modified for this type of Input Source\n"
448  << "Contact a Framework Developer\n";
449  }
450 
451  void
454  << "InputSource::skip()\n"
455  << "Forking and random access are not implemented for this type of Input Source\n"
456  << "Contact a Framework Developer\n";
457  }
458 
459  bool
462  << "InputSource::goToEvent_()\n"
463  << "Random access is not implemented for this type of Input Source\n"
464  << "Contact a Framework Developer\n";
465  return true;
466  }
467 
468  void
471  << "InputSource::rewind()\n"
472  << "Forking and random access are not implemented for this type of Input Source\n"
473  << "Contact a Framework Developer\n";
474  }
475 
476  void
478  if(-1 == remainingEvents_) {
479  return;
480  }
481  if(iSkipped < remainingEvents_) {
482  remainingEvents_ -= iSkipped;
483  } else {
484  remainingEvents_ = 0;
485  }
486  }
487 
488  void
490  Run run(rp, moduleDescription(), nullptr);
491  callWithTryCatchAndPrint<void>( [this,&run](){ beginRun(run); }, "Calling InputSource::beginRun" );
492  run.commit_();
493  }
494 
495  void
496  InputSource::doEndRun(RunPrincipal& rp, bool cleaningUpAfterException, ProcessContext const* processContext) {
497  rp.setEndTime(time_);
498  rp.setComplete();
499  Run run(rp, moduleDescription(), nullptr);
500  callWithTryCatchAndPrint<void>( [this,&run](){ endRun(run); }, "Calling InputSource::endRun", cleaningUpAfterException );
501  run.commit_();
502  }
503 
504  void
506  LuminosityBlock lb(lbp, moduleDescription(), nullptr);
507  callWithTryCatchAndPrint<void>( [this,&lb](){ beginLuminosityBlock(lb); }, "Calling InputSource::beginLuminosityBlock" );
508  lb.commit_();
509  }
510 
511  void
512  InputSource::doEndLumi(LuminosityBlockPrincipal& lbp, bool cleaningUpAfterException, ProcessContext const* processContext) {
513  lbp.setEndTime(time_);
514  lbp.setComplete();
515  LuminosityBlock lb(lbp, moduleDescription(), nullptr);
516  callWithTryCatchAndPrint<void>( [this,&lb](){ endLuminosityBlock(lb); }, "Calling InputSource::endLuminosityBlock", cleaningUpAfterException );
517  lb.commit_();
518  }
519 
520  void
522  callWithTryCatchAndPrint<void>( [this](){ preForkReleaseResources(); }, "Calling InputSource::preForkReleaseResources" );
523  }
524 
525  void
526  InputSource::doPostForkReacquireResources(std::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
527  callWithTryCatchAndPrint<void>( [this, &iReceiver](){ postForkReacquireResources(iReceiver); },
528  "Calling InputSource::postForkReacquireResources" );
529  }
530 
531  bool
533  return callWithTryCatchAndPrint<bool>( [this](){ return randomAccess_(); },
534  "Calling InputSource::randomAccess_" );
535  }
536 
539  return callWithTryCatchAndPrint<ProcessingController::ForwardState>( [this](){ return forwardState_(); },
540  "Calling InputSource::forwardState_" );
541  }
542 
545  return callWithTryCatchAndPrint<ProcessingController::ReverseState>( [this](){ return reverseState_(); },
546  "Calling InputSource::reverseState__" );
547  }
548 
549  void
551 
552  void
554 
555  void
557 
558  void
560 
561  void
563 
564  void
566 
567  void
569 
570  void
571  InputSource::postForkReacquireResources(std::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
572  receiver_ = iReceiver;
573  receiver_->receive();
574  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
575  rewind();
576  }
577 
578  bool
580  return false;
581  }
582 
586  }
587 
591  }
592 
593  ProcessHistoryID const&
595  assert(runAuxiliary());
596  return processHistoryRegistry_->reducedProcessHistoryID(runAuxiliary()->processHistoryID());
597  }
598 
601  assert(runAuxiliary());
602  return runAuxiliary()->run();
603  }
604 
607  assert(luminosityBlockAuxiliary());
608  return luminosityBlockAuxiliary()->luminosityBlock();
609  }
610 
611  InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
612  pre();
613  }
614 
616  post_();
617  }
618 
620  source_(source),
621  sc_(sc)
622  {
623  source.actReg()->preSourceSignal_(sc_.streamID());
624  }
625 
627  source_.actReg()->postSourceSignal_(sc_.streamID());
628  }
629 
631  sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
632  }
633 
635  sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
636  }
637 
639  std::string const& lfn,
640  bool usedFallback) :
641  post_(source.actReg()->postOpenFileSignal_),
642  lfn_(lfn),
643  usedFallback_(usedFallback) {
644  source.actReg()->preOpenFileSignal_(lfn, usedFallback);
645  }
646 
648  post_(lfn_, usedFallback_);
649  }
650 
652  std::string const& lfn,
653  bool usedFallback) :
654  post_(source.actReg()->postCloseFileSignal_),
655  lfn_(lfn),
656  usedFallback_(usedFallback) {
657  source.actReg()->preCloseFileSignal_(lfn, usedFallback);
658  }
659 
661  post_(lfn_, usedFallback_);
662  }
663 }
RunNumber_t run() const
Definition: EventID.h:39
virtual ~InputSource()
Destructor.
Definition: InputSource.cc:115
EventNumber_t event() const
Definition: EventID.h:41
ProcessHistoryRegistry const & processHistoryRegistry() const
Const accessor for process history registry.
Definition: InputSource.h:172
T getUntrackedParameter(std::string const &, T const &) const
virtual void setRun(RunNumber_t r)
Definition: InputSource.cc:436
virtual void endRun(Run &)
Definition: InputSource.cc:559
virtual void preForkReleaseResources()
Definition: InputSource.cc:568
void issueReports(EventID const &eventID)
issue an event report
Definition: InputSource.cc:410
std::shared_ptr< RunAuxiliary > runAuxiliary_
Definition: InputSource.h:454
void doBeginJob()
Called by framework at beginning of job.
Definition: InputSource.cc:249
virtual std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()=0
std::shared_ptr< LuminosityBlockAuxiliary > lumiAuxiliary_
Definition: InputSource.h:455
virtual void closeFile_()
Definition: InputSource.h:412
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:477
Timestamp time_
Definition: InputSource.h:449
static std::string const source("source")
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void doBeginRun(RunPrincipal &rp, ProcessContext const *)
Called by framework at beginning of run.
Definition: InputSource.cc:489
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:254
RunSourceSentry(InputSource const &source)
Definition: InputSource.cc:634
void readAndMergeRun(RunPrincipal &rp)
Read next run (same as a prior run)
Definition: InputSource.cc:308
std::string statusFileName_
Definition: InputSource.h:456
void doPostForkReacquireResources(std::shared_ptr< multicore::MessageReceiverForSource >)
Definition: InputSource.cc:526
void registerProducts()
Register any produced products.
Definition: InputSource.cc:269
virtual void setLumi(LuminosityBlockNumber_t lb)
Definition: InputSource.cc:444
EventID const & id() const
virtual void rewind_()
Definition: InputSource.cc:469
std::shared_ptr< RunAuxiliary > readRunAuxiliary()
Read next run Auxiliary.
Definition: InputSource.cc:243
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:257
virtual void postForkReacquireResources(std::shared_ptr< multicore::MessageReceiverForSource >)
Definition: InputSource.cc:571
void setTimestamp(Timestamp const &theTime)
To set the current time, as seen by the input source.
Definition: InputSource.h:349
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
void fillRunPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
Definition: RunPrincipal.cc:21
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
unsigned int LuminosityBlockNumber_t
virtual void beginLuminosityBlock(LuminosityBlock &)
Definition: InputSource.cc:550
virtual void readEvent_(EventPrincipal &eventPrincipal)=0
void closeFile(FileBlock *, bool cleaningUpAfterException)
close current file
Definition: InputSource.cc:285
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: InputSource.cc:594
bool limitReached() const
Definition: InputSource.h:402
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
Timestamp const & time() const
virtual bool goToEvent_(EventID const &eventID)
Definition: InputSource.cc:460
void close()
Definition: FileBlock.h:114
std::chrono::time_point< std::chrono::steady_clock > processingStart_
Definition: InputSource.h:440
FileCloseSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:651
ProcessingController::ForwardState forwardState() const
Definition: InputSource.cc:538
TypeLabelList & typeLabelList()
used by the fwk to register the list of products of this module
unsigned int numberOfEventsBeforeBigSkip_
Definition: InputSource.h:460
EventSourceSentry(InputSource const &source, StreamContext &sc)
Definition: InputSource.cc:619
virtual ProcessingController::ForwardState forwardState_() const
Definition: InputSource.cc:584
void fillLuminosityBlockPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
int remainingEvents() const
Definition: InputSource.h:198
ProcessingController::ReverseState reverseState() const
Definition: InputSource.cc:544
void addDefault(ParameterSetDescription const &psetDescription)
bool lumiLimitReached() const
Definition: InputSource.h:394
void readRun(RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
Read next run (new run)
Definition: InputSource.cc:302
virtual void beginJob()
Definition: InputSource.cc:562
ProcessingMode processingMode_
Definition: InputSource.h:441
int maxSecondsUntilRampdown_
Definition: InputSource.h:439
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary()
Read next luminosity block Auxilary.
Definition: InputSource.cc:237
void setEndTime(Timestamp const &time)
void doBeginLumi(LuminosityBlockPrincipal &lbp, ProcessContext const *)
Called by framework at beginning of lumi block.
Definition: InputSource.cc:505
virtual ItemType getNextItemType()=0
tuple result
Definition: query.py:137
static const std::string & baseType()
Definition: InputSource.cc:132
void commit_()
Definition: Run.cc:83
void doEndLumi(LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
Called by framework at end of lumi block.
Definition: InputSource.cc:512
std::shared_ptr< RunPrincipal > const runPrincipal() const
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext)
Definition: InputSource.cc:428
bool goToEvent(EventID const &eventID)
Definition: InputSource.cc:390
void doPreForkReleaseResources()
Called by the framework before forking the process.
Definition: InputSource.cc:521
SourceSentry(Sig &pre, Sig &post)
Definition: InputSource.cc:611
virtual SharedResourcesAcquirer * resourceSharedWithDelayedReader_() const
Definition: InputSource.cc:264
std::shared_ptr< edm::multicore::MessageReceiverForSource > receiver_
Definition: InputSource.h:459
#define end
Definition: vmac.h:37
unsigned int offset(bool)
void resetEventCached()
Definition: InputSource.h:386
StreamID const & streamID() const
Definition: StreamContext.h:57
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
bool eventLimitReached() const
Definition: InputSource.h:393
void readAndMergeLumi(LuminosityBlockPrincipal &lbp)
Read next luminosity block (same as a prior lumi)
Definition: InputSource.cc:323
void skipEvents(int offset)
Definition: InputSource.cc:385
virtual void beginRun(Run &)
Definition: InputSource.cc:556
virtual void readRun_(RunPrincipal &runPrincipal)
Definition: InputSource.cc:332
virtual std::unique_ptr< FileBlock > readFile_()
Definition: InputSource.cc:297
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: InputSource.cc:118
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void readEvent(EventPrincipal &ep, StreamContext &)
Read next event.
Definition: InputSource.cc:345
std::unique_ptr< FileBlock > readFile()
Read next file.
Definition: InputSource.cc:277
virtual bool randomAccess_() const
Definition: InputSource.cc:579
static const std::string kBaseType("EDAnalyzer")
bool isInfoEnabled()
virtual std::shared_ptr< RunAuxiliary > readRunAuxiliary_()=0
ItemType nextItemType_()
Definition: InputSource.cc:175
#define begin
Definition: vmac.h:30
std::shared_ptr< ActivityRegistry > actReg() const
Accessor for Activity Registry.
Definition: InputSource.h:260
virtual ProcessingController::ReverseState reverseState_() const
Definition: InputSource.cc:589
std::unique_ptr< ProcessHistoryRegistry > processHistoryRegistry_
Definition: InputSource.h:444
bool randomAccess() const
Definition: InputSource.cc:532
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:137
bool const primary_
Definition: InputSource.h:447
void rewind()
Begin again at the first event.
Definition: InputSource.cc:395
int remainingLuminosityBlocks() const
Definition: InputSource.h:206
ModuleDescription const & moduleDescription() const
Accessor for &#39;module&#39; description.
Definition: InputSource.h:209
unsigned int RunNumber_t
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal)
Definition: InputSource.cc:340
volatile std::atomic< bool > shutdown_flag false
void readLuminosityBlock(LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
Read next luminosity block (new lumi)
Definition: InputSource.cc:314
FileOpenSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:638
static void addToRegistry(TypeLabelList::const_iterator const &iBegin, TypeLabelList::const_iterator const &iEnd, ModuleDescription const &iDesc, ProductRegistry &iReg, bool iIsListener=false)
void doEndRun(RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
Called by framework at end of run.
Definition: InputSource.cc:496
long double T
LumiSourceSentry(InputSource const &source)
Definition: InputSource.cc:630
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
ItemType nextItemType()
Advances the source to the next item.
Definition: InputSource.cc:190
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:52
virtual void endJob()
Definition: InputSource.cc:565
static void prevalidate(ConfigurationDescriptions &)
Definition: InputSource.cc:125
SharedResourcesAcquirer * resourceSharedWithDelayedReader() const
Returns nullptr if no resource shared between the Source and a DelayedReader.
Definition: InputSource.cc:259
virtual void skip(int offset)
Definition: InputSource.cc:452
std::string createGlobalIdentifier()
Definition: Run.h:41
virtual void endLuminosityBlock(LuminosityBlock &)
Definition: InputSource.cc:553