CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
45 
47 
55 
60 
65 
78 
79 #include "MessageForSource.h"
80 #include "MessageForParent.h"
82 #include "RunProcessingStatus.h"
83 
84 #include "boost/range/adaptor/reversed.hpp"
85 
86 #include <cassert>
87 #include <exception>
88 #include <iomanip>
89 #include <iostream>
90 #include <utility>
91 #include <sstream>
92 
93 #include <sys/ipc.h>
94 #include <sys/msg.h>
95 
96 #include "oneapi/tbb/task.h"
97 
98 //Used for CPU affinity
99 #ifndef __APPLE__
100 #include <sched.h>
101 #endif
102 
103 namespace {
104  class PauseQueueSentry {
105  public:
106  PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
107  ~PauseQueueSentry() { queue_.resume(); }
108 
109  private:
110  edm::SerialTaskQueue& queue_;
111  };
112 } // namespace
113 
114 namespace edm {
115 
116  namespace chain = waiting_task::chain;
117 
118  // ---------------------------------------------------------------
119  std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
121  CommonParams const& common,
122  std::shared_ptr<ProductRegistry> preg,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
125  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126  std::shared_ptr<ActivityRegistry> areg,
127  std::shared_ptr<ProcessConfiguration const> processConfiguration,
128  PreallocationConfiguration const& allocations) {
129  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
130  if (main_input == nullptr) {
132  << "There must be exactly one source in the configuration.\n"
133  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134  }
135 
136  std::string modtype(main_input->getParameter<std::string>("@module_type"));
137 
138  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
140  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
141  filler->fill(descriptions);
142 
143  try {
144  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
145  } catch (cms::Exception& iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
164  moduleIndex);
165 
166  InputSourceDescription isdesc(md,
167  preg,
168  branchIDListHelper,
169  processBlockHelper,
170  thinnedAssociationsHelper,
171  areg,
172  common.maxEventsInput_,
173  common.maxLumisInput_,
174  common.maxSecondsUntilRampdown_,
175  allocations);
176 
177  areg->preSourceConstructionSignal_(md);
178  std::unique_ptr<InputSource> input;
179  try {
180  //even if we have an exception, send the signal
181  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
182  convertException::wrap([&]() {
183  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
184  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186  });
187  } catch (cms::Exception& iException) {
188  std::ostringstream ost;
189  ost << "Constructing input source of type " << modtype;
190  iException.addContext(ost.str());
191  throw;
192  }
193  return input;
194  }
195 
196  // ---------------------------------------------------------------
197  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
200  std::vector<std::string> const& loopers) {
201  std::shared_ptr<EDLooperBase> vLooper;
202 
203  assert(1 == loopers.size());
204 
205  for (auto const& looperName : loopers) {
206  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
207  // Unlikely we would ever need the ModuleTypeResolver in Looper
208  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
209  }
210  return vLooper;
211  }
212 
213  // ---------------------------------------------------------------
214  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
215  ServiceToken const& iToken,
217  std::vector<std::string> const& defaultServices,
218  std::vector<std::string> const& forcedServices)
219  : actReg_(),
220  preg_(),
221  branchIDListHelper_(),
222  serviceToken_(),
223  input_(),
224  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
252 
253  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
254  std::vector<std::string> const& defaultServices,
255  std::vector<std::string> const& forcedServices)
256  : actReg_(),
257  preg_(),
258  branchIDListHelper_(),
259  serviceToken_(),
260  input_(),
261  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
265  processConfiguration_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
271  deferredExceptionPtrIsSet_(false),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
278  exceptionMessageFiles_(),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
283  forceESCacheClearOnNewRun_(false),
284  eventSetupDataToExcludeFromPrefetching_() {
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
289 
290  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
291  ServiceToken const& token,
293  : actReg_(),
294  preg_(),
295  branchIDListHelper_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
302  processConfiguration_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
308  deferredExceptionPtrIsSet_(false),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
315  exceptionMessageFiles_(),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
320  forceESCacheClearOnNewRun_(false),
321  eventSetupDataToExcludeFromPrefetching_() {
322  init(processDesc, token, legacy);
323  }
324 
325  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
326  ServiceToken const& iToken,
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
606 
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
624 
628  task.waitNoThrow();
629  assert(task.done());
630  }
631 
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
700  }
701  if (preallocations_.numberOfRuns() > 1) {
703  }
704 
705  //NOTE: This implementation assumes 'Job' means one call
706  // the EventProcessor::run
707  // If it really means once per 'application' then this code will
708  // have to be changed.
709  // Also have to deal with case where have 'run' then new Module
710  // added and do 'run'
711  // again. In that case the newly added Module needs its 'beginJob'
712  // to be called.
713 
714  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
715  // For now we delay calling beginOfJob until first beginOfRun
716  //if(looper_) {
717  // looper_->beginOfJob(es);
718  //}
719  espController_->finishConfiguration();
720  actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
721  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728 
729  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToResolverIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
762 
764  // Collects exceptions, so we don't throw before all operations are performed.
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
829 
831 
832  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
833  return schedule_->getAllModuleDescriptions();
834  }
835 
836  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
837 
838  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
839 
840  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
841 
842  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
843 
844  namespace {
845 #include "TransitionProcessors.icc"
846  }
847 
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
854  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
856  jr->reportShutdownSignal();
858  }
859  return returnValue;
860  }
861 
862  namespace {
863  struct SourceNextGuard {
864  SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
865  ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
866  edm::ActivityRegistry& act_;
867  };
868  } // namespace
869 
872  InputSource::ItemTypeInfo itemTypeInfo;
873  {
874  SourceNextGuard guard(*actReg_.get());
875  //For now, do nothing with InputSource::IsSynchronize
876  do {
877  itemTypeInfo = input_->nextItemType();
878  } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
879  }
880  lastSourceTransition_ = itemTypeInfo;
881  sentry.completedSuccessfully();
882 
884 
886  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
888  }
889 
890  return lastSourceTransition_;
891  }
892 
893  void EventProcessor::nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
894  WaitingTaskHolder nextTask) {
895  auto group = nextTask.group();
897  *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
898  CMS_SA_ALLOW try {
900  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
903  runStatus->runPrincipal()->run() == input_->run() &&
904  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
906  << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
907  << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
908  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
909  }
910  } catch (...) {
911  nextHolder.doneWaiting(std::current_exception());
912  }
913  });
914  }
915 
917  beginJob(); //make sure this was called
918 
919  // make the services available
921  actReg_->beginProcessingSignal_();
922  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
923  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
924  try {
925  FilesProcessor fp(fileModeNoMerge_);
926 
927  convertException::wrap([&]() {
928  bool firstTime = true;
929  do {
930  if (not firstTime) {
932  rewindInput();
933  } else {
934  firstTime = false;
935  }
936  startingNewLoop();
937 
938  auto trans = fp.processFiles(*this);
939 
940  fp.normalEnd();
941 
942  if (deferredExceptionPtrIsSet_.load()) {
943  std::rethrow_exception(deferredExceptionPtr_);
944  }
945  if (trans != InputSource::ItemType::IsStop) {
946  //problem with the source
947  doErrorStuff();
948 
949  throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
950  }
951  } while (not endOfLoop());
952  }); // convertException::wrap
953 
954  } // Try block
955  catch (cms::Exception& e) {
957  std::string message(
958  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
959  e.addAdditionalInfo(message);
960  if (e.alreadyPrinted()) {
961  LogAbsolute("Additional Exceptions") << message;
962  }
963  }
964  if (exceptionMessageRuns_) {
965  std::string message(
966  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
967  e.addAdditionalInfo(message);
968  if (e.alreadyPrinted()) {
969  LogAbsolute("Additional Exceptions") << message;
970  }
971  }
972  if (!exceptionMessageFiles_.empty()) {
973  e.addAdditionalInfo(exceptionMessageFiles_);
974  if (e.alreadyPrinted()) {
975  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
976  }
977  }
978  throw;
979  }
980  return epSuccess;
981  }
982 
984  FDEBUG(1) << " \treadFile\n";
985  size_t size = preg_->size();
987 
988  if (streamRunActive_ > 0) {
989  streamRunStatus_[0]->runPrincipal()->preReadFile();
990  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
991  }
992 
993  if (streamLumiActive_ > 0) {
994  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
995  }
996 
997  fb_ = input_->readFile();
998  if (size < preg_->size()) {
1000  }
1003  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1004  }
1005  sentry.completedSuccessfully();
1006  }
1007 
1008  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1009  if (fileBlockValid()) {
1011  input_->closeFile(fb_.get(), cleaningUpAfterException);
1012  sentry.completedSuccessfully();
1013  }
1014  FDEBUG(1) << "\tcloseInputFile\n";
1015  }
1016 
1018  if (fileBlockValid()) {
1019  schedule_->openOutputFiles(*fb_);
1020  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1021  }
1022  FDEBUG(1) << "\topenOutputFiles\n";
1023  }
1024 
1026  schedule_->closeOutputFiles();
1027  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1028  processBlockHelper_->clearAfterOutputFilesClose();
1029  FDEBUG(1) << "\tcloseOutputFiles\n";
1030  }
1031 
1033  if (fileBlockValid()) {
1035  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1036  schedule_->respondToOpenInputFile(*fb_);
1037  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1038  }
1039  FDEBUG(1) << "\trespondToOpenInputFile\n";
1040  }
1041 
1043  if (fileBlockValid()) {
1044  schedule_->respondToCloseInputFile(*fb_);
1045  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1046  }
1047  FDEBUG(1) << "\trespondToCloseInputFile\n";
1048  }
1049 
1051  shouldWeStop_ = false;
1052  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1053  // until after we've called beginOfJob
1054  if (looper_ && looperBeginJobRun_) {
1055  looper_->doStartingNewLoop();
1056  }
1057  FDEBUG(1) << "\tstartingNewLoop\n";
1058  }
1059 
1061  if (looper_) {
1062  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1063  looper_->setModuleChanger(&changer);
1064  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1065  looper_->setModuleChanger(nullptr);
1067  return true;
1068  else
1069  return false;
1070  }
1071  FDEBUG(1) << "\tendOfLoop\n";
1072  return true;
1073  }
1074 
1076  input_->repeat();
1077  input_->rewind();
1078  FDEBUG(1) << "\trewind\n";
1079  }
1080 
1082  looper_->prepareForNextLoop(esp_.get());
1083  FDEBUG(1) << "\tprepareForNextLoop\n";
1084  }
1085 
1087  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1088  if (!subProcesses_.empty()) {
1089  for (auto const& subProcess : subProcesses_) {
1090  if (subProcess.shouldWeCloseOutput()) {
1091  return true;
1092  }
1093  }
1094  return false;
1095  }
1096  return schedule_->shouldWeCloseOutput();
1097  }
1098 
1100  FDEBUG(1) << "\tdoErrorStuff\n";
1101  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1102  << "and went to the error state\n"
1103  << "Will attempt to terminate processing normally\n"
1104  << "(IF using the looper the next loop will be attempted)\n"
1105  << "This likely indicates a bug in an input module or corrupted input or both\n";
1106  }
1107 
1108  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1109  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1110  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1111 
1113  FinalWaitingTask globalWaitTask{taskGroup_};
1114 
1115  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1116  beginGlobalTransitionAsync<Traits>(
1117  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1118 
1119  globalWaitTask.wait();
1120  beginProcessBlockSucceeded = true;
1121  }
1122 
1124  input_->fillProcessBlockHelper();
1126  while (input_->nextProcessBlock(processBlockPrincipal)) {
1127  readProcessBlock(processBlockPrincipal);
1128 
1130  FinalWaitingTask globalWaitTask{taskGroup_};
1131 
1132  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1133  beginGlobalTransitionAsync<Traits>(
1134  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1135 
1136  globalWaitTask.wait();
1137 
1138  FinalWaitingTask writeWaitTask{taskGroup_};
1140  writeWaitTask.wait();
1141 
1142  processBlockPrincipal.clearPrincipal();
1143  for (auto& s : subProcesses_) {
1144  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1145  }
1146  }
1147  }
1148 
1149  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1150  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1151 
1153  FinalWaitingTask globalWaitTask{taskGroup_};
1154 
1155  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1156  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1157  *schedule_,
1158  transitionInfo,
1159  serviceToken_,
1160  subProcesses_,
1161  cleaningUpAfterException);
1162  globalWaitTask.wait();
1163 
1164  if (beginProcessBlockSucceeded) {
1165  FinalWaitingTask writeWaitTask{taskGroup_};
1167  writeWaitTask.wait();
1168  }
1169 
1170  processBlockPrincipal.clearPrincipal();
1171  for (auto& s : subProcesses_) {
1172  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1173  }
1174  }
1175 
1177  FinalWaitingTask waitTask{taskGroup_};
1179  if (streamRunActive_ == 0) {
1180  assert(streamLumiActive_ == 0);
1181 
1182  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1183  WaitingTaskHolder{taskGroup_, &waitTask});
1184  } else {
1186 
1187  auto runStatus = streamRunStatus_[0];
1188 
1190  runStatus->runPrincipal()->run() == input_->run() and
1191  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1192  readAndMergeRun(*runStatus);
1194  }
1195 
1196  setNeedToCallNext(false);
1197 
1198  WaitingTaskHolder holder{taskGroup_, &waitTask};
1199  runStatus->setHolderOfTaskInProcessRuns(holder);
1200  if (streamLumiActive_ > 0) {
1202  continueLumiAsync(std::move(holder));
1203  } else {
1205  }
1206  }
1207  waitTask.wait();
1208  return lastTransitionType();
1209  }
1210 
1212  if (iHolder.taskHasFailed()) {
1213  return;
1214  }
1215 
1216  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1217 
1218  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1219 
1220  chain::first([this, &status, iSync](auto nextTask) {
1221  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1222  nextTask,
1223  status->endIOVWaitingTasks(),
1224  status->eventSetupImpls(),
1226  actReg_.get(),
1227  serviceToken_,
1229  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1230  CMS_SA_ALLOW try {
1231  if (iException) {
1232  WaitingTaskHolder copyHolder(nextTask);
1233  copyHolder.doneWaiting(*iException);
1234  // Finish handling the exception in the task pushed to runQueue_
1235  }
1237 
1238  runQueue_->pushAndPause(
1239  *nextTask.group(),
1240  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1241  CMS_SA_ALLOW try {
1242  if (postRunQueueTask.taskHasFailed()) {
1243  status->resetBeginResources();
1245  return;
1246  }
1247 
1248  status->setResumer(std::move(iResumer));
1249 
1251  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1252  CMS_SA_ALLOW try {
1254 
1255  if (postSourceTask.taskHasFailed()) {
1256  status->resetBeginResources();
1258  status->resumeGlobalRunQueue();
1259  return;
1260  }
1261 
1262  status->setRunPrincipal(readRun());
1263 
1264  RunPrincipal& runPrincipal = *status->runPrincipal();
1265  {
1267  input_->doBeginRun(runPrincipal, &processContext_);
1268  sentry.completedSuccessfully();
1269  }
1270 
1271  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1272  if (looper_ && looperBeginJobRun_ == false) {
1273  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1274 
1275  oneapi::tbb::task_group group;
1276  FinalWaitingTask waitTask{group};
1277  using namespace edm::waiting_task::chain;
1278  chain::first([this, &es](auto nextTask) {
1279  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1280  }) | then([this, &es](auto nextTask) mutable {
1281  looper_->beginOfJob(es);
1282  looperBeginJobRun_ = true;
1283  looper_->doStartingNewLoop();
1284  }) | runLast(WaitingTaskHolder(group, &waitTask));
1285  waitTask.wait();
1286  }
1287 
1288  using namespace edm::waiting_task::chain;
1289  chain::first([this, status](auto nextTask) mutable {
1290  CMS_SA_ALLOW try {
1293  } else {
1294  setNeedToCallNext(true);
1295  }
1296  } catch (...) {
1297  status->setStopBeforeProcessingRun(true);
1298  nextTask.doneWaiting(std::current_exception());
1299  }
1300  }) | then([this, status, &es](auto nextTask) {
1301  if (status->stopBeforeProcessingRun()) {
1302  return;
1303  }
1304  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1306  beginGlobalTransitionAsync<Traits>(
1307  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1308  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1309  if (status->stopBeforeProcessingRun()) {
1310  return;
1311  }
1312  looper_->prefetchAsync(
1313  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1314  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1315  if (status->stopBeforeProcessingRun()) {
1316  return;
1317  }
1318  ServiceRegistry::Operate operateLooper(serviceToken_);
1319  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1320  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1321  if (iException) {
1322  WaitingTaskHolder copyHolder(holder);
1323  copyHolder.doneWaiting(*iException);
1324  } else {
1325  status->globalBeginDidSucceed();
1326  }
1327 
1328  if (status->stopBeforeProcessingRun()) {
1329  // We just quit now if there was a failure when merging runs
1330  status->resetBeginResources();
1332  status->resumeGlobalRunQueue();
1333  return;
1334  }
1335  CMS_SA_ALLOW try {
1336  // Under normal circumstances, this task runs after endRun has completed for all streams
1337  // and global endLumi has completed for all lumis contained in this run
1338  auto globalEndRunTask =
1339  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1340  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1341  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1343  });
1344  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1345  } catch (...) {
1346  status->resetBeginResources();
1348  status->resumeGlobalRunQueue();
1349  holder.doneWaiting(std::current_exception());
1350  return;
1351  }
1352 
1353  // After this point we are committed to end the run via endRunAsync
1354 
1356 
1357  // The only purpose of the pause is to cause stream begin run to execute before
1358  // global begin lumi in the single threaded case (maintains consistency with
1359  // the order that existed before concurrent runs were implemented).
1360  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1361 
1362  CMS_SA_ALLOW try {
1363  streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1364  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1365  CMS_SA_ALLOW try {
1366  streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1368  });
1369  } catch (...) {
1370  if (status->streamFinishedBeginRun()) {
1371  WaitingTaskHolder copyHolder(holder);
1372  copyHolder.doneWaiting(std::current_exception());
1373  status->resetBeginResources();
1376  }
1377  }
1378  }
1379  });
1380  } catch (...) {
1381  WaitingTaskHolder copyHolder(holder);
1382  copyHolder.doneWaiting(std::current_exception());
1383  status->resetBeginResources();
1386  }
1388  }) | runLast(postSourceTask);
1389  } catch (...) {
1390  status->resetBeginResources();
1392  status->resumeGlobalRunQueue();
1393  postSourceTask.doneWaiting(std::current_exception());
1394  }
1395  }); // task in sourceResourcesAcquirer
1396  } catch (...) {
1397  status->resetBeginResources();
1399  status->resumeGlobalRunQueue();
1400  postRunQueueTask.doneWaiting(std::current_exception());
1401  }
1402  }); // task in runQueue
1403  } catch (...) {
1404  status->resetBeginResources();
1406  nextTask.doneWaiting(std::current_exception());
1407  }
1408  }) | chain::runLast(std::move(iHolder));
1409  }
1410 
1411  void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1412  std::shared_ptr<RunProcessingStatus> status,
1413  WaitingTaskHolder iHolder) noexcept {
1414  // These shouldn't throw
1415  streamQueues_[iStream].pause();
1416  ++streamRunActive_;
1417  streamRunStatus_[iStream] = std::move(status);
1418 
1419  CMS_SA_ALLOW try {
1420  using namespace edm::waiting_task::chain;
1421  chain::first([this, iStream](auto nextTask) {
1422  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1423  if (rs.didGlobalBeginSucceed()) {
1424  RunTransitionInfo transitionInfo(
1425  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1427  beginStreamTransitionAsync<Traits>(
1428  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1429  }
1430  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1431  if (exceptionFromBeginStreamRun) {
1432  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1433  }
1434  releaseBeginRunResources(iStream);
1435  }) | runLast(iHolder);
1436  } catch (...) {
1437  releaseBeginRunResources(iStream);
1438  iHolder.doneWaiting(std::current_exception());
1439  }
1440  }
1441 
1442  void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1443  auto& status = streamRunStatus_[iStream];
1444  if (status->streamFinishedBeginRun()) {
1445  status->resetBeginResources();
1447  }
1448  streamQueues_[iStream].resume();
1449  }
1450 
1451  void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1452  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1453  iRunStatus->setEndTime();
1454  IOVSyncValue ts(
1456  runPrincipal.endTime());
1457  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1458  WaitingTaskHolder copyHolder(iHolder);
1459  copyHolder.doneWaiting(std::current_exception());
1460  }
1461 
1462  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1463  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1464  nextTask,
1465  iRunStatus->endIOVWaitingTasksEndRun(),
1466  iRunStatus->eventSetupImplsEndRun(),
1468  actReg_.get(),
1469  serviceToken_);
1470  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1471  if (iException) {
1472  iRunStatus->setEndingEventSetupSucceeded(false);
1473  handleEndRunExceptions(*iException, nextTask);
1474  }
1476  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1477  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1478  CMS_SA_ALLOW try {
1479  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1480  streamQueues_[i].pause();
1481  streamEndRunAsync(std::move(nextTask), i);
1482  });
1483  } catch (...) {
1484  WaitingTaskHolder copyHolder(nextTask);
1485  copyHolder.doneWaiting(std::current_exception());
1486  }
1487  }
1488  });
1489 
1491  CMS_SA_ALLOW try {
1492  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1493  } catch (...) {
1494  WaitingTaskHolder copyHolder(nextTask);
1495  copyHolder.doneWaiting(std::current_exception());
1496  }
1497  }
1498  }) | chain::runLast(std::move(iHolder));
1499  }
1500 
1501  void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1502  if (holder.taskHasFailed()) {
1504  } else {
1505  WaitingTaskHolder tmp(holder);
1506  tmp.doneWaiting(iException);
1507  }
1508  }
1509 
1510  void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1511  auto& runPrincipal = *(iRunStatus->runPrincipal());
1512  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1513  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1514  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1515  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1516  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1517 
1518  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1519  using namespace edm::waiting_task::chain;
1520  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1521  auto nextTask) {
1522  if (endingEventSetupSucceeded) {
1523  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1525  endGlobalTransitionAsync<Traits>(
1526  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1527  }
1528  }) |
1529  ifThen(looper_ && endingEventSetupSucceeded,
1530  [this, &runPrincipal, &es](auto nextTask) {
1531  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1532  }) |
1533  ifThen(looper_ && endingEventSetupSucceeded,
1534  [this, &runPrincipal, &es](auto nextTask) {
1536  looper_->doEndRun(runPrincipal, es, &processContext_);
1537  }) |
1538  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1539  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1540  mergeableRunProductMetadata->preWriteRun();
1541  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1542  }) |
1543  then([status = std::move(iRunStatus),
1544  this,
1545  didGlobalBeginSucceed,
1546  mergeableRunProductMetadata,
1547  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1548  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1549  mergeableRunProductMetadata->postWriteRun();
1550  }
1551  if (iException) {
1552  handleEndRunExceptions(*iException, nextTask);
1553  }
1555 
1556  std::exception_ptr ptr;
1557 
1558  // Try hard to clean up resources so the
1559  // process can terminate in a controlled
1560  // fashion even after exceptions have occurred.
1561  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1562  if (not ptr) {
1563  ptr = std::current_exception();
1564  }
1565  }
1566  CMS_SA_ALLOW try {
1567  status->resumeGlobalRunQueue();
1569  } catch (...) {
1570  if (not ptr) {
1571  ptr = std::current_exception();
1572  }
1573  }
1574  CMS_SA_ALLOW try {
1575  status->resetEndResources();
1576  status.reset();
1577  } catch (...) {
1578  if (not ptr) {
1579  ptr = std::current_exception();
1580  }
1581  }
1582 
1583  if (ptr && !iException) {
1584  handleEndRunExceptions(ptr, nextTask);
1585  }
1586  }) |
1587  runLast(std::move(iTask));
1588  }
1589 
1590  void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1591  CMS_SA_ALLOW try {
1592  if (!streamRunStatus_[iStreamIndex]) {
1593  if (exceptionRunStatus_->streamFinishedRun()) {
1594  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1595  exceptionRunStatus_.reset();
1596  }
1597  return;
1598  }
1599 
1600  auto runDoneTask =
1601  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1602  if (iException) {
1603  handleEndRunExceptions(*iException, iTask);
1604  }
1605 
1606  auto runStatus = streamRunStatus_[iStreamIndex];
1607 
1608  //reset status before releasing queue else get race condition
1609  if (runStatus->streamFinishedRun()) {
1610  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1611  }
1612  streamRunStatus_[iStreamIndex].reset();
1613  --streamRunActive_;
1614  streamQueues_[iStreamIndex].resume();
1615  });
1616 
1617  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1618 
1619  auto runStatus = streamRunStatus_[iStreamIndex].get();
1620 
1621  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1622  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1623  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1624  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1625 
1626  auto& runPrincipal = *runStatus->runPrincipal();
1628  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1629  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1630  *schedule_,
1631  iStreamIndex,
1632  transitionInfo,
1633  serviceToken_,
1634  subProcesses_,
1635  cleaningUpAfterException);
1636  }
1637  } catch (...) {
1638  handleEndRunExceptions(std::current_exception(), iTask);
1639  }
1640  }
1641 
1642  void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1643  if (streamRunActive_ > 0) {
1644  FinalWaitingTask waitTask{taskGroup_};
1645 
1646  auto runStatus = streamRunStatus_[0].get();
1647  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1648  WaitingTaskHolder holder{taskGroup_, &waitTask};
1649  runStatus->setHolderOfTaskInProcessRuns(holder);
1651  endRunAsync(streamRunStatus_[0], std::move(holder));
1652  waitTask.wait();
1653  }
1654  }
1655 
1657  std::shared_ptr<RunProcessingStatus> iRunStatus,
1658  edm::WaitingTaskHolder iHolder) {
1659  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1660 
1661  auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1662  chain::first([this, &iSync, &status](auto nextTask) {
1663  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1664  nextTask,
1665  status->endIOVWaitingTasks(),
1666  status->eventSetupImpls(),
1668  actReg_.get(),
1669  serviceToken_);
1670  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1671  CMS_SA_ALLOW try {
1672  //the call to doneWaiting will cause the count to decrement
1673  if (iException) {
1674  WaitingTaskHolder copyHolder(nextTask);
1675  copyHolder.doneWaiting(*iException);
1676  }
1677 
1678  lumiQueue_->pushAndPause(
1679  *nextTask.group(),
1680  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1681  CMS_SA_ALLOW try {
1682  if (postLumiQueueTask.taskHasFailed()) {
1683  status->resetResources();
1685  endRunAsync(iRunStatus, postLumiQueueTask);
1686  return;
1687  }
1688 
1689  status->setResumer(std::move(iResumer));
1690 
1692  *postLumiQueueTask.group(),
1693  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1694  CMS_SA_ALLOW try {
1696 
1697  if (postSourceTask.taskHasFailed()) {
1698  status->resetResources();
1700  endRunAsync(iRunStatus, postSourceTask);
1701  return;
1702  }
1703 
1704  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1705 
1706  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1707  {
1709  input_->doBeginLumi(lumiPrincipal, &processContext_);
1710  sentry.completedSuccessfully();
1711  }
1712 
1714  if (rng.isAvailable()) {
1715  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1716  rng->preBeginLumi(lb);
1717  }
1718 
1719  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1720 
1721  using namespace edm::waiting_task::chain;
1722  chain::first([this, status](auto nextTask) mutable {
1725  } else {
1726  setNeedToCallNext(true);
1727  }
1728  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1729  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1731  beginGlobalTransitionAsync<Traits>(
1732  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1733  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1734  looper_->prefetchAsync(
1735  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1736  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1737  ServiceRegistry::Operate operateLooper(serviceToken_);
1738  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1739  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1740  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1741 
1742  if (iException) {
1743  WaitingTaskHolder copyHolder(holder);
1744  copyHolder.doneWaiting(*iException);
1745  globalEndLumiAsync(holder, status);
1746  endRunAsync(iRunStatus, holder);
1747  } else {
1748  status->globalBeginDidSucceed();
1749 
1750  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1752 
1753  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1754  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1755  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1756  if (!status->shouldStreamStartLumi()) {
1757  return;
1758  }
1759  streamQueues_[i].pause();
1760 
1761  auto& event = principalCache_.eventPrincipal(i);
1762  auto eventSetupImpls = &status->eventSetupImpls();
1763  auto lp = status->lumiPrincipal().get();
1766  event.setLuminosityBlockPrincipal(lp);
1767  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1768  using namespace edm::waiting_task::chain;
1769  chain::first([this, i, &transitionInfo](auto nextTask) {
1770  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1771  *schedule_,
1772  i,
1773  transitionInfo,
1774  serviceToken_,
1775  subProcesses_);
1776  }) |
1777  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1778  auto nextTask) {
1779  if (exceptionFromBeginStreamLumi) {
1780  WaitingTaskHolder copyHolder(nextTask);
1781  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1782  }
1784  }) |
1785  runLast(std::move(holder));
1786  });
1787  } // end for loop over streams
1788  });
1789  }
1790  }) | runLast(postSourceTask);
1791  } catch (...) {
1792  status->resetResources();
1794  WaitingTaskHolder copyHolder(postSourceTask);
1795  copyHolder.doneWaiting(std::current_exception());
1796  endRunAsync(iRunStatus, postSourceTask);
1797  }
1798  }); // task in sourceResourcesAcquirer
1799  } catch (...) {
1800  status->resetResources();
1802  WaitingTaskHolder copyHolder(postLumiQueueTask);
1803  copyHolder.doneWaiting(std::current_exception());
1804  endRunAsync(iRunStatus, postLumiQueueTask);
1805  }
1806  }); // task in lumiQueue
1807  } catch (...) {
1808  status->resetResources();
1810  WaitingTaskHolder copyHolder(nextTask);
1811  copyHolder.doneWaiting(std::current_exception());
1812  endRunAsync(iRunStatus, nextTask);
1813  }
1814  }) | chain::runLast(std::move(iHolder));
1815  }
1816 
1818  chain::first([this](auto nextTask) {
1819  //all streams are sharing the same status at the moment
1820  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1822 
1824  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1827  }
1828  }) | chain::then([this](auto nextTask) mutable {
1829  unsigned int streamIndex = 0;
1830  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1831  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1832  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1833  }
1834  nextTask.group()->run(
1835  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1836  }) | chain::runLast(std::move(iHolder));
1837  }
1838 
1839  void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1840  if (holder.taskHasFailed()) {
1842  } else {
1843  WaitingTaskHolder tmp(holder);
1844  tmp.doneWaiting(iException);
1845  }
1846  }
1847 
1849  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1850  // Get some needed info out of the status object before moving
1851  // it into finalTaskForThisLumi.
1852  auto& lp = *(iLumiStatus->lumiPrincipal());
1853  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1854  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1855  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1856  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1857 
1858  using namespace edm::waiting_task::chain;
1859  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1860  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1861 
1862  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1864  endGlobalTransitionAsync<Traits>(
1865  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1866  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1867  //Only call writeLumi if beginLumi succeeded
1868  if (didGlobalBeginSucceed) {
1869  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1870  }
1871  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1872  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1873  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1874  //any thrown exception auto propagates to nextTask via the chain
1876  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1877  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1878  if (iException) {
1879  handleEndLumiExceptions(*iException, nextTask);
1880  }
1882 
1883  std::exception_ptr ptr;
1884 
1885  // Try hard to clean up resources so the
1886  // process can terminate in a controlled
1887  // fashion even after exceptions have occurred.
1888  // Caught exception is passed to handleEndLumiExceptions()
1889  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1890  if (not ptr) {
1891  ptr = std::current_exception();
1892  }
1893  }
1894  // Caught exception is passed to handleEndLumiExceptions()
1895  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1896  if (not ptr) {
1897  ptr = std::current_exception();
1898  }
1899  }
1900  // Caught exception is passed to handleEndLumiExceptions()
1901  CMS_SA_ALLOW try {
1902  status->resetResources();
1903  status->globalEndRunHolderDoneWaiting();
1904  status.reset();
1905  } catch (...) {
1906  if (not ptr) {
1907  ptr = std::current_exception();
1908  }
1909  }
1910 
1911  if (ptr && !iException) {
1912  handleEndLumiExceptions(ptr, nextTask);
1913  }
1914  }) | runLast(std::move(iTask));
1915  }
1916 
1917  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1918  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1919  auto status = streamLumiStatus_[iStreamIndex];
1920  if (iException) {
1921  handleEndLumiExceptions(*iException, iTask);
1922  }
1923 
1924  // reset status before releasing queue else get race condition
1925  streamLumiStatus_[iStreamIndex].reset();
1927  streamQueues_[iStreamIndex].resume();
1928 
1929  //are we the last one?
1930  if (status->streamFinishedLumi()) {
1932  }
1933  });
1934 
1935  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1936 
1937  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1938  // therefore we do not want to hold the shared_ptr
1939  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1940  lumiStatus->setEndTime();
1941 
1942  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1943  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1944  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1945 
1946  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1948  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1949  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1950  *schedule_,
1951  iStreamIndex,
1952  transitionInfo,
1953  serviceToken_,
1954  subProcesses_,
1955  cleaningUpAfterException);
1956  }
1957 
1958  void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1959  if (streamRunActive_ == 0) {
1960  assert(streamLumiActive_ == 0);
1961  } else {
1963  if (streamLumiActive_ > 0) {
1964  FinalWaitingTask globalWaitTask{taskGroup_};
1966  streamLumiStatus_[0]->noMoreEventsInLumi();
1967  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1968  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1969  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1970  }
1971  globalWaitTask.wait();
1972  }
1973  }
1974  }
1975 
1978  input_->readProcessBlock(processBlockPrincipal);
1979  sentry.completedSuccessfully();
1980  }
1981 
1982  std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1984  assert(rp);
1985  rp->setAux(*input_->runAuxiliary());
1986  {
1988  input_->readRun(*rp, *historyAppender_);
1989  sentry.completedSuccessfully();
1990  }
1991  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1992  return rp;
1993  }
1994 
1996  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1997 
1998  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1999  assert(runOK);
2000  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2001  {
2003  input_->readAndMergeRun(runPrincipal);
2004  sentry.completedSuccessfully();
2005  }
2006  }
2007 
2008  std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
2010  assert(lbp);
2011  lbp->setAux(*input_->luminosityBlockAuxiliary());
2012  {
2014  input_->readLuminosityBlock(*lbp, *historyAppender_);
2015  sentry.completedSuccessfully();
2016  }
2017  lbp->setRunPrincipal(std::move(rp));
2018  return lbp;
2019  }
2020 
2022  auto& lumiPrincipal = *iStatus.lumiPrincipal();
2023  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2024  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2025  input_->processHistoryRegistry().reducedProcessHistoryID(
2026  input_->luminosityBlockAuxiliary()->processHistoryID()));
2027  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2028  assert(lumiOK);
2029  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2030  {
2032  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2033  sentry.completedSuccessfully();
2034  }
2035  }
2036 
2038  using namespace edm::waiting_task;
2039  chain::first([&](auto nextTask) {
2041  schedule_->writeProcessBlockAsync(
2042  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2043  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2045  for (auto& s : subProcesses_) {
2046  s.writeProcessBlockAsync(nextTask, processBlockType);
2047  }
2048  }) | chain::runLast(std::move(task));
2049  }
2050 
2052  RunPrincipal const& runPrincipal,
2053  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2054  using namespace edm::waiting_task;
2055  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2056  chain::first([&](auto nextTask) {
2058  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2059  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2061  for (auto& s : subProcesses_) {
2062  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2063  }
2064  }) | chain::runLast(std::move(task));
2065  }
2066  }
2067 
2069  for (auto& s : subProcesses_) {
2070  s.clearRunPrincipal(*iStatus.runPrincipal());
2071  }
2072  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2073  iStatus.runPrincipal()->clearPrincipal();
2074  }
2075 
2077  using namespace edm::waiting_task;
2078  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2079  chain::first([&](auto nextTask) {
2081 
2082  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2083  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2084  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2086  for (auto& s : subProcesses_) {
2087  s.writeLumiAsync(nextTask, lumiPrincipal);
2088  }
2090  }
2091  }
2092 
2094  for (auto& s : subProcesses_) {
2095  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2096  }
2097  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2098  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2099  iStatus.lumiPrincipal()->clearPrincipal();
2100  }
2101 
2102  void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2103  WaitingTaskHolder iHolder) {
2104  auto group = iHolder.group();
2106  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2107  CMS_SA_ALLOW try {
2109 
2110  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2111 
2113  setNeedToCallNext(false);
2114 
2116  status->runPrincipal()->run() == input_->run() and
2117  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2118  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2119  status->setStopBeforeProcessingRun(true);
2120  return;
2121  }
2124  setNeedToCallNext(true);
2125  return;
2126  }
2128  }
2129  } catch (...) {
2130  status->setStopBeforeProcessingRun(true);
2131  holder.doneWaiting(std::current_exception());
2132  }
2133  });
2134  }
2135 
2136  void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2137  WaitingTaskHolder iHolder) {
2138  auto group = iHolder.group();
2140  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2141  CMS_SA_ALLOW try {
2143 
2144  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2145 
2147  setNeedToCallNext(false);
2148 
2150  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2151  readAndMergeLumi(*iLumiStatus);
2153  setNeedToCallNext(true);
2154  return;
2155  }
2157  }
2158  } catch (...) {
2159  holder.doneWaiting(std::current_exception());
2160  }
2161  });
2162  }
2163 
2164  void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2165  WaitingTaskHolder iHolder) {
2166  chain::first([this, iRunStatus](auto nextTask) mutable {
2167  if (needToCallNext()) {
2168  nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2169  }
2170  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2172  if (iException) {
2173  WaitingTaskHolder copyHolder(nextTask);
2174  copyHolder.doneWaiting(*iException);
2175  }
2177  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2178  return;
2179  }
2180  if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2181  CMS_SA_ALLOW try {
2182  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2183  input_->luminosityBlockAuxiliary()->beginTime()),
2184  iRunStatus,
2185  nextTask);
2186  return;
2187  } catch (...) {
2188  WaitingTaskHolder copyHolder(nextTask);
2189  copyHolder.doneWaiting(std::current_exception());
2190  }
2191  }
2192  // Note that endRunAsync will call beginRunAsync for the following run
2193  // if appropriate.
2194  endRunAsync(iRunStatus, std::move(nextTask));
2195  }) | chain::runLast(std::move(iHolder));
2196  }
2197 
2199  unsigned int iStreamIndex,
2201  // This function returns true if it successfully reads an event for the stream and that
2202  // requires both that an event is next and there are no problems or requests to stop.
2203 
2204  if (iTask.taskHasFailed()) {
2205  // We want all streams to stop or all streams to pause. If we are already in the
2206  // middle of pausing streams, then finish pausing all of them and the lumi will be
2207  // ended later. Otherwise, just end it now.
2210  }
2211  return false;
2212  }
2213 
2214  // Did another stream already stop or pause this lumi?
2216  return false;
2217  }
2218 
2219  // Are output modules or the looper requesting we stop?
2220  if (shouldWeStop()) {
2223  return false;
2224  }
2225 
2227 
2228  // need to use lock in addition to the serial task queue because
2229  // of delayed provenance reading and reading data in response to
2230  // edm::Refs etc
2231  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2232 
2233  // If we didn't already call nextTransitionType while merging lumis, call it here.
2234  // This asks the input source what is next and also checks for signals.
2235 
2237  setNeedToCallNext(true);
2238 
2239  if (InputSource::ItemType::IsEvent != itemType) {
2240  // IsFile may continue processing the lumi and
2241  // looper_ can cause the input source to declare a new IsRun which is actually
2242  // just a continuation of the previous run
2244  (InputSource::ItemType::IsRun == itemType and
2245  (iStatus.lumiPrincipal()->run() != input_->run() or
2246  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2247  if (itemType == InputSource::ItemType::IsLumi &&
2248  iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2250  << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2251  << "but the next lumi entry has the same lumi number.\n"
2252  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2253  }
2255  } else {
2257  }
2258  return false;
2259  }
2260  readEvent(iStreamIndex);
2261  return true;
2262  }
2263 
2264  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2265  if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2266  streamEndLumiAsync(iTask, iStreamIndex);
2267  return;
2268  }
2269  auto group = iTask.group();
2270  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2271  CMS_SA_ALLOW try {
2272  auto status = streamLumiStatus_[iStreamIndex].get();
2274 
2275  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2276  auto recursionTask =
2277  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2278  if (iEventException) {
2279  WaitingTaskHolder copyHolder(iTask);
2280  copyHolder.doneWaiting(*iEventException);
2281  // Intentionally, we don't return here. The recursive call to
2282  // handleNextEvent takes care of immediately ending the run properly
2283  // using the same code it uses to end the run in other situations.
2284  }
2285  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2286  });
2287 
2288  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2289  } else {
2290  // the stream will stop processing this lumi now
2292  if (not status->haveStartedNextLumiOrEndedRun()) {
2293  status->noMoreEventsInLumi();
2294  status->startNextLumiOrEndRun();
2296  CMS_SA_ALLOW try {
2297  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2298  input_->luminosityBlockAuxiliary()->beginTime()),
2299  streamRunStatus_[iStreamIndex],
2300  iTask);
2301  } catch (...) {
2302  WaitingTaskHolder copyHolder(iTask);
2303  copyHolder.doneWaiting(std::current_exception());
2304  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2305  }
2306  } else {
2307  // If appropriate, this will also start the next run.
2308  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2309  }
2310  }
2311  streamEndLumiAsync(iTask, iStreamIndex);
2312  } else {
2313  assert(status->eventProcessingState() ==
2315  auto runStatus = streamRunStatus_[iStreamIndex].get();
2316 
2317  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2318  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2319  }
2320  }
2321  }
2322  } catch (...) {
2323  WaitingTaskHolder copyHolder(iTask);
2324  copyHolder.doneWaiting(std::current_exception());
2325  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2326  }
2327  });
2328  }
2329 
2330  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2331  //TODO this will have to become per stream
2332  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2333  StreamContext streamContext(event.streamID(), &processContext_);
2334 
2336  input_->readEvent(event, streamContext);
2337 
2338  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2339  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2340  sentry.completedSuccessfully();
2341 
2342  FDEBUG(1) << "\treadEvent\n";
2343  }
2344 
2345  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2346  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2347  }
2348 
2349  namespace {
2350  struct ClearEventGuard {
2351  ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
2352  : act_(iReg), context_(iContext) {
2353  iReg.preClearEventSignal_.emit(iContext);
2354  }
2355  ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2356  edm::ActivityRegistry& act_;
2357  edm::StreamContext const& context_;
2358  };
2359  } // namespace
2360 
2361  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2362  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2363 
2366  if (rng.isAvailable()) {
2367  Event ev(*pep, ModuleDescription(), nullptr);
2368  rng->postEventRead(ev);
2369  }
2370 
2371  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2372  using namespace edm::waiting_task::chain;
2373  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2374  EventTransitionInfo info(*pep, es);
2375  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2376  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2377  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2378  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2379  }
2380  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2381  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2382  ServiceRegistry::Operate operateLooper(serviceToken_);
2383  processEventWithLooper(*pep, iStreamIndex);
2384  }) | then([this, pep](auto nextTask) {
2385  FDEBUG(1) << "\tprocessEvent\n";
2386  StreamContext streamContext(pep->streamID(),
2388  pep->id(),
2389  pep->runPrincipal().index(),
2390  pep->luminosityBlockPrincipal().index(),
2391  pep->time(),
2392  &processContext_);
2393  ClearEventGuard guard(*this->actReg_.get(), streamContext);
2394  pep->clearEventPrincipal();
2395  }) | runLast(iHolder);
2396  }
2397 
2398  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2399  bool randomAccess = input_->randomAccess();
2400  ProcessingController::ForwardState forwardState = input_->forwardState();
2401  ProcessingController::ReverseState reverseState = input_->reverseState();
2402  ProcessingController pc(forwardState, reverseState, randomAccess);
2403 
2405  do {
2406  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2407  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2408  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2409 
2410  bool succeeded = true;
2411  if (randomAccess) {
2413  input_->skipEvents(-2);
2415  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2416  }
2417  }
2419  } while (!pc.lastOperationSucceeded());
2421  shouldWeStop_ = true;
2422  }
2423  }
2424 
2426  FDEBUG(1) << "\tshouldWeStop\n";
2427  if (shouldWeStop_)
2428  return true;
2429  if (!subProcesses_.empty()) {
2430  for (auto const& subProcess : subProcesses_) {
2431  if (subProcess.terminate()) {
2432  return true;
2433  }
2434  }
2435  return false;
2436  }
2437  return schedule_->terminate();
2438  }
2439 
2441 
2443 
2445 
2446  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2447  bool expected = false;
2448  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2449  deferredExceptionPtr_ = iException;
2450  return true;
2451  }
2452  return false;
2453  }
2454 
2456  cms::Exception ex("ModulesSynchingOnLumis");
2457  ex << "The framework is configured to use at least two streams, but the following modules\n"
2458  << "require synchronizing on LuminosityBlock boundaries:";
2459  bool found = false;
2460  for (auto worker : schedule_->allWorkers()) {
2461  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2462  found = true;
2463  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2464  }
2465  }
2466  if (found) {
2467  ex << "\n\nThe situation can be fixed by either\n"
2468  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2469  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2470  throw ex;
2471  }
2472  }
2473 
2475  std::unique_ptr<LogSystem> s;
2476  for (auto worker : schedule_->allWorkers()) {
2477  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2478  if (not s) {
2479  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2480  (*s) << "The following modules require synchronizing on Run boundaries:";
2481  }
2482  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2483  }
2484  }
2485  }
2486 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
PreClearEvent preClearEventSignal_
signal is emitted before the data products in the Event are cleared
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:386
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void clearRunPrincipal(RunProcessingStatus &)
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
InputSource::ItemTypeInfo lastSourceTransition_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:210
std::unique_ptr< ExceptionToActionTable const > act_table_
int merge(int argc, char *argv[])
Definition: DiMuonVmerge.cc:28
static PFTauRenderPlugin instance
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
Definition: RunPrincipal.h:73
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:50
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
Definition: Signal.h:50
std::multimap< std::string, std::string > referencesToBranches_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
Definition: Registry.cc:40
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
virtual void endOfJob()
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
PreSourceNextTransition preSourceNextTransitionSignal_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void doneWaiting(std::exception_ptr iExcept) noexcept
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
void writeLumi(LuminosityBlockNumber_t lumi)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void reportShutdownSignal()
Definition: JobReport.cc:543
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:169
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:315
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:668
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)