CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
38 
40 
48 
53 
56 
68 
69 #include "MessageForSource.h"
70 #include "MessageForParent.h"
72 
73 #include "boost/range/adaptor/reversed.hpp"
74 
75 #include <cassert>
76 #include <exception>
77 #include <iomanip>
78 #include <iostream>
79 #include <utility>
80 #include <sstream>
81 
82 #include <sys/ipc.h>
83 #include <sys/msg.h>
84 
85 #include "tbb/task.h"
86 
87 //Used for CPU affinity
88 #ifndef __APPLE__
89 #include <sched.h>
90 #endif
91 
92 namespace {
93  //Sentry class to only send a signal if an
94  // exception occurs. An exception is identified
95  // by the destructor being called without first
96  // calling completedSuccessfully().
97  class SendSourceTerminationSignalIfException {
98  public:
99  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
100  ~SendSourceTerminationSignalIfException() {
101  if (reg_) {
102  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
103  }
104  }
105  void completedSuccessfully() { reg_ = nullptr; }
106 
107  private:
108  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
109  };
110 
111 } // namespace
112 
113 namespace edm {
114 
115  // ---------------------------------------------------------------
116  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
117  CommonParams const& common,
118  std::shared_ptr<ProductRegistry> preg,
119  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
120  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
121  std::shared_ptr<ActivityRegistry> areg,
122  std::shared_ptr<ProcessConfiguration const> processConfiguration,
123  PreallocationConfiguration const& allocations) {
124  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
125  if (main_input == nullptr) {
127  << "There must be exactly one source in the configuration.\n"
128  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
129  }
130 
131  std::string modtype(main_input->getParameter<std::string>("@module_type"));
132 
133  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
135  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
136  filler->fill(descriptions);
137 
138  try {
139  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
140  } catch (cms::Exception& iException) {
141  std::ostringstream ost;
142  ost << "Validating configuration of input source of type " << modtype;
143  iException.addContext(ost.str());
144  throw;
145  }
146 
147  main_input->registerIt();
148 
149  // Fill in "ModuleDescription", in case the input source produces
150  // any EDProducts, which would be registered in the ProductRegistry.
151  // Also fill in the process history item for this process.
152  // There is no module label for the unnamed input source, so
153  // just use "source".
154  // Only the tracked parameters belong in the process configuration.
155  ModuleDescription md(main_input->id(),
156  main_input->getParameter<std::string>("@module_type"),
157  "source",
158  processConfiguration.get(),
159  ModuleDescription::getUniqueID());
160 
161  InputSourceDescription isdesc(md,
162  preg,
163  branchIDListHelper,
164  thinnedAssociationsHelper,
165  areg,
166  common.maxEventsInput_,
167  common.maxLumisInput_,
169  allocations);
170 
171  areg->preSourceConstructionSignal_(md);
172  std::unique_ptr<InputSource> input;
173  try {
174  //even if we have an exception, send the signal
175  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
176  convertException::wrap([&]() {
177  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
178  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
179  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
180  });
181  } catch (cms::Exception& iException) {
182  std::ostringstream ost;
183  ost << "Constructing input source of type " << modtype;
184  iException.addContext(ost.str());
185  throw;
186  }
187  return input;
188  }
189 
190  // ---------------------------------------------------------------
191  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
193  ParameterSet& params) {
194  std::shared_ptr<EDLooperBase> vLooper;
195 
196  std::vector<std::string> loopers = params.getParameter<std::vector<std::string>>("@all_loopers");
197 
198  if (loopers.empty()) {
199  return vLooper;
200  }
201 
202  assert(1 == loopers.size());
203 
204  for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
205  ++itName) {
206  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
207  providerPSet->registerIt();
208  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
209  }
210  return vLooper;
211  }
212 
213  // ---------------------------------------------------------------
214  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
215  ServiceToken const& iToken,
217  std::vector<std::string> const& defaultServices,
218  std::vector<std::string> const& forcedServices)
219  : actReg_(),
220  preg_(),
221  branchIDListHelper_(),
222  serviceToken_(),
223  input_(),
224  espController_(new eventsetup::EventSetupsController),
225  esp_(),
226  act_table_(),
227  processConfiguration_(),
228  schedule_(),
229  subProcesses_(),
230  historyAppender_(new HistoryAppender),
231  fb_(),
232  looper_(),
233  deferredExceptionPtrIsSet_(false),
234  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
235  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
236  principalCache_(),
237  beginJobCalled_(false),
238  shouldWeStop_(false),
239  fileModeNoMerge_(false),
240  exceptionMessageFiles_(),
241  exceptionMessageRuns_(),
242  exceptionMessageLumis_(false),
243  forceLooperToEnd_(false),
244  looperBeginJobRun_(false),
245  forceESCacheClearOnNewRun_(false),
246  eventSetupDataToExcludeFromPrefetching_() {
247  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
248  processDesc->addServices(defaultServices, forcedServices);
249  init(processDesc, iToken, iLegacy);
250  }
251 
252  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
253  std::vector<std::string> const& defaultServices,
254  std::vector<std::string> const& forcedServices)
255  : actReg_(),
256  preg_(),
258  serviceToken_(),
259  input_(),
260  espController_(new eventsetup::EventSetupsController),
261  esp_(),
262  act_table_(),
264  schedule_(),
265  subProcesses_(),
267  fb_(),
268  looper_(),
270  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
271  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
272  principalCache_(),
284  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
285  processDesc->addServices(defaultServices, forcedServices);
287  }
288 
289  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
290  ServiceToken const& token,
292  : actReg_(),
293  preg_(),
295  serviceToken_(),
296  input_(),
297  espController_(new eventsetup::EventSetupsController),
298  esp_(),
299  act_table_(),
301  schedule_(),
302  subProcesses_(),
304  fb_(),
305  looper_(),
307  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
308  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
309  principalCache_(),
321  init(processDesc, token, legacy);
322  }
323 
324  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
325  ServiceToken const& iToken,
327  //std::cerr << processDesc->dump() << std::endl;
328 
329  // register the empty parentage vector , once and for all
331 
332  // register the empty parameter set, once and for all.
334 
335  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
336 
337  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
338  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
339  bool const hasSubProcesses = !subProcessVParameterSet.empty();
340 
341  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
342  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
343  // set in here if the parameters were not explicitly set.
344  validateTopLevelParameterSets(parameterSet.get());
345 
346  // Now set some parameters specific to the main process.
347  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
348  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
349  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
350  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
351  << fileMode << ".\n"
352  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
353  } else {
354  fileModeNoMerge_ = (fileMode == "NOMERGE");
355  }
356  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
357 
358  //threading
359  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
360 
361  // Even if numberOfThreads was set to zero in the Python configuration, the code
362  // in cmsRun.cpp should have reset it to something else.
363  assert(nThreads != 0);
364 
365  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
366  if (nStreams == 0) {
367  nStreams = nThreads;
368  }
369  if (nThreads > 1 or nStreams > 1) {
370  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
371  }
372  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
373  if (nConcurrentRuns != 1) {
374  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
375  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
376  }
377  unsigned int nConcurrentLumis =
378  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
379  if (nConcurrentLumis == 0) {
380  nConcurrentLumis = nConcurrentRuns;
381  }
382 
383  //Check that relationships between threading parameters makes sense
384  /*
385  if(nThreads<nStreams) {
386  //bad
387  }
388  if(nConcurrentRuns>nStreams) {
389  //bad
390  }
391  if(nConcurrentRuns>nConcurrentLumis) {
392  //bad
393  }
394  */
395  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
396 
397  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
398 
399  // Now do general initialization
401 
402  //initialize the services
403  auto& serviceSets = processDesc->getServicesPSets();
404  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
405  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
406 
407  //make the services available
409 
410  if (nStreams > 1) {
412  handler->willBeUsingThreads();
413  }
414 
415  // intialize miscellaneous items
416  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
417 
418  // intialize the event setup provider
419  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
420  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
421 
422  // initialize the looper, if any
423  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
424  if (looper_) {
425  looper_->setActionTable(items.act_table_.get());
426  looper_->attachTo(*items.actReg_);
427 
428  //For now loopers make us run only 1 transition at a time
429  nStreams = 1;
430  nConcurrentLumis = 1;
431  nConcurrentRuns = 1;
432  }
433  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
434 
435  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
436 
437  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
438  streamQueues_.resize(nStreams);
439  streamLumiStatus_.resize(nStreams);
440 
441  // initialize the input source
442  input_ = makeInput(*parameterSet,
443  *common,
444  items.preg(),
445  items.branchIDListHelper(),
446  items.thinnedAssociationsHelper(),
447  items.actReg_,
448  items.processConfiguration(),
450 
451  // intialize the Schedule
452  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
453 
454  // set the data members
455  act_table_ = std::move(items.act_table_);
456  actReg_ = items.actReg_;
457  preg_ = items.preg();
459  branchIDListHelper_ = items.branchIDListHelper();
460  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
461  processConfiguration_ = items.processConfiguration();
463  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
464 
465  FDEBUG(2) << parameterSet << std::endl;
466 
468  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
469  // Reusable event principal
470  auto ep = std::make_shared<EventPrincipal>(preg(),
474  historyAppender_.get(),
475  index);
477  }
478 
479  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
480  auto lp =
481  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
483  }
484 
485  // fill the subprocesses, if there are any
486  subProcesses_.reserve(subProcessVParameterSet.size());
487  for (auto& subProcessPSet : subProcessVParameterSet) {
488  subProcesses_.emplace_back(subProcessPSet,
489  *parameterSet,
490  preg(),
495  *actReg_,
496  token,
499  &processContext_);
500  }
501  }
502 
504  // Make the services available while everything is being deleted.
506  ServiceRegistry::Operate op(token);
507 
508  // manually destroy all these thing that may need the services around
509  // propagate_const<T> has no reset() function
510  espController_ = nullptr;
511  esp_ = nullptr;
512  schedule_ = nullptr;
513  input_ = nullptr;
514  looper_ = nullptr;
515  actReg_ = nullptr;
516 
519  }
520 
522  if (beginJobCalled_)
523  return;
524  beginJobCalled_ = true;
525  bk::beginJob();
526 
527  // StateSentry toerror(this); // should we add this ?
528  //make the services available
530 
535  actReg_->preallocateSignal_(bounds);
536  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
538 
539  //NOTE: this may throw
541  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
542 
545  }
546  //NOTE: This implementation assumes 'Job' means one call
547  // the EventProcessor::run
548  // If it really means once per 'application' then this code will
549  // have to be changed.
550  // Also have to deal with case where have 'run' then new Module
551  // added and do 'run'
552  // again. In that case the newly added Module needs its 'beginJob'
553  // to be called.
554 
555  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
556  // For now we delay calling beginOfJob until first beginOfRun
557  //if(looper_) {
558  // looper_->beginOfJob(es);
559  //}
560  try {
561  convertException::wrap([&]() { input_->doBeginJob(); });
562  } catch (cms::Exception& ex) {
563  ex.addContext("Calling beginJob for the source");
564  throw;
565  }
566  espController_->finishConfiguration();
567  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
568  // toerror.succeeded(); // should we add this?
569  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
570  actReg_->postBeginJobSignal_();
571 
572  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
573  schedule_->beginStream(i);
574  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
575  }
576  }
577 
579  // Collects exceptions, so we don't throw before all operations are performed.
581  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
582 
583  //make the services available
585 
586  //NOTE: this really should go elsewhere in the future
587  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
588  c.call([this, i]() { this->schedule_->endStream(i); });
589  for (auto& subProcess : subProcesses_) {
590  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
591  }
592  }
593  auto actReg = actReg_.get();
594  c.call([actReg]() { actReg->preEndJobSignal_(); });
595  schedule_->endJob(c);
596  for (auto& subProcess : subProcesses_) {
597  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
598  }
599  c.call(std::bind(&InputSource::doEndJob, input_.get()));
600  if (looper_) {
601  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
602  }
603  c.call([actReg]() { actReg->postEndJobSignal_(); });
604  if (c.hasThrown()) {
605  c.rethrow();
606  }
607  }
608 
610 
611  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
612  return schedule_->getAllModuleDescriptions();
613  }
614 
615  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
616 
617  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
618 
619  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
620 
621  void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); }
622 
623  bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); }
624 
625  void EventProcessor::getTriggerReport(TriggerReport& rep) const { schedule_->getTriggerReport(rep); }
626 
627  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
628 
629  namespace {
630 #include "TransitionProcessors.icc"
631  }
632 
634  bool returnValue = false;
635 
636  // Look for a shutdown signal
637  if (shutdown_flag.load(std::memory_order_acquire)) {
638  returnValue = true;
639  returnCode = epSignal;
640  }
641  return returnValue;
642  }
643 
645  if (deferredExceptionPtrIsSet_.load()) {
647  return InputSource::IsStop;
648  }
649 
650  SendSourceTerminationSignalIfException sentry(actReg_.get());
651  InputSource::ItemType itemType;
652  //For now, do nothing with InputSource::IsSynchronize
653  do {
654  itemType = input_->nextItemType();
655  } while (itemType == InputSource::IsSynchronize);
656 
657  lastSourceTransition_ = itemType;
658  sentry.completedSuccessfully();
659 
661 
662  if (checkForAsyncStopRequest(returnCode)) {
663  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
665  }
666 
667  return lastSourceTransition_;
668  }
669 
670  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
671  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
672  }
673 
675 
679  {
680  beginJob(); //make sure this was called
681 
682  // make the services available
684 
686  try {
687  FilesProcessor fp(fileModeNoMerge_);
688 
689  convertException::wrap([&]() {
690  bool firstTime = true;
691  do {
692  if (not firstTime) {
694  rewindInput();
695  } else {
696  firstTime = false;
697  }
698  startingNewLoop();
699 
700  auto trans = fp.processFiles(*this);
701 
702  fp.normalEnd();
703 
704  if (deferredExceptionPtrIsSet_.load()) {
705  std::rethrow_exception(deferredExceptionPtr_);
706  }
707  if (trans != InputSource::IsStop) {
708  //problem with the source
709  doErrorStuff();
710 
711  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
712  }
713  } while (not endOfLoop());
714  }); // convertException::wrap
715 
716  } // Try block
717  catch (cms::Exception& e) {
719  std::string message(
720  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
721  e.addAdditionalInfo(message);
722  if (e.alreadyPrinted()) {
723  LogAbsolute("Additional Exceptions") << message;
724  }
725  }
726  if (!exceptionMessageRuns_.empty()) {
728  if (e.alreadyPrinted()) {
729  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
730  }
731  }
732  if (!exceptionMessageFiles_.empty()) {
734  if (e.alreadyPrinted()) {
735  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
736  }
737  }
738  throw;
739  }
740  }
741 
742  return returnCode;
743  }
744 
746  FDEBUG(1) << " \treadFile\n";
747  size_t size = preg_->size();
748  SendSourceTerminationSignalIfException sentry(actReg_.get());
749 
751 
752  fb_ = input_->readFile();
753  if (size < preg_->size()) {
755  }
758  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
759  }
760  sentry.completedSuccessfully();
761  }
762 
763  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
764  if (fb_.get() != nullptr) {
765  SendSourceTerminationSignalIfException sentry(actReg_.get());
766  input_->closeFile(fb_.get(), cleaningUpAfterException);
767  sentry.completedSuccessfully();
768  }
769  FDEBUG(1) << "\tcloseInputFile\n";
770  }
771 
773  if (fb_.get() != nullptr) {
774  schedule_->openOutputFiles(*fb_);
775  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
776  }
777  FDEBUG(1) << "\topenOutputFiles\n";
778  }
779 
781  if (fb_.get() != nullptr) {
782  schedule_->closeOutputFiles();
783  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
784  }
785  FDEBUG(1) << "\tcloseOutputFiles\n";
786  }
787 
790  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
791  if (fb_.get() != nullptr) {
792  schedule_->respondToOpenInputFile(*fb_);
793  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
794  }
795  FDEBUG(1) << "\trespondToOpenInputFile\n";
796  }
797 
799  if (fb_.get() != nullptr) {
800  schedule_->respondToCloseInputFile(*fb_);
801  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
802  }
803  FDEBUG(1) << "\trespondToCloseInputFile\n";
804  }
805 
807  shouldWeStop_ = false;
808  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
809  // until after we've called beginOfJob
810  if (looper_ && looperBeginJobRun_) {
811  looper_->doStartingNewLoop();
812  }
813  FDEBUG(1) << "\tstartingNewLoop\n";
814  }
815 
817  if (looper_) {
818  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
819  looper_->setModuleChanger(&changer);
820  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
821  looper_->setModuleChanger(nullptr);
823  return true;
824  else
825  return false;
826  }
827  FDEBUG(1) << "\tendOfLoop\n";
828  return true;
829  }
830 
832  input_->repeat();
833  input_->rewind();
834  FDEBUG(1) << "\trewind\n";
835  }
836 
838  looper_->prepareForNextLoop(esp_.get());
839  FDEBUG(1) << "\tprepareForNextLoop\n";
840  }
841 
843  FDEBUG(1) << "\tshouldWeCloseOutput\n";
844  if (!subProcesses_.empty()) {
845  for (auto const& subProcess : subProcesses_) {
846  if (subProcess.shouldWeCloseOutput()) {
847  return true;
848  }
849  }
850  return false;
851  }
852  return schedule_->shouldWeCloseOutput();
853  }
854 
856  FDEBUG(1) << "\tdoErrorStuff\n";
857  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
858  << "and went to the error state\n"
859  << "Will attempt to terminate processing normally\n"
860  << "(IF using the looper the next loop will be attempted)\n"
861  << "This likely indicates a bug in an input module or corrupted input or both\n";
862  }
863 
866  bool& globalBeginSucceeded,
867  bool& eventSetupForInstanceSucceeded) {
868  globalBeginSucceeded = false;
869  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
870  {
871  SendSourceTerminationSignalIfException sentry(actReg_.get());
872 
873  input_->doBeginRun(runPrincipal, &processContext_);
874  sentry.completedSuccessfully();
875  }
876 
877  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
879  espController_->forceCacheClear();
880  }
881  {
882  SendSourceTerminationSignalIfException sentry(actReg_.get());
883  espController_->eventSetupForInstance(ts);
884  eventSetupForInstanceSucceeded = true;
885  sentry.completedSuccessfully();
886  }
887  auto const& es = esp_->eventSetupImpl();
888  if (looper_ && looperBeginJobRun_ == false) {
889  looper_->copyInfo(ScheduleInfo(schedule_.get()));
890  looper_->beginOfJob(es);
891  looperBeginJobRun_ = true;
892  looper_->doStartingNewLoop();
893  }
894  {
896  auto globalWaitTask = make_empty_waiting_task();
897  globalWaitTask->increment_ref_count();
898  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
899  *schedule_,
900  runPrincipal,
901  ts,
902  es,
903  nullptr,
905  subProcesses_);
906  globalWaitTask->wait_for_all();
907  if (globalWaitTask->exceptionPtr() != nullptr) {
908  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
909  }
910  }
911  globalBeginSucceeded = true;
912  FDEBUG(1) << "\tbeginRun " << run << "\n";
913  if (looper_) {
914  looper_->doBeginRun(runPrincipal, es, &processContext_);
915  }
916  {
917  //To wait, the ref count has to be 1+#streams
918  auto streamLoopWaitTask = make_empty_waiting_task();
919  streamLoopWaitTask->increment_ref_count();
920 
922 
923  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
924  *schedule_,
926  runPrincipal,
927  ts,
928  es,
929  nullptr,
931  subProcesses_);
932 
933  streamLoopWaitTask->wait_for_all();
934  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
935  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
936  }
937  }
938  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
939  if (looper_) {
940  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
941  }
942  }
943 
946  bool globalBeginSucceeded,
947  bool cleaningUpAfterException,
948  bool eventSetupForInstanceSucceeded) {
949  if (eventSetupForInstanceSucceeded) {
950  //If we skip empty runs, this would be called conditionally
951  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
952 
953  if (globalBeginSucceeded) {
955  t->increment_ref_count();
956  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
957  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
958  mergeableRunProductMetadata->preWriteRun();
959  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
960  t->wait_for_all();
961  mergeableRunProductMetadata->postWriteRun();
962  if (t->exceptionPtr()) {
963  std::rethrow_exception(*t->exceptionPtr());
964  }
965  }
966  }
967  deleteRunFromCache(phid, run);
968  }
969 
972  bool globalBeginSucceeded,
973  bool cleaningUpAfterException) {
974  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
975  runPrincipal.setEndTime(input_->timestamp());
976 
977  IOVSyncValue ts(
979  runPrincipal.endTime());
980  {
981  SendSourceTerminationSignalIfException sentry(actReg_.get());
982  espController_->eventSetupForInstance(ts);
983  sentry.completedSuccessfully();
984  }
985  auto const& es = esp_->eventSetupImpl();
986  if (globalBeginSucceeded) {
987  //To wait, the ref count has to be 1+#streams
988  auto streamLoopWaitTask = make_empty_waiting_task();
989  streamLoopWaitTask->increment_ref_count();
990 
992 
993  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
994  *schedule_,
996  runPrincipal,
997  ts,
998  es,
999  nullptr,
1000  serviceToken_,
1001  subProcesses_,
1002  cleaningUpAfterException);
1003 
1004  streamLoopWaitTask->wait_for_all();
1005  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1006  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1007  }
1008  }
1009  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1010  if (looper_) {
1011  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1012  }
1013  {
1014  auto globalWaitTask = make_empty_waiting_task();
1015  globalWaitTask->increment_ref_count();
1016 
1018  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1019  *schedule_,
1020  runPrincipal,
1021  ts,
1022  es,
1023  nullptr,
1024  serviceToken_,
1025  subProcesses_,
1026  cleaningUpAfterException);
1027  globalWaitTask->wait_for_all();
1028  if (globalWaitTask->exceptionPtr() != nullptr) {
1029  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1030  }
1031  }
1032  FDEBUG(1) << "\tendRun " << run << "\n";
1033  if (looper_) {
1034  looper_->doEndRun(runPrincipal, es, &processContext_);
1035  }
1036  }
1037 
1038  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1039  auto waitTask = make_empty_waiting_task();
1040  waitTask->increment_ref_count();
1041 
1042  if (streamLumiActive_ > 0) {
1044  // Continue after opening a new input file
1045  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1046  } else {
1047  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1048  input_->luminosityBlockAuxiliary()->beginTime()),
1049  iRunResource,
1050  WaitingTaskHolder{waitTask.get()});
1051  }
1052  waitTask->wait_for_all();
1053 
1054  if (waitTask->exceptionPtr() != nullptr) {
1055  std::rethrow_exception(*(waitTask->exceptionPtr()));
1056  }
1057  return lastTransitionType();
1058  }
1059 
1061  std::shared_ptr<void> const& iRunResource,
1062  edm::WaitingTaskHolder iHolder) {
1063  if (iHolder.taskHasFailed()) {
1064  return;
1065  }
1066 
1067  // We must be careful with the status object here and in code this function calls. IF we want
1068  // endRun to be called, then we must call resetResources before the things waiting on
1069  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1070  // endRun to be called much later than it should be, because status is holding iRunResource).
1071  // Note that this must be done explicitly. Relying on the destructor does not work well
1072  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1073  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1074  // destroyed inside this function and lumiWork.
1075  auto status =
1076  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1077 
1078  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1079  if (iHolder.taskHasFailed()) {
1080  status->resetResources();
1081  return;
1082  }
1083 
1084  status->setResumer(std::move(iResumer));
1085 
1086  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1087  //make the services available
1089 
1090  try {
1092 
1093  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1094  {
1095  SendSourceTerminationSignalIfException sentry(actReg_.get());
1096 
1097  input_->doBeginLumi(lumiPrincipal, &processContext_);
1098  sentry.completedSuccessfully();
1099  }
1100 
1102  if (rng.isAvailable()) {
1103  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1104  rng->preBeginLumi(lb);
1105  }
1106 
1107  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1108 
1109  //Task to start the stream beginLumis
1110  auto beginStreamsTask = make_waiting_task(
1111  tbb::task::allocate_root(), [this, holder = iHolder, status, ts](std::exception_ptr const* iPtr) mutable {
1112  if (iPtr) {
1113  status->resetResources();
1114  holder.doneWaiting(*iPtr);
1115  } else {
1116  status->globalBeginDidSucceed();
1117  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1118 
1119  if (looper_) {
1120  try {
1121  //make the services available
1122  ServiceRegistry::Operate operateLooper(serviceToken_);
1123  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1124  } catch (...) {
1125  status->resetResources();
1126  holder.doneWaiting(std::current_exception());
1127  return;
1128  }
1129  }
1131 
1132  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1133  streamQueues_[i].push([this, i, status, holder, ts, &es]() mutable {
1134  streamQueues_[i].pause();
1135 
1136  auto eventTask = edm::make_waiting_task(
1137  tbb::task::allocate_root(),
1138  [this, i, h = holder](std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1139  if (exceptionFromBeginStreamLumi) {
1141  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1143  } else {
1145  }
1146  });
1147  auto& event = principalCache_.eventPrincipal(i);
1150  auto lp = status->lumiPrincipal();
1151  event.setLuminosityBlockPrincipal(lp.get());
1152  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1153  *schedule_,
1154  i,
1155  *lp,
1156  ts,
1157  es,
1158  &status->eventSetupImpls(),
1159  serviceToken_,
1160  subProcesses_);
1161  });
1162  }
1163  }
1164  }); // beginStreamTask
1165 
1166  //task to start the global begin lumi
1167  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1168 
1169  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1170  {
1172  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1173  *schedule_,
1174  lumiPrincipal,
1175  ts,
1176  es,
1177  &status->eventSetupImpls(),
1178  serviceToken_,
1179  subProcesses_);
1180  }
1181  } catch (...) {
1182  status->resetResources();
1183  iHolder.doneWaiting(std::current_exception());
1184  }
1185  }); // task in sourceResourcesAcquirer
1186  }; // end lumiWork
1187 
1188  auto queueLumiWorkTask = make_waiting_task(
1189  tbb::task::allocate_root(),
1190  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1191  if (iPtr) {
1192  iHolder.doneWaiting(*iPtr);
1193  }
1194  lumiQueue_->pushAndPause(std::move(lumiWorkLambda));
1195  });
1196 
1197  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1198  // We only get here inside this block if there is an EventSetup
1199  // module not able to handle concurrent IOVs (usually an ESSource)
1200  // and the new sync value is outside the current IOV of that module.
1201 
1202  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1203 
1204  queueWhichWaitsForIOVsToFinish_.push([this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1205  try {
1206  SendSourceTerminationSignalIfException sentry(actReg_.get());
1207  // Pass in iSync to let the EventSetup system know which run and lumi
1208  // need to be processed and prepare IOVs for it.
1209  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1210  // lumi is done and no longer needs its EventSetup IOVs.
1211  espController_->eventSetupForInstance(
1212  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1213  sentry.completedSuccessfully();
1214  } catch (...) {
1215  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1216  }
1218  });
1219 
1220  } else {
1222 
1223  // This holder will be used to wait until the EventSetup IOVs are ready
1224  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1225 
1226  try {
1227  SendSourceTerminationSignalIfException sentry(actReg_.get());
1228 
1229  // Pass in iSync to let the EventSetup system know which run and lumi
1230  // need to be processed and prepare IOVs for it.
1231  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1232  // lumi is done and no longer needs its EventSetup IOVs.
1233  espController_->eventSetupForInstance(
1234  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1235  sentry.completedSuccessfully();
1236 
1237  } catch (...) {
1238  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1239  }
1240  }
1241  }
1242 
1244  {
1245  //all streams are sharing the same status at the moment
1246  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1247  status->needToContinueLumi();
1248  status->startProcessingEvents();
1249  }
1250 
1251  unsigned int streamIndex = 0;
1252  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1253  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1254  handleNextEventForStreamAsync(std::move(h), streamIndex);
1255  }));
1256  }
1257  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1258  handleNextEventForStreamAsync(std::move(h), streamIndex);
1259  }));
1260  }
1261 
1262  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1263  if (setDeferredException(*iPtr)) {
1264  WaitingTaskHolder tmp(holder);
1265  tmp.doneWaiting(*iPtr);
1266  } else {
1268  }
1269  }
1270 
1272  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1273  // Get some needed info out of the status object before moving
1274  // it into finalTaskForThisLumi.
1275  auto& lp = *(iLumiStatus->lumiPrincipal());
1276  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1277  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1278  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1279  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1280 
1281  auto finalTaskForThisLumi = edm::make_waiting_task(
1282  tbb::task::allocate_root(),
1283  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1284  std::exception_ptr ptr;
1285  if (iPtr) {
1286  handleEndLumiExceptions(iPtr, iTask);
1287  } else {
1288  try {
1290  if (looper_) {
1291  auto& lumiPrincipal = *(status->lumiPrincipal());
1292  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1293  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1294  }
1295  } catch (...) {
1296  ptr = std::current_exception();
1297  }
1298  }
1300 
1301  // Try hard to clean up resources so the
1302  // process can terminate in a controlled
1303  // fashion even after exceptions have occurred.
1304 
1305  try {
1307  } catch (...) {
1308  if (not ptr) {
1309  ptr = std::current_exception();
1310  }
1311  }
1312 
1313  try {
1314  status->resumeGlobalLumiQueue();
1316  } catch (...) {
1317  if (not ptr) {
1318  ptr = std::current_exception();
1319  }
1320  }
1321 
1322  try {
1323  // This call to status.resetResources() must occur before iTask is destroyed.
1324  // Otherwise there will be a data race which could result in endRun
1325  // being delayed until it is too late to successfully call it.
1326  status->resetResources();
1327  status.reset();
1328  } catch (...) {
1329  if (not ptr) {
1330  ptr = std::current_exception();
1331  }
1332  }
1333 
1334  if (ptr) {
1335  handleEndLumiExceptions(&ptr, iTask);
1336  }
1337  });
1338 
1339  auto writeT = edm::make_waiting_task(
1340  tbb::task::allocate_root(),
1341  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1342  std::exception_ptr const* iExcept) mutable {
1343  if (iExcept) {
1344  task.doneWaiting(*iExcept);
1345  } else {
1346  //Only call writeLumi if beginLumi succeeded
1347  if (didGlobalBeginSucceed) {
1348  writeLumiAsync(std::move(task), lumiPrincipal);
1349  }
1350  }
1351  });
1352 
1353  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1354 
1356 
1357  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1358  *schedule_,
1359  lp,
1360  ts,
1361  es,
1362  eventSetupImpls,
1363  serviceToken_,
1364  subProcesses_,
1365  cleaningUpAfterException);
1366  }
1367 
1369  unsigned int iStreamIndex,
1370  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1371  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1372  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1373  if (iPtr) {
1374  handleEndLumiExceptions(iPtr, iTask);
1375  }
1376  auto status = streamLumiStatus_[iStreamIndex];
1377  //reset status before releasing queue else get race condtion
1378  streamLumiStatus_[iStreamIndex].reset();
1380  streamQueues_[iStreamIndex].resume();
1381 
1382  //are we the last one?
1383  if (status->streamFinishedLumi()) {
1385  }
1386  });
1387 
1388  edm::WaitingTaskHolder lumiDoneTask{t};
1389 
1390  iLumiStatus->setEndTime();
1391 
1392  if (iLumiStatus->didGlobalBeginSucceed()) {
1393  auto& lumiPrincipal = *iLumiStatus->lumiPrincipal();
1394  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1395  lumiPrincipal.endTime());
1396  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1397 
1398  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1399 
1401  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1402  *schedule_,
1403  iStreamIndex,
1404  lumiPrincipal,
1405  ts,
1406  es,
1407  &iLumiStatus->eventSetupImpls(),
1408  serviceToken_,
1409  subProcesses_,
1410  cleaningUpAfterException);
1411  }
1412  }
1413 
1415  if (streamLumiActive_.load() > 0) {
1416  auto globalWaitTask = make_empty_waiting_task();
1417  globalWaitTask->increment_ref_count();
1418  {
1419  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1420  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1421  if (streamLumiStatus_[i]) {
1422  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1423  }
1424  }
1425  }
1426  globalWaitTask->wait_for_all();
1427  if (globalWaitTask->exceptionPtr() != nullptr) {
1428  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1429  }
1430  }
1431  }
1432 
1433  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1435  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1436  << "Illegal attempt to insert run into cache\n"
1437  << "Contact a Framework Developer\n";
1438  }
1439  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1440  preg(),
1442  historyAppender_.get(),
1443  0,
1444  true,
1446  {
1447  SendSourceTerminationSignalIfException sentry(actReg_.get());
1448  input_->readRun(*rp, *historyAppender_);
1449  sentry.completedSuccessfully();
1450  }
1451  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1452  principalCache_.insert(rp);
1453  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1454  }
1455 
1456  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1457  principalCache_.merge(input_->runAuxiliary(), preg());
1458  auto runPrincipal = principalCache_.runPrincipalPtr();
1459  {
1460  SendSourceTerminationSignalIfException sentry(actReg_.get());
1461  input_->readAndMergeRun(*runPrincipal);
1462  sentry.completedSuccessfully();
1463  }
1464  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1465  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1466  }
1467 
1470  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1471  << "Illegal attempt to insert lumi into cache\n"
1472  << "Run is invalid\n"
1473  << "Contact a Framework Developer\n";
1474  }
1476  assert(lbp);
1477  lbp->setAux(*input_->luminosityBlockAuxiliary());
1478  {
1479  SendSourceTerminationSignalIfException sentry(actReg_.get());
1480  input_->readLuminosityBlock(*lbp, *historyAppender_);
1481  sentry.completedSuccessfully();
1482  }
1483  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1484  iStatus.lumiPrincipal() = std::move(lbp);
1485  }
1486 
1488  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1489  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1490  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1491  input_->processHistoryRegistry().reducedProcessHistoryID(
1492  input_->luminosityBlockAuxiliary()->processHistoryID()));
1493  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1494  assert(lumiOK);
1495  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1496  {
1497  SendSourceTerminationSignalIfException sentry(actReg_.get());
1498  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1499  sentry.completedSuccessfully();
1500  }
1501  return input_->luminosityBlock();
1502  }
1503 
1505  ProcessHistoryID const& phid,
1506  RunNumber_t run,
1507  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1508  auto subsT = edm::make_waiting_task(
1509  tbb::task::allocate_root(),
1510  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1511  if (iExcept) {
1512  task.doneWaiting(*iExcept);
1513  } else {
1515  for (auto& s : subProcesses_) {
1516  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1517  }
1518  }
1519  });
1521  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1522  principalCache_.runPrincipal(phid, run),
1523  &processContext_,
1524  actReg_.get(),
1525  mergeableRunProductMetadata);
1526  }
1527 
1529  principalCache_.deleteRun(phid, run);
1530  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1531  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1532  }
1533 
1535  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1536  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1537  if (iExcept) {
1538  task.doneWaiting(*iExcept);
1539  } else {
1541  for (auto& s : subProcesses_) {
1542  s.writeLumiAsync(task, lumiPrincipal);
1543  }
1544  }
1545  });
1547 
1548  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1549 
1550  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1551  }
1552 
1554  for (auto& s : subProcesses_) {
1555  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1556  }
1557  iStatus.lumiPrincipal()->clearPrincipal();
1558  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1559  }
1560 
1562  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1563  iStatus.endLumi();
1564  return false;
1565  }
1566 
1567  if (iStatus.wasEventProcessingStopped()) {
1568  return false;
1569  }
1570 
1571  if (shouldWeStop()) {
1573  iStatus.stopProcessingEvents();
1574  iStatus.endLumi();
1575  return false;
1576  }
1577 
1579  try {
1580  //need to use lock in addition to the serial task queue because
1581  // of delayed provenance reading and reading data in response to
1582  // edm::Refs etc
1583  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1584 
1585  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1586  if (InputSource::IsLumi == itemType) {
1587  iStatus.haveContinuedLumi();
1588  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1589  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1590  readAndMergeLumi(iStatus);
1591  itemType = nextTransitionType();
1592  }
1593  if (InputSource::IsLumi == itemType) {
1594  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1595  input_->luminosityBlockAuxiliary()->beginTime()));
1596  }
1597  }
1598  if (InputSource::IsEvent != itemType) {
1599  iStatus.stopProcessingEvents();
1600 
1601  //IsFile may continue processing the lumi and
1602  // looper_ can cause the input source to declare a new IsRun which is actually
1603  // just a continuation of the previous run
1604  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1605  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1606  iStatus.endLumi();
1607  }
1608  return false;
1609  }
1610  readEvent(iStreamIndex);
1611  } catch (...) {
1612  bool expected = false;
1613  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1614  deferredExceptionPtr_ = std::current_exception();
1615  iStatus.endLumi();
1616  }
1617  return false;
1618  }
1619  return true;
1620  }
1621 
1622  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1623  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1625  auto& status = streamLumiStatus_[iStreamIndex];
1626  try {
1627  if (readNextEventForStream(iStreamIndex, *status)) {
1628  auto recursionTask = make_waiting_task(
1629  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1630  if (iPtr) {
1631  // Try to end the stream properly even if an exception was
1632  // thrown on an event.
1633  bool expected = false;
1634  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1635  // This is the case where the exception in iPtr is the primary
1636  // exception and we want to see its message.
1637  deferredExceptionPtr_ = *iPtr;
1638  WaitingTaskHolder tempHolder(iTask);
1639  tempHolder.doneWaiting(*iPtr);
1640  }
1641  streamEndLumiAsync(std::move(iTask), iStreamIndex, streamLumiStatus_[iStreamIndex]);
1642  //the stream will stop now
1643  return;
1644  }
1645  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1646  });
1647 
1648  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1649  } else {
1650  //the stream will stop now
1651  if (status->isLumiEnding()) {
1652  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1653  status->startNextLumi();
1654  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1655  }
1656  streamEndLumiAsync(std::move(iTask), iStreamIndex, status);
1657  } else {
1658  iTask.doneWaiting(std::exception_ptr{});
1659  }
1660  }
1661  } catch (...) {
1662  // It is unlikely we will ever get in here ...
1663  // But if we do try to clean up and propagate the exception
1664  if (streamLumiStatus_[iStreamIndex]) {
1665  streamEndLumiAsync(iTask, iStreamIndex, streamLumiStatus_[iStreamIndex]);
1666  }
1667  bool expected = false;
1668  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1669  auto e = std::current_exception();
1671  iTask.doneWaiting(e);
1672  }
1673  }
1674  });
1675  }
1676 
1677  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1678  //TODO this will have to become per stream
1679  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1680  StreamContext streamContext(event.streamID(), &processContext_);
1681 
1682  SendSourceTerminationSignalIfException sentry(actReg_.get());
1683  input_->readEvent(event, streamContext);
1684 
1685  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1686  sentry.completedSuccessfully();
1687 
1688  FDEBUG(1) << "\treadEvent\n";
1689  }
1690 
1691  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1692  tbb::task::spawn(
1693  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1694  }
1695 
1696  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1697  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1698 
1701  if (rng.isAvailable()) {
1702  Event ev(*pep, ModuleDescription(), nullptr);
1703  rng->postEventRead(ev);
1704  }
1705 
1706  WaitingTaskHolder finalizeEventTask(make_waiting_task(
1707  tbb::task::allocate_root(), [this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1708  //NOTE: If we have a looper we only have one Stream
1709  if (looper_) {
1710  ServiceRegistry::Operate operateLooper(serviceToken_);
1711  processEventWithLooper(*pep, iStreamIndex);
1712  }
1713 
1714  FDEBUG(1) << "\tprocessEvent\n";
1715  pep->clearEventPrincipal();
1716  if (iPtr) {
1717  iHolder.doneWaiting(*iPtr);
1718  } else {
1719  iHolder.doneWaiting(std::exception_ptr());
1720  }
1721  }));
1722  WaitingTaskHolder afterProcessTask;
1723  if (subProcesses_.empty()) {
1724  afterProcessTask = std::move(finalizeEventTask);
1725  } else {
1726  //Need to run SubProcesses after schedule has finished
1727  // with the event
1728  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1729  tbb::task::allocate_root(),
1730  [this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1731  if (not iPtr) {
1732  //when run with 1 thread, we want to the order to be what
1733  // it was before. This requires reversing the order since
1734  // tasks are run last one in first one out
1735  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1736  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1737  }
1738  } else {
1739  finalizeEventTask.doneWaiting(*iPtr);
1740  }
1741  }));
1742  }
1743 
1744  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1745  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, *pep, es, serviceToken_);
1746  }
1747 
1748  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1749  bool randomAccess = input_->randomAccess();
1750  ProcessingController::ForwardState forwardState = input_->forwardState();
1751  ProcessingController::ReverseState reverseState = input_->reverseState();
1752  ProcessingController pc(forwardState, reverseState, randomAccess);
1753 
1755  do {
1756  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1757  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1758  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1759 
1760  bool succeeded = true;
1761  if (randomAccess) {
1763  input_->skipEvents(-2);
1765  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1766  }
1767  }
1768  pc.setLastOperationSucceeded(succeeded);
1769  } while (!pc.lastOperationSucceeded());
1770  if (status != EDLooperBase::kContinue) {
1771  shouldWeStop_ = true;
1773  }
1774  }
1775 
1777  FDEBUG(1) << "\tshouldWeStop\n";
1778  if (shouldWeStop_)
1779  return true;
1780  if (!subProcesses_.empty()) {
1781  for (auto const& subProcess : subProcesses_) {
1782  if (subProcess.terminate()) {
1783  return true;
1784  }
1785  }
1786  return false;
1787  }
1788  return schedule_->terminate();
1789  }
1790 
1792 
1794 
1796 
1797  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1798  bool expected = false;
1799  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1800  deferredExceptionPtr_ = iException;
1801  return true;
1802  }
1803  return false;
1804  }
1805 
1807  std::unique_ptr<LogSystem> s;
1808  for (auto worker : schedule_->allWorkers()) {
1809  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1810  if (not s) {
1811  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1812  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1813  }
1814  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1815  }
1816  }
1817  }
1818 } // namespace edm
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
bool ev
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:60
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
Definition: Exception.cc:177
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:48
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:70
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:66
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:40
void clear()
Not thread safe.
Definition: Registry.cc:40
Timestamp const & endTime() const
Definition: RunPrincipal.h:68
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
virtual void endOfJob()
Definition: EDLooperBase.cc:90
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1190
int totalEvents() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void writeLumi(LuminosityBlockNumber_t lumi)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:693
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:78
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
Transition requestedTransition() const
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:22