CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
43 
45 
53 
58 
63 
76 
77 #include "MessageForSource.h"
78 #include "MessageForParent.h"
80 
81 #include "boost/range/adaptor/reversed.hpp"
82 
83 #include <cassert>
84 #include <exception>
85 #include <iomanip>
86 #include <iostream>
87 #include <utility>
88 #include <sstream>
89 
90 #include <sys/ipc.h>
91 #include <sys/msg.h>
92 
93 #include "oneapi/tbb/task.h"
94 
95 //Used for CPU affinity
96 #ifndef __APPLE__
97 #include <sched.h>
98 #endif
99 
100 namespace {
101  //Sentry class to only send a signal if an
102  // exception occurs. An exception is identified
103  // by the destructor being called without first
104  // calling completedSuccessfully().
105  class SendSourceTerminationSignalIfException {
106  public:
107  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
108  ~SendSourceTerminationSignalIfException() {
109  if (reg_) {
110  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
111  }
112  }
113  void completedSuccessfully() { reg_ = nullptr; }
114 
115  private:
116  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
117  };
118 } // namespace
119 
120 namespace edm {
121 
122  namespace chain = waiting_task::chain;
123 
124  // ---------------------------------------------------------------
125  std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
127  CommonParams const& common,
128  std::shared_ptr<ProductRegistry> preg,
129  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
130  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
131  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
132  std::shared_ptr<ActivityRegistry> areg,
133  std::shared_ptr<ProcessConfiguration const> processConfiguration,
134  PreallocationConfiguration const& allocations) {
135  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
136  if (main_input == nullptr) {
138  << "There must be exactly one source in the configuration.\n"
139  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
140  }
141 
142  std::string modtype(main_input->getParameter<std::string>("@module_type"));
143 
144  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
146  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
147  filler->fill(descriptions);
148 
149  try {
150  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
151  } catch (cms::Exception& iException) {
152  std::ostringstream ost;
153  ost << "Validating configuration of input source of type " << modtype;
154  iException.addContext(ost.str());
155  throw;
156  }
157 
158  main_input->registerIt();
159 
160  // Fill in "ModuleDescription", in case the input source produces
161  // any EDProducts, which would be registered in the ProductRegistry.
162  // Also fill in the process history item for this process.
163  // There is no module label for the unnamed input source, so
164  // just use "source".
165  // Only the tracked parameters belong in the process configuration.
166  ModuleDescription md(main_input->id(),
167  main_input->getParameter<std::string>("@module_type"),
168  "source",
169  processConfiguration.get(),
170  moduleIndex);
171 
172  InputSourceDescription isdesc(md,
173  preg,
174  branchIDListHelper,
175  processBlockHelper,
176  thinnedAssociationsHelper,
177  areg,
178  common.maxEventsInput_,
179  common.maxLumisInput_,
180  common.maxSecondsUntilRampdown_,
181  allocations);
182 
183  areg->preSourceConstructionSignal_(md);
184  std::unique_ptr<InputSource> input;
185  try {
186  //even if we have an exception, send the signal
187  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
188  convertException::wrap([&]() {
189  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
190  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
191  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
192  });
193  } catch (cms::Exception& iException) {
194  std::ostringstream ost;
195  ost << "Constructing input source of type " << modtype;
196  iException.addContext(ost.str());
197  throw;
198  }
199  return input;
200  }
201 
202  // ---------------------------------------------------------------
204  auto const modtype = pset.getParameter<std::string>("@module_type");
205  auto const moduleLabel = pset.getParameter<std::string>("@module_label");
207  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
208  filler->fill(descriptions);
209  try {
210  edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
211  } catch (cms::Exception& iException) {
212  iException.addContext(
213  fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
214  throw;
215  }
216  }
217 
218  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
221  std::vector<std::string> const& loopers) {
222  std::shared_ptr<EDLooperBase> vLooper;
223 
224  assert(1 == loopers.size());
225 
226  for (auto const& looperName : loopers) {
227  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
228  validateLooper(*providerPSet);
229  providerPSet->registerIt();
230  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
231  }
232  return vLooper;
233  }
234 
235  // ---------------------------------------------------------------
236  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
237  ServiceToken const& iToken,
239  std::vector<std::string> const& defaultServices,
240  std::vector<std::string> const& forcedServices)
241  : actReg_(),
242  preg_(),
243  branchIDListHelper_(),
244  serviceToken_(),
245  input_(),
246  espController_(new eventsetup::EventSetupsController),
247  esp_(),
248  act_table_(),
249  processConfiguration_(),
250  schedule_(),
251  subProcesses_(),
252  historyAppender_(new HistoryAppender),
253  fb_(),
254  looper_(),
255  deferredExceptionPtrIsSet_(false),
256  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
257  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
258  principalCache_(),
259  beginJobCalled_(false),
260  shouldWeStop_(false),
261  fileModeNoMerge_(false),
262  exceptionMessageFiles_(),
263  exceptionMessageRuns_(false),
264  exceptionMessageLumis_(false),
265  forceLooperToEnd_(false),
266  looperBeginJobRun_(false),
267  forceESCacheClearOnNewRun_(false),
268  eventSetupDataToExcludeFromPrefetching_() {
269  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
270  processDesc->addServices(defaultServices, forcedServices);
271  init(processDesc, iToken, iLegacy);
272  }
273 
274  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
275  std::vector<std::string> const& defaultServices,
276  std::vector<std::string> const& forcedServices)
277  : actReg_(),
278  preg_(),
279  branchIDListHelper_(),
280  serviceToken_(),
281  input_(),
282  espController_(new eventsetup::EventSetupsController),
283  esp_(),
284  act_table_(),
285  processConfiguration_(),
286  schedule_(),
287  subProcesses_(),
288  historyAppender_(new HistoryAppender),
289  fb_(),
290  looper_(),
291  deferredExceptionPtrIsSet_(false),
292  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
293  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
294  principalCache_(),
295  beginJobCalled_(false),
296  shouldWeStop_(false),
297  fileModeNoMerge_(false),
298  exceptionMessageFiles_(),
299  exceptionMessageRuns_(false),
300  exceptionMessageLumis_(false),
301  forceLooperToEnd_(false),
302  looperBeginJobRun_(false),
303  forceESCacheClearOnNewRun_(false),
304  eventSetupDataToExcludeFromPrefetching_() {
305  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
306  processDesc->addServices(defaultServices, forcedServices);
308  }
309 
310  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
311  ServiceToken const& token,
313  : actReg_(),
314  preg_(),
315  branchIDListHelper_(),
316  serviceToken_(),
317  input_(),
318  espController_(new eventsetup::EventSetupsController),
319  esp_(),
320  act_table_(),
321  processConfiguration_(),
322  schedule_(),
323  subProcesses_(),
324  historyAppender_(new HistoryAppender),
325  fb_(),
326  looper_(),
327  deferredExceptionPtrIsSet_(false),
328  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
329  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
330  principalCache_(),
331  beginJobCalled_(false),
332  shouldWeStop_(false),
333  fileModeNoMerge_(false),
334  exceptionMessageFiles_(),
335  exceptionMessageRuns_(false),
336  exceptionMessageLumis_(false),
337  forceLooperToEnd_(false),
338  looperBeginJobRun_(false),
339  forceESCacheClearOnNewRun_(false),
340  eventSetupDataToExcludeFromPrefetching_() {
341  init(processDesc, token, legacy);
342  }
343 
344  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
345  ServiceToken const& iToken,
347  //std::cerr << processDesc->dump() << std::endl;
348 
349  // register the empty parentage vector , once and for all
351 
352  // register the empty parameter set, once and for all.
354 
355  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
356 
357  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
358  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
359  bool const hasSubProcesses = !subProcessVParameterSet.empty();
360 
361  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
362  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
363  // set in here if the parameters were not explicitly set.
365 
366  // Now set some parameters specific to the main process.
367  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
368  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
369  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
370  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
371  << fileMode << ".\n"
372  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
373  } else {
374  fileModeNoMerge_ = (fileMode == "NOMERGE");
375  }
376  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
378 
379  //threading
380  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
381 
382  // Even if numberOfThreads was set to zero in the Python configuration, the code
383  // in cmsRun.cpp should have reset it to something else.
384  assert(nThreads != 0);
385 
386  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
387  if (nStreams == 0) {
388  nStreams = nThreads;
389  }
390  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
391  if (nConcurrentRuns != 1) {
392  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
393  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
394  }
395  unsigned int nConcurrentLumis =
396  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
397  if (nConcurrentLumis == 0) {
398  nConcurrentLumis = 2;
399  }
400  if (nConcurrentLumis > nStreams) {
401  nConcurrentLumis = nStreams;
402  }
403  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
404  if (!loopers.empty()) {
405  //For now loopers make us run only 1 transition at a time
406  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
407  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
408  "of concurrent runs, and the number of concurrent lumis "
409  "are all being reset to 1. Loopers cannot currently support "
410  "values greater than 1.";
411  nStreams = 1;
412  nConcurrentLumis = 1;
413  nConcurrentRuns = 1;
414  }
415  }
416  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
417  if (dumpOptions) {
418  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
419  } else {
420  if (nThreads > 1 or nStreams > 1) {
421  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
422  }
423  }
424  // The number of concurrent IOVs is configured individually for each record in
425  // the class NumberOfConcurrentIOVs to values less than or equal to this.
426  unsigned int maxConcurrentIOVs = nConcurrentLumis;
427 
428  //Check that relationships between threading parameters makes sense
429  /*
430  if(nThreads<nStreams) {
431  //bad
432  }
433  if(nConcurrentRuns>nStreams) {
434  //bad
435  }
436  if(nConcurrentRuns>nConcurrentLumis) {
437  //bad
438  }
439  */
440  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
441 
442  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
444  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
445  //for now, if have a subProcess, don't allow early delete
446  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
447  if (not hasSubProcesses) {
448  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
449  }
450 
451  // Now do general initialization
453 
454  //initialize the services
455  auto& serviceSets = processDesc->getServicesPSets();
456  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
457  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
458 
459  //make the services available
461 
462  CMS_SA_ALLOW try {
463  if (nStreams > 1) {
465  handler->willBeUsingThreads();
466  }
467 
468  // intialize miscellaneous items
469  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
470 
471  // intialize the event setup provider
472  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
473  esp_ = espController_->makeProvider(
474  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
475 
476  // initialize the looper, if any
477  if (!loopers.empty()) {
479  looper_->setActionTable(items.act_table_.get());
480  looper_->attachTo(*items.actReg_);
481 
482  // in presence of looper do not delete modules
484  }
485 
486  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
487 
488  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
489  streamQueues_.resize(nStreams);
490  streamLumiStatus_.resize(nStreams);
491 
492  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
493 
494  {
495  std::optional<ScheduleItems::MadeModules> madeModules;
496 
497  //setup input and modules concurrently
498  tbb::task_group group;
499 
500  // initialize the input source
501  auto tempReg = std::make_shared<ProductRegistry>();
502  auto sourceID = ModuleDescription::getUniqueID();
503 
504  group.run([&, this]() {
505  // initialize the Schedule
508  madeModules = items.initModules(*parameterSet, tns, preallocations_, &processContext_);
509  });
510 
511  group.run([&, this, tempReg]() {
513  input_ = makeInput(sourceID,
514  *parameterSet,
515  *common,
516  /*items.preg(),*/ tempReg,
517  items.branchIDListHelper(),
519  items.thinnedAssociationsHelper(),
520  items.actReg_,
521  items.processConfiguration(),
523  });
524 
525  group.wait();
526  items.preg()->addFromInput(*tempReg);
527  input_->switchTo(items.preg());
528 
529  {
531  schedule_ = items.finishSchedule(std::move(*madeModules),
532  *parameterSet,
533  tns,
534  hasSubProcesses,
538  }
539  }
540 
541  // set the data members
542  act_table_ = std::move(items.act_table_);
543  actReg_ = items.actReg_;
544  preg_ = items.preg();
546  branchIDListHelper_ = items.branchIDListHelper();
547  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
548  processConfiguration_ = items.processConfiguration();
550  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
551 
552  FDEBUG(2) << parameterSet << std::endl;
553 
555  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
556  // Reusable event principal
557  auto ep = std::make_shared<EventPrincipal>(preg(),
561  historyAppender_.get(),
562  index,
563  true /*primary process*/,
566  }
567 
568  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
569  auto lp =
570  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
572  }
573 
574  {
575  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
577 
578  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
580  }
581 
582  // fill the subprocesses, if there are any
583  subProcesses_.reserve(subProcessVParameterSet.size());
584  for (auto& subProcessPSet : subProcessVParameterSet) {
585  subProcesses_.emplace_back(subProcessPSet,
586  *parameterSet,
587  preg(),
593  *actReg_,
594  token,
597  &processContext_);
598  }
599  } catch (...) {
600  //in case of an exception, make sure Services are available
601  // during the following destructors
602  espController_ = nullptr;
603  esp_ = nullptr;
604  schedule_ = nullptr;
605  input_ = nullptr;
606  looper_ = nullptr;
607  actReg_ = nullptr;
608  throw;
609  }
610  }
611 
613  // Make the services available while everything is being deleted.
616 
617  // manually destroy all these thing that may need the services around
618  // propagate_const<T> has no reset() function
619  espController_ = nullptr;
620  esp_ = nullptr;
621  schedule_ = nullptr;
622  input_ = nullptr;
623  looper_ = nullptr;
624  actReg_ = nullptr;
625 
628  }
629 
633  task.waitNoThrow();
634  assert(task.done());
635  }
636 
638  if (beginJobCalled_)
639  return;
640  beginJobCalled_ = true;
641  bk::beginJob();
642 
643  // StateSentry toerror(this); // should we add this ?
644  //make the services available
646 
651  actReg_->preallocateSignal_(bounds);
652  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
654 
655  std::vector<ModuleProcessName> consumedBySubProcesses;
657  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
658  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
659  if (consumedBySubProcesses.empty()) {
660  consumedBySubProcesses = std::move(c);
661  } else if (not c.empty()) {
662  std::vector<ModuleProcessName> tmp;
663  tmp.reserve(consumedBySubProcesses.size() + c.size());
664  std::merge(consumedBySubProcesses.begin(),
665  consumedBySubProcesses.end(),
666  c.begin(),
667  c.end(),
668  std::back_inserter(tmp));
669  std::swap(consumedBySubProcesses, tmp);
670  }
671  });
672 
673  // Note: all these may throw
676  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
677  not unusedModules.empty()) {
679 
680  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
681  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
682  "and "
683  "therefore they are deleted before beginJob transition.";
684  for (auto const& description : unusedModules) {
685  l << "\n " << description->moduleLabel();
686  }
687  });
688  for (auto const& description : unusedModules) {
689  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
690  }
691  }
692  }
693  // Initialize after the deletion of non-consumed unscheduled
694  // modules to avoid non-consumed non-run modules to keep the
695  // products unnecessarily alive
696  if (not branchesToDeleteEarly_.empty()) {
697  schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
699  }
700 
701  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
702 
705  }
707 
708  //NOTE: This implementation assumes 'Job' means one call
709  // the EventProcessor::run
710  // If it really means once per 'application' then this code will
711  // have to be changed.
712  // Also have to deal with case where have 'run' then new Module
713  // added and do 'run'
714  // again. In that case the newly added Module needs its 'beginJob'
715  // to be called.
716 
717  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
718  // For now we delay calling beginOfJob until first beginOfRun
719  //if(looper_) {
720  // looper_->beginOfJob(es);
721  //}
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728  espController_->finishConfiguration();
729  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToProxyIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
762 
764  // Collects exceptions, so we don't throw before all operations are performed.
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
829 
831 
832  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
833  return schedule_->getAllModuleDescriptions();
834  }
835 
836  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
837 
838  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
839 
840  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
841 
842  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
843 
844  namespace {
845 #include "TransitionProcessors.icc"
846  }
847 
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
855  }
856  return returnValue;
857  }
858 
860  if (deferredExceptionPtrIsSet_.load()) {
862  return InputSource::IsStop;
863  }
864 
865  SendSourceTerminationSignalIfException sentry(actReg_.get());
866  InputSource::ItemType itemType;
867  //For now, do nothing with InputSource::IsSynchronize
868  do {
869  itemType = input_->nextItemType();
870  } while (itemType == InputSource::IsSynchronize);
871 
872  lastSourceTransition_ = itemType;
873  sentry.completedSuccessfully();
874 
876 
878  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
880  }
881 
882  return lastSourceTransition_;
883  }
884 
885  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
886  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
887  }
888 
890 
892  beginJob(); //make sure this was called
893 
894  // make the services available
896 
897  try {
898  FilesProcessor fp(fileModeNoMerge_);
899 
900  convertException::wrap([&]() {
901  bool firstTime = true;
902  do {
903  if (not firstTime) {
905  rewindInput();
906  } else {
907  firstTime = false;
908  }
909  startingNewLoop();
910 
911  auto trans = fp.processFiles(*this);
912 
913  fp.normalEnd();
914 
915  if (deferredExceptionPtrIsSet_.load()) {
916  std::rethrow_exception(deferredExceptionPtr_);
917  }
918  if (trans != InputSource::IsStop) {
919  //problem with the source
920  doErrorStuff();
921 
922  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
923  }
924  } while (not endOfLoop());
925  }); // convertException::wrap
926 
927  } // Try block
928  catch (cms::Exception& e) {
930  std::string message(
931  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
932  e.addAdditionalInfo(message);
933  if (e.alreadyPrinted()) {
934  LogAbsolute("Additional Exceptions") << message;
935  }
936  }
937  if (exceptionMessageRuns_) {
938  std::string message(
939  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
940  e.addAdditionalInfo(message);
941  if (e.alreadyPrinted()) {
942  LogAbsolute("Additional Exceptions") << message;
943  }
944  }
945  if (!exceptionMessageFiles_.empty()) {
946  e.addAdditionalInfo(exceptionMessageFiles_);
947  if (e.alreadyPrinted()) {
948  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
949  }
950  }
951  throw;
952  }
953  return epSuccess;
954  }
955 
957  FDEBUG(1) << " \treadFile\n";
958  size_t size = preg_->size();
959  SendSourceTerminationSignalIfException sentry(actReg_.get());
960 
962 
963  fb_ = input_->readFile();
964  if (size < preg_->size()) {
966  }
969  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
970  }
971  sentry.completedSuccessfully();
972  }
973 
974  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
975  if (fileBlockValid()) {
976  SendSourceTerminationSignalIfException sentry(actReg_.get());
977  input_->closeFile(fb_.get(), cleaningUpAfterException);
978  sentry.completedSuccessfully();
979  }
980  FDEBUG(1) << "\tcloseInputFile\n";
981  }
982 
984  if (fileBlockValid()) {
985  schedule_->openOutputFiles(*fb_);
986  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
987  }
988  FDEBUG(1) << "\topenOutputFiles\n";
989  }
990 
992  schedule_->closeOutputFiles();
993  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
994  processBlockHelper_->clearAfterOutputFilesClose();
995  FDEBUG(1) << "\tcloseOutputFiles\n";
996  }
997 
999  if (fileBlockValid()) {
1001  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1002  schedule_->respondToOpenInputFile(*fb_);
1003  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1004  }
1005  FDEBUG(1) << "\trespondToOpenInputFile\n";
1006  }
1007 
1009  if (fileBlockValid()) {
1010  schedule_->respondToCloseInputFile(*fb_);
1011  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1012  }
1013  FDEBUG(1) << "\trespondToCloseInputFile\n";
1014  }
1015 
1017  shouldWeStop_ = false;
1018  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1019  // until after we've called beginOfJob
1020  if (looper_ && looperBeginJobRun_) {
1021  looper_->doStartingNewLoop();
1022  }
1023  FDEBUG(1) << "\tstartingNewLoop\n";
1024  }
1025 
1027  if (looper_) {
1028  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1029  looper_->setModuleChanger(&changer);
1030  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1031  looper_->setModuleChanger(nullptr);
1033  return true;
1034  else
1035  return false;
1036  }
1037  FDEBUG(1) << "\tendOfLoop\n";
1038  return true;
1039  }
1040 
1042  input_->repeat();
1043  input_->rewind();
1044  FDEBUG(1) << "\trewind\n";
1045  }
1046 
1048  looper_->prepareForNextLoop(esp_.get());
1049  FDEBUG(1) << "\tprepareForNextLoop\n";
1050  }
1051 
1053  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1054  if (!subProcesses_.empty()) {
1055  for (auto const& subProcess : subProcesses_) {
1056  if (subProcess.shouldWeCloseOutput()) {
1057  return true;
1058  }
1059  }
1060  return false;
1061  }
1062  return schedule_->shouldWeCloseOutput();
1063  }
1064 
1066  FDEBUG(1) << "\tdoErrorStuff\n";
1067  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1068  << "and went to the error state\n"
1069  << "Will attempt to terminate processing normally\n"
1070  << "(IF using the looper the next loop will be attempted)\n"
1071  << "This likely indicates a bug in an input module or corrupted input or both\n";
1072  }
1073 
1074  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1075  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1076  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1077 
1079  FinalWaitingTask globalWaitTask{taskGroup_};
1080 
1081  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1082  beginGlobalTransitionAsync<Traits>(
1083  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1084 
1085  globalWaitTask.wait();
1086  beginProcessBlockSucceeded = true;
1087  }
1088 
1090  input_->fillProcessBlockHelper();
1092  while (input_->nextProcessBlock(processBlockPrincipal)) {
1093  readProcessBlock(processBlockPrincipal);
1094 
1096  FinalWaitingTask globalWaitTask{taskGroup_};
1097 
1098  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1099  beginGlobalTransitionAsync<Traits>(
1100  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1101 
1102  globalWaitTask.wait();
1103 
1104  FinalWaitingTask writeWaitTask{taskGroup_};
1106  writeWaitTask.wait();
1107 
1108  processBlockPrincipal.clearPrincipal();
1109  for (auto& s : subProcesses_) {
1110  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1111  }
1112  }
1113  }
1114 
1115  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1116  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1117 
1119  FinalWaitingTask globalWaitTask{taskGroup_};
1120 
1121  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1122  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1123  *schedule_,
1124  transitionInfo,
1125  serviceToken_,
1126  subProcesses_,
1127  cleaningUpAfterException);
1128  globalWaitTask.wait();
1129 
1130  if (beginProcessBlockSucceeded) {
1131  FinalWaitingTask writeWaitTask{taskGroup_};
1133  writeWaitTask.wait();
1134  }
1135 
1136  processBlockPrincipal.clearPrincipal();
1137  for (auto& s : subProcesses_) {
1138  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1139  }
1140  }
1141 
1143  RunNumber_t run,
1144  bool& globalBeginSucceeded,
1145  bool& eventSetupForInstanceSucceeded) {
1146  globalBeginSucceeded = false;
1147  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1148  {
1149  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150 
1151  input_->doBeginRun(runPrincipal, &processContext_);
1152  sentry.completedSuccessfully();
1153  }
1154 
1155  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1157  espController_->forceCacheClear();
1158  }
1159  {
1160  SendSourceTerminationSignalIfException sentry(actReg_.get());
1161  actReg_->preESSyncIOVSignal_.emit(ts);
1163  actReg_->postESSyncIOVSignal_.emit(ts);
1164  eventSetupForInstanceSucceeded = true;
1165  sentry.completedSuccessfully();
1166  }
1167  auto const& es = esp_->eventSetupImpl();
1168  if (looper_ && looperBeginJobRun_ == false) {
1169  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1170 
1171  FinalWaitingTask waitTask{taskGroup_};
1172  using namespace edm::waiting_task::chain;
1173  chain::first([this, &es](auto nextTask) {
1174  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1175  }) | then([this, &es](auto nextTask) mutable {
1176  looper_->beginOfJob(es);
1177  looperBeginJobRun_ = true;
1178  looper_->doStartingNewLoop();
1179  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1180 
1181  waitTask.wait();
1182  }
1183  {
1185  FinalWaitingTask globalWaitTask{taskGroup_};
1186 
1187  using namespace edm::waiting_task::chain;
1188  chain::first([&runPrincipal, &es, this](auto waitTask) {
1189  RunTransitionInfo transitionInfo(runPrincipal, es);
1190  beginGlobalTransitionAsync<Traits>(
1191  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1192  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1193  globalBeginSucceeded = true;
1194  FDEBUG(1) << "\tbeginRun " << run << "\n";
1195  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1196  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1197  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1198  looper_->doBeginRun(runPrincipal, es, &processContext_);
1199  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1200 
1201  globalWaitTask.wait();
1202  }
1203  {
1204  //To wait, the ref count has to be 1+#streams
1205  FinalWaitingTask streamLoopWaitTask{taskGroup_};
1206 
1208 
1209  RunTransitionInfo transitionInfo(runPrincipal, es);
1210  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1211  *schedule_,
1213  transitionInfo,
1214  serviceToken_,
1215  subProcesses_);
1216  streamLoopWaitTask.wait();
1217  }
1218  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1219  if (looper_) {
1220  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1221  }
1222  }
1223 
1225  RunNumber_t run,
1226  bool globalBeginSucceeded,
1227  bool cleaningUpAfterException,
1228  bool eventSetupForInstanceSucceeded) {
1229  if (eventSetupForInstanceSucceeded) {
1230  //If we skip empty runs, this would be called conditionally
1231  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1232 
1233  if (globalBeginSucceeded) {
1234  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1235  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1237  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1238  mergeableRunProductMetadata->preWriteRun();
1239  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1240  auto exceptn = t.waitNoThrow();
1241  mergeableRunProductMetadata->postWriteRun();
1242  if (exceptn) {
1243  std::rethrow_exception(exceptn);
1244  }
1245  }
1246  }
1247  }
1248  deleteRunFromCache(phid, run);
1249  }
1250 
1252  RunNumber_t run,
1253  bool globalBeginSucceeded,
1254  bool cleaningUpAfterException) {
1255  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1256  runPrincipal.setEndTime(input_->timestamp());
1257 
1258  IOVSyncValue ts(
1260  runPrincipal.endTime());
1261  {
1262  SendSourceTerminationSignalIfException sentry(actReg_.get());
1263  actReg_->preESSyncIOVSignal_.emit(ts);
1265  actReg_->postESSyncIOVSignal_.emit(ts);
1266  sentry.completedSuccessfully();
1267  }
1268  auto const& es = esp_->eventSetupImpl();
1269  if (globalBeginSucceeded) {
1270  //To wait, the ref count has to be 1+#streams
1271  FinalWaitingTask streamLoopWaitTask{taskGroup_};
1272 
1274 
1275  RunTransitionInfo transitionInfo(runPrincipal, es);
1276  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1277  *schedule_,
1279  transitionInfo,
1280  serviceToken_,
1281  subProcesses_,
1282  cleaningUpAfterException);
1283  streamLoopWaitTask.wait();
1284  }
1285  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1286  if (looper_) {
1287  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1288  }
1289  {
1290  FinalWaitingTask globalWaitTask{taskGroup_};
1291 
1292  using namespace edm::waiting_task::chain;
1293  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1294  RunTransitionInfo transitionInfo(runPrincipal, es);
1296  endGlobalTransitionAsync<Traits>(
1297  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1298  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1299  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1300  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1301  looper_->doEndRun(runPrincipal, es, &processContext_);
1302  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1303 
1304  globalWaitTask.wait();
1305  }
1306  FDEBUG(1) << "\tendRun " << run << "\n";
1307  }
1308 
1309  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1310  FinalWaitingTask waitTask{taskGroup_};
1311  if (streamLumiActive_ > 0) {
1313  // Continue after opening a new input file
1315  } else {
1316  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1317  input_->luminosityBlockAuxiliary()->beginTime()),
1318  iRunResource,
1319  WaitingTaskHolder{taskGroup_, &waitTask});
1320  }
1321  waitTask.wait();
1322  return lastTransitionType();
1323  }
1324 
1326  std::shared_ptr<void> const& iRunResource,
1327  edm::WaitingTaskHolder iHolder) {
1328  if (iHolder.taskHasFailed()) {
1329  return;
1330  }
1331 
1332  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1333  // We must be careful with the status object here and in code this function calls. IF we want
1334  // endRun to be called, then we must call resetResources before the things waiting on
1335  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1336  // endRun to be called much later than it should be, because status is holding iRunResource).
1337  // Note that this must be done explicitly. Relying on the destructor does not work well
1338  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1339  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1340  // destroyed inside this function and lumiWork.
1341  auto status =
1342  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1343  chain::first([&](auto nextTask) {
1344  auto asyncEventSetup = [](ActivityRegistry* actReg,
1345  auto* espController,
1346  auto& queue,
1348  auto& status,
1349  IOVSyncValue const& iSync) {
1350  queue.pause();
1351  CMS_SA_ALLOW try {
1352  SendSourceTerminationSignalIfException sentry(actReg);
1353  // Pass in iSync to let the EventSetup system know which run and lumi
1354  // need to be processed and prepare IOVs for it.
1355  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1356  // lumi is done and no longer needs its EventSetup IOVs.
1357  actReg->preESSyncIOVSignal_.emit(iSync);
1358  espController->eventSetupForInstanceAsync(
1359  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1360  sentry.completedSuccessfully();
1361  } catch (...) {
1362  task.doneWaiting(std::current_exception());
1363  }
1364  };
1365  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1366  // We only get here inside this block if there is an EventSetup
1367  // module not able to handle concurrent IOVs (usually an ESSource)
1368  // and the new sync value is outside the current IOV of that module.
1369  auto group = nextTask.group();
1371  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1372  asyncEventSetup(
1374  });
1375  } else {
1376  asyncEventSetup(
1377  actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(nextTask), status, iSync);
1378  }
1379  }) | chain::then([this, status, iSync](std::exception_ptr const* iPtr, auto nextTask) {
1380  actReg_->postESSyncIOVSignal_.emit(iSync);
1381  //the call to doneWaiting will cause the count to decrement
1382  auto copyTask = nextTask;
1383  if (iPtr) {
1384  nextTask.doneWaiting(*iPtr);
1385  }
1386  auto group = copyTask.group();
1387  lumiQueue_->pushAndPause(
1388  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1389  if (task.taskHasFailed()) {
1390  status->resetResources();
1391  return;
1392  }
1393 
1394  status->setResumer(std::move(iResumer));
1395 
1396  auto group = task.group();
1398  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1399  //make the services available
1401  // Caught exception is propagated via WaitingTaskHolder
1402  CMS_SA_ALLOW try {
1404 
1405  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1406  {
1407  SendSourceTerminationSignalIfException sentry(actReg_.get());
1408 
1409  input_->doBeginLumi(lumiPrincipal, &processContext_);
1410  sentry.completedSuccessfully();
1411  }
1412 
1414  if (rng.isAvailable()) {
1415  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1416  rng->preBeginLumi(lb);
1417  }
1418 
1419  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1420 
1421  using namespace edm::waiting_task::chain;
1422  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1423  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1424  {
1425  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1427  beginGlobalTransitionAsync<Traits>(
1428  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1429  }
1430  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1431  looper_->prefetchAsync(
1432  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1433  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1434  status->globalBeginDidSucceed();
1435  //make the services available
1436  ServiceRegistry::Operate operateLooper(serviceToken_);
1437  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1438  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1439  if (iPtr) {
1440  status->resetResources();
1441  holder.doneWaiting(*iPtr);
1442  } else {
1443  if (not looper_) {
1444  status->globalBeginDidSucceed();
1445  }
1446  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1448 
1449  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1450  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1451  streamQueues_[i].pause();
1452 
1453  auto& event = principalCache_.eventPrincipal(i);
1454  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1455  // held by the container as this lambda may not finish executing before all the tasks it
1456  // spawns have already started to run.
1457  auto eventSetupImpls = &status->eventSetupImpls();
1458  auto lp = status->lumiPrincipal().get();
1461  event.setLuminosityBlockPrincipal(lp);
1462  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1463  using namespace edm::waiting_task::chain;
1464  chain::first([this, i, &transitionInfo](auto nextTask) {
1465  beginStreamTransitionAsync<Traits>(
1466  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1467  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1468  if (exceptionFromBeginStreamLumi) {
1469  WaitingTaskHolder tmp(nextTask);
1470  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1471  streamEndLumiAsync(nextTask, i);
1472  } else {
1474  }
1475  }) | runLast(holder);
1476  });
1477  }
1478  }
1479  }) | runLast(postQueueTask);
1480 
1481  } catch (...) {
1482  status->resetResources();
1483  postQueueTask.doneWaiting(std::current_exception());
1484  }
1485  }); // task in sourceResourcesAcquirer
1486  });
1487  }) | chain::runLast(std::move(iHolder));
1488  }
1489 
1491  {
1492  //all streams are sharing the same status at the moment
1493  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1494  status->needToContinueLumi();
1495  status->startProcessingEvents();
1496  }
1497 
1498  unsigned int streamIndex = 0;
1499  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1500  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1501  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1502  }
1503  iHolder.group()->run(
1504  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1505  }
1506 
1507  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1508  if (setDeferredException(*iPtr)) {
1509  WaitingTaskHolder tmp(holder);
1510  tmp.doneWaiting(*iPtr);
1511  } else {
1513  }
1514  }
1515 
1517  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1518  // Get some needed info out of the status object before moving
1519  // it into finalTaskForThisLumi.
1520  auto& lp = *(iLumiStatus->lumiPrincipal());
1521  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1522  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1523  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1524  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1525 
1526  using namespace edm::waiting_task::chain;
1527  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1528  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1529 
1530  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1532  endGlobalTransitionAsync<Traits>(
1533  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1534  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1535  //Only call writeLumi if beginLumi succeeded
1536  if (didGlobalBeginSucceed) {
1537  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1538  }
1539  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1540  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1541  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1542  //any thrown exception auto propagates to nextTask via the chain
1544  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1545  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1546  std::exception_ptr ptr;
1547  if (iPtr) {
1548  ptr = *iPtr;
1549  }
1551 
1552  // Try hard to clean up resources so the
1553  // process can terminate in a controlled
1554  // fashion even after exceptions have occurred.
1555  // Caught exception is passed to handleEndLumiExceptions()
1556  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1557  if (not ptr) {
1558  ptr = std::current_exception();
1559  }
1560  }
1561  // Caught exception is passed to handleEndLumiExceptions()
1562  CMS_SA_ALLOW try {
1563  status->resumeGlobalLumiQueue();
1565  } catch (...) {
1566  if (not ptr) {
1567  ptr = std::current_exception();
1568  }
1569  }
1570  // Caught exception is passed to handleEndLumiExceptions()
1571  CMS_SA_ALLOW try {
1572  // This call to status.resetResources() must occur before iTask is destroyed.
1573  // Otherwise there will be a data race which could result in endRun
1574  // being delayed until it is too late to successfully call it.
1575  status->resetResources();
1576  status.reset();
1577  } catch (...) {
1578  if (not ptr) {
1579  ptr = std::current_exception();
1580  }
1581  }
1582 
1583  if (ptr) {
1584  handleEndLumiExceptions(&ptr, nextTask);
1585  }
1586  }) | runLast(std::move(iTask));
1587  }
1588 
1589  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1590  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1591  if (iPtr) {
1592  handleEndLumiExceptions(iPtr, iTask);
1593  }
1594  auto status = streamLumiStatus_[iStreamIndex];
1595  //reset status before releasing queue else get race condtion
1596  streamLumiStatus_[iStreamIndex].reset();
1598  streamQueues_[iStreamIndex].resume();
1599 
1600  //are we the last one?
1601  if (status->streamFinishedLumi()) {
1603  }
1604  });
1605 
1606  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1607 
1608  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1609  // therefore we do not want to hold the shared_ptr
1610  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1611  lumiStatus->setEndTime();
1612 
1613  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1614 
1615  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1616  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1617 
1618  if (lumiStatus->didGlobalBeginSucceed()) {
1619  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1620  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1621  lumiPrincipal.endTime());
1623  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1624  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1625  *schedule_,
1626  iStreamIndex,
1627  transitionInfo,
1628  serviceToken_,
1629  subProcesses_,
1630  cleaningUpAfterException);
1631  }
1632  }
1633 
1635  if (streamLumiActive_.load() > 0) {
1636  FinalWaitingTask globalWaitTask{taskGroup_};
1637  {
1638  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1639  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1640  if (streamLumiStatus_[i]) {
1641  streamEndLumiAsync(globalTaskHolder, i);
1642  }
1643  }
1644  }
1645  globalWaitTask.wait();
1646  }
1647  }
1648 
1650  SendSourceTerminationSignalIfException sentry(actReg_.get());
1651  input_->readProcessBlock(processBlockPrincipal);
1652  sentry.completedSuccessfully();
1653  }
1654 
1655  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1657  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1658  << "Illegal attempt to insert run into cache\n"
1659  << "Contact a Framework Developer\n";
1660  }
1661  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1662  preg(),
1664  historyAppender_.get(),
1665  0,
1666  true,
1668  {
1669  SendSourceTerminationSignalIfException sentry(actReg_.get());
1670  input_->readRun(*rp, *historyAppender_);
1671  sentry.completedSuccessfully();
1672  }
1673  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1674  principalCache_.insert(rp);
1675  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1676  }
1677 
1678  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1679  principalCache_.merge(input_->runAuxiliary(), preg());
1680  auto runPrincipal = principalCache_.runPrincipalPtr();
1681  {
1682  SendSourceTerminationSignalIfException sentry(actReg_.get());
1683  input_->readAndMergeRun(*runPrincipal);
1684  sentry.completedSuccessfully();
1685  }
1686  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1687  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1688  }
1689 
1692  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1693  << "Illegal attempt to insert lumi into cache\n"
1694  << "Run is invalid\n"
1695  << "Contact a Framework Developer\n";
1696  }
1698  assert(lbp);
1699  lbp->setAux(*input_->luminosityBlockAuxiliary());
1700  {
1701  SendSourceTerminationSignalIfException sentry(actReg_.get());
1702  input_->readLuminosityBlock(*lbp, *historyAppender_);
1703  sentry.completedSuccessfully();
1704  }
1705  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1706  iStatus.lumiPrincipal() = std::move(lbp);
1707  }
1708 
1710  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1711  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1712  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1713  input_->processHistoryRegistry().reducedProcessHistoryID(
1714  input_->luminosityBlockAuxiliary()->processHistoryID()));
1715  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1716  assert(lumiOK);
1717  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1718  {
1719  SendSourceTerminationSignalIfException sentry(actReg_.get());
1720  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1721  sentry.completedSuccessfully();
1722  }
1723  return input_->luminosityBlock();
1724  }
1725 
1727  using namespace edm::waiting_task;
1728  chain::first([&](auto nextTask) {
1730  schedule_->writeProcessBlockAsync(
1731  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1732  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1734  for (auto& s : subProcesses_) {
1735  s.writeProcessBlockAsync(nextTask, processBlockType);
1736  }
1737  }) | chain::runLast(std::move(task));
1738  }
1739 
1741  ProcessHistoryID const& phid,
1742  RunNumber_t run,
1743  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1744  using namespace edm::waiting_task;
1745  chain::first([&](auto nextTask) {
1747  schedule_->writeRunAsync(nextTask,
1749  &processContext_,
1750  actReg_.get(),
1751  mergeableRunProductMetadata);
1752  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1754  for (auto& s : subProcesses_) {
1755  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1756  }
1757  }) | chain::runLast(std::move(task));
1758  }
1759 
1761  principalCache_.deleteRun(phid, run);
1762  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1763  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1764  }
1765 
1767  using namespace edm::waiting_task;
1768  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
1769  chain::first([&](auto nextTask) {
1771 
1772  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1773  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1774  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1776  for (auto& s : subProcesses_) {
1777  s.writeLumiAsync(nextTask, lumiPrincipal);
1778  }
1780  }
1781  }
1782 
1784  for (auto& s : subProcesses_) {
1785  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1786  }
1787  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
1788  iStatus.lumiPrincipal()->clearPrincipal();
1789  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1790  }
1791 
1793  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1794  iStatus.endLumi();
1795  return false;
1796  }
1797 
1798  if (iStatus.wasEventProcessingStopped()) {
1799  return false;
1800  }
1801 
1802  if (shouldWeStop()) {
1804  iStatus.stopProcessingEvents();
1805  iStatus.endLumi();
1806  return false;
1807  }
1808 
1810  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1811  CMS_SA_ALLOW try {
1812  //need to use lock in addition to the serial task queue because
1813  // of delayed provenance reading and reading data in response to
1814  // edm::Refs etc
1815  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1816 
1817  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1818  if (InputSource::IsLumi == itemType) {
1819  iStatus.haveContinuedLumi();
1820  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1821  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1822  readAndMergeLumi(iStatus);
1823  itemType = nextTransitionType();
1824  }
1825  if (InputSource::IsLumi == itemType) {
1826  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1827  input_->luminosityBlockAuxiliary()->beginTime()));
1828  }
1829  }
1830  if (InputSource::IsEvent != itemType) {
1831  iStatus.stopProcessingEvents();
1832 
1833  //IsFile may continue processing the lumi and
1834  // looper_ can cause the input source to declare a new IsRun which is actually
1835  // just a continuation of the previous run
1836  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1837  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1838  iStatus.endLumi();
1839  }
1840  return false;
1841  }
1842  readEvent(iStreamIndex);
1843  } catch (...) {
1844  bool expected = false;
1845  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1846  deferredExceptionPtr_ = std::current_exception();
1847  iStatus.endLumi();
1848  }
1849  return false;
1850  }
1851  return true;
1852  }
1853 
1854  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1855  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1857  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1858  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1859  auto status = streamLumiStatus_[iStreamIndex].get();
1860  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1861  CMS_SA_ALLOW try {
1862  if (readNextEventForStream(iStreamIndex, *status)) {
1863  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1864  if (iPtr) {
1865  // Try to end the stream properly even if an exception was
1866  // thrown on an event.
1867  bool expected = false;
1868  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1869  // This is the case where the exception in iPtr is the primary
1870  // exception and we want to see its message.
1871  deferredExceptionPtr_ = *iPtr;
1872  WaitingTaskHolder tempHolder(iTask);
1873  tempHolder.doneWaiting(*iPtr);
1874  }
1875  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1876  //the stream will stop now
1877  return;
1878  }
1879  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1880  });
1881 
1882  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1883  } else {
1884  //the stream will stop now
1885  if (status->isLumiEnding()) {
1886  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1887  status->startNextLumi();
1888  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1889  }
1890  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1891  } else {
1892  iTask.doneWaiting(std::exception_ptr{});
1893  }
1894  }
1895  } catch (...) {
1896  // It is unlikely we will ever get in here ...
1897  // But if we do try to clean up and propagate the exception
1898  if (streamLumiStatus_[iStreamIndex]) {
1899  streamEndLumiAsync(iTask, iStreamIndex);
1900  }
1901  bool expected = false;
1902  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1903  auto e = std::current_exception();
1905  iTask.doneWaiting(e);
1906  }
1907  }
1908  });
1909  }
1910 
1911  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1912  //TODO this will have to become per stream
1913  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1914  StreamContext streamContext(event.streamID(), &processContext_);
1915 
1916  SendSourceTerminationSignalIfException sentry(actReg_.get());
1917  input_->readEvent(event, streamContext);
1918 
1919  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1920  sentry.completedSuccessfully();
1921 
1922  FDEBUG(1) << "\treadEvent\n";
1923  }
1924 
1925  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1926  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1927  }
1928 
1929  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1930  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1931 
1934  if (rng.isAvailable()) {
1935  Event ev(*pep, ModuleDescription(), nullptr);
1936  rng->postEventRead(ev);
1937  }
1938 
1939  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1940  using namespace edm::waiting_task::chain;
1941  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1942  EventTransitionInfo info(*pep, es);
1943  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1944  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1945  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1946  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1947  }
1948  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1949  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1950  ServiceRegistry::Operate operateLooper(serviceToken_);
1951  processEventWithLooper(*pep, iStreamIndex);
1952  }) | then([pep](auto nextTask) {
1953  FDEBUG(1) << "\tprocessEvent\n";
1954  pep->clearEventPrincipal();
1955  }) | runLast(iHolder);
1956  }
1957 
1958  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1959  bool randomAccess = input_->randomAccess();
1960  ProcessingController::ForwardState forwardState = input_->forwardState();
1961  ProcessingController::ReverseState reverseState = input_->reverseState();
1962  ProcessingController pc(forwardState, reverseState, randomAccess);
1963 
1965  do {
1966  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1967  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1968  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1969 
1970  bool succeeded = true;
1971  if (randomAccess) {
1973  input_->skipEvents(-2);
1975  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1976  }
1977  }
1979  } while (!pc.lastOperationSucceeded());
1981  shouldWeStop_ = true;
1983  }
1984  }
1985 
1987  FDEBUG(1) << "\tshouldWeStop\n";
1988  if (shouldWeStop_)
1989  return true;
1990  if (!subProcesses_.empty()) {
1991  for (auto const& subProcess : subProcesses_) {
1992  if (subProcess.terminate()) {
1993  return true;
1994  }
1995  }
1996  return false;
1997  }
1998  return schedule_->terminate();
1999  }
2000 
2002 
2004 
2006 
2007  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2008  bool expected = false;
2009  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2010  deferredExceptionPtr_ = iException;
2011  return true;
2012  }
2013  return false;
2014  }
2015 
2017  cms::Exception ex("ModulesSynchingOnLumis");
2018  ex << "The framework is configured to use at least two streams, but the following modules\n"
2019  << "require synchronizing on LuminosityBlock boundaries:";
2020  bool found = false;
2021  for (auto worker : schedule_->allWorkers()) {
2022  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2023  found = true;
2024  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2025  }
2026  }
2027  if (found) {
2028  ex << "\n\nThe situation can be fixed by either\n"
2029  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2030  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2031  throw ex;
2032  }
2033  }
2034 
2036  std::unique_ptr<LogSystem> s;
2037  for (auto worker : schedule_->allWorkers()) {
2038  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2039  if (not s) {
2040  s = std::make_unique<LogSystem>("LegacyModules");
2041  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2042  "is going to end soon. These modules need to be converted to have type\n"
2043  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2044  }
2045  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2046  }
2047  }
2048  }
2049 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:382
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
PreESSyncIOV preESSyncIOVSignal_
static PFTauRenderPlugin instance
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::shared_ptr< EDLooperBase const > looper() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
InputSource::ItemType lastTransitionType() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
bool hasRunPrincipal() const
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
std::atomic< bool > exceptionMessageRuns_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:50
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
Definition: Signal.h:48
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void validateLooper(ParameterSet &pset)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
Definition: Registry.cc:40
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
virtual void endOfJob()
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void writeLumi(LuminosityBlockNumber_t lumi)
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
oneapi::tbb::task_group taskGroup_
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Transition requestedTransition() const
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:205