CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
38 
40 
48 
53 
55 
67 
68 #include "MessageForSource.h"
69 #include "MessageForParent.h"
71 
72 #include "boost/range/adaptor/reversed.hpp"
73 
74 #include <cassert>
75 #include <exception>
76 #include <iomanip>
77 #include <iostream>
78 #include <utility>
79 #include <sstream>
80 
81 #include <sys/ipc.h>
82 #include <sys/msg.h>
83 
84 #include "tbb/task.h"
85 
86 //Used for CPU affinity
87 #ifndef __APPLE__
88 #include <sched.h>
89 #endif
90 
91 namespace {
92  //Sentry class to only send a signal if an
93  // exception occurs. An exception is identified
94  // by the destructor being called without first
95  // calling completedSuccessfully().
96  class SendSourceTerminationSignalIfException {
97  public:
98  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
99  ~SendSourceTerminationSignalIfException() {
100  if (reg_) {
101  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
102  }
103  }
104  void completedSuccessfully() { reg_ = nullptr; }
105 
106  private:
107  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
108  };
109 
110 } // namespace
111 
112 namespace edm {
113 
114  // ---------------------------------------------------------------
115  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
116  CommonParams const& common,
117  std::shared_ptr<ProductRegistry> preg,
118  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
119  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
120  std::shared_ptr<ActivityRegistry> areg,
121  std::shared_ptr<ProcessConfiguration const> processConfiguration,
122  PreallocationConfiguration const& allocations) {
123  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
124  if (main_input == nullptr) {
126  << "There must be exactly one source in the configuration.\n"
127  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
128  }
129 
130  std::string modtype(main_input->getParameter<std::string>("@module_type"));
131 
132  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
134  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
135  filler->fill(descriptions);
136 
137  try {
138  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
139  } catch (cms::Exception& iException) {
140  std::ostringstream ost;
141  ost << "Validating configuration of input source of type " << modtype;
142  iException.addContext(ost.str());
143  throw;
144  }
145 
146  main_input->registerIt();
147 
148  // Fill in "ModuleDescription", in case the input source produces
149  // any EDProducts, which would be registered in the ProductRegistry.
150  // Also fill in the process history item for this process.
151  // There is no module label for the unnamed input source, so
152  // just use "source".
153  // Only the tracked parameters belong in the process configuration.
154  ModuleDescription md(main_input->id(),
155  main_input->getParameter<std::string>("@module_type"),
156  "source",
157  processConfiguration.get(),
158  ModuleDescription::getUniqueID());
159 
160  InputSourceDescription isdesc(md,
161  preg,
162  branchIDListHelper,
163  thinnedAssociationsHelper,
164  areg,
165  common.maxEventsInput_,
166  common.maxLumisInput_,
168  allocations);
169 
170  areg->preSourceConstructionSignal_(md);
171  std::unique_ptr<InputSource> input;
172  try {
173  //even if we have an exception, send the signal
174  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
175  convertException::wrap([&]() {
176  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
177  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
178  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
179  });
180  } catch (cms::Exception& iException) {
181  std::ostringstream ost;
182  ost << "Constructing input source of type " << modtype;
183  iException.addContext(ost.str());
184  throw;
185  }
186  return input;
187  }
188 
189  // ---------------------------------------------------------------
190  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
192  ParameterSet& params) {
193  std::shared_ptr<EDLooperBase> vLooper;
194 
195  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
196 
197  if (loopers.empty()) {
198  return vLooper;
199  }
200 
201  assert(1 == loopers.size());
202 
203  for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
204  ++itName) {
205  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
206  providerPSet->registerIt();
207  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
208  }
209  return vLooper;
210  }
211 
212  // ---------------------------------------------------------------
213  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
214  ServiceToken const& iToken,
216  std::vector<std::string> const& defaultServices,
217  std::vector<std::string> const& forcedServices)
218  : actReg_(),
219  preg_(),
220  branchIDListHelper_(),
221  serviceToken_(),
222  input_(),
223  espController_(new eventsetup::EventSetupsController),
224  esp_(),
225  act_table_(),
226  processConfiguration_(),
227  schedule_(),
228  subProcesses_(),
229  historyAppender_(new HistoryAppender),
230  fb_(),
231  looper_(),
232  deferredExceptionPtrIsSet_(false),
233  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
234  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
235  principalCache_(),
236  beginJobCalled_(false),
237  shouldWeStop_(false),
238  fileModeNoMerge_(false),
239  exceptionMessageFiles_(),
240  exceptionMessageRuns_(),
241  exceptionMessageLumis_(false),
242  forceLooperToEnd_(false),
243  looperBeginJobRun_(false),
244  forceESCacheClearOnNewRun_(false),
245  eventSetupDataToExcludeFromPrefetching_() {
246  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
247  processDesc->addServices(defaultServices, forcedServices);
248  init(processDesc, iToken, iLegacy);
249  }
250 
251  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
252  std::vector<std::string> const& defaultServices,
253  std::vector<std::string> const& forcedServices)
254  : actReg_(),
255  preg_(),
257  serviceToken_(),
258  input_(),
259  espController_(new eventsetup::EventSetupsController),
260  esp_(),
261  act_table_(),
263  schedule_(),
264  subProcesses_(),
266  fb_(),
267  looper_(),
269  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
270  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
271  principalCache_(),
283  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
284  processDesc->addServices(defaultServices, forcedServices);
286  }
287 
288  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
289  ServiceToken const& token,
291  : actReg_(),
292  preg_(),
294  serviceToken_(),
295  input_(),
296  espController_(new eventsetup::EventSetupsController),
297  esp_(),
298  act_table_(),
300  schedule_(),
301  subProcesses_(),
303  fb_(),
304  looper_(),
306  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
307  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
308  principalCache_(),
320  init(processDesc, token, legacy);
321  }
322 
323  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
324  ServiceToken const& iToken,
326  //std::cerr << processDesc->dump() << std::endl;
327 
328  // register the empty parentage vector , once and for all
330 
331  // register the empty parameter set, once and for all.
333 
334  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
335 
336  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
337  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
338  bool const hasSubProcesses = !subProcessVParameterSet.empty();
339 
340  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
341  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
342  // set in here if the parameters were not explicitly set.
343  validateTopLevelParameterSets(parameterSet.get());
344 
345  // Now set some parameters specific to the main process.
346  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
347  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
348  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
349  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
350  << fileMode << ".\n"
351  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
352  } else {
353  fileModeNoMerge_ = (fileMode == "NOMERGE");
354  }
355  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
356 
357  //threading
358  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
359 
360  // Even if numberOfThreads was set to zero in the Python configuration, the code
361  // in cmsRun.cpp should have reset it to something else.
362  assert(nThreads != 0);
363 
364  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
365  if (nStreams == 0) {
366  nStreams = nThreads;
367  }
368  if (nThreads > 1 or nStreams > 1) {
369  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
370  }
371  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
372  if (nConcurrentRuns != 1) {
373  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
374  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
375  }
376  unsigned int nConcurrentLumis =
377  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
378  if (nConcurrentLumis == 0) {
379  nConcurrentLumis = nConcurrentRuns;
380  }
381 
382  //Check that relationships between threading parameters makes sense
383  /*
384  if(nThreads<nStreams) {
385  //bad
386  }
387  if(nConcurrentRuns>nStreams) {
388  //bad
389  }
390  if(nConcurrentRuns>nConcurrentLumis) {
391  //bad
392  }
393  */
394  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
395 
396  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
397 
398  // Now do general initialization
400 
401  //initialize the services
402  auto& serviceSets = processDesc->getServicesPSets();
403  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
404  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
405 
406  //make the services available
408 
409  if (nStreams > 1) {
411  handler->willBeUsingThreads();
412  }
413 
414  // intialize miscellaneous items
415  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
416 
417  // intialize the event setup provider
418  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
419 
420  // initialize the looper, if any
421  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
422  if (looper_) {
423  looper_->setActionTable(items.act_table_.get());
424  looper_->attachTo(*items.actReg_);
425 
426  //For now loopers make us run only 1 transition at a time
427  nStreams = 1;
428  nConcurrentLumis = 1;
429  nConcurrentRuns = 1;
430  }
431 
432  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
433 
434  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
435  streamQueues_.resize(nStreams);
436  streamLumiStatus_.resize(nStreams);
437 
438  // initialize the input source
439  input_ = makeInput(*parameterSet,
440  *common,
441  items.preg(),
442  items.branchIDListHelper(),
443  items.thinnedAssociationsHelper(),
444  items.actReg_,
445  items.processConfiguration(),
447 
448  // intialize the Schedule
449  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
450 
451  // set the data members
452  act_table_ = std::move(items.act_table_);
453  actReg_ = items.actReg_;
454  preg_ = items.preg();
456  branchIDListHelper_ = items.branchIDListHelper();
457  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
458  processConfiguration_ = items.processConfiguration();
460  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
461 
462  FDEBUG(2) << parameterSet << std::endl;
463 
465  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
466  // Reusable event principal
467  auto ep = std::make_shared<EventPrincipal>(preg(),
471  historyAppender_.get(),
472  index);
474  }
475 
476  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
477  auto lp =
478  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
480  }
481 
482  // fill the subprocesses, if there are any
483  subProcesses_.reserve(subProcessVParameterSet.size());
484  for (auto& subProcessPSet : subProcessVParameterSet) {
485  subProcesses_.emplace_back(subProcessPSet,
486  *parameterSet,
487  preg(),
492  *actReg_,
493  token,
496  &processContext_);
497  }
498  }
499 
501  // Make the services available while everything is being deleted.
502  ServiceToken token = getToken();
503  ServiceRegistry::Operate op(token);
504 
505  // manually destroy all these thing that may need the services around
506  // propagate_const<T> has no reset() function
507  espController_ = nullptr;
508  esp_ = nullptr;
509  schedule_ = nullptr;
510  input_ = nullptr;
511  looper_ = nullptr;
512  actReg_ = nullptr;
513 
516  }
517 
519  if (beginJobCalled_)
520  return;
521  beginJobCalled_ = true;
522  bk::beginJob();
523 
524  // StateSentry toerror(this); // should we add this ?
525  //make the services available
527 
532  actReg_->preallocateSignal_(bounds);
533  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
535 
536  //NOTE: this may throw
538  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
539 
542  }
543  //NOTE: This implementation assumes 'Job' means one call
544  // the EventProcessor::run
545  // If it really means once per 'application' then this code will
546  // have to be changed.
547  // Also have to deal with case where have 'run' then new Module
548  // added and do 'run'
549  // again. In that case the newly added Module needs its 'beginJob'
550  // to be called.
551 
552  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
553  // For now we delay calling beginOfJob until first beginOfRun
554  //if(looper_) {
555  // looper_->beginOfJob(es);
556  //}
557  try {
558  convertException::wrap([&]() { input_->doBeginJob(); });
559  } catch (cms::Exception& ex) {
560  ex.addContext("Calling beginJob for the source");
561  throw;
562  }
563  espController_->finishConfiguration();
564  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
565  // toerror.succeeded(); // should we add this?
566  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
567  actReg_->postBeginJobSignal_();
568 
569  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
570  schedule_->beginStream(i);
571  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
572  }
573  }
574 
576  // Collects exceptions, so we don't throw before all operations are performed.
578  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
579 
580  //make the services available
582 
583  //NOTE: this really should go elsewhere in the future
584  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
585  c.call([this, i]() { this->schedule_->endStream(i); });
586  for (auto& subProcess : subProcesses_) {
587  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
588  }
589  }
590  auto actReg = actReg_.get();
591  c.call([actReg]() { actReg->preEndJobSignal_(); });
592  schedule_->endJob(c);
593  for (auto& subProcess : subProcesses_) {
594  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
595  }
596  c.call(std::bind(&InputSource::doEndJob, input_.get()));
597  if (looper_) {
598  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
599  }
600  c.call([actReg]() { actReg->postEndJobSignal_(); });
601  if (c.hasThrown()) {
602  c.rethrow();
603  }
604  }
605 
607 
608  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
609  return schedule_->getAllModuleDescriptions();
610  }
611 
612  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
613 
614  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
615 
616  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
617 
618  void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); }
619 
620  bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); }
621 
622  void EventProcessor::getTriggerReport(TriggerReport& rep) const { schedule_->getTriggerReport(rep); }
623 
624  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
625 
626  namespace {
627 #include "TransitionProcessors.icc"
628  }
629 
631  bool returnValue = false;
632 
633  // Look for a shutdown signal
634  if (shutdown_flag.load(std::memory_order_acquire)) {
635  returnValue = true;
636  returnCode = epSignal;
637  }
638  return returnValue;
639  }
640 
642  if (deferredExceptionPtrIsSet_.load()) {
644  return InputSource::IsStop;
645  }
646 
647  SendSourceTerminationSignalIfException sentry(actReg_.get());
648  InputSource::ItemType itemType;
649  //For now, do nothing with InputSource::IsSynchronize
650  do {
651  itemType = input_->nextItemType();
652  } while (itemType == InputSource::IsSynchronize);
653 
654  lastSourceTransition_ = itemType;
655  sentry.completedSuccessfully();
656 
658 
659  if (checkForAsyncStopRequest(returnCode)) {
660  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
662  }
663 
664  return lastSourceTransition_;
665  }
666 
667  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
668  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
669  }
670 
672 
676  {
677  beginJob(); //make sure this was called
678 
679  // make the services available
681 
683  try {
684  FilesProcessor fp(fileModeNoMerge_);
685 
686  convertException::wrap([&]() {
687  bool firstTime = true;
688  do {
689  if (not firstTime) {
691  rewindInput();
692  } else {
693  firstTime = false;
694  }
695  startingNewLoop();
696 
697  auto trans = fp.processFiles(*this);
698 
699  fp.normalEnd();
700 
701  if (deferredExceptionPtrIsSet_.load()) {
702  std::rethrow_exception(deferredExceptionPtr_);
703  }
704  if (trans != InputSource::IsStop) {
705  //problem with the source
706  doErrorStuff();
707 
708  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
709  }
710  } while (not endOfLoop());
711  }); // convertException::wrap
712 
713  } // Try block
714  catch (cms::Exception& e) {
716  std::string message(
717  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
718  e.addAdditionalInfo(message);
719  if (e.alreadyPrinted()) {
720  LogAbsolute("Additional Exceptions") << message;
721  }
722  }
723  if (!exceptionMessageRuns_.empty()) {
725  if (e.alreadyPrinted()) {
726  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
727  }
728  }
729  if (!exceptionMessageFiles_.empty()) {
731  if (e.alreadyPrinted()) {
732  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
733  }
734  }
735  throw;
736  }
737  }
738 
739  return returnCode;
740  }
741 
743  FDEBUG(1) << " \treadFile\n";
744  size_t size = preg_->size();
745  SendSourceTerminationSignalIfException sentry(actReg_.get());
746 
748 
749  fb_ = input_->readFile();
750  if (size < preg_->size()) {
752  }
755  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
756  }
757  sentry.completedSuccessfully();
758  }
759 
760  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
761  if (fb_.get() != nullptr) {
762  SendSourceTerminationSignalIfException sentry(actReg_.get());
763  input_->closeFile(fb_.get(), cleaningUpAfterException);
764  sentry.completedSuccessfully();
765  }
766  FDEBUG(1) << "\tcloseInputFile\n";
767  }
768 
770  if (fb_.get() != nullptr) {
771  schedule_->openOutputFiles(*fb_);
772  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
773  }
774  FDEBUG(1) << "\topenOutputFiles\n";
775  }
776 
778  if (fb_.get() != nullptr) {
779  schedule_->closeOutputFiles();
780  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
781  }
782  FDEBUG(1) << "\tcloseOutputFiles\n";
783  }
784 
787  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
788  if (fb_.get() != nullptr) {
789  schedule_->respondToOpenInputFile(*fb_);
790  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
791  }
792  FDEBUG(1) << "\trespondToOpenInputFile\n";
793  }
794 
796  if (fb_.get() != nullptr) {
797  schedule_->respondToCloseInputFile(*fb_);
798  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
799  }
800  FDEBUG(1) << "\trespondToCloseInputFile\n";
801  }
802 
804  shouldWeStop_ = false;
805  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
806  // until after we've called beginOfJob
807  if (looper_ && looperBeginJobRun_) {
808  looper_->doStartingNewLoop();
809  }
810  FDEBUG(1) << "\tstartingNewLoop\n";
811  }
812 
814  if (looper_) {
815  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
816  looper_->setModuleChanger(&changer);
817  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
818  looper_->setModuleChanger(nullptr);
820  return true;
821  else
822  return false;
823  }
824  FDEBUG(1) << "\tendOfLoop\n";
825  return true;
826  }
827 
829  input_->repeat();
830  input_->rewind();
831  FDEBUG(1) << "\trewind\n";
832  }
833 
835  looper_->prepareForNextLoop(esp_.get());
836  FDEBUG(1) << "\tprepareForNextLoop\n";
837  }
838 
840  FDEBUG(1) << "\tshouldWeCloseOutput\n";
841  if (!subProcesses_.empty()) {
842  for (auto const& subProcess : subProcesses_) {
843  if (subProcess.shouldWeCloseOutput()) {
844  return true;
845  }
846  }
847  return false;
848  }
849  return schedule_->shouldWeCloseOutput();
850  }
851 
853  FDEBUG(1) << "\tdoErrorStuff\n";
854  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
855  << "and went to the error state\n"
856  << "Will attempt to terminate processing normally\n"
857  << "(IF using the looper the next loop will be attempted)\n"
858  << "This likely indicates a bug in an input module or corrupted input or both\n";
859  }
860 
863  bool& globalBeginSucceeded,
864  bool& eventSetupForInstanceSucceeded) {
865  globalBeginSucceeded = false;
866  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
867  {
868  SendSourceTerminationSignalIfException sentry(actReg_.get());
869 
870  input_->doBeginRun(runPrincipal, &processContext_);
871  sentry.completedSuccessfully();
872  }
873 
874  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
876  espController_->forceCacheClear();
877  }
878  {
879  SendSourceTerminationSignalIfException sentry(actReg_.get());
880  espController_->eventSetupForInstance(ts);
881  eventSetupForInstanceSucceeded = true;
882  sentry.completedSuccessfully();
883  }
884  auto const& es = esp_->eventSetup();
885  if (looper_ && looperBeginJobRun_ == false) {
886  looper_->copyInfo(ScheduleInfo(schedule_.get()));
887  looper_->beginOfJob(es);
888  looperBeginJobRun_ = true;
889  looper_->doStartingNewLoop();
890  }
891  {
893  auto globalWaitTask = make_empty_waiting_task();
894  globalWaitTask->increment_ref_count();
895  beginGlobalTransitionAsync<Traits>(
896  WaitingTaskHolder(globalWaitTask.get()), *schedule_, runPrincipal, ts, es, serviceToken_, subProcesses_);
897  globalWaitTask->wait_for_all();
898  if (globalWaitTask->exceptionPtr() != nullptr) {
899  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
900  }
901  }
902  globalBeginSucceeded = true;
903  FDEBUG(1) << "\tbeginRun " << run << "\n";
904  if (looper_) {
905  looper_->doBeginRun(runPrincipal, es, &processContext_);
906  }
907  {
908  //To wait, the ref count has to be 1+#streams
909  auto streamLoopWaitTask = make_empty_waiting_task();
910  streamLoopWaitTask->increment_ref_count();
911 
913 
914  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
915  *schedule_,
917  runPrincipal,
918  ts,
919  es,
921  subProcesses_);
922 
923  streamLoopWaitTask->wait_for_all();
924  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
925  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
926  }
927  }
928  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
929  if (looper_) {
930  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
931  }
932  }
933 
936  bool globalBeginSucceeded,
937  bool cleaningUpAfterException,
938  bool eventSetupForInstanceSucceeded) {
939  if (eventSetupForInstanceSucceeded) {
940  //If we skip empty runs, this would be called conditionally
941  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
942 
943  if (globalBeginSucceeded) {
945  t->increment_ref_count();
946  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
947  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
948  mergeableRunProductMetadata->preWriteRun();
949  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
950  t->wait_for_all();
951  mergeableRunProductMetadata->postWriteRun();
952  if (t->exceptionPtr()) {
953  std::rethrow_exception(*t->exceptionPtr());
954  }
955  }
956  }
957  deleteRunFromCache(phid, run);
958  }
959 
962  bool globalBeginSucceeded,
963  bool cleaningUpAfterException) {
964  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
965  runPrincipal.setEndTime(input_->timestamp());
966 
967  IOVSyncValue ts(
969  runPrincipal.endTime());
970  {
971  SendSourceTerminationSignalIfException sentry(actReg_.get());
972  espController_->eventSetupForInstance(ts);
973  sentry.completedSuccessfully();
974  }
975  auto const& es = esp_->eventSetup();
976  if (globalBeginSucceeded) {
977  //To wait, the ref count has to be 1+#streams
978  auto streamLoopWaitTask = make_empty_waiting_task();
979  streamLoopWaitTask->increment_ref_count();
980 
982 
983  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
984  *schedule_,
986  runPrincipal,
987  ts,
988  es,
991  cleaningUpAfterException);
992 
993  streamLoopWaitTask->wait_for_all();
994  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
995  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
996  }
997  }
998  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
999  if (looper_) {
1000  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1001  }
1002  {
1003  auto globalWaitTask = make_empty_waiting_task();
1004  globalWaitTask->increment_ref_count();
1005 
1007  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1008  *schedule_,
1009  runPrincipal,
1010  ts,
1011  es,
1012  serviceToken_,
1013  subProcesses_,
1014  cleaningUpAfterException);
1015  globalWaitTask->wait_for_all();
1016  if (globalWaitTask->exceptionPtr() != nullptr) {
1017  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1018  }
1019  }
1020  FDEBUG(1) << "\tendRun " << run << "\n";
1021  if (looper_) {
1022  looper_->doEndRun(runPrincipal, es, &processContext_);
1023  }
1024  }
1025 
1026  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1027  auto waitTask = make_empty_waiting_task();
1028  waitTask->increment_ref_count();
1029 
1030  if (streamLumiActive_ > 0) {
1032  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1033  } else {
1034  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1035  input_->luminosityBlockAuxiliary()->beginTime()),
1036  iRunResource,
1037  WaitingTaskHolder{waitTask.get()});
1038  }
1039  waitTask->wait_for_all();
1040 
1041  if (waitTask->exceptionPtr() != nullptr) {
1042  std::rethrow_exception(*(waitTask->exceptionPtr()));
1043  }
1044  return lastTransitionType();
1045  }
1046 
1048  std::shared_ptr<void> const& iRunResource,
1049  edm::WaitingTaskHolder iHolder) {
1050  if (iHolder.taskHasFailed()) {
1051  return;
1052  }
1053 
1054  // We must be careful with the status object here and in code this function calls. IF we want
1055  // endRun to be called, then the status object must be destroyed before the things waiting on
1056  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1057  // endRun to be called much later than it should be, because it is holding iRunResource).
1058  auto status =
1059  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1060 
1061  auto lumiWork = [this, iHolder, status = std::move(status)](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1062  if (iHolder.taskHasFailed()) {
1063  status.reset();
1064  return;
1065  }
1066 
1067  status->setResumer(std::move(iResumer));
1068 
1069  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1070  //make the services available
1072 
1073  try {
1074  readLuminosityBlock(*status);
1075 
1076  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1077  {
1078  SendSourceTerminationSignalIfException sentry(actReg_.get());
1079 
1080  input_->doBeginLumi(lumiPrincipal, &processContext_);
1081  sentry.completedSuccessfully();
1082  }
1083 
1085  if (rng.isAvailable()) {
1086  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1087  rng->preBeginLumi(lb);
1088  }
1089 
1090  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1091 
1092  //Task to start the stream beginLumis
1093  auto beginStreamsTask = make_waiting_task(
1094  tbb::task::allocate_root(),
1095  [this, holder = iHolder, status = std::move(status), ts](std::exception_ptr const* iPtr) mutable {
1096  if (iPtr) {
1097  status.reset();
1098  holder.doneWaiting(*iPtr);
1099  } else {
1100  status->globalBeginDidSucceed();
1101  auto const& es = esp_->eventSetup();
1102  if (looper_) {
1103  try {
1104  //make the services available
1106  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1107  } catch (...) {
1108  status.reset();
1109  holder.doneWaiting(std::current_exception());
1110  return;
1111  }
1112  }
1114 
1115  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1116  streamQueues_[i].push([this, i, status, holder, ts, &es]() mutable {
1117  streamQueues_[i].pause();
1118 
1119  auto eventTask = edm::make_waiting_task(
1120  tbb::task::allocate_root(), [this, i, h = holder](std::exception_ptr const* iPtr) mutable {
1121  if (iPtr) {
1123  tmp.doneWaiting(*iPtr);
1125  } else {
1127  }
1128  });
1129  auto& event = principalCache_.eventPrincipal(i);
1132  auto lp = status->lumiPrincipal();
1133  event.setLuminosityBlockPrincipal(lp.get());
1134  beginStreamTransitionAsync<Traits>(
1135  WaitingTaskHolder{eventTask}, *schedule_, i, *lp, ts, es, serviceToken_, subProcesses_);
1136  status.reset();
1137  });
1138  }
1139  status.reset();
1140  }
1141  }); // beginStreamTask
1142 
1143  //task to start the global begin lumi
1144  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1145  auto const& es = esp_->eventSetup();
1146  {
1148  beginGlobalTransitionAsync<Traits>(
1149  beginStreamsHolder, *schedule_, lumiPrincipal, ts, es, serviceToken_, subProcesses_);
1150  }
1151  } catch (...) {
1152  status.reset();
1153  iHolder.doneWaiting(std::current_exception());
1154  }
1155  }); // task in sourceResourcesAcquirer
1156  }; // end lumiWork
1157 
1158  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1159  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1160  if (espController_->isWithinValidityInterval(iSync)) {
1161  iovQueue_.pause();
1162  lumiQueue_->pushAndPause(std::move(lumiWork));
1163  } else {
1164  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1165  iovQueue_.push([this, iHolder, lumiWork, iSync]() mutable {
1166  try {
1167  SendSourceTerminationSignalIfException sentry(actReg_.get());
1168  espController_->eventSetupForInstance(iSync);
1169  sentry.completedSuccessfully();
1170  } catch (...) {
1171  iHolder.doneWaiting(std::current_exception());
1172  return;
1173  }
1174  iovQueue_.pause();
1175  lumiQueue_->pushAndPause(std::move(lumiWork));
1176  });
1177  }
1178  }
1179 
1181  {
1182  //all streams are sharing the same status at the moment
1183  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1184  status->needToContinueLumi();
1185  status->startProcessingEvents();
1186  }
1187 
1188  unsigned int streamIndex = 0;
1189  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1190  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1191  handleNextEventForStreamAsync(std::move(h), streamIndex);
1192  }));
1193  }
1194  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1195  handleNextEventForStreamAsync(std::move(h), streamIndex);
1196  }));
1197  }
1198 
1199  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1200  if (setDeferredException(*iPtr)) {
1201  WaitingTaskHolder tmp(holder);
1202  tmp.doneWaiting(*iPtr);
1203  } else {
1205  }
1206  }
1207 
1209  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1210  // Get some needed info out of the status object before moving
1211  // it into finalTaskForThisLumi.
1212  auto& lp = *(iLumiStatus->lumiPrincipal());
1213  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1214  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1215 
1216  auto finalTaskForThisLumi = edm::make_waiting_task(
1217  tbb::task::allocate_root(),
1218  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1219  std::exception_ptr ptr;
1220  if (iPtr) {
1221  handleEndLumiExceptions(iPtr, iTask);
1222  } else {
1223  try {
1225  if (looper_) {
1226  auto& lp = *(status->lumiPrincipal());
1227  auto const& es = esp_->eventSetup();
1228  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1229  }
1230  } catch (...) {
1231  ptr = std::current_exception();
1232  }
1233  }
1235 
1236  // Try hard to clean up resources so the
1237  // process can terminate in a controlled
1238  // fashion even after exceptions have occurred.
1239 
1240  try {
1242  } catch (...) {
1243  if (not ptr) {
1244  ptr = std::current_exception();
1245  }
1246  }
1247 
1248  try {
1249  //release our hold on the IOV
1250  iovQueue_.resume();
1251  } catch (...) {
1252  if (not ptr) {
1253  ptr = std::current_exception();
1254  }
1255  }
1256 
1257  try {
1258  status->resumeGlobalLumiQueue();
1259  } catch (...) {
1260  if (not ptr) {
1261  ptr = std::current_exception();
1262  }
1263  }
1264 
1265  try {
1266  // This call to status.reset() must occur before iTask is destroyed.
1267  // Otherwise there will be a data race which could result in endRun
1268  // being delayed until it is too late to successfully call it.
1269  status.reset();
1270  } catch (...) {
1271  if (not ptr) {
1272  ptr = std::current_exception();
1273  }
1274  }
1275 
1276  if (ptr) {
1277  handleEndLumiExceptions(&ptr, iTask);
1278  }
1279  });
1280 
1281  auto writeT = edm::make_waiting_task(
1282  tbb::task::allocate_root(),
1283  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1284  std::exception_ptr const* iExcept) mutable {
1285  if (iExcept) {
1286  task.doneWaiting(*iExcept);
1287  } else {
1288  //Only call writeLumi if beginLumi succeeded
1289  if (didGlobalBeginSucceed) {
1290  writeLumiAsync(std::move(task), lumiPrincipal);
1291  }
1292  }
1293  });
1294 
1295  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1296 
1298  auto const& es = esp_->eventSetup();
1299 
1300  endGlobalTransitionAsync<Traits>(
1301  WaitingTaskHolder(writeT), *schedule_, lp, ts, es, serviceToken_, subProcesses_, cleaningUpAfterException);
1302  }
1303 
1305  unsigned int iStreamIndex,
1306  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1307  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1308  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1309  if (iPtr) {
1310  handleEndLumiExceptions(iPtr, iTask);
1311  }
1312  auto status = streamLumiStatus_[iStreamIndex];
1313  //reset status before releasing queue else get race condtion
1314  streamLumiStatus_[iStreamIndex].reset();
1316  streamQueues_[iStreamIndex].resume();
1317 
1318  //are we the last one?
1319  if (status->streamFinishedLumi()) {
1321  } else {
1322  status.reset();
1323  }
1324  });
1325 
1326  edm::WaitingTaskHolder lumiDoneTask{t};
1327 
1328  iLumiStatus->setEndTime();
1329 
1330  if (iLumiStatus->didGlobalBeginSucceed()) {
1331  auto& lumiPrincipal = *iLumiStatus->lumiPrincipal();
1332  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1333  lumiPrincipal.endTime());
1334  auto const& es = esp_->eventSetup();
1335 
1336  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1337 
1339  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1340  *schedule_,
1341  iStreamIndex,
1342  lumiPrincipal,
1343  ts,
1344  es,
1345  serviceToken_,
1346  subProcesses_,
1347  cleaningUpAfterException);
1348  }
1349  iLumiStatus.reset();
1350  }
1351 
1353  if (streamLumiActive_.load() > 0) {
1354  auto globalWaitTask = make_empty_waiting_task();
1355  globalWaitTask->increment_ref_count();
1356  {
1357  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1358  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1359  if (streamLumiStatus_[i]) {
1360  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1361  }
1362  }
1363  }
1364  globalWaitTask->wait_for_all();
1365  if (globalWaitTask->exceptionPtr() != nullptr) {
1366  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1367  }
1368  }
1369  }
1370 
1371  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1373  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1374  << "Illegal attempt to insert run into cache\n"
1375  << "Contact a Framework Developer\n";
1376  }
1377  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1378  preg(),
1380  historyAppender_.get(),
1381  0,
1382  true,
1384  {
1385  SendSourceTerminationSignalIfException sentry(actReg_.get());
1386  input_->readRun(*rp, *historyAppender_);
1387  sentry.completedSuccessfully();
1388  }
1389  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1390  principalCache_.insert(rp);
1391  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1392  }
1393 
1394  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1395  principalCache_.merge(input_->runAuxiliary(), preg());
1396  auto runPrincipal = principalCache_.runPrincipalPtr();
1397  {
1398  SendSourceTerminationSignalIfException sentry(actReg_.get());
1399  input_->readAndMergeRun(*runPrincipal);
1400  sentry.completedSuccessfully();
1401  }
1402  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1403  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1404  }
1405 
1408  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1409  << "Illegal attempt to insert lumi into cache\n"
1410  << "Run is invalid\n"
1411  << "Contact a Framework Developer\n";
1412  }
1414  assert(lbp);
1415  lbp->setAux(*input_->luminosityBlockAuxiliary());
1416  {
1417  SendSourceTerminationSignalIfException sentry(actReg_.get());
1418  input_->readLuminosityBlock(*lbp, *historyAppender_);
1419  sentry.completedSuccessfully();
1420  }
1421  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1422  iStatus.lumiPrincipal() = std::move(lbp);
1423  }
1424 
1426  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1427  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1428  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1429  input_->processHistoryRegistry().reducedProcessHistoryID(
1430  input_->luminosityBlockAuxiliary()->processHistoryID()));
1431  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1432  assert(lumiOK);
1433  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1434  {
1435  SendSourceTerminationSignalIfException sentry(actReg_.get());
1436  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1437  sentry.completedSuccessfully();
1438  }
1439  return input_->luminosityBlock();
1440  }
1441 
1443  ProcessHistoryID const& phid,
1444  RunNumber_t run,
1445  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1446  auto subsT = edm::make_waiting_task(
1447  tbb::task::allocate_root(),
1448  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1449  if (iExcept) {
1450  task.doneWaiting(*iExcept);
1451  } else {
1453  for (auto& s : subProcesses_) {
1454  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1455  }
1456  }
1457  });
1459  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1460  principalCache_.runPrincipal(phid, run),
1461  &processContext_,
1462  actReg_.get(),
1463  mergeableRunProductMetadata);
1464  }
1465 
1467  principalCache_.deleteRun(phid, run);
1468  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1469  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1470  }
1471 
1473  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1474  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1475  if (iExcept) {
1476  task.doneWaiting(*iExcept);
1477  } else {
1479  for (auto& s : subProcesses_) {
1480  s.writeLumiAsync(task, lumiPrincipal);
1481  }
1482  }
1483  });
1485 
1486  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1487 
1488  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1489  }
1490 
1492  for (auto& s : subProcesses_) {
1493  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1494  }
1495  iStatus.lumiPrincipal()->clearPrincipal();
1496  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1497  }
1498 
1500  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1501  iStatus.endLumi();
1502  return false;
1503  }
1504 
1505  if (iStatus.wasEventProcessingStopped()) {
1506  return false;
1507  }
1508 
1509  if (shouldWeStop()) {
1511  iStatus.stopProcessingEvents();
1512  iStatus.endLumi();
1513  return false;
1514  }
1515 
1517  try {
1518  //need to use lock in addition to the serial task queue because
1519  // of delayed provenance reading and reading data in response to
1520  // edm::Refs etc
1521  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1522 
1523  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1524  if (InputSource::IsLumi == itemType) {
1525  iStatus.haveContinuedLumi();
1526  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1527  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1528  readAndMergeLumi(iStatus);
1529  itemType = nextTransitionType();
1530  }
1531  if (InputSource::IsLumi == itemType) {
1532  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1533  input_->luminosityBlockAuxiliary()->beginTime()));
1534  }
1535  }
1536  if (InputSource::IsEvent != itemType) {
1537  iStatus.stopProcessingEvents();
1538 
1539  //IsFile may continue processing the lumi and
1540  // looper_ can cause the input source to declare a new IsRun which is actually
1541  // just a continuation of the previous run
1542  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1543  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1544  iStatus.endLumi();
1545  }
1546  return false;
1547  }
1548  readEvent(iStreamIndex);
1549  } catch (...) {
1550  bool expected = false;
1551  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1552  deferredExceptionPtr_ = std::current_exception();
1553  iStatus.endLumi();
1554  }
1555  return false;
1556  }
1557  return true;
1558  }
1559 
1560  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1561  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1563  auto& status = streamLumiStatus_[iStreamIndex];
1564  try {
1565  if (readNextEventForStream(iStreamIndex, *status)) {
1566  auto recursionTask = make_waiting_task(
1567  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1568  if (iPtr) {
1569  // Try to end the stream properly even if an exception was
1570  // thrown on an event.
1571  bool expected = false;
1572  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1573  // This is the case where the exception in iPtr is the primary
1574  // exception and we want to see its message.
1575  deferredExceptionPtr_ = *iPtr;
1576  WaitingTaskHolder tempHolder(iTask);
1577  tempHolder.doneWaiting(*iPtr);
1578  }
1579  streamEndLumiAsync(std::move(iTask), iStreamIndex, streamLumiStatus_[iStreamIndex]);
1580  //the stream will stop now
1581  return;
1582  }
1583  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1584  });
1585 
1586  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1587  } else {
1588  //the stream will stop now
1589  if (status->isLumiEnding()) {
1590  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1591  status->startNextLumi();
1592  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1593  }
1594  streamEndLumiAsync(std::move(iTask), iStreamIndex, status);
1595  } else {
1596  iTask.doneWaiting(std::exception_ptr{});
1597  }
1598  }
1599  } catch (...) {
1600  // It is unlikely we will ever get in here ...
1601  // But if we do try to clean up and propagate the exception
1602  if (streamLumiStatus_[iStreamIndex]) {
1603  streamEndLumiAsync(iTask, iStreamIndex, streamLumiStatus_[iStreamIndex]);
1604  }
1605  bool expected = false;
1606  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1607  auto e = std::current_exception();
1609  iTask.doneWaiting(e);
1610  }
1611  }
1612  });
1613  }
1614 
1615  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1616  //TODO this will have to become per stream
1617  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1618  StreamContext streamContext(event.streamID(), &processContext_);
1619 
1620  SendSourceTerminationSignalIfException sentry(actReg_.get());
1621  input_->readEvent(event, streamContext);
1622 
1623  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1624  sentry.completedSuccessfully();
1625 
1626  FDEBUG(1) << "\treadEvent\n";
1627  }
1628 
1629  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1630  tbb::task::spawn(
1631  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1632  }
1633 
1634  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1635  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1636 
1639  if (rng.isAvailable()) {
1640  Event ev(*pep, ModuleDescription(), nullptr);
1641  rng->postEventRead(ev);
1642  }
1643 
1644  WaitingTaskHolder finalizeEventTask(
1645  make_waiting_task(tbb::task::allocate_root(), [this, pep, iHolder](std::exception_ptr const* iPtr) mutable {
1646  //NOTE: If we have a looper we only have one Stream
1647  if (looper_) {
1649  processEventWithLooper(*pep);
1650  }
1651 
1652  FDEBUG(1) << "\tprocessEvent\n";
1653  pep->clearEventPrincipal();
1654  if (iPtr) {
1655  iHolder.doneWaiting(*iPtr);
1656  } else {
1657  iHolder.doneWaiting(std::exception_ptr());
1658  }
1659  }));
1660  WaitingTaskHolder afterProcessTask;
1661  if (subProcesses_.empty()) {
1662  afterProcessTask = std::move(finalizeEventTask);
1663  } else {
1664  //Need to run SubProcesses after schedule has finished
1665  // with the event
1666  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1667  tbb::task::allocate_root(), [this, pep, finalizeEventTask](std::exception_ptr const* iPtr) mutable {
1668  if (not iPtr) {
1669  //when run with 1 thread, we want to the order to be what
1670  // it was before. This requires reversing the order since
1671  // tasks are run last one in first one out
1672  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1673  subProcess.doEventAsync(finalizeEventTask, *pep);
1674  }
1675  } else {
1676  finalizeEventTask.doneWaiting(*iPtr);
1677  }
1678  }));
1679  }
1680 
1681  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, *pep, esp_->eventSetup(), serviceToken_);
1682  }
1683 
1685  bool randomAccess = input_->randomAccess();
1686  ProcessingController::ForwardState forwardState = input_->forwardState();
1687  ProcessingController::ReverseState reverseState = input_->reverseState();
1688  ProcessingController pc(forwardState, reverseState, randomAccess);
1689 
1691  do {
1692  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1693  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1694 
1695  bool succeeded = true;
1696  if (randomAccess) {
1698  input_->skipEvents(-2);
1700  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1701  }
1702  }
1703  pc.setLastOperationSucceeded(succeeded);
1704  } while (!pc.lastOperationSucceeded());
1705  if (status != EDLooperBase::kContinue) {
1706  shouldWeStop_ = true;
1708  }
1709  }
1710 
1712  FDEBUG(1) << "\tshouldWeStop\n";
1713  if (shouldWeStop_)
1714  return true;
1715  if (!subProcesses_.empty()) {
1716  for (auto const& subProcess : subProcesses_) {
1717  if (subProcess.terminate()) {
1718  return true;
1719  }
1720  }
1721  return false;
1722  }
1723  return schedule_->terminate();
1724  }
1725 
1727 
1729 
1731 
1732  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1733  bool expected = false;
1734  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1735  deferredExceptionPtr_ = iException;
1736  return true;
1737  }
1738  return false;
1739  }
1740 
1742  std::unique_ptr<LogSystem> s;
1743  for (auto worker : schedule_->allWorkers()) {
1744  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1745  if (not s) {
1746  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1747  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1748  }
1749  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1750  }
1751  }
1752  }
1753 } // namespace edm
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
bool ev
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:60
void adjustIndexesAfterProductRegistryAddition()
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
Definition: Exception.cc:177
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:48
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:70
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:66
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:40
void clear()
Not thread safe.
Definition: Registry.cc:40
Timestamp const & endTime() const
Definition: RunPrincipal.h:68
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
virtual void endOfJob()
Definition: EDLooperBase.cc:90
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1190
int totalEvents() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void writeLumi(LuminosityBlockNumber_t lumi)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:648
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:78
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:22