CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
45 
47 
55 
60 
65 
78 
79 #include "MessageForSource.h"
80 #include "MessageForParent.h"
82 #include "RunProcessingStatus.h"
83 
84 #include "boost/range/adaptor/reversed.hpp"
85 
86 #include <cassert>
87 #include <exception>
88 #include <iomanip>
89 #include <iostream>
90 #include <utility>
91 #include <sstream>
92 
93 #include <sys/ipc.h>
94 #include <sys/msg.h>
95 
96 #include "oneapi/tbb/task.h"
97 
98 //Used for CPU affinity
99 #ifndef __APPLE__
100 #include <sched.h>
101 #endif
102 
103 namespace {
104  class PauseQueueSentry {
105  public:
106  PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
107  ~PauseQueueSentry() { queue_.resume(); }
108 
109  private:
110  edm::SerialTaskQueue& queue_;
111  };
112 } // namespace
113 
114 namespace edm {
115 
116  namespace chain = waiting_task::chain;
117 
118  // ---------------------------------------------------------------
119  std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
121  CommonParams const& common,
122  std::shared_ptr<ProductRegistry> preg,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
125  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126  std::shared_ptr<ActivityRegistry> areg,
127  std::shared_ptr<ProcessConfiguration const> processConfiguration,
128  PreallocationConfiguration const& allocations) {
129  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
130  if (main_input == nullptr) {
132  << "There must be exactly one source in the configuration.\n"
133  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134  }
135 
136  std::string modtype(main_input->getParameter<std::string>("@module_type"));
137 
138  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
140  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
141  filler->fill(descriptions);
142 
143  try {
144  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
145  } catch (cms::Exception& iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
164  moduleIndex);
165 
166  InputSourceDescription isdesc(md,
167  preg,
168  branchIDListHelper,
169  processBlockHelper,
170  thinnedAssociationsHelper,
171  areg,
172  common.maxEventsInput_,
173  common.maxLumisInput_,
174  common.maxSecondsUntilRampdown_,
175  allocations);
176 
177  areg->preSourceConstructionSignal_(md);
178  std::unique_ptr<InputSource> input;
179  try {
180  //even if we have an exception, send the signal
181  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
182  convertException::wrap([&]() {
183  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
184  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186  });
187  } catch (cms::Exception& iException) {
188  std::ostringstream ost;
189  ost << "Constructing input source of type " << modtype;
190  iException.addContext(ost.str());
191  throw;
192  }
193  return input;
194  }
195 
196  // ---------------------------------------------------------------
197  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
200  std::vector<std::string> const& loopers) {
201  std::shared_ptr<EDLooperBase> vLooper;
202 
203  assert(1 == loopers.size());
204 
205  for (auto const& looperName : loopers) {
206  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
207  // Unlikely we would ever need the ModuleTypeResolver in Looper
208  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
209  }
210  return vLooper;
211  }
212 
213  // ---------------------------------------------------------------
214  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
215  ServiceToken const& iToken,
217  std::vector<std::string> const& defaultServices,
218  std::vector<std::string> const& forcedServices)
219  : actReg_(),
220  preg_(),
221  branchIDListHelper_(),
222  serviceToken_(),
223  input_(),
224  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
252 
253  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
254  std::vector<std::string> const& defaultServices,
255  std::vector<std::string> const& forcedServices)
256  : actReg_(),
257  preg_(),
258  branchIDListHelper_(),
259  serviceToken_(),
260  input_(),
261  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
265  processConfiguration_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
271  deferredExceptionPtrIsSet_(false),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
278  exceptionMessageFiles_(),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
283  forceESCacheClearOnNewRun_(false),
284  eventSetupDataToExcludeFromPrefetching_() {
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
289 
290  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
291  ServiceToken const& token,
293  : actReg_(),
294  preg_(),
295  branchIDListHelper_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
302  processConfiguration_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
308  deferredExceptionPtrIsSet_(false),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
315  exceptionMessageFiles_(),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
320  forceESCacheClearOnNewRun_(false),
321  eventSetupDataToExcludeFromPrefetching_() {
322  init(processDesc, token, legacy);
323  }
324 
325  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
326  ServiceToken const& iToken,
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
606 
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
624 
628  task.waitNoThrow();
629  assert(task.done());
630  }
631 
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
698  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
699 
702  }
703  if (preallocations_.numberOfRuns() > 1) {
705  }
707 
708  //NOTE: This implementation assumes 'Job' means one call
709  // the EventProcessor::run
710  // If it really means once per 'application' then this code will
711  // have to be changed.
712  // Also have to deal with case where have 'run' then new Module
713  // added and do 'run'
714  // again. In that case the newly added Module needs its 'beginJob'
715  // to be called.
716 
717  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
718  // For now we delay calling beginOfJob until first beginOfRun
719  //if(looper_) {
720  // looper_->beginOfJob(es);
721  //}
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728  espController_->finishConfiguration();
729  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToResolverIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
762 
764  // Collects exceptions, so we don't throw before all operations are performed.
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
829 
831 
832  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
833  return schedule_->getAllModuleDescriptions();
834  }
835 
836  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
837 
838  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
839 
840  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
841 
842  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
843 
844  namespace {
845 #include "TransitionProcessors.icc"
846  }
847 
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
854  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
856  jr->reportShutdownSignal();
858  }
859  return returnValue;
860  }
861 
864  InputSource::ItemType itemType;
865  //For now, do nothing with InputSource::IsSynchronize
866  do {
867  itemType = input_->nextItemType();
868  } while (itemType == InputSource::IsSynchronize);
869 
870  lastSourceTransition_ = itemType;
871  sentry.completedSuccessfully();
872 
874 
876  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
878  }
879 
880  return lastSourceTransition_;
881  }
882 
884  beginJob(); //make sure this was called
885 
886  // make the services available
888  actReg_->beginProcessingSignal_();
889  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
890  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
891  try {
892  FilesProcessor fp(fileModeNoMerge_);
893 
894  convertException::wrap([&]() {
895  bool firstTime = true;
896  do {
897  if (not firstTime) {
899  rewindInput();
900  } else {
901  firstTime = false;
902  }
903  startingNewLoop();
904 
905  auto trans = fp.processFiles(*this);
906 
907  fp.normalEnd();
908 
909  if (deferredExceptionPtrIsSet_.load()) {
910  std::rethrow_exception(deferredExceptionPtr_);
911  }
912  if (trans != InputSource::IsStop) {
913  //problem with the source
914  doErrorStuff();
915 
916  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
917  }
918  } while (not endOfLoop());
919  }); // convertException::wrap
920 
921  } // Try block
922  catch (cms::Exception& e) {
924  std::string message(
925  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
926  e.addAdditionalInfo(message);
927  if (e.alreadyPrinted()) {
928  LogAbsolute("Additional Exceptions") << message;
929  }
930  }
931  if (exceptionMessageRuns_) {
932  std::string message(
933  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
934  e.addAdditionalInfo(message);
935  if (e.alreadyPrinted()) {
936  LogAbsolute("Additional Exceptions") << message;
937  }
938  }
939  if (!exceptionMessageFiles_.empty()) {
940  e.addAdditionalInfo(exceptionMessageFiles_);
941  if (e.alreadyPrinted()) {
942  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
943  }
944  }
945  throw;
946  }
947  return epSuccess;
948  }
949 
951  FDEBUG(1) << " \treadFile\n";
952  size_t size = preg_->size();
954 
955  if (streamRunActive_ > 0) {
956  streamRunStatus_[0]->runPrincipal()->preReadFile();
957  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
958  }
959 
960  if (streamLumiActive_ > 0) {
961  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
962  }
963 
964  fb_ = input_->readFile();
965  if (size < preg_->size()) {
967  }
970  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
971  }
972  sentry.completedSuccessfully();
973  }
974 
975  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
976  if (fileBlockValid()) {
978  input_->closeFile(fb_.get(), cleaningUpAfterException);
979  sentry.completedSuccessfully();
980  }
981  FDEBUG(1) << "\tcloseInputFile\n";
982  }
983 
985  if (fileBlockValid()) {
986  schedule_->openOutputFiles(*fb_);
987  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
988  }
989  FDEBUG(1) << "\topenOutputFiles\n";
990  }
991 
993  schedule_->closeOutputFiles();
994  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
995  processBlockHelper_->clearAfterOutputFilesClose();
996  FDEBUG(1) << "\tcloseOutputFiles\n";
997  }
998 
1000  if (fileBlockValid()) {
1002  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1003  schedule_->respondToOpenInputFile(*fb_);
1004  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1005  }
1006  FDEBUG(1) << "\trespondToOpenInputFile\n";
1007  }
1008 
1010  if (fileBlockValid()) {
1011  schedule_->respondToCloseInputFile(*fb_);
1012  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1013  }
1014  FDEBUG(1) << "\trespondToCloseInputFile\n";
1015  }
1016 
1018  shouldWeStop_ = false;
1019  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1020  // until after we've called beginOfJob
1021  if (looper_ && looperBeginJobRun_) {
1022  looper_->doStartingNewLoop();
1023  }
1024  FDEBUG(1) << "\tstartingNewLoop\n";
1025  }
1026 
1028  if (looper_) {
1029  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1030  looper_->setModuleChanger(&changer);
1031  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1032  looper_->setModuleChanger(nullptr);
1034  return true;
1035  else
1036  return false;
1037  }
1038  FDEBUG(1) << "\tendOfLoop\n";
1039  return true;
1040  }
1041 
1043  input_->repeat();
1044  input_->rewind();
1045  FDEBUG(1) << "\trewind\n";
1046  }
1047 
1049  looper_->prepareForNextLoop(esp_.get());
1050  FDEBUG(1) << "\tprepareForNextLoop\n";
1051  }
1052 
1054  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1055  if (!subProcesses_.empty()) {
1056  for (auto const& subProcess : subProcesses_) {
1057  if (subProcess.shouldWeCloseOutput()) {
1058  return true;
1059  }
1060  }
1061  return false;
1062  }
1063  return schedule_->shouldWeCloseOutput();
1064  }
1065 
1067  FDEBUG(1) << "\tdoErrorStuff\n";
1068  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1069  << "and went to the error state\n"
1070  << "Will attempt to terminate processing normally\n"
1071  << "(IF using the looper the next loop will be attempted)\n"
1072  << "This likely indicates a bug in an input module or corrupted input or both\n";
1073  }
1074 
1075  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1076  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1077  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1078 
1080  FinalWaitingTask globalWaitTask{taskGroup_};
1081 
1082  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1083  beginGlobalTransitionAsync<Traits>(
1084  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1085 
1086  globalWaitTask.wait();
1087  beginProcessBlockSucceeded = true;
1088  }
1089 
1091  input_->fillProcessBlockHelper();
1093  while (input_->nextProcessBlock(processBlockPrincipal)) {
1094  readProcessBlock(processBlockPrincipal);
1095 
1097  FinalWaitingTask globalWaitTask{taskGroup_};
1098 
1099  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1100  beginGlobalTransitionAsync<Traits>(
1101  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1102 
1103  globalWaitTask.wait();
1104 
1105  FinalWaitingTask writeWaitTask{taskGroup_};
1107  writeWaitTask.wait();
1108 
1109  processBlockPrincipal.clearPrincipal();
1110  for (auto& s : subProcesses_) {
1111  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1112  }
1113  }
1114  }
1115 
1116  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1117  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1118 
1120  FinalWaitingTask globalWaitTask{taskGroup_};
1121 
1122  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1123  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1124  *schedule_,
1125  transitionInfo,
1126  serviceToken_,
1127  subProcesses_,
1128  cleaningUpAfterException);
1129  globalWaitTask.wait();
1130 
1131  if (beginProcessBlockSucceeded) {
1132  FinalWaitingTask writeWaitTask{taskGroup_};
1134  writeWaitTask.wait();
1135  }
1136 
1137  processBlockPrincipal.clearPrincipal();
1138  for (auto& s : subProcesses_) {
1139  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1140  }
1141  }
1142 
1144  FinalWaitingTask waitTask{taskGroup_};
1146  if (streamRunActive_ == 0) {
1147  assert(streamLumiActive_ == 0);
1148 
1149  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1150  WaitingTaskHolder{taskGroup_, &waitTask});
1151  } else {
1153 
1154  auto runStatus = streamRunStatus_[0];
1155 
1156  while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1157  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1158  readAndMergeRun(*runStatus);
1160  }
1161 
1162  WaitingTaskHolder holder{taskGroup_, &waitTask};
1163  runStatus->setHolderOfTaskInProcessRuns(holder);
1164  if (streamLumiActive_ > 0) {
1166  continueLumiAsync(std::move(holder));
1167  } else {
1169  }
1170  }
1171  waitTask.wait();
1172  return lastTransitionType();
1173  }
1174 
1176  if (iHolder.taskHasFailed()) {
1177  return;
1178  }
1179 
1180  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1181 
1182  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1183 
1184  chain::first([this, &status, iSync](auto nextTask) {
1185  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1186  nextTask,
1187  status->endIOVWaitingTasks(),
1188  status->eventSetupImpls(),
1190  actReg_.get(),
1191  serviceToken_,
1193  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1194  CMS_SA_ALLOW try {
1195  if (iException) {
1196  WaitingTaskHolder copyHolder(nextTask);
1197  copyHolder.doneWaiting(*iException);
1198  // Finish handling the exception in the task pushed to runQueue_
1199  }
1201 
1202  runQueue_->pushAndPause(
1203  *nextTask.group(),
1204  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1205  CMS_SA_ALLOW try {
1206  if (postRunQueueTask.taskHasFailed()) {
1207  status->resetBeginResources();
1209  return;
1210  }
1211 
1212  status->setResumer(std::move(iResumer));
1213 
1215  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1216  CMS_SA_ALLOW try {
1218 
1219  if (postSourceTask.taskHasFailed()) {
1220  status->resetBeginResources();
1222  status->resumeGlobalRunQueue();
1223  return;
1224  }
1225 
1226  status->setRunPrincipal(readRun());
1227 
1228  RunPrincipal& runPrincipal = *status->runPrincipal();
1229  {
1231  input_->doBeginRun(runPrincipal, &processContext_);
1232  sentry.completedSuccessfully();
1233  }
1234 
1235  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1236  if (looper_ && looperBeginJobRun_ == false) {
1237  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1238 
1239  oneapi::tbb::task_group group;
1240  FinalWaitingTask waitTask{group};
1241  using namespace edm::waiting_task::chain;
1242  chain::first([this, &es](auto nextTask) {
1243  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1244  }) | then([this, &es](auto nextTask) mutable {
1245  looper_->beginOfJob(es);
1246  looperBeginJobRun_ = true;
1247  looper_->doStartingNewLoop();
1248  }) | runLast(WaitingTaskHolder(group, &waitTask));
1249  waitTask.wait();
1250  }
1251 
1252  using namespace edm::waiting_task::chain;
1253  chain::first([this, status](auto nextTask) mutable {
1254  CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1255  status->setStopBeforeProcessingRun(true);
1256  nextTask.doneWaiting(std::current_exception());
1257  }
1258  }) | then([this, status, &es](auto nextTask) {
1259  if (status->stopBeforeProcessingRun()) {
1260  return;
1261  }
1262  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1264  beginGlobalTransitionAsync<Traits>(
1265  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1266  }) | then([status](auto nextTask) mutable {
1267  if (status->stopBeforeProcessingRun()) {
1268  return;
1269  }
1270  status->globalBeginDidSucceed();
1271  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1272  if (status->stopBeforeProcessingRun()) {
1273  return;
1274  }
1275  looper_->prefetchAsync(
1276  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1277  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1278  if (status->stopBeforeProcessingRun()) {
1279  return;
1280  }
1281  ServiceRegistry::Operate operateLooper(serviceToken_);
1282  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1283  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1284  bool precedingTasksSucceeded = true;
1285  if (iException) {
1286  precedingTasksSucceeded = false;
1287  WaitingTaskHolder copyHolder(holder);
1288  copyHolder.doneWaiting(*iException);
1289  }
1290 
1291  if (status->stopBeforeProcessingRun()) {
1292  // We just quit now if there was a failure when merging runs
1293  status->resetBeginResources();
1295  status->resumeGlobalRunQueue();
1296  return;
1297  }
1298  CMS_SA_ALLOW try {
1299  // Under normal circumstances, this task runs after endRun has completed for all streams
1300  // and global endLumi has completed for all lumis contained in this run
1301  auto globalEndRunTask =
1302  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1303  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1304  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1306  });
1307  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1308  } catch (...) {
1309  status->resetBeginResources();
1311  status->resumeGlobalRunQueue();
1312  holder.doneWaiting(std::current_exception());
1313  return;
1314  }
1315 
1316  // After this point we are committed to end the run via endRunAsync
1317 
1319 
1320  // The only purpose of the pause is to cause stream begin run to execute before
1321  // global begin lumi in the single threaded case (maintains consistency with
1322  // the order that existed before concurrent runs were implemented).
1323  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1324 
1325  CMS_SA_ALLOW try {
1327  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1328  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1329  CMS_SA_ALLOW try {
1330  streamQueues_[i].push(
1331  *holder.group(),
1332  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1334  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1335  });
1336  } catch (...) {
1337  if (status->streamFinishedBeginRun()) {
1338  WaitingTaskHolder copyHolder(holder);
1339  copyHolder.doneWaiting(std::current_exception());
1340  status->resetBeginResources();
1343  }
1344  }
1345  }
1346  });
1347  } catch (...) {
1348  WaitingTaskHolder copyHolder(holder);
1349  copyHolder.doneWaiting(std::current_exception());
1350  status->resetBeginResources();
1353  }
1355  }) | runLast(postSourceTask);
1356  } catch (...) {
1357  status->resetBeginResources();
1359  status->resumeGlobalRunQueue();
1360  postSourceTask.doneWaiting(std::current_exception());
1361  }
1362  }); // task in sourceResourcesAcquirer
1363  } catch (...) {
1364  status->resetBeginResources();
1366  status->resumeGlobalRunQueue();
1367  postRunQueueTask.doneWaiting(std::current_exception());
1368  }
1369  }); // task in runQueue
1370  } catch (...) {
1371  status->resetBeginResources();
1373  nextTask.doneWaiting(std::current_exception());
1374  }
1375  }) | chain::runLast(std::move(iHolder));
1376  }
1377 
1378  void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1379  std::shared_ptr<RunProcessingStatus> status,
1380  bool precedingTasksSucceeded,
1381  WaitingTaskHolder iHolder) {
1382  // These shouldn't throw
1383  streamQueues_[iStream].pause();
1384  ++streamRunActive_;
1385  streamRunStatus_[iStream] = std::move(status);
1386 
1387  CMS_SA_ALLOW try {
1388  using namespace edm::waiting_task::chain;
1389  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1390  if (precedingTasksSucceeded) {
1391  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1392  RunTransitionInfo transitionInfo(
1393  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1395  beginStreamTransitionAsync<Traits>(
1396  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1397  }
1398  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1399  if (exceptionFromBeginStreamRun) {
1400  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1401  }
1402  releaseBeginRunResources(iStream);
1403  }) | runLast(iHolder);
1404  } catch (...) {
1405  releaseBeginRunResources(iStream);
1406  iHolder.doneWaiting(std::current_exception());
1407  }
1408  }
1409 
1410  void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1411  auto& status = streamRunStatus_[iStream];
1412  if (status->streamFinishedBeginRun()) {
1413  status->resetBeginResources();
1415  }
1416  streamQueues_[iStream].resume();
1417  }
1418 
1419  void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1420  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1421  iRunStatus->setEndTime();
1422  IOVSyncValue ts(
1424  runPrincipal.endTime());
1425  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1426  WaitingTaskHolder copyHolder(iHolder);
1427  copyHolder.doneWaiting(std::current_exception());
1428  }
1429 
1430  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1431  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1432  nextTask,
1433  iRunStatus->endIOVWaitingTasksEndRun(),
1434  iRunStatus->eventSetupImplsEndRun(),
1436  actReg_.get(),
1437  serviceToken_);
1438  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1439  if (iException) {
1440  iRunStatus->setEndingEventSetupSucceeded(false);
1441  handleEndRunExceptions(*iException, nextTask);
1442  }
1444  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1445  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1446  CMS_SA_ALLOW try {
1447  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1448  streamQueues_[i].pause();
1449  streamEndRunAsync(std::move(nextTask), i);
1450  });
1451  } catch (...) {
1452  WaitingTaskHolder copyHolder(nextTask);
1453  copyHolder.doneWaiting(std::current_exception());
1454  }
1455  }
1456  });
1457 
1459  CMS_SA_ALLOW try {
1460  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1461  } catch (...) {
1462  WaitingTaskHolder copyHolder(nextTask);
1463  copyHolder.doneWaiting(std::current_exception());
1464  }
1465  }
1466  }) | chain::runLast(std::move(iHolder));
1467  }
1468 
1469  void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1470  if (holder.taskHasFailed()) {
1472  } else {
1473  WaitingTaskHolder tmp(holder);
1474  tmp.doneWaiting(iException);
1475  }
1476  }
1477 
1478  void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1479  auto& runPrincipal = *(iRunStatus->runPrincipal());
1480  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1481  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1482  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1483  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1484  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1485 
1486  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1487  using namespace edm::waiting_task::chain;
1488  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1489  auto nextTask) {
1490  if (endingEventSetupSucceeded) {
1491  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1493  endGlobalTransitionAsync<Traits>(
1494  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1495  }
1496  }) |
1497  ifThen(looper_ && endingEventSetupSucceeded,
1498  [this, &runPrincipal, &es](auto nextTask) {
1499  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1500  }) |
1501  ifThen(looper_ && endingEventSetupSucceeded,
1502  [this, &runPrincipal, &es](auto nextTask) {
1504  looper_->doEndRun(runPrincipal, es, &processContext_);
1505  }) |
1506  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1507  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1508  mergeableRunProductMetadata->preWriteRun();
1509  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1510  }) |
1511  then([status = std::move(iRunStatus),
1512  this,
1513  didGlobalBeginSucceed,
1514  mergeableRunProductMetadata,
1515  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1516  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1517  mergeableRunProductMetadata->postWriteRun();
1518  }
1519  if (iException) {
1520  handleEndRunExceptions(*iException, nextTask);
1521  }
1523 
1524  std::exception_ptr ptr;
1525 
1526  // Try hard to clean up resources so the
1527  // process can terminate in a controlled
1528  // fashion even after exceptions have occurred.
1529  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1530  if (not ptr) {
1531  ptr = std::current_exception();
1532  }
1533  }
1534  CMS_SA_ALLOW try {
1535  status->resumeGlobalRunQueue();
1537  } catch (...) {
1538  if (not ptr) {
1539  ptr = std::current_exception();
1540  }
1541  }
1542  CMS_SA_ALLOW try {
1543  status->resetEndResources();
1544  status.reset();
1545  } catch (...) {
1546  if (not ptr) {
1547  ptr = std::current_exception();
1548  }
1549  }
1550 
1551  if (ptr && !iException) {
1552  handleEndRunExceptions(ptr, nextTask);
1553  }
1554  }) |
1555  runLast(std::move(iTask));
1556  }
1557 
1558  void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1559  CMS_SA_ALLOW try {
1560  if (!streamRunStatus_[iStreamIndex]) {
1561  if (exceptionRunStatus_->streamFinishedRun()) {
1562  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1563  exceptionRunStatus_.reset();
1564  }
1565  return;
1566  }
1567 
1568  auto runDoneTask =
1569  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1570  if (iException) {
1571  handleEndRunExceptions(*iException, iTask);
1572  }
1573 
1574  auto runStatus = streamRunStatus_[iStreamIndex];
1575 
1576  //reset status before releasing queue else get race condition
1577  if (runStatus->streamFinishedRun()) {
1578  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1579  }
1580  streamRunStatus_[iStreamIndex].reset();
1581  --streamRunActive_;
1582  streamQueues_[iStreamIndex].resume();
1583  });
1584 
1585  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1586 
1587  auto runStatus = streamRunStatus_[iStreamIndex].get();
1588 
1589  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1590  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1591  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1592  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1593 
1594  auto& runPrincipal = *runStatus->runPrincipal();
1596  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1597  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1598  *schedule_,
1599  iStreamIndex,
1600  transitionInfo,
1601  serviceToken_,
1602  subProcesses_,
1603  cleaningUpAfterException);
1604  }
1605  } catch (...) {
1606  handleEndRunExceptions(std::current_exception(), iTask);
1607  }
1608  }
1609 
1610  void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1611  if (streamRunActive_ > 0) {
1612  FinalWaitingTask waitTask{taskGroup_};
1613 
1614  auto runStatus = streamRunStatus_[0].get();
1615  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1616  WaitingTaskHolder holder{taskGroup_, &waitTask};
1617  runStatus->setHolderOfTaskInProcessRuns(holder);
1619  endRunAsync(streamRunStatus_[0], std::move(holder));
1620  waitTask.wait();
1621  }
1622  }
1623 
1625  std::shared_ptr<RunProcessingStatus> iRunStatus,
1626  edm::WaitingTaskHolder iHolder) {
1627  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1628 
1629  auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1630  chain::first([this, &iSync, &status](auto nextTask) {
1631  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1632  nextTask,
1633  status->endIOVWaitingTasks(),
1634  status->eventSetupImpls(),
1636  actReg_.get(),
1637  serviceToken_);
1638  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1639  CMS_SA_ALLOW try {
1640  //the call to doneWaiting will cause the count to decrement
1641  if (iException) {
1642  WaitingTaskHolder copyHolder(nextTask);
1643  copyHolder.doneWaiting(*iException);
1644  }
1645 
1646  lumiQueue_->pushAndPause(
1647  *nextTask.group(),
1648  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1649  CMS_SA_ALLOW try {
1650  if (postLumiQueueTask.taskHasFailed()) {
1651  status->resetResources();
1653  endRunAsync(iRunStatus, postLumiQueueTask);
1654  return;
1655  }
1656 
1657  status->setResumer(std::move(iResumer));
1658 
1660  *postLumiQueueTask.group(),
1661  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1662  CMS_SA_ALLOW try {
1664 
1665  if (postSourceTask.taskHasFailed()) {
1666  status->resetResources();
1668  endRunAsync(iRunStatus, postSourceTask);
1669  return;
1670  }
1671 
1672  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1673 
1674  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1675  {
1677  input_->doBeginLumi(lumiPrincipal, &processContext_);
1678  sentry.completedSuccessfully();
1679  }
1680 
1682  if (rng.isAvailable()) {
1683  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1684  rng->preBeginLumi(lb);
1685  }
1686 
1687  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1688 
1689  using namespace edm::waiting_task::chain;
1690  chain::first([this, status](auto nextTask) mutable {
1692  firstItemAfterLumiMerge_ = true;
1693  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1694  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1696  beginGlobalTransitionAsync<Traits>(
1697  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1698  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1699  looper_->prefetchAsync(
1700  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1701  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1702  status->globalBeginDidSucceed();
1703  ServiceRegistry::Operate operateLooper(serviceToken_);
1704  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1705  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1706  if (iException) {
1707  status->resetResources();
1709  WaitingTaskHolder copyHolder(holder);
1710  copyHolder.doneWaiting(*iException);
1711  endRunAsync(iRunStatus, holder);
1712  } else {
1713  if (not looper_) {
1714  status->globalBeginDidSucceed();
1715  }
1716 
1717  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1718 
1719  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1721 
1722  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1723  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1724  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1725  streamQueues_[i].pause();
1726 
1727  auto& event = principalCache_.eventPrincipal(i);
1728  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1729  // held by the container as this lambda may not finish executing before all the tasks it
1730  // spawns have already started to run.
1731  auto eventSetupImpls = &status->eventSetupImpls();
1732  auto lp = status->lumiPrincipal().get();
1735  event.setLuminosityBlockPrincipal(lp);
1736  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1737  using namespace edm::waiting_task::chain;
1738  chain::first([this, i, &transitionInfo](auto nextTask) {
1739  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1740  *schedule_,
1741  i,
1742  transitionInfo,
1743  serviceToken_,
1744  subProcesses_);
1745  }) |
1746  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1747  auto nextTask) {
1748  if (exceptionFromBeginStreamLumi) {
1749  WaitingTaskHolder copyHolder(nextTask);
1750  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1751  }
1753  }) |
1754  runLast(std::move(holder));
1755  });
1756  } // end for loop over streams
1757  });
1758  }
1759  }) | runLast(postSourceTask);
1760  } catch (...) {
1761  status->resetResources();
1763  WaitingTaskHolder copyHolder(postSourceTask);
1764  copyHolder.doneWaiting(std::current_exception());
1765  endRunAsync(iRunStatus, postSourceTask);
1766  }
1767  }); // task in sourceResourcesAcquirer
1768  } catch (...) {
1769  status->resetResources();
1771  WaitingTaskHolder copyHolder(postLumiQueueTask);
1772  copyHolder.doneWaiting(std::current_exception());
1773  endRunAsync(iRunStatus, postLumiQueueTask);
1774  }
1775  }); // task in lumiQueue
1776  } catch (...) {
1777  status->resetResources();
1779  WaitingTaskHolder copyHolder(nextTask);
1780  copyHolder.doneWaiting(std::current_exception());
1781  endRunAsync(iRunStatus, nextTask);
1782  }
1783  }) | chain::runLast(std::move(iHolder));
1784  }
1785 
1787  chain::first([this](auto nextTask) {
1788  //all streams are sharing the same status at the moment
1789  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1791 
1792  while (lastTransitionType() == InputSource::IsLumi and
1793  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1796  }
1797  firstItemAfterLumiMerge_ = true;
1798  }) | chain::then([this](auto nextTask) mutable {
1799  unsigned int streamIndex = 0;
1800  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1801  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1802  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1803  }
1804  nextTask.group()->run(
1805  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1806  }) | chain::runLast(std::move(iHolder));
1807  }
1808 
1809  void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1810  if (holder.taskHasFailed()) {
1812  } else {
1813  WaitingTaskHolder tmp(holder);
1814  tmp.doneWaiting(iException);
1815  }
1816  }
1817 
1819  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1820  // Get some needed info out of the status object before moving
1821  // it into finalTaskForThisLumi.
1822  auto& lp = *(iLumiStatus->lumiPrincipal());
1823  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1824  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1825  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1826  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1827 
1828  using namespace edm::waiting_task::chain;
1829  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1830  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1831 
1832  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1834  endGlobalTransitionAsync<Traits>(
1835  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1836  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1837  //Only call writeLumi if beginLumi succeeded
1838  if (didGlobalBeginSucceed) {
1839  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1840  }
1841  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1842  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1843  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1844  //any thrown exception auto propagates to nextTask via the chain
1846  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1847  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1848  if (iException) {
1849  handleEndLumiExceptions(*iException, nextTask);
1850  }
1852 
1853  std::exception_ptr ptr;
1854 
1855  // Try hard to clean up resources so the
1856  // process can terminate in a controlled
1857  // fashion even after exceptions have occurred.
1858  // Caught exception is passed to handleEndLumiExceptions()
1859  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1860  if (not ptr) {
1861  ptr = std::current_exception();
1862  }
1863  }
1864  // Caught exception is passed to handleEndLumiExceptions()
1865  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1866  if (not ptr) {
1867  ptr = std::current_exception();
1868  }
1869  }
1870  // Caught exception is passed to handleEndLumiExceptions()
1871  CMS_SA_ALLOW try {
1872  status->resetResources();
1873  status->globalEndRunHolderDoneWaiting();
1874  status.reset();
1875  } catch (...) {
1876  if (not ptr) {
1877  ptr = std::current_exception();
1878  }
1879  }
1880 
1881  if (ptr && !iException) {
1882  handleEndLumiExceptions(ptr, nextTask);
1883  }
1884  }) | runLast(std::move(iTask));
1885  }
1886 
1887  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1888  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1889  auto status = streamLumiStatus_[iStreamIndex];
1890  if (iException) {
1891  handleEndLumiExceptions(*iException, iTask);
1892  }
1893 
1894  // reset status before releasing queue else get race condition
1895  streamLumiStatus_[iStreamIndex].reset();
1897  streamQueues_[iStreamIndex].resume();
1898 
1899  //are we the last one?
1900  if (status->streamFinishedLumi()) {
1902  }
1903  });
1904 
1905  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1906 
1907  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1908  // therefore we do not want to hold the shared_ptr
1909  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1910  lumiStatus->setEndTime();
1911 
1912  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1913  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1914  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1915 
1916  if (lumiStatus->didGlobalBeginSucceed()) {
1917  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1919  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1920  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1921  *schedule_,
1922  iStreamIndex,
1923  transitionInfo,
1924  serviceToken_,
1925  subProcesses_,
1926  cleaningUpAfterException);
1927  }
1928  }
1929 
1930  void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1931  if (streamRunActive_ == 0) {
1932  assert(streamLumiActive_ == 0);
1933  } else {
1935  if (streamLumiActive_ > 0) {
1936  FinalWaitingTask globalWaitTask{taskGroup_};
1938  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1939  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1940  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1941  }
1942  globalWaitTask.wait();
1943  }
1944  }
1945  }
1946 
1949  input_->readProcessBlock(processBlockPrincipal);
1950  sentry.completedSuccessfully();
1951  }
1952 
1953  std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1955  assert(rp);
1956  rp->setAux(*input_->runAuxiliary());
1957  {
1959  input_->readRun(*rp, *historyAppender_);
1960  sentry.completedSuccessfully();
1961  }
1962  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1963  return rp;
1964  }
1965 
1967  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1968 
1969  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1970  assert(runOK);
1971  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1972  {
1974  input_->readAndMergeRun(runPrincipal);
1975  sentry.completedSuccessfully();
1976  }
1977  }
1978 
1979  std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
1981  assert(lbp);
1982  lbp->setAux(*input_->luminosityBlockAuxiliary());
1983  {
1985  input_->readLuminosityBlock(*lbp, *historyAppender_);
1986  sentry.completedSuccessfully();
1987  }
1988  lbp->setRunPrincipal(std::move(rp));
1989  return lbp;
1990  }
1991 
1993  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1994  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1995  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1996  input_->processHistoryRegistry().reducedProcessHistoryID(
1997  input_->luminosityBlockAuxiliary()->processHistoryID()));
1998  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1999  assert(lumiOK);
2000  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2001  {
2003  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2004  sentry.completedSuccessfully();
2005  }
2006  }
2007 
2009  using namespace edm::waiting_task;
2010  chain::first([&](auto nextTask) {
2012  schedule_->writeProcessBlockAsync(
2013  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2014  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2016  for (auto& s : subProcesses_) {
2017  s.writeProcessBlockAsync(nextTask, processBlockType);
2018  }
2019  }) | chain::runLast(std::move(task));
2020  }
2021 
2023  RunPrincipal const& runPrincipal,
2024  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2025  using namespace edm::waiting_task;
2026  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2027  chain::first([&](auto nextTask) {
2029  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2030  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2032  for (auto& s : subProcesses_) {
2033  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2034  }
2035  }) | chain::runLast(std::move(task));
2036  }
2037  }
2038 
2040  for (auto& s : subProcesses_) {
2041  s.clearRunPrincipal(*iStatus.runPrincipal());
2042  }
2043  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2044  iStatus.runPrincipal()->clearPrincipal();
2045  }
2046 
2048  using namespace edm::waiting_task;
2049  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2050  chain::first([&](auto nextTask) {
2052 
2053  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2054  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2055  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2057  for (auto& s : subProcesses_) {
2058  s.writeLumiAsync(nextTask, lumiPrincipal);
2059  }
2061  }
2062  }
2063 
2065  for (auto& s : subProcesses_) {
2066  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2067  }
2068  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2069  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2070  iStatus.lumiPrincipal()->clearPrincipal();
2071  }
2072 
2073  void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2074  WaitingTaskHolder iHolder) {
2075  auto group = iHolder.group();
2077  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2078  CMS_SA_ALLOW try {
2080 
2081  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2082 
2084  while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2085  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2086  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2087  status->setStopBeforeProcessingRun(true);
2088  return;
2089  }
2092  }
2093  } catch (...) {
2094  status->setStopBeforeProcessingRun(true);
2095  holder.doneWaiting(std::current_exception());
2096  }
2097  });
2098  }
2099 
2100  void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2101  WaitingTaskHolder iHolder) {
2102  auto group = iHolder.group();
2104  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2105  CMS_SA_ALLOW try {
2107 
2108  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2109 
2111  while (lastTransitionType() == InputSource::IsLumi and
2112  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2113  readAndMergeLumi(*iLumiStatus);
2115  }
2116  } catch (...) {
2117  holder.doneWaiting(std::current_exception());
2118  }
2119  });
2120  }
2121 
2122  void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2123  WaitingTaskHolder iHolder) {
2125  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2126  iHolder.doneWaiting(std::exception_ptr{});
2127  } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2128  CMS_SA_ALLOW try {
2129  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2130  input_->luminosityBlockAuxiliary()->beginTime()),
2131  iRunStatus,
2132  iHolder);
2133  } catch (...) {
2134  WaitingTaskHolder copyHolder(iHolder);
2135  iHolder.doneWaiting(std::current_exception());
2136  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2137  }
2138  } else {
2139  // Note that endRunAsync will call beginRunAsync for the following run
2140  // if appropriate.
2141  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2142  }
2143  }
2144 
2146  unsigned int iStreamIndex,
2148  // This function returns true if it successfully reads an event for the stream and that
2149  // requires both that an event is next and there are no problems or requests to stop.
2150 
2151  if (iTask.taskHasFailed()) {
2152  // We want all streams to stop or all streams to pause. If we are already in the
2153  // middle of pausing streams, then finish pausing all of them and the lumi will be
2154  // ended later. Otherwise, just end it now.
2157  }
2158  return false;
2159  }
2160 
2161  // Did another stream already stop or pause this lumi?
2163  return false;
2164  }
2165 
2166  // Are output modules or the looper requesting we stop?
2167  if (shouldWeStop()) {
2170  return false;
2171  }
2172 
2174 
2175  // need to use lock in addition to the serial task queue because
2176  // of delayed provenance reading and reading data in response to
2177  // edm::Refs etc
2178  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2179 
2180  // If we didn't already call nextTransitionType while merging lumis, call it here.
2181  // This asks the input source what is next and also checks for signals.
2182 
2184  firstItemAfterLumiMerge_ = false;
2185 
2186  if (InputSource::IsEvent != itemType) {
2187  // IsFile may continue processing the lumi and
2188  // looper_ can cause the input source to declare a new IsRun which is actually
2189  // just a continuation of the previous run
2190  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2191  (InputSource::IsRun == itemType and
2192  (iStatus.lumiPrincipal()->run() != input_->run() or
2193  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2195  } else {
2197  }
2198  return false;
2199  }
2200  readEvent(iStreamIndex);
2201  return true;
2202  }
2203 
2204  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2205  auto group = iTask.group();
2206  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2207  CMS_SA_ALLOW try {
2208  auto status = streamLumiStatus_[iStreamIndex].get();
2210 
2211  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2212  auto recursionTask =
2213  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2214  if (iEventException) {
2215  WaitingTaskHolder copyHolder(iTask);
2216  copyHolder.doneWaiting(*iEventException);
2217  // Intentionally, we don't return here. The recursive call to
2218  // handleNextEvent takes care of immediately ending the run properly
2219  // using the same code it uses to end the run in other situations.
2220  }
2221  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2222  });
2223 
2224  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2225  } else {
2226  // the stream will stop processing this lumi now
2228  if (not status->haveStartedNextLumiOrEndedRun()) {
2229  status->startNextLumiOrEndRun();
2230  if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2231  CMS_SA_ALLOW try {
2232  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2233  input_->luminosityBlockAuxiliary()->beginTime()),
2234  streamRunStatus_[iStreamIndex],
2235  iTask);
2236  } catch (...) {
2237  WaitingTaskHolder copyHolder(iTask);
2238  copyHolder.doneWaiting(std::current_exception());
2239  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2240  }
2241  } else {
2242  // If appropriate, this will also start the next run.
2243  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2244  }
2245  }
2246  streamEndLumiAsync(iTask, iStreamIndex);
2247  } else {
2248  assert(status->eventProcessingState() ==
2250  auto runStatus = streamRunStatus_[iStreamIndex].get();
2251 
2252  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2253  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2254  }
2255  }
2256  }
2257  } catch (...) {
2258  WaitingTaskHolder copyHolder(iTask);
2259  copyHolder.doneWaiting(std::current_exception());
2260  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2261  }
2262  });
2263  }
2264 
2265  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2266  //TODO this will have to become per stream
2267  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2268  StreamContext streamContext(event.streamID(), &processContext_);
2269 
2271  input_->readEvent(event, streamContext);
2272 
2273  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2274  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2275  sentry.completedSuccessfully();
2276 
2277  FDEBUG(1) << "\treadEvent\n";
2278  }
2279 
2280  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2281  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2282  }
2283 
2284  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2285  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2286 
2289  if (rng.isAvailable()) {
2290  Event ev(*pep, ModuleDescription(), nullptr);
2291  rng->postEventRead(ev);
2292  }
2293 
2294  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2295  using namespace edm::waiting_task::chain;
2296  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2297  EventTransitionInfo info(*pep, es);
2298  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2299  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2300  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2301  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2302  }
2303  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2304  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2305  ServiceRegistry::Operate operateLooper(serviceToken_);
2306  processEventWithLooper(*pep, iStreamIndex);
2307  }) | then([pep](auto nextTask) {
2308  FDEBUG(1) << "\tprocessEvent\n";
2309  pep->clearEventPrincipal();
2310  }) | runLast(iHolder);
2311  }
2312 
2313  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2314  bool randomAccess = input_->randomAccess();
2315  ProcessingController::ForwardState forwardState = input_->forwardState();
2316  ProcessingController::ReverseState reverseState = input_->reverseState();
2317  ProcessingController pc(forwardState, reverseState, randomAccess);
2318 
2320  do {
2321  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2322  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2323  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2324 
2325  bool succeeded = true;
2326  if (randomAccess) {
2328  input_->skipEvents(-2);
2330  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2331  }
2332  }
2334  } while (!pc.lastOperationSucceeded());
2336  shouldWeStop_ = true;
2337  }
2338  }
2339 
2341  FDEBUG(1) << "\tshouldWeStop\n";
2342  if (shouldWeStop_)
2343  return true;
2344  if (!subProcesses_.empty()) {
2345  for (auto const& subProcess : subProcesses_) {
2346  if (subProcess.terminate()) {
2347  return true;
2348  }
2349  }
2350  return false;
2351  }
2352  return schedule_->terminate();
2353  }
2354 
2356 
2358 
2360 
2361  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2362  bool expected = false;
2363  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2364  deferredExceptionPtr_ = iException;
2365  return true;
2366  }
2367  return false;
2368  }
2369 
2371  cms::Exception ex("ModulesSynchingOnLumis");
2372  ex << "The framework is configured to use at least two streams, but the following modules\n"
2373  << "require synchronizing on LuminosityBlock boundaries:";
2374  bool found = false;
2375  for (auto worker : schedule_->allWorkers()) {
2376  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2377  found = true;
2378  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2379  }
2380  }
2381  if (found) {
2382  ex << "\n\nThe situation can be fixed by either\n"
2383  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2384  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2385  throw ex;
2386  }
2387  }
2388 
2390  std::unique_ptr<LogSystem> s;
2391  for (auto worker : schedule_->allWorkers()) {
2392  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2393  if (not s) {
2394  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2395  (*s) << "The following modules require synchronizing on Run boundaries:";
2396  }
2397  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2398  }
2399  }
2400  }
2401 
2403  std::unique_ptr<LogSystem> s;
2404  for (auto worker : schedule_->allWorkers()) {
2405  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2406  if (not s) {
2407  s = std::make_unique<LogSystem>("LegacyModules");
2408  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2409  "is going to end soon. These modules need to be converted to have type\n"
2410  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2411  }
2412  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2413  }
2414  }
2415  }
2416 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
size
Write out results.
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:383
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void clearRunPrincipal(RunProcessingStatus &)
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
InputSource::ItemType lastTransitionType() const
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
Definition: RunPrincipal.h:73
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:50
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
std::multimap< std::string, std::string > referencesToBranches_
fileMode
Definition: DMR_cfg.py:72
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
Definition: Registry.cc:40
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
virtual void endOfJob()
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
void writeLumi(LuminosityBlockNumber_t lumi)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void reportShutdownSignal()
Definition: JobReport.cc:543
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:169
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:312
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:676
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)