CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
InputSource.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
4 
26 
27 #include <cassert>
28 #include <fstream>
29 #include <iomanip>
30 
31 namespace edm {
32 
33  namespace {
34  std::string const& suffix(int count) {
35  static std::string const st("st");
36  static std::string const nd("nd");
37  static std::string const rd("rd");
38  static std::string const th("th");
39  // *0, *4 - *9 use "th".
40  int lastDigit = count % 10;
41  if(lastDigit >= 4 || lastDigit == 0) return th;
42  // *11, *12, or *13 use "th".
43  if(count % 100 - lastDigit == 10) return th;
44  return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
45  }
46  template <typename T>
47  boost::shared_ptr<T> createSharedPtrToStatic(T* ptr) {
48  return boost::shared_ptr<T>(ptr, do_nothing_deleter());
49  }
50  }
51 
54  actReg_(desc.actReg_),
55  maxEvents_(desc.maxEvents_),
56  remainingEvents_(maxEvents_),
57  maxLumis_(desc.maxLumis_),
58  remainingLumis_(maxLumis_),
59  readCount_(0),
60  processingMode_(RunsLumisAndEvents),
61  moduleDescription_(desc.moduleDescription_),
62  productRegistry_(createSharedPtrToStatic<ProductRegistry>(desc.productRegistry_)),
63  processHistoryRegistry_(new ProcessHistoryRegistry),
64  branchIDListHelper_(desc.branchIDListHelper_),
65  primary_(pset.getParameter<std::string>("@module_label") == std::string("@main_input")),
66  processGUID_(primary_ ? createGlobalIdentifier() : std::string()),
67  time_(),
68  newRun_(true),
69  newLumi_(true),
70  eventCached_(false),
71  state_(IsInvalid),
72  runAuxiliary_(),
73  lumiAuxiliary_(),
74  statusFileName_(),
75  receiver_(),
76  numberOfEventsBeforeBigSkip_(0) {
77 
78  if(pset.getUntrackedParameter<bool>("writeStatusFile", false)) {
79  std::ostringstream statusfilename;
80  statusfilename << "source_" << getpid();
81  statusFileName_ = statusfilename.str();
82  }
83 
84  // Secondary input sources currently do not have a product registry.
85  if(primary_) {
86  assert(desc.productRegistry_ != 0);
87  }
88  std::string const defaultMode("RunsLumisAndEvents");
89  std::string const runMode("Runs");
90  std::string const runLumiMode("RunsAndLumis");
91 
92  // The default value provided as the second argument to the getUntrackedParameter function call
93  // is not used when the ParameterSet has been validated and the parameters are not optional
94  // in the description. As soon as all primary input sources and all modules with a secondary
95  // input sources have defined descriptions, the defaults in the getUntrackedParameterSet function
96  // calls can and should be deleted from the code.
97  std::string processingMode = pset.getUntrackedParameter<std::string>("processingMode", defaultMode);
98  if(processingMode == runMode) {
100  } else if(processingMode == runLumiMode) {
102  } else if(processingMode != defaultMode) {
104  << "InputSource::InputSource()\n"
105  << "The 'processingMode' parameter for sources has an illegal value '" << processingMode << "'\n"
106  << "Legal values are '" << defaultMode << "', '" << runLumiMode << "', or '" << runMode << "'.\n";
107  }
108  }
109 
111 
112  void
115  desc.setUnknown();
116  descriptions.addDefault(desc);
117  }
118 
119  void
121  }
122 
123 
124  static std::string const kBaseType("Source");
125 
126  std::string const&
128  return kBaseType;
129  }
130 
131  void
133  std::string defaultString("RunsLumisAndEvents");
134  desc.addUntracked<std::string>("processingMode", defaultString)->setComment(
135  "'RunsLumisAndEvents': process runs, lumis, and events.\n"
136  "'RunsAndLumis': process runs and lumis (not events).\n"
137  "'Runs': process runs (not lumis or events).");
138  desc.addUntracked<bool>("writeStatusFile", false)->setComment("Write a status file. Intended for use by workflow management.");
139  }
140 
141  bool
143  if(eventLimitReached()) {
144  return false;
145  }
147  receiver_->receive();
148  unsigned long toSkip = receiver_->numberToSkip();
149  if(0 != toSkip) {
150  skipEvents(toSkip);
152  }
153  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
155  return false;
156  }
157  }
158  return true;
159  }
160 
161  // This next function is to guarantee that "runs only" mode does not return events or lumis,
162  // and that "runs and lumis only" mode does not return events.
163  // For input sources that are not random access (e.g. you need to read through the events
164  // to get to the lumis and runs), this is all that is involved to implement these modes.
165  // For input sources where events or lumis can be skipped, getNextItemType() should
166  // implement the skipping internally, so that the performance gain is realized.
167  // If this is done for a source, the 'if' blocks in this function will never be entered
168  // for that source.
171  ItemType itemType = callWithTryCatchAndPrint<ItemType>( [this](){ return getNextItemType(); }, "Calling InputSource::getNextItemType" );
172 
173  if(itemType == IsEvent && processingMode() != RunsLumisAndEvents) {
174  skipEvents(1);
175  return nextItemType_();
176  }
177  if(itemType == IsLumi && processingMode() == Runs) {
178  // QQQ skipLuminosityBlock_();
179  return nextItemType_();
180  }
181  return itemType;
182  }
183 
187  if(eventLimitReached()) {
188  // If the maximum event limit has been reached, stop.
189  state_ = IsStop;
190  } else if(lumiLimitReached()) {
191  // If the maximum lumi limit has been reached, stop
192  // when reaching a new file, run, or lumi.
193  if(oldState == IsInvalid || oldState == IsFile || oldState == IsRun || processingMode() != RunsLumisAndEvents) {
194  state_ = IsStop;
195  } else {
196  ItemType newState = nextItemType_();
197  if(newState == IsEvent) {
198  assert (processingMode() == RunsLumisAndEvents);
199  state_ = IsEvent;
200  } else {
201  state_ = IsStop;
202  }
203  }
204  } else {
205  ItemType newState = nextItemType_();
206  if(newState == IsStop) {
207  state_ = IsStop;
208  } else if(newState == IsSynchronize) {
210  } else if(newState == IsFile || oldState == IsInvalid) {
211  state_ = IsFile;
212  } else if(newState == IsRun || oldState == IsFile) {
214  state_ = IsRun;
215  } else if(newState == IsLumi || oldState == IsRun) {
216  assert (processingMode() != Runs);
218  state_ = IsLumi;
219  } else {
220  assert (processingMode() == RunsLumisAndEvents);
221  state_ = IsEvent;
222  }
223  }
224  if(state_ == IsStop) {
225  lumiAuxiliary_.reset();
226  runAuxiliary_.reset();
227  }
228  return state_;
229  }
230 
231  boost::shared_ptr<LuminosityBlockAuxiliary>
233  return callWithTryCatchAndPrint<boost::shared_ptr<LuminosityBlockAuxiliary> >( [this](){ return readLuminosityBlockAuxiliary_(); },
234  "Calling InputSource::readLuminosityBlockAuxiliary_" );
235  }
236 
237  boost::shared_ptr<RunAuxiliary>
239  return callWithTryCatchAndPrint<boost::shared_ptr<RunAuxiliary> >( [this](){ return readRunAuxiliary_(); },
240  "Calling InputSource::readRunAuxiliary_" );
241  }
242 
243  void
245  this->beginJob();
246  }
247 
248  void
250  endJob();
251  }
252 
256  }
257 
260  return nullptr;
261  }
262 
263  void
265  if(!typeLabelList().empty()) {
267  }
268  }
269 
270  // Return a dummy file block.
271  std::unique_ptr<FileBlock>
273  assert(state_ == IsFile);
274  assert(!limitReached());
275  return callWithTryCatchAndPrint<std::unique_ptr<FileBlock> >( [this](){ return readFile_(); },
276  "Calling InputSource::readFile_" );
277  }
278 
279  void
280  InputSource::closeFile(FileBlock* fb, bool cleaningUpAfterException) {
281  if(fb != nullptr) fb->close();
282  callWithTryCatchAndPrint<void>( [this](){ closeFile_(); },
283  "Calling InputSource::closeFile_",
284  cleaningUpAfterException );
285  return;
286  }
287 
288  // Return a dummy file block.
289  // This function must be overridden for any input source that reads a file
290  // containing Products.
291  std::unique_ptr<FileBlock>
293  return std::unique_ptr<FileBlock>(new FileBlock);
294  }
295 
296  void
297  InputSource::readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender) {
298  RunSourceSentry sentry(*this);
299  callWithTryCatchAndPrint<void>( [this,&runPrincipal](){ readRun_(runPrincipal); }, "Calling InputSource::readRun_" );
300  }
301 
302  void
304  RunSourceSentry sentry(*this);
305  callWithTryCatchAndPrint<void>( [this,&rp](){ readRun_(rp); }, "Calling InputSource::readRun_" );
306  }
307 
308  void
310  LumiSourceSentry sentry(*this);
311  callWithTryCatchAndPrint<void>( [this,&lumiPrincipal](){ readLuminosityBlock_(lumiPrincipal); }, "Calling InputSource::readLuminosityBlock_" );
312  if(remainingLumis_ > 0) {
313  --remainingLumis_;
314  }
315  }
316 
317  void
319  LumiSourceSentry sentry(*this);
320  callWithTryCatchAndPrint<void>( [this,&lbp](){ readLuminosityBlock_(lbp); }, "Calling InputSource::readLuminosityBlock_" );
321  if(remainingLumis_ > 0) {
322  --remainingLumis_;
323  }
324  }
325 
326  void
328  // Note: For the moment, we do not support saving and restoring the state of the
329  // random number generator if random numbers are generated during processing of runs
330  // (e.g. beginRun(), endRun())
332  }
333 
334  void
337  }
338 
339  void
341  assert(state_ == IsEvent);
342  assert(!eventLimitReached());
343  {
344  // block scope, in order to issue the PostSourceEvent signal before calling postRead and issueReports
345  EventSourceSentry sentry(*this, streamContext);
346 
347  callWithTryCatchAndPrint<void>( [this,&ep](){ readEvent_(ep); }, "Calling InputSource::readEvent_" );
348  if(receiver_) {
350  }
351  }
352 
354  ++readCount_;
355  setTimestamp(ep.time());
356  issueReports(ep.id());
357  }
358 
359  bool
360  InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
361  bool result = false;
362 
363  if (not limitReached()) {
364  // the Pre/PostSourceEvent signals should be generated only if the event is actually found.
365  // this should be taken care of by an EventSourceSentry in the implementaion of readIt()
366 
367  //result = callWithTryCatchAndPrint<bool>( [this,&eventID,&ep](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
368  result = readIt(eventID, ep, streamContext);
369 
370  if (result) {
372  ++readCount_;
373  issueReports(ep.id());
374  }
375  }
376  return result;
377  }
378 
379  void
381  callWithTryCatchAndPrint<void>( [this,&offset](){ skip(offset); }, "Calling InputSource::skip" );
382  }
383 
384  bool
385  InputSource::goToEvent(EventID const& eventID) {
386  return callWithTryCatchAndPrint<bool>( [this,&eventID](){ return goToEvent_(eventID); }, "Calling InputSource::goToEvent_" );
387  }
388 
389  void
391  state_ = IsInvalid;
393  setNewRun();
394  setNewLumi();
396  callWithTryCatchAndPrint<void>( [this](){ rewind_(); }, "Calling InputSource::rewind_" );
397  if(receiver_) {
398  unsigned int numberToSkip = receiver_->numberToSkip();
399  skip(numberToSkip);
400  decreaseRemainingEventsBy(numberToSkip);
401  }
402  }
403 
404  void
406  if(isInfoEnabled()) {
407  LogVerbatim("FwkReport") << "Begin processing the " << readCount_
408  << suffix(readCount_) << " record. Run " << eventID.run()
409  << ", Event " << eventID.event()
410  << ", LumiSection " << eventID.luminosityBlock()
411  << " at " << std::setprecision(3) << TimeOfDay();
412  }
413  if(!statusFileName_.empty()) {
414  std::ofstream statusFile(statusFileName_.c_str());
415  statusFile << eventID << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
416  statusFile.close();
417  }
418 
419  // At some point we may want to initiate checkpointing here
420  }
421 
422  bool
425  << "InputSource::readIt()\n"
426  << "Random access is not implemented for this type of Input Source\n"
427  << "Contact a Framework Developer\n";
428  }
429 
430  void
433  << "InputSource::setRun()\n"
434  << "Run number cannot be modified for this type of Input Source\n"
435  << "Contact a Framework Developer\n";
436  }
437 
438  void
441  << "InputSource::setLumi()\n"
442  << "Luminosity Block ID cannot be modified for this type of Input Source\n"
443  << "Contact a Framework Developer\n";
444  }
445 
446  void
449  << "InputSource::skip()\n"
450  << "Forking and random access are not implemented for this type of Input Source\n"
451  << "Contact a Framework Developer\n";
452  }
453 
454  bool
457  << "InputSource::goToEvent_()\n"
458  << "Random access is not implemented for this type of Input Source\n"
459  << "Contact a Framework Developer\n";
460  return true;
461  }
462 
463  void
466  << "InputSource::rewind()\n"
467  << "Forking and random access are not implemented for this type of Input Source\n"
468  << "Contact a Framework Developer\n";
469  }
470 
471  void
473  if(-1 == remainingEvents_) {
474  return;
475  }
476  if(iSkipped < remainingEvents_) {
477  remainingEvents_ -= iSkipped;
478  } else {
479  remainingEvents_ = 0;
480  }
481  }
482 
483  void
485  Run run(rp, moduleDescription(), nullptr);
486  callWithTryCatchAndPrint<void>( [this,&run](){ beginRun(run); }, "Calling InputSource::beginRun" );
487  run.commit_();
488  }
489 
490  void
491  InputSource::doEndRun(RunPrincipal& rp, bool cleaningUpAfterException, ProcessContext const* processContext) {
492  rp.setEndTime(time_);
493  rp.setComplete();
494  Run run(rp, moduleDescription(), nullptr);
495  callWithTryCatchAndPrint<void>( [this,&run](){ endRun(run); }, "Calling InputSource::endRun", cleaningUpAfterException );
496  run.commit_();
497  }
498 
499  void
501  LuminosityBlock lb(lbp, moduleDescription(), nullptr);
502  callWithTryCatchAndPrint<void>( [this,&lb](){ beginLuminosityBlock(lb); }, "Calling InputSource::beginLuminosityBlock" );
503  lb.commit_();
504  }
505 
506  void
507  InputSource::doEndLumi(LuminosityBlockPrincipal& lbp, bool cleaningUpAfterException, ProcessContext const* processContext) {
508  lbp.setEndTime(time_);
509  lbp.setComplete();
510  LuminosityBlock lb(lbp, moduleDescription(), nullptr);
511  callWithTryCatchAndPrint<void>( [this,&lb](){ endLuminosityBlock(lb); }, "Calling InputSource::endLuminosityBlock", cleaningUpAfterException );
512  lb.commit_();
513  }
514 
515  void
517  callWithTryCatchAndPrint<void>( [this](){ preForkReleaseResources(); }, "Calling InputSource::preForkReleaseResources" );
518  }
519 
520  void
521  InputSource::doPostForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
522  callWithTryCatchAndPrint<void>( [this, &iReceiver](){ postForkReacquireResources(iReceiver); },
523  "Calling InputSource::postForkReacquireResources" );
524  }
525 
526  bool
528  return callWithTryCatchAndPrint<bool>( [this](){ return randomAccess_(); },
529  "Calling InputSource::randomAccess_" );
530  }
531 
534  return callWithTryCatchAndPrint<ProcessingController::ForwardState>( [this](){ return forwardState_(); },
535  "Calling InputSource::forwardState_" );
536  }
537 
540  return callWithTryCatchAndPrint<ProcessingController::ReverseState>( [this](){ return reverseState_(); },
541  "Calling InputSource::reverseState__" );
542  }
543 
544  void
546 
547  void
549 
550  void
552 
553  void
555 
556  void
558 
559  void
561 
562  void
564 
565  void
566  InputSource::postForkReacquireResources(boost::shared_ptr<multicore::MessageReceiverForSource> iReceiver) {
567  receiver_ = iReceiver;
568  receiver_->receive();
569  numberOfEventsBeforeBigSkip_ = receiver_->numberOfConsecutiveIndices();
570  rewind();
571  }
572 
573  bool
575  return false;
576  }
577 
581  }
582 
586  }
587 
588  ProcessHistoryID const&
590  assert(runAuxiliary());
591  return processHistoryRegistry_->reducedProcessHistoryID(runAuxiliary()->processHistoryID());
592  }
593 
596  assert(runAuxiliary());
597  return runAuxiliary()->run();
598  }
599 
602  assert(luminosityBlockAuxiliary());
603  return luminosityBlockAuxiliary()->luminosityBlock();
604  }
605 
606  InputSource::SourceSentry::SourceSentry(Sig& pre, Sig& post) : post_(post) {
607  pre();
608  }
609 
611  post_();
612  }
613 
615  source_(source),
616  sc_(sc)
617  {
618  source.actReg()->preSourceSignal_(sc_.streamID());
619  }
620 
622  source_.actReg()->postSourceSignal_(sc_.streamID());
623  }
624 
626  sentry_(source.actReg()->preSourceLumiSignal_, source.actReg()->postSourceLumiSignal_) {
627  }
628 
630  sentry_(source.actReg()->preSourceRunSignal_, source.actReg()->postSourceRunSignal_) {
631  }
632 
634  std::string const& lfn,
635  bool usedFallback) :
636  post_(source.actReg()->postOpenFileSignal_),
637  lfn_(lfn),
638  usedFallback_(usedFallback) {
639  source.actReg()->preOpenFileSignal_(lfn, usedFallback);
640  }
641 
643  post_(lfn_, usedFallback_);
644  }
645 
647  std::string const& lfn,
648  bool usedFallback) :
649  post_(source.actReg()->postCloseFileSignal_),
650  lfn_(lfn),
651  usedFallback_(usedFallback) {
652  source.actReg()->preCloseFileSignal_(lfn, usedFallback);
653  }
654 
656  post_(lfn_, usedFallback_);
657  }
658 }
RunNumber_t run() const
Definition: EventID.h:42
virtual ~InputSource()
Destructor.
Definition: InputSource.cc:110
EventNumber_t event() const
Definition: EventID.h:44
ProcessHistoryRegistry const & processHistoryRegistry() const
Const accessor for process history registry.
Definition: InputSource.h:171
T getUntrackedParameter(std::string const &, T const &) const
virtual void setRun(RunNumber_t r)
Definition: InputSource.cc:431
virtual void endRun(Run &)
Definition: InputSource.cc:554
virtual void preForkReleaseResources()
Definition: InputSource.cc:563
void issueReports(EventID const &eventID)
issue an event report
Definition: InputSource.cc:405
void doBeginJob()
Called by framework at beginning of job.
Definition: InputSource.cc:244
boost::shared_ptr< ActivityRegistry > actReg() const
Accessor for Activity Registry.
Definition: InputSource.h:256
virtual void closeFile_()
Definition: InputSource.h:401
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:472
Timestamp time_
Definition: InputSource.h:435
static std::string const source("source")
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void doBeginRun(RunPrincipal &rp, ProcessContext const *)
Called by framework at beginning of run.
Definition: InputSource.cc:484
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:249
RunSourceSentry(InputSource const &source)
Definition: InputSource.cc:629
void readAndMergeRun(RunPrincipal &rp)
Read next run (same as a prior run)
Definition: InputSource.cc:303
std::string statusFileName_
Definition: InputSource.h:442
void registerProducts()
Register any produced products.
Definition: InputSource.cc:264
virtual void setLumi(LuminosityBlockNumber_t lb)
Definition: InputSource.cc:439
EventID const & id() const
virtual void rewind_()
Definition: InputSource.cc:464
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:253
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:262
void setTimestamp(Timestamp const &theTime)
To set the current time, as seen by the input source.
Definition: InputSource.h:345
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:595
boost::shared_ptr< RunAuxiliary > runAuxiliary_
Definition: InputSource.h:440
void fillRunPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
Definition: RunPrincipal.cc:22
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:43
unsigned int LuminosityBlockNumber_t
Definition: EventID.h:31
virtual void beginLuminosityBlock(LuminosityBlock &)
Definition: InputSource.cc:545
virtual void postForkReacquireResources(boost::shared_ptr< multicore::MessageReceiverForSource >)
Definition: InputSource.cc:566
virtual void readEvent_(EventPrincipal &eventPrincipal)=0
void closeFile(FileBlock *, bool cleaningUpAfterException)
close current file
Definition: InputSource.cc:280
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: InputSource.cc:589
bool limitReached() const
Definition: InputSource.h:391
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
Timestamp const & time() const
virtual bool goToEvent_(EventID const &eventID)
Definition: InputSource.cc:455
void close()
Definition: FileBlock.h:114
FileCloseSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:646
ProcessingController::ForwardState forwardState() const
Definition: InputSource.cc:533
TypeLabelList & typeLabelList()
used by the fwk to register the list of products of this module
unsigned int numberOfEventsBeforeBigSkip_
Definition: InputSource.h:446
EventSourceSentry(InputSource const &source, StreamContext &sc)
Definition: InputSource.cc:614
virtual ProcessingController::ForwardState forwardState_() const
Definition: InputSource.cc:579
void fillLuminosityBlockPrincipal(ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
int remainingEvents() const
Definition: InputSource.h:194
ProcessingController::ReverseState reverseState() const
Definition: InputSource.cc:539
void addDefault(ParameterSetDescription const &psetDescription)
bool lumiLimitReached() const
Definition: InputSource.h:390
void readRun(RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
Read next run (new run)
Definition: InputSource.cc:297
virtual void beginJob()
Definition: InputSource.cc:557
ProcessingMode processingMode_
Definition: InputSource.h:428
void setEndTime(Timestamp const &time)
void doBeginLumi(LuminosityBlockPrincipal &lbp, ProcessContext const *)
Called by framework at beginning of lumi block.
Definition: InputSource.cc:500
boost::shared_ptr< edm::multicore::MessageReceiverForSource > receiver_
Definition: InputSource.h:445
virtual ItemType getNextItemType()=0
tuple result
Definition: query.py:137
static const std::string & baseType()
Definition: InputSource.cc:127
void commit_()
Definition: Run.cc:86
void doEndLumi(LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
Called by framework at end of lumi block.
Definition: InputSource.cc:507
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext)
Definition: InputSource.cc:423
bool goToEvent(EventID const &eventID)
Definition: InputSource.cc:385
void doPreForkReleaseResources()
Called by the framework before forking the process.
Definition: InputSource.cc:516
SourceSentry(Sig &pre, Sig &post)
Definition: InputSource.cc:606
virtual SharedResourcesAcquirer * resourceSharedWithDelayedReader_() const
Definition: InputSource.cc:259
#define end
Definition: vmac.h:37
virtual boost::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()=0
unsigned int offset(bool)
boost::shared_ptr< RunAuxiliary > readRunAuxiliary()
Read next run Auxiliary.
Definition: InputSource.cc:238
void resetEventCached()
Definition: InputSource.h:382
StreamID const & streamID() const
Definition: StreamContext.h:57
void doPostForkReacquireResources(boost::shared_ptr< multicore::MessageReceiverForSource >)
Definition: InputSource.cc:521
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:601
bool eventLimitReached() const
Definition: InputSource.h:389
void readAndMergeLumi(LuminosityBlockPrincipal &lbp)
Read next luminosity block (same as a prior lumi)
Definition: InputSource.cc:318
void skipEvents(int offset)
Definition: InputSource.cc:380
virtual void beginRun(Run &)
Definition: InputSource.cc:551
boost::shared_ptr< LuminosityBlockAuxiliary > lumiAuxiliary_
Definition: InputSource.h:441
virtual void readRun_(RunPrincipal &runPrincipal)
Definition: InputSource.cc:327
virtual std::unique_ptr< FileBlock > readFile_()
Definition: InputSource.cc:292
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:347
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: InputSource.cc:113
void readEvent(EventPrincipal &ep, StreamContext &)
Read next event.
Definition: InputSource.cc:340
std::unique_ptr< FileBlock > readFile()
Read next file.
Definition: InputSource.cc:272
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
virtual bool randomAccess_() const
Definition: InputSource.cc:574
boost::shared_ptr< RunPrincipal > const runPrincipal() const
static const std::string kBaseType("EDAnalyzer")
bool isInfoEnabled()
ItemType nextItemType_()
Definition: InputSource.cc:170
#define begin
Definition: vmac.h:30
virtual ProcessingController::ReverseState reverseState_() const
Definition: InputSource.cc:584
virtual boost::shared_ptr< RunAuxiliary > readRunAuxiliary_()=0
std::unique_ptr< ProcessHistoryRegistry > processHistoryRegistry_
Definition: InputSource.h:431
bool randomAccess() const
Definition: InputSource.cc:527
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:132
bool const primary_
Definition: InputSource.h:433
void rewind()
Begin again at the first event.
Definition: InputSource.cc:390
int remainingLuminosityBlocks() const
Definition: InputSource.h:202
ModuleDescription const & moduleDescription() const
Accessor for &#39;module&#39; description.
Definition: InputSource.h:205
unsigned int RunNumber_t
Definition: EventRange.h:32
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal)
Definition: InputSource.cc:335
volatile std::atomic< bool > shutdown_flag false
boost::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary()
Read next luminosity block Auxilary.
Definition: InputSource.cc:232
void readLuminosityBlock(LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
Read next luminosity block (new lumi)
Definition: InputSource.cc:309
FileOpenSentry(InputSource const &source, std::string const &lfn, bool usedFallback)
Definition: InputSource.cc:633
static void addToRegistry(TypeLabelList::const_iterator const &iBegin, TypeLabelList::const_iterator const &iEnd, ModuleDescription const &iDesc, ProductRegistry &iReg, bool iIsListener=false)
void doEndRun(RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
Called by framework at end of run.
Definition: InputSource.cc:491
long double T
LumiSourceSentry(InputSource const &source)
Definition: InputSource.cc:625
ItemType nextItemType()
Advances the source to the next item.
Definition: InputSource.cc:185
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:52
virtual void endJob()
Definition: InputSource.cc:560
static void prevalidate(ConfigurationDescriptions &)
Definition: InputSource.cc:120
SharedResourcesAcquirer * resourceSharedWithDelayedReader() const
Returns nullptr if no resource shared between the Source and a DelayedReader.
Definition: InputSource.cc:254
virtual void skip(int offset)
Definition: InputSource.cc:447
std::string createGlobalIdentifier()
Definition: Run.h:41
virtual void endLuminosityBlock(LuminosityBlock &)
Definition: InputSource.cc:548