CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
45 
47 
55 
60 
65 
78 
79 #include "MessageForSource.h"
80 #include "MessageForParent.h"
82 #include "RunProcessingStatus.h"
83 
84 #include "boost/range/adaptor/reversed.hpp"
85 
86 #include <cassert>
87 #include <exception>
88 #include <iomanip>
89 #include <iostream>
90 #include <utility>
91 #include <sstream>
92 
93 #include <sys/ipc.h>
94 #include <sys/msg.h>
95 
96 #include "oneapi/tbb/task.h"
97 
98 //Used for CPU affinity
99 #ifndef __APPLE__
100 #include <sched.h>
101 #endif
102 
103 namespace {
104  class PauseQueueSentry {
105  public:
106  PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
107  ~PauseQueueSentry() { queue_.resume(); }
108 
109  private:
110  edm::SerialTaskQueue& queue_;
111  };
112 } // namespace
113 
114 namespace edm {
115 
116  namespace chain = waiting_task::chain;
117 
118  // ---------------------------------------------------------------
119  std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
121  CommonParams const& common,
122  std::shared_ptr<ProductRegistry> preg,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
125  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126  std::shared_ptr<ActivityRegistry> areg,
127  std::shared_ptr<ProcessConfiguration const> processConfiguration,
128  PreallocationConfiguration const& allocations) {
129  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
130  if (main_input == nullptr) {
132  << "There must be exactly one source in the configuration.\n"
133  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134  }
135 
136  std::string modtype(main_input->getParameter<std::string>("@module_type"));
137 
138  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
140  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
141  filler->fill(descriptions);
142 
143  try {
144  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
145  } catch (cms::Exception& iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
164  moduleIndex);
165 
166  InputSourceDescription isdesc(md,
167  preg,
168  branchIDListHelper,
169  processBlockHelper,
170  thinnedAssociationsHelper,
171  areg,
172  common.maxEventsInput_,
173  common.maxLumisInput_,
174  common.maxSecondsUntilRampdown_,
175  allocations);
176 
177  areg->preSourceConstructionSignal_(md);
178  std::unique_ptr<InputSource> input;
179  try {
180  //even if we have an exception, send the signal
181  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
182  convertException::wrap([&]() {
183  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
184  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186  });
187  } catch (cms::Exception& iException) {
188  std::ostringstream ost;
189  ost << "Constructing input source of type " << modtype;
190  iException.addContext(ost.str());
191  throw;
192  }
193  return input;
194  }
195 
196  // ---------------------------------------------------------------
197  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
200  std::vector<std::string> const& loopers) {
201  std::shared_ptr<EDLooperBase> vLooper;
202 
203  assert(1 == loopers.size());
204 
205  for (auto const& looperName : loopers) {
206  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
207  // Unlikely we would ever need the ModuleTypeResolver in Looper
208  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
209  }
210  return vLooper;
211  }
212 
213  // ---------------------------------------------------------------
214  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
215  ServiceToken const& iToken,
217  std::vector<std::string> const& defaultServices,
218  std::vector<std::string> const& forcedServices)
219  : actReg_(),
220  preg_(),
221  branchIDListHelper_(),
222  serviceToken_(),
223  input_(),
224  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
252 
253  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
254  std::vector<std::string> const& defaultServices,
255  std::vector<std::string> const& forcedServices)
256  : actReg_(),
257  preg_(),
258  branchIDListHelper_(),
259  serviceToken_(),
260  input_(),
261  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
265  processConfiguration_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
271  deferredExceptionPtrIsSet_(false),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
278  exceptionMessageFiles_(),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
283  forceESCacheClearOnNewRun_(false),
284  eventSetupDataToExcludeFromPrefetching_() {
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
289 
290  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
291  ServiceToken const& token,
293  : actReg_(),
294  preg_(),
295  branchIDListHelper_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
302  processConfiguration_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
308  deferredExceptionPtrIsSet_(false),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
315  exceptionMessageFiles_(),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
320  forceESCacheClearOnNewRun_(false),
321  eventSetupDataToExcludeFromPrefetching_() {
322  init(processDesc, token, legacy);
323  }
324 
325  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
326  ServiceToken const& iToken,
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
606 
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
624 
628  task.waitNoThrow();
629  assert(task.done());
630  }
631 
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
698  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
699 
702  }
703  if (preallocations_.numberOfRuns() > 1) {
705  }
707 
708  //NOTE: This implementation assumes 'Job' means one call
709  // the EventProcessor::run
710  // If it really means once per 'application' then this code will
711  // have to be changed.
712  // Also have to deal with case where have 'run' then new Module
713  // added and do 'run'
714  // again. In that case the newly added Module needs its 'beginJob'
715  // to be called.
716 
717  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
718  // For now we delay calling beginOfJob until first beginOfRun
719  //if(looper_) {
720  // looper_->beginOfJob(es);
721  //}
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728  espController_->finishConfiguration();
729  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToProxyIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
762 
764  // Collects exceptions, so we don't throw before all operations are performed.
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
829 
831 
832  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
833  return schedule_->getAllModuleDescriptions();
834  }
835 
836  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
837 
838  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
839 
840  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
841 
842  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
843 
844  namespace {
845 #include "TransitionProcessors.icc"
846  }
847 
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
855  }
856  return returnValue;
857  }
858 
861  InputSource::ItemType itemType;
862  //For now, do nothing with InputSource::IsSynchronize
863  do {
864  itemType = input_->nextItemType();
865  } while (itemType == InputSource::IsSynchronize);
866 
867  lastSourceTransition_ = itemType;
868  sentry.completedSuccessfully();
869 
871 
873  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
875  }
876 
877  return lastSourceTransition_;
878  }
879 
881  beginJob(); //make sure this was called
882 
883  // make the services available
885 
886  try {
887  FilesProcessor fp(fileModeNoMerge_);
888 
889  convertException::wrap([&]() {
890  bool firstTime = true;
891  do {
892  if (not firstTime) {
894  rewindInput();
895  } else {
896  firstTime = false;
897  }
898  startingNewLoop();
899 
900  auto trans = fp.processFiles(*this);
901 
902  fp.normalEnd();
903 
904  if (deferredExceptionPtrIsSet_.load()) {
905  std::rethrow_exception(deferredExceptionPtr_);
906  }
907  if (trans != InputSource::IsStop) {
908  //problem with the source
909  doErrorStuff();
910 
911  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
912  }
913  } while (not endOfLoop());
914  }); // convertException::wrap
915 
916  } // Try block
917  catch (cms::Exception& e) {
919  std::string message(
920  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
921  e.addAdditionalInfo(message);
922  if (e.alreadyPrinted()) {
923  LogAbsolute("Additional Exceptions") << message;
924  }
925  }
926  if (exceptionMessageRuns_) {
927  std::string message(
928  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
929  e.addAdditionalInfo(message);
930  if (e.alreadyPrinted()) {
931  LogAbsolute("Additional Exceptions") << message;
932  }
933  }
934  if (!exceptionMessageFiles_.empty()) {
935  e.addAdditionalInfo(exceptionMessageFiles_);
936  if (e.alreadyPrinted()) {
937  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
938  }
939  }
940  throw;
941  }
942  return epSuccess;
943  }
944 
946  FDEBUG(1) << " \treadFile\n";
947  size_t size = preg_->size();
949 
950  if (streamRunActive_ > 0) {
951  streamRunStatus_[0]->runPrincipal()->preReadFile();
952  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
953  }
954 
955  if (streamLumiActive_ > 0) {
956  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
957  }
958 
959  fb_ = input_->readFile();
960  if (size < preg_->size()) {
962  }
965  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
966  }
967  sentry.completedSuccessfully();
968  }
969 
970  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
971  if (fileBlockValid()) {
973  input_->closeFile(fb_.get(), cleaningUpAfterException);
974  sentry.completedSuccessfully();
975  }
976  FDEBUG(1) << "\tcloseInputFile\n";
977  }
978 
980  if (fileBlockValid()) {
981  schedule_->openOutputFiles(*fb_);
982  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
983  }
984  FDEBUG(1) << "\topenOutputFiles\n";
985  }
986 
988  schedule_->closeOutputFiles();
989  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
990  processBlockHelper_->clearAfterOutputFilesClose();
991  FDEBUG(1) << "\tcloseOutputFiles\n";
992  }
993 
995  if (fileBlockValid()) {
997  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
998  schedule_->respondToOpenInputFile(*fb_);
999  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1000  }
1001  FDEBUG(1) << "\trespondToOpenInputFile\n";
1002  }
1003 
1005  if (fileBlockValid()) {
1006  schedule_->respondToCloseInputFile(*fb_);
1007  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1008  }
1009  FDEBUG(1) << "\trespondToCloseInputFile\n";
1010  }
1011 
1013  shouldWeStop_ = false;
1014  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1015  // until after we've called beginOfJob
1016  if (looper_ && looperBeginJobRun_) {
1017  looper_->doStartingNewLoop();
1018  }
1019  FDEBUG(1) << "\tstartingNewLoop\n";
1020  }
1021 
1023  if (looper_) {
1024  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1025  looper_->setModuleChanger(&changer);
1026  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1027  looper_->setModuleChanger(nullptr);
1029  return true;
1030  else
1031  return false;
1032  }
1033  FDEBUG(1) << "\tendOfLoop\n";
1034  return true;
1035  }
1036 
1038  input_->repeat();
1039  input_->rewind();
1040  FDEBUG(1) << "\trewind\n";
1041  }
1042 
1044  looper_->prepareForNextLoop(esp_.get());
1045  FDEBUG(1) << "\tprepareForNextLoop\n";
1046  }
1047 
1049  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1050  if (!subProcesses_.empty()) {
1051  for (auto const& subProcess : subProcesses_) {
1052  if (subProcess.shouldWeCloseOutput()) {
1053  return true;
1054  }
1055  }
1056  return false;
1057  }
1058  return schedule_->shouldWeCloseOutput();
1059  }
1060 
1062  FDEBUG(1) << "\tdoErrorStuff\n";
1063  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1064  << "and went to the error state\n"
1065  << "Will attempt to terminate processing normally\n"
1066  << "(IF using the looper the next loop will be attempted)\n"
1067  << "This likely indicates a bug in an input module or corrupted input or both\n";
1068  }
1069 
1070  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1071  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1072  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1073 
1075  FinalWaitingTask globalWaitTask{taskGroup_};
1076 
1077  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1078  beginGlobalTransitionAsync<Traits>(
1079  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1080 
1081  globalWaitTask.wait();
1082  beginProcessBlockSucceeded = true;
1083  }
1084 
1086  input_->fillProcessBlockHelper();
1088  while (input_->nextProcessBlock(processBlockPrincipal)) {
1089  readProcessBlock(processBlockPrincipal);
1090 
1092  FinalWaitingTask globalWaitTask{taskGroup_};
1093 
1094  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1095  beginGlobalTransitionAsync<Traits>(
1096  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1097 
1098  globalWaitTask.wait();
1099 
1100  FinalWaitingTask writeWaitTask{taskGroup_};
1102  writeWaitTask.wait();
1103 
1104  processBlockPrincipal.clearPrincipal();
1105  for (auto& s : subProcesses_) {
1106  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1107  }
1108  }
1109  }
1110 
1111  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1112  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1113 
1115  FinalWaitingTask globalWaitTask{taskGroup_};
1116 
1117  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1118  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1119  *schedule_,
1120  transitionInfo,
1121  serviceToken_,
1122  subProcesses_,
1123  cleaningUpAfterException);
1124  globalWaitTask.wait();
1125 
1126  if (beginProcessBlockSucceeded) {
1127  FinalWaitingTask writeWaitTask{taskGroup_};
1129  writeWaitTask.wait();
1130  }
1131 
1132  processBlockPrincipal.clearPrincipal();
1133  for (auto& s : subProcesses_) {
1134  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1135  }
1136  }
1137 
1139  FinalWaitingTask waitTask{taskGroup_};
1141  if (streamRunActive_ == 0) {
1142  assert(streamLumiActive_ == 0);
1143 
1144  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1145  WaitingTaskHolder{taskGroup_, &waitTask});
1146  } else {
1148 
1149  auto runStatus = streamRunStatus_[0];
1150 
1151  while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1152  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1153  readAndMergeRun(*runStatus);
1155  }
1156 
1157  WaitingTaskHolder holder{taskGroup_, &waitTask};
1158  runStatus->setHolderOfTaskInProcessRuns(holder);
1159  if (streamLumiActive_ > 0) {
1161  continueLumiAsync(std::move(holder));
1162  } else {
1164  }
1165  }
1166  waitTask.wait();
1167  return lastTransitionType();
1168  }
1169 
1171  if (iHolder.taskHasFailed()) {
1172  return;
1173  }
1174 
1175  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1176 
1177  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1178 
1179  chain::first([this, &status, &iSync](auto nextTask) {
1180  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1181  nextTask,
1182  status->endIOVWaitingTasks(),
1183  status->eventSetupImpls(),
1185  actReg_.get(),
1186  serviceToken_,
1188  }) | chain::then([this, status, iSync](std::exception_ptr const* iException, auto nextTask) {
1189  CMS_SA_ALLOW try {
1190  if (iException) {
1191  WaitingTaskHolder copyHolder(nextTask);
1192  copyHolder.doneWaiting(*iException);
1193  // Finish handling the exception in the task pushed to runQueue_
1194  }
1196  actReg_->postESSyncIOVSignal_.emit(iSync);
1197 
1198  runQueue_->pushAndPause(
1199  *nextTask.group(),
1200  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1201  CMS_SA_ALLOW try {
1202  if (postRunQueueTask.taskHasFailed()) {
1203  status->resetBeginResources();
1205  return;
1206  }
1207 
1208  status->setResumer(std::move(iResumer));
1209 
1211  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1212  CMS_SA_ALLOW try {
1214 
1215  if (postSourceTask.taskHasFailed()) {
1216  status->resetBeginResources();
1218  status->resumeGlobalRunQueue();
1219  return;
1220  }
1221 
1222  status->setRunPrincipal(readRun());
1223 
1224  RunPrincipal& runPrincipal = *status->runPrincipal();
1225  {
1227  input_->doBeginRun(runPrincipal, &processContext_);
1228  sentry.completedSuccessfully();
1229  }
1230 
1231  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1232  if (looper_ && looperBeginJobRun_ == false) {
1233  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1234 
1235  oneapi::tbb::task_group group;
1236  FinalWaitingTask waitTask{group};
1237  using namespace edm::waiting_task::chain;
1238  chain::first([this, &es](auto nextTask) {
1239  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1240  }) | then([this, &es](auto nextTask) mutable {
1241  looper_->beginOfJob(es);
1242  looperBeginJobRun_ = true;
1243  looper_->doStartingNewLoop();
1244  }) | runLast(WaitingTaskHolder(group, &waitTask));
1245  waitTask.wait();
1246  }
1247 
1248  using namespace edm::waiting_task::chain;
1249  chain::first([this, status](auto nextTask) mutable {
1250  CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1251  status->setStopBeforeProcessingRun(true);
1252  nextTask.doneWaiting(std::current_exception());
1253  }
1254  }) | then([this, status, &es](auto nextTask) {
1255  if (status->stopBeforeProcessingRun()) {
1256  return;
1257  }
1258  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1260  beginGlobalTransitionAsync<Traits>(
1261  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1262  }) | then([status](auto nextTask) mutable {
1263  if (status->stopBeforeProcessingRun()) {
1264  return;
1265  }
1266  status->globalBeginDidSucceed();
1267  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1268  if (status->stopBeforeProcessingRun()) {
1269  return;
1270  }
1271  looper_->prefetchAsync(
1272  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1273  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1274  if (status->stopBeforeProcessingRun()) {
1275  return;
1276  }
1277  ServiceRegistry::Operate operateLooper(serviceToken_);
1278  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1279  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1280  bool precedingTasksSucceeded = true;
1281  if (iException) {
1282  precedingTasksSucceeded = false;
1283  WaitingTaskHolder copyHolder(holder);
1284  copyHolder.doneWaiting(*iException);
1285  }
1286 
1287  if (status->stopBeforeProcessingRun()) {
1288  // We just quit now if there was a failure when merging runs
1289  status->resetBeginResources();
1291  status->resumeGlobalRunQueue();
1292  return;
1293  }
1294  CMS_SA_ALLOW try {
1295  // Under normal circumstances, this task runs after endRun has completed for all streams
1296  // and global endLumi has completed for all lumis contained in this run
1297  auto globalEndRunTask =
1298  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1299  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1300  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1302  });
1303  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1304  } catch (...) {
1305  status->resetBeginResources();
1307  status->resumeGlobalRunQueue();
1308  holder.doneWaiting(std::current_exception());
1309  return;
1310  }
1311 
1312  // After this point we are committed to end the run via endRunAsync
1313 
1315 
1316  // The only purpose of the pause is to cause stream begin run to execute before
1317  // global begin lumi in the single threaded case (maintains consistency with
1318  // the order that existed before concurrent runs were implemented).
1319  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1320 
1321  CMS_SA_ALLOW try {
1323  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1324  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1325  CMS_SA_ALLOW try {
1326  streamQueues_[i].push(
1327  *holder.group(),
1328  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1330  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1331  });
1332  } catch (...) {
1333  if (status->streamFinishedBeginRun()) {
1334  WaitingTaskHolder copyHolder(holder);
1335  copyHolder.doneWaiting(std::current_exception());
1336  status->resetBeginResources();
1339  }
1340  }
1341  }
1342  });
1343  } catch (...) {
1344  WaitingTaskHolder copyHolder(holder);
1345  copyHolder.doneWaiting(std::current_exception());
1346  status->resetBeginResources();
1349  }
1351  }) | runLast(postSourceTask);
1352  } catch (...) {
1353  status->resetBeginResources();
1355  status->resumeGlobalRunQueue();
1356  postSourceTask.doneWaiting(std::current_exception());
1357  }
1358  }); // task in sourceResourcesAcquirer
1359  } catch (...) {
1360  status->resetBeginResources();
1362  status->resumeGlobalRunQueue();
1363  postRunQueueTask.doneWaiting(std::current_exception());
1364  }
1365  }); // task in runQueue
1366  } catch (...) {
1367  status->resetBeginResources();
1369  nextTask.doneWaiting(std::current_exception());
1370  }
1371  }) | chain::runLast(std::move(iHolder));
1372  }
1373 
1374  void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1375  std::shared_ptr<RunProcessingStatus> status,
1376  bool precedingTasksSucceeded,
1377  WaitingTaskHolder iHolder) {
1378  // These shouldn't throw
1379  streamQueues_[iStream].pause();
1380  ++streamRunActive_;
1381  streamRunStatus_[iStream] = std::move(status);
1382 
1383  CMS_SA_ALLOW try {
1384  using namespace edm::waiting_task::chain;
1385  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1386  if (precedingTasksSucceeded) {
1387  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1388  RunTransitionInfo transitionInfo(
1389  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1391  beginStreamTransitionAsync<Traits>(
1392  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1393  }
1394  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1395  if (exceptionFromBeginStreamRun) {
1396  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1397  }
1398  releaseBeginRunResources(iStream);
1399  }) | runLast(iHolder);
1400  } catch (...) {
1401  releaseBeginRunResources(iStream);
1402  iHolder.doneWaiting(std::current_exception());
1403  }
1404  }
1405 
1406  void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1407  auto& status = streamRunStatus_[iStream];
1408  if (status->streamFinishedBeginRun()) {
1409  status->resetBeginResources();
1411  }
1412  streamQueues_[iStream].resume();
1413  }
1414 
1415  void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1416  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1417  iRunStatus->setEndTime();
1418  IOVSyncValue ts(
1420  runPrincipal.endTime());
1421  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1422  WaitingTaskHolder copyHolder(iHolder);
1423  copyHolder.doneWaiting(std::current_exception());
1424  }
1425 
1426  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1427  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1428  nextTask,
1429  iRunStatus->endIOVWaitingTasksEndRun(),
1430  iRunStatus->eventSetupImplsEndRun(),
1432  actReg_.get(),
1433  serviceToken_);
1434  }) | chain::then([this, iRunStatus, ts](std::exception_ptr const* iException, auto nextTask) {
1435  if (iException) {
1436  iRunStatus->setEndingEventSetupSucceeded(false);
1437  handleEndRunExceptions(*iException, nextTask);
1438  }
1440  CMS_SA_ALLOW try { actReg_->postESSyncIOVSignal_.emit(ts); } catch (...) {
1441  WaitingTaskHolder copyHolder(nextTask);
1442  copyHolder.doneWaiting(std::current_exception());
1443  }
1444 
1445  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1446  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1447  CMS_SA_ALLOW try {
1448  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1449  streamQueues_[i].pause();
1450  streamEndRunAsync(std::move(nextTask), i);
1451  });
1452  } catch (...) {
1453  WaitingTaskHolder copyHolder(nextTask);
1454  copyHolder.doneWaiting(std::current_exception());
1455  }
1456  }
1457  });
1458 
1460  CMS_SA_ALLOW try {
1461  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1462  } catch (...) {
1463  WaitingTaskHolder copyHolder(nextTask);
1464  copyHolder.doneWaiting(std::current_exception());
1465  }
1466  }
1467  }) | chain::runLast(std::move(iHolder));
1468  }
1469 
1470  void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1471  if (holder.taskHasFailed()) {
1473  } else {
1474  WaitingTaskHolder tmp(holder);
1475  tmp.doneWaiting(iException);
1476  }
1477  }
1478 
1479  void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1480  auto& runPrincipal = *(iRunStatus->runPrincipal());
1481  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1482  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1483  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1484  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1485  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1486 
1487  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1488  using namespace edm::waiting_task::chain;
1489  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1490  auto nextTask) {
1491  if (endingEventSetupSucceeded) {
1492  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1494  endGlobalTransitionAsync<Traits>(
1495  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1496  }
1497  }) |
1498  ifThen(looper_ && endingEventSetupSucceeded,
1499  [this, &runPrincipal, &es](auto nextTask) {
1500  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1501  }) |
1502  ifThen(looper_ && endingEventSetupSucceeded,
1503  [this, &runPrincipal, &es](auto nextTask) {
1505  looper_->doEndRun(runPrincipal, es, &processContext_);
1506  }) |
1507  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1508  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1509  mergeableRunProductMetadata->preWriteRun();
1510  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1511  }) |
1512  then([status = std::move(iRunStatus),
1513  this,
1514  didGlobalBeginSucceed,
1515  mergeableRunProductMetadata,
1516  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1517  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1518  mergeableRunProductMetadata->postWriteRun();
1519  }
1520  if (iException) {
1521  handleEndRunExceptions(*iException, nextTask);
1522  }
1524 
1525  std::exception_ptr ptr;
1526 
1527  // Try hard to clean up resources so the
1528  // process can terminate in a controlled
1529  // fashion even after exceptions have occurred.
1530  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1531  if (not ptr) {
1532  ptr = std::current_exception();
1533  }
1534  }
1535  CMS_SA_ALLOW try {
1536  status->resumeGlobalRunQueue();
1538  } catch (...) {
1539  if (not ptr) {
1540  ptr = std::current_exception();
1541  }
1542  }
1543  CMS_SA_ALLOW try {
1544  status->resetEndResources();
1545  status.reset();
1546  } catch (...) {
1547  if (not ptr) {
1548  ptr = std::current_exception();
1549  }
1550  }
1551 
1552  if (ptr && !iException) {
1553  handleEndRunExceptions(ptr, nextTask);
1554  }
1555  }) |
1556  runLast(std::move(iTask));
1557  }
1558 
1559  void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1560  CMS_SA_ALLOW try {
1561  if (!streamRunStatus_[iStreamIndex]) {
1562  if (exceptionRunStatus_->streamFinishedRun()) {
1563  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1564  exceptionRunStatus_.reset();
1565  }
1566  return;
1567  }
1568 
1569  auto runDoneTask =
1570  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1571  if (iException) {
1572  handleEndRunExceptions(*iException, iTask);
1573  }
1574 
1575  auto runStatus = streamRunStatus_[iStreamIndex];
1576 
1577  //reset status before releasing queue else get race condition
1578  if (runStatus->streamFinishedRun()) {
1579  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1580  }
1581  streamRunStatus_[iStreamIndex].reset();
1582  --streamRunActive_;
1583  streamQueues_[iStreamIndex].resume();
1584  });
1585 
1586  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1587 
1588  auto runStatus = streamRunStatus_[iStreamIndex].get();
1589 
1590  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1591  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1592  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1593  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1594 
1595  auto& runPrincipal = *runStatus->runPrincipal();
1597  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1598  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1599  *schedule_,
1600  iStreamIndex,
1601  transitionInfo,
1602  serviceToken_,
1603  subProcesses_,
1604  cleaningUpAfterException);
1605  }
1606  } catch (...) {
1607  handleEndRunExceptions(std::current_exception(), iTask);
1608  }
1609  }
1610 
1611  void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1612  if (streamRunActive_ > 0) {
1613  FinalWaitingTask waitTask{taskGroup_};
1614 
1615  auto runStatus = streamRunStatus_[0].get();
1616  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1617  WaitingTaskHolder holder{taskGroup_, &waitTask};
1618  runStatus->setHolderOfTaskInProcessRuns(holder);
1620  endRunAsync(streamRunStatus_[0], std::move(holder));
1621  waitTask.wait();
1622  }
1623  }
1624 
1626  std::shared_ptr<RunProcessingStatus> iRunStatus,
1627  edm::WaitingTaskHolder iHolder) {
1628  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1629 
1630  auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1631  chain::first([this, &iSync, &status](auto nextTask) {
1632  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1633  nextTask,
1634  status->endIOVWaitingTasks(),
1635  status->eventSetupImpls(),
1637  actReg_.get(),
1638  serviceToken_);
1639  }) | chain::then([this, status, iRunStatus, iSync](std::exception_ptr const* iException, auto nextTask) {
1640  CMS_SA_ALLOW try {
1641  //the call to doneWaiting will cause the count to decrement
1642  if (iException) {
1643  WaitingTaskHolder copyHolder(nextTask);
1644  copyHolder.doneWaiting(*iException);
1645  }
1646 
1648  actReg_->postESSyncIOVSignal_.emit(iSync);
1649 
1650  lumiQueue_->pushAndPause(
1651  *nextTask.group(),
1652  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1653  CMS_SA_ALLOW try {
1654  if (postLumiQueueTask.taskHasFailed()) {
1655  status->resetResources();
1657  endRunAsync(iRunStatus, postLumiQueueTask);
1658  return;
1659  }
1660 
1661  status->setResumer(std::move(iResumer));
1662 
1664  *postLumiQueueTask.group(),
1665  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1666  CMS_SA_ALLOW try {
1668 
1669  if (postSourceTask.taskHasFailed()) {
1670  status->resetResources();
1672  endRunAsync(iRunStatus, postSourceTask);
1673  return;
1674  }
1675 
1676  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1677 
1678  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1679  {
1681  input_->doBeginLumi(lumiPrincipal, &processContext_);
1682  sentry.completedSuccessfully();
1683  }
1684 
1686  if (rng.isAvailable()) {
1687  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1688  rng->preBeginLumi(lb);
1689  }
1690 
1691  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1692 
1693  using namespace edm::waiting_task::chain;
1694  chain::first([this, status](auto nextTask) mutable {
1696  firstItemAfterLumiMerge_ = true;
1697  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1698  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1700  beginGlobalTransitionAsync<Traits>(
1701  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1702  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1703  looper_->prefetchAsync(
1704  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1705  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1706  status->globalBeginDidSucceed();
1707  ServiceRegistry::Operate operateLooper(serviceToken_);
1708  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1709  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1710  if (iException) {
1711  status->resetResources();
1713  WaitingTaskHolder copyHolder(holder);
1714  copyHolder.doneWaiting(*iException);
1715  endRunAsync(iRunStatus, holder);
1716  } else {
1717  if (not looper_) {
1718  status->globalBeginDidSucceed();
1719  }
1720 
1721  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1722 
1723  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1725 
1726  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1727  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1728  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1729  streamQueues_[i].pause();
1730 
1731  auto& event = principalCache_.eventPrincipal(i);
1732  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1733  // held by the container as this lambda may not finish executing before all the tasks it
1734  // spawns have already started to run.
1735  auto eventSetupImpls = &status->eventSetupImpls();
1736  auto lp = status->lumiPrincipal().get();
1739  event.setLuminosityBlockPrincipal(lp);
1740  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1741  using namespace edm::waiting_task::chain;
1742  chain::first([this, i, &transitionInfo](auto nextTask) {
1743  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1744  *schedule_,
1745  i,
1746  transitionInfo,
1747  serviceToken_,
1748  subProcesses_);
1749  }) |
1750  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1751  auto nextTask) {
1752  if (exceptionFromBeginStreamLumi) {
1753  WaitingTaskHolder copyHolder(nextTask);
1754  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1755  }
1757  }) |
1758  runLast(std::move(holder));
1759  });
1760  } // end for loop over streams
1761  });
1762  }
1763  }) | runLast(postSourceTask);
1764  } catch (...) {
1765  status->resetResources();
1767  WaitingTaskHolder copyHolder(postSourceTask);
1768  copyHolder.doneWaiting(std::current_exception());
1769  endRunAsync(iRunStatus, postSourceTask);
1770  }
1771  }); // task in sourceResourcesAcquirer
1772  } catch (...) {
1773  status->resetResources();
1775  WaitingTaskHolder copyHolder(postLumiQueueTask);
1776  copyHolder.doneWaiting(std::current_exception());
1777  endRunAsync(iRunStatus, postLumiQueueTask);
1778  }
1779  }); // task in lumiQueue
1780  } catch (...) {
1781  status->resetResources();
1783  WaitingTaskHolder copyHolder(nextTask);
1784  copyHolder.doneWaiting(std::current_exception());
1785  endRunAsync(iRunStatus, nextTask);
1786  }
1787  }) | chain::runLast(std::move(iHolder));
1788  }
1789 
1791  chain::first([this](auto nextTask) {
1792  //all streams are sharing the same status at the moment
1793  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1795 
1796  while (lastTransitionType() == InputSource::IsLumi and
1797  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1800  }
1801  firstItemAfterLumiMerge_ = true;
1802  }) | chain::then([this](auto nextTask) mutable {
1803  unsigned int streamIndex = 0;
1804  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1805  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1806  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1807  }
1808  nextTask.group()->run(
1809  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1810  }) | chain::runLast(std::move(iHolder));
1811  }
1812 
1813  void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1814  if (holder.taskHasFailed()) {
1816  } else {
1817  WaitingTaskHolder tmp(holder);
1818  tmp.doneWaiting(iException);
1819  }
1820  }
1821 
1823  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1824  // Get some needed info out of the status object before moving
1825  // it into finalTaskForThisLumi.
1826  auto& lp = *(iLumiStatus->lumiPrincipal());
1827  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1828  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1829  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1830  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1831 
1832  using namespace edm::waiting_task::chain;
1833  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1834  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1835 
1836  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1838  endGlobalTransitionAsync<Traits>(
1839  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1840  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1841  //Only call writeLumi if beginLumi succeeded
1842  if (didGlobalBeginSucceed) {
1843  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1844  }
1845  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1846  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1847  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1848  //any thrown exception auto propagates to nextTask via the chain
1850  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1851  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1852  if (iException) {
1853  handleEndLumiExceptions(*iException, nextTask);
1854  }
1856 
1857  std::exception_ptr ptr;
1858 
1859  // Try hard to clean up resources so the
1860  // process can terminate in a controlled
1861  // fashion even after exceptions have occurred.
1862  // Caught exception is passed to handleEndLumiExceptions()
1863  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1864  if (not ptr) {
1865  ptr = std::current_exception();
1866  }
1867  }
1868  // Caught exception is passed to handleEndLumiExceptions()
1869  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1870  if (not ptr) {
1871  ptr = std::current_exception();
1872  }
1873  }
1874  // Caught exception is passed to handleEndLumiExceptions()
1875  CMS_SA_ALLOW try {
1876  status->resetResources();
1877  status->globalEndRunHolderDoneWaiting();
1878  status.reset();
1879  } catch (...) {
1880  if (not ptr) {
1881  ptr = std::current_exception();
1882  }
1883  }
1884 
1885  if (ptr && !iException) {
1886  handleEndLumiExceptions(ptr, nextTask);
1887  }
1888  }) | runLast(std::move(iTask));
1889  }
1890 
1891  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1892  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1893  auto status = streamLumiStatus_[iStreamIndex];
1894  if (iException) {
1895  handleEndLumiExceptions(*iException, iTask);
1896  }
1897 
1898  // reset status before releasing queue else get race condition
1899  streamLumiStatus_[iStreamIndex].reset();
1901  streamQueues_[iStreamIndex].resume();
1902 
1903  //are we the last one?
1904  if (status->streamFinishedLumi()) {
1906  }
1907  });
1908 
1909  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1910 
1911  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1912  // therefore we do not want to hold the shared_ptr
1913  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1914  lumiStatus->setEndTime();
1915 
1916  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1917  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1918  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1919 
1920  if (lumiStatus->didGlobalBeginSucceed()) {
1921  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1923  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1924  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1925  *schedule_,
1926  iStreamIndex,
1927  transitionInfo,
1928  serviceToken_,
1929  subProcesses_,
1930  cleaningUpAfterException);
1931  }
1932  }
1933 
1934  void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1935  if (streamRunActive_ == 0) {
1936  assert(streamLumiActive_ == 0);
1937  } else {
1939  if (streamLumiActive_ > 0) {
1940  FinalWaitingTask globalWaitTask{taskGroup_};
1942  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1943  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1944  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1945  }
1946  globalWaitTask.wait();
1947  }
1948  }
1949  }
1950 
1953  input_->readProcessBlock(processBlockPrincipal);
1954  sentry.completedSuccessfully();
1955  }
1956 
1957  std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1959  assert(rp);
1960  rp->setAux(*input_->runAuxiliary());
1961  {
1963  input_->readRun(*rp, *historyAppender_);
1964  sentry.completedSuccessfully();
1965  }
1966  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1967  return rp;
1968  }
1969 
1971  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1972 
1973  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1974  assert(runOK);
1975  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1976  {
1978  input_->readAndMergeRun(runPrincipal);
1979  sentry.completedSuccessfully();
1980  }
1981  }
1982 
1983  std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
1985  assert(lbp);
1986  lbp->setAux(*input_->luminosityBlockAuxiliary());
1987  {
1989  input_->readLuminosityBlock(*lbp, *historyAppender_);
1990  sentry.completedSuccessfully();
1991  }
1992  lbp->setRunPrincipal(std::move(rp));
1993  return lbp;
1994  }
1995 
1997  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1998  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1999  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2000  input_->processHistoryRegistry().reducedProcessHistoryID(
2001  input_->luminosityBlockAuxiliary()->processHistoryID()));
2002  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2003  assert(lumiOK);
2004  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2005  {
2007  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2008  sentry.completedSuccessfully();
2009  }
2010  }
2011 
2013  using namespace edm::waiting_task;
2014  chain::first([&](auto nextTask) {
2016  schedule_->writeProcessBlockAsync(
2017  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2018  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2020  for (auto& s : subProcesses_) {
2021  s.writeProcessBlockAsync(nextTask, processBlockType);
2022  }
2023  }) | chain::runLast(std::move(task));
2024  }
2025 
2027  RunPrincipal const& runPrincipal,
2028  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2029  using namespace edm::waiting_task;
2030  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2031  chain::first([&](auto nextTask) {
2033  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2034  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2036  for (auto& s : subProcesses_) {
2037  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2038  }
2039  }) | chain::runLast(std::move(task));
2040  }
2041  }
2042 
2044  for (auto& s : subProcesses_) {
2045  s.clearRunPrincipal(*iStatus.runPrincipal());
2046  }
2047  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2048  iStatus.runPrincipal()->clearPrincipal();
2049  }
2050 
2052  using namespace edm::waiting_task;
2053  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2054  chain::first([&](auto nextTask) {
2056 
2057  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2058  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2059  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2061  for (auto& s : subProcesses_) {
2062  s.writeLumiAsync(nextTask, lumiPrincipal);
2063  }
2065  }
2066  }
2067 
2069  for (auto& s : subProcesses_) {
2070  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2071  }
2072  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2073  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2074  iStatus.lumiPrincipal()->clearPrincipal();
2075  }
2076 
2077  void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2078  WaitingTaskHolder iHolder) {
2079  auto group = iHolder.group();
2081  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2082  CMS_SA_ALLOW try {
2084 
2085  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2086 
2088  while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2089  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2090  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2091  status->setStopBeforeProcessingRun(true);
2092  return;
2093  }
2096  }
2097  } catch (...) {
2098  status->setStopBeforeProcessingRun(true);
2099  holder.doneWaiting(std::current_exception());
2100  }
2101  });
2102  }
2103 
2104  void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2105  WaitingTaskHolder iHolder) {
2106  auto group = iHolder.group();
2108  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2109  CMS_SA_ALLOW try {
2111 
2112  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2113 
2115  while (lastTransitionType() == InputSource::IsLumi and
2116  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2117  readAndMergeLumi(*iLumiStatus);
2119  }
2120  } catch (...) {
2121  holder.doneWaiting(std::current_exception());
2122  }
2123  });
2124  }
2125 
2126  void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2127  WaitingTaskHolder iHolder) {
2129  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2130  iHolder.doneWaiting(std::exception_ptr{});
2131  } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2132  CMS_SA_ALLOW try {
2133  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2134  input_->luminosityBlockAuxiliary()->beginTime()),
2135  iRunStatus,
2136  iHolder);
2137  } catch (...) {
2138  WaitingTaskHolder copyHolder(iHolder);
2139  iHolder.doneWaiting(std::current_exception());
2140  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2141  }
2142  } else {
2143  // Note that endRunAsync will call beginRunAsync for the following run
2144  // if appropriate.
2145  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2146  }
2147  }
2148 
2150  unsigned int iStreamIndex,
2152  // This function returns true if it successfully reads an event for the stream and that
2153  // requires both that an event is next and there are no problems or requests to stop.
2154 
2155  if (iTask.taskHasFailed()) {
2156  // We want all streams to stop or all streams to pause. If we are already in the
2157  // middle of pausing streams, then finish pausing all of them and the lumi will be
2158  // ended later. Otherwise, just end it now.
2161  }
2162  return false;
2163  }
2164 
2165  // Did another stream already stop or pause this lumi?
2167  return false;
2168  }
2169 
2170  // Are output modules or the looper requesting we stop?
2171  if (shouldWeStop()) {
2174  return false;
2175  }
2176 
2178 
2179  // need to use lock in addition to the serial task queue because
2180  // of delayed provenance reading and reading data in response to
2181  // edm::Refs etc
2182  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2183 
2184  // If we didn't already call nextTransitionType while merging lumis, call it here.
2185  // This asks the input source what is next and also checks for signals.
2186 
2188  firstItemAfterLumiMerge_ = false;
2189 
2190  if (InputSource::IsEvent != itemType) {
2191  // IsFile may continue processing the lumi and
2192  // looper_ can cause the input source to declare a new IsRun which is actually
2193  // just a continuation of the previous run
2194  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2195  (InputSource::IsRun == itemType and
2196  (iStatus.lumiPrincipal()->run() != input_->run() or
2197  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2199  } else {
2201  }
2202  return false;
2203  }
2204  readEvent(iStreamIndex);
2205  return true;
2206  }
2207 
2208  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2209  auto group = iTask.group();
2210  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2211  CMS_SA_ALLOW try {
2212  auto status = streamLumiStatus_[iStreamIndex].get();
2214 
2215  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2216  auto recursionTask =
2217  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2218  if (iEventException) {
2219  WaitingTaskHolder copyHolder(iTask);
2220  copyHolder.doneWaiting(*iEventException);
2221  // Intentionally, we don't return here. The recursive call to
2222  // handleNextEvent takes care of immediately ending the run properly
2223  // using the same code it uses to end the run in other situations.
2224  }
2225  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2226  });
2227 
2228  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2229  } else {
2230  // the stream will stop processing this lumi now
2232  if (not status->haveStartedNextLumiOrEndedRun()) {
2233  status->startNextLumiOrEndRun();
2234  if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2235  CMS_SA_ALLOW try {
2236  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2237  input_->luminosityBlockAuxiliary()->beginTime()),
2238  streamRunStatus_[iStreamIndex],
2239  iTask);
2240  } catch (...) {
2241  WaitingTaskHolder copyHolder(iTask);
2242  copyHolder.doneWaiting(std::current_exception());
2243  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2244  }
2245  } else {
2246  // If appropriate, this will also start the next run.
2247  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2248  }
2249  }
2250  streamEndLumiAsync(iTask, iStreamIndex);
2251  } else {
2252  assert(status->eventProcessingState() ==
2254  auto runStatus = streamRunStatus_[iStreamIndex].get();
2255 
2256  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2257  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2258  }
2259  }
2260  }
2261  } catch (...) {
2262  WaitingTaskHolder copyHolder(iTask);
2263  copyHolder.doneWaiting(std::current_exception());
2264  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2265  }
2266  });
2267  }
2268 
2269  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2270  //TODO this will have to become per stream
2271  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2272  StreamContext streamContext(event.streamID(), &processContext_);
2273 
2275  input_->readEvent(event, streamContext);
2276 
2277  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2278  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2279  sentry.completedSuccessfully();
2280 
2281  FDEBUG(1) << "\treadEvent\n";
2282  }
2283 
2284  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2285  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2286  }
2287 
2288  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2289  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2290 
2293  if (rng.isAvailable()) {
2294  Event ev(*pep, ModuleDescription(), nullptr);
2295  rng->postEventRead(ev);
2296  }
2297 
2298  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2299  using namespace edm::waiting_task::chain;
2300  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2301  EventTransitionInfo info(*pep, es);
2302  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2303  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2304  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2305  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2306  }
2307  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2308  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2309  ServiceRegistry::Operate operateLooper(serviceToken_);
2310  processEventWithLooper(*pep, iStreamIndex);
2311  }) | then([pep](auto nextTask) {
2312  FDEBUG(1) << "\tprocessEvent\n";
2313  pep->clearEventPrincipal();
2314  }) | runLast(iHolder);
2315  }
2316 
2317  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2318  bool randomAccess = input_->randomAccess();
2319  ProcessingController::ForwardState forwardState = input_->forwardState();
2320  ProcessingController::ReverseState reverseState = input_->reverseState();
2321  ProcessingController pc(forwardState, reverseState, randomAccess);
2322 
2324  do {
2325  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2326  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2327  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2328 
2329  bool succeeded = true;
2330  if (randomAccess) {
2332  input_->skipEvents(-2);
2334  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2335  }
2336  }
2338  } while (!pc.lastOperationSucceeded());
2340  shouldWeStop_ = true;
2341  }
2342  }
2343 
2345  FDEBUG(1) << "\tshouldWeStop\n";
2346  if (shouldWeStop_)
2347  return true;
2348  if (!subProcesses_.empty()) {
2349  for (auto const& subProcess : subProcesses_) {
2350  if (subProcess.terminate()) {
2351  return true;
2352  }
2353  }
2354  return false;
2355  }
2356  return schedule_->terminate();
2357  }
2358 
2360 
2362 
2364 
2365  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2366  bool expected = false;
2367  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2368  deferredExceptionPtr_ = iException;
2369  return true;
2370  }
2371  return false;
2372  }
2373 
2375  cms::Exception ex("ModulesSynchingOnLumis");
2376  ex << "The framework is configured to use at least two streams, but the following modules\n"
2377  << "require synchronizing on LuminosityBlock boundaries:";
2378  bool found = false;
2379  for (auto worker : schedule_->allWorkers()) {
2380  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2381  found = true;
2382  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2383  }
2384  }
2385  if (found) {
2386  ex << "\n\nThe situation can be fixed by either\n"
2387  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2388  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2389  throw ex;
2390  }
2391  }
2392 
2394  std::unique_ptr<LogSystem> s;
2395  for (auto worker : schedule_->allWorkers()) {
2396  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2397  if (not s) {
2398  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2399  (*s) << "The following modules require synchronizing on Run boundaries:";
2400  }
2401  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2402  }
2403  }
2404  }
2405 
2407  std::unique_ptr<LogSystem> s;
2408  for (auto worker : schedule_->allWorkers()) {
2409  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2410  if (not s) {
2411  s = std::make_unique<LogSystem>("LegacyModules");
2412  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2413  "is going to end soon. These modules need to be converted to have type\n"
2414  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2415  }
2416  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2417  }
2418  }
2419  }
2420 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
size
Write out results.
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:390
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void clearRunPrincipal(RunProcessingStatus &)
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
InputSource::ItemType lastTransitionType() const
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
Definition: RunPrincipal.h:73
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:50
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
std::multimap< std::string, std::string > referencesToBranches_
fileMode
Definition: DMR_cfg.py:72
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
Definition: Registry.cc:40
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
virtual void endOfJob()
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
void writeLumi(LuminosityBlockNumber_t lumi)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:319
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:680
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)