CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
45 
47 
55 
60 
65 
78 
79 #include "MessageForSource.h"
80 #include "MessageForParent.h"
82 #include "RunProcessingStatus.h"
83 
84 #include "boost/range/adaptor/reversed.hpp"
85 
86 #include <cassert>
87 #include <exception>
88 #include <iomanip>
89 #include <iostream>
90 #include <utility>
91 #include <sstream>
92 
93 #include <sys/ipc.h>
94 #include <sys/msg.h>
95 
96 #include "oneapi/tbb/task.h"
97 
98 //Used for CPU affinity
99 #ifndef __APPLE__
100 #include <sched.h>
101 #endif
102 
103 namespace {
104  class PauseQueueSentry {
105  public:
106  PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
107  ~PauseQueueSentry() { queue_.resume(); }
108 
109  private:
110  edm::SerialTaskQueue& queue_;
111  };
112 } // namespace
113 
114 namespace edm {
115 
116  namespace chain = waiting_task::chain;
117 
118  // ---------------------------------------------------------------
119  std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
121  CommonParams const& common,
122  std::shared_ptr<ProductRegistry> preg,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
125  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126  std::shared_ptr<ActivityRegistry> areg,
127  std::shared_ptr<ProcessConfiguration const> processConfiguration,
128  PreallocationConfiguration const& allocations) {
129  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
130  if (main_input == nullptr) {
132  << "There must be exactly one source in the configuration.\n"
133  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134  }
135 
136  std::string modtype(main_input->getParameter<std::string>("@module_type"));
137 
138  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
140  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
141  filler->fill(descriptions);
142 
143  try {
144  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
145  } catch (cms::Exception& iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
164  moduleIndex);
165 
166  InputSourceDescription isdesc(md,
167  preg,
168  branchIDListHelper,
169  processBlockHelper,
170  thinnedAssociationsHelper,
171  areg,
172  common.maxEventsInput_,
173  common.maxLumisInput_,
174  common.maxSecondsUntilRampdown_,
175  allocations);
176 
177  areg->preSourceConstructionSignal_(md);
178  std::unique_ptr<InputSource> input;
179  try {
180  //even if we have an exception, send the signal
181  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
182  convertException::wrap([&]() {
183  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
184  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186  });
187  } catch (cms::Exception& iException) {
188  std::ostringstream ost;
189  ost << "Constructing input source of type " << modtype;
190  iException.addContext(ost.str());
191  throw;
192  }
193  return input;
194  }
195 
196  // ---------------------------------------------------------------
197  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
200  std::vector<std::string> const& loopers) {
201  std::shared_ptr<EDLooperBase> vLooper;
202 
203  assert(1 == loopers.size());
204 
205  for (auto const& looperName : loopers) {
206  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
207  // Unlikely we would ever need the ModuleTypeResolver in Looper
208  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
209  }
210  return vLooper;
211  }
212 
213  // ---------------------------------------------------------------
214  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
215  ServiceToken const& iToken,
217  std::vector<std::string> const& defaultServices,
218  std::vector<std::string> const& forcedServices)
219  : actReg_(),
220  preg_(),
221  branchIDListHelper_(),
222  serviceToken_(),
223  input_(),
224  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
252 
253  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
254  std::vector<std::string> const& defaultServices,
255  std::vector<std::string> const& forcedServices)
256  : actReg_(),
257  preg_(),
258  branchIDListHelper_(),
259  serviceToken_(),
260  input_(),
261  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
265  processConfiguration_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
271  deferredExceptionPtrIsSet_(false),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
278  exceptionMessageFiles_(),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
283  forceESCacheClearOnNewRun_(false),
284  eventSetupDataToExcludeFromPrefetching_() {
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
289 
290  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
291  ServiceToken const& token,
293  : actReg_(),
294  preg_(),
295  branchIDListHelper_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
302  processConfiguration_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
308  deferredExceptionPtrIsSet_(false),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
315  exceptionMessageFiles_(),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
320  forceESCacheClearOnNewRun_(false),
321  eventSetupDataToExcludeFromPrefetching_() {
322  init(processDesc, token, legacy);
323  }
324 
325  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
326  ServiceToken const& iToken,
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
606 
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
624 
628  task.waitNoThrow();
629  assert(task.done());
630  }
631 
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
639 
644  actReg_->preallocateSignal_(bounds);
645  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
647 
648  std::vector<ModuleProcessName> consumedBySubProcesses;
650  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
651  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
652  if (consumedBySubProcesses.empty()) {
653  consumedBySubProcesses = std::move(c);
654  } else if (not c.empty()) {
655  std::vector<ModuleProcessName> tmp;
656  tmp.reserve(consumedBySubProcesses.size() + c.size());
657  std::merge(consumedBySubProcesses.begin(),
658  consumedBySubProcesses.end(),
659  c.begin(),
660  c.end(),
661  std::back_inserter(tmp));
662  std::swap(consumedBySubProcesses, tmp);
663  }
664  });
665 
666  // Note: all these may throw
669  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
670  not unusedModules.empty()) {
672 
673  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
674  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
675  "and "
676  "therefore they are deleted before beginJob transition.";
677  for (auto const& description : unusedModules) {
678  l << "\n " << description->moduleLabel();
679  }
680  });
681  for (auto const& description : unusedModules) {
682  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
683  }
684  }
685  }
686  // Initialize after the deletion of non-consumed unscheduled
687  // modules to avoid non-consumed non-run modules to keep the
688  // products unnecessarily alive
689  if (not branchesToDeleteEarly_.empty()) {
690  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
691  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
692  auto referencesToBranches = std::move(referencesToBranches_);
693  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
694  }
695 
698  }
699  if (preallocations_.numberOfRuns() > 1) {
701  }
702 
703  //NOTE: This implementation assumes 'Job' means one call
704  // the EventProcessor::run
705  // If it really means once per 'application' then this code will
706  // have to be changed.
707  // Also have to deal with case where have 'run' then new Module
708  // added and do 'run'
709  // again. In that case the newly added Module needs its 'beginJob'
710  // to be called.
711 
712  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
713  // For now we delay calling beginOfJob until first beginOfRun
714  //if(looper_) {
715  // looper_->beginOfJob(es);
716  //}
717  espController_->finishConfiguration();
718  actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
719  try {
720  convertException::wrap([&]() { input_->doBeginJob(); });
721  } catch (cms::Exception& ex) {
722  ex.addContext("Calling beginJob for the source");
723  throw;
724  }
725 
727 
728  // If we execute the beginJob transition for any module then we execute it
729  // for all of the modules. We save the first exception and rethrow that
730  // after they all complete.
731  std::exception_ptr firstException;
732  CMS_SA_ALLOW try {
733  schedule_->beginJob(
734  *preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, pathsAndConsumesOfModules_, processContext_);
735  } catch (...) {
736  firstException = std::current_exception();
737  }
738  if (looper_ && !firstException) {
739  CMS_SA_ALLOW try {
740  constexpr bool mustPrefetchMayGet = true;
741  auto const processBlockLookup = preg_->productLookup(InProcess);
742  auto const runLookup = preg_->productLookup(InRun);
743  auto const lumiLookup = preg_->productLookup(InLumi);
744  auto const eventLookup = preg_->productLookup(InEvent);
745  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
746  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
747  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
748  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
749  looper_->updateLookup(esp_->recordsToResolverIndices());
750  } catch (...) {
751  firstException = std::current_exception();
752  }
753  }
754  for (auto& subProcess : subProcesses_) {
755  CMS_SA_ALLOW try { subProcess.doBeginJob(); } catch (...) {
756  if (!firstException) {
757  firstException = std::current_exception();
758  }
759  }
760  }
761  if (firstException) {
762  std::rethrow_exception(firstException);
763  }
764 
765  beginJobSucceeded_ = true;
766  beginStreams();
767  }
768 
770  // This will process streams concurrently, but not modules in the
771  // same stream or SubProcesses.
772  oneapi::tbb::task_group group;
773  FinalWaitingTask finalWaitingTask{group};
774  using namespace edm::waiting_task::chain;
775  {
776  WaitingTaskHolder taskHolder(group, &finalWaitingTask);
777  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
778  first([this, i](auto nextTask) {
779  std::exception_ptr exceptionPtr;
780  {
782  CMS_SA_ALLOW try { schedule_->beginStream(i); } catch (...) {
783  exceptionPtr = std::current_exception();
784  }
785  for (auto& subProcess : subProcesses_) {
786  CMS_SA_ALLOW try { subProcess.doBeginStream(i); } catch (...) {
787  if (!exceptionPtr) {
788  exceptionPtr = std::current_exception();
789  }
790  }
791  }
792  }
793  nextTask.doneWaiting(exceptionPtr);
794  }) | lastTask(taskHolder);
795  }
796  }
797  finalWaitingTask.wait();
798  }
799 
800  void EventProcessor::endStreams(ExceptionCollector& collector) noexcept {
801  std::mutex collectorMutex;
802 
803  // This will process streams concurrently, but not modules in the
804  // same stream or SubProcesses.
805  oneapi::tbb::task_group group;
806  FinalWaitingTask finalWaitingTask{group};
807  using namespace edm::waiting_task::chain;
808  {
809  WaitingTaskHolder taskHolder(group, &finalWaitingTask);
810  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
811  first([this, i, &collector, &collectorMutex](auto nextTask) {
812  {
814  schedule_->endStream(i, collector, collectorMutex);
815  for (auto& subProcess : subProcesses_) {
816  subProcess.doEndStream(i, collector, collectorMutex);
817  }
818  }
819  }) | lastTask(taskHolder);
820  }
821  }
822  finalWaitingTask.waitNoThrow();
823  }
824 
826  // Collects exceptions, so we don't throw before all operations are performed.
828  "Multiple exceptions were thrown while executing endStream and endJob. An exception message follows for "
829  "each.\n");
830 
831  //make the services available
833 
834  if (beginJobSucceeded_) {
835  endStreams(c);
836  }
837 
839  schedule_->endJob(c);
840  for (auto& subProcess : subProcesses_) {
841  subProcess.doEndJob(c);
842  }
843  c.call(std::bind(&InputSource::doEndJob, input_.get()));
844  if (looper_) {
845  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
846  }
847  if (c.hasThrown()) {
848  c.rethrow();
849  }
850  }
851  }
852 
854 
855  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
856  return schedule_->getAllModuleDescriptions();
857  }
858 
859  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
860 
861  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
862 
863  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
864 
865  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
866 
867  namespace {
868 #include "TransitionProcessors.icc"
869  }
870 
872  bool returnValue = false;
873 
874  // Look for a shutdown signal
875  if (shutdown_flag.load(std::memory_order_acquire)) {
876  returnValue = true;
877  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
879  jr->reportShutdownSignal();
881  }
882  return returnValue;
883  }
884 
885  namespace {
886  struct SourceNextGuard {
887  SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
888  ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
889  edm::ActivityRegistry& act_;
890  };
891  } // namespace
892 
895  InputSource::ItemTypeInfo itemTypeInfo;
896  {
897  SourceNextGuard guard(*actReg_.get());
898  //For now, do nothing with InputSource::IsSynchronize
899  do {
900  itemTypeInfo = input_->nextItemType();
901  } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
902  }
903  lastSourceTransition_ = itemTypeInfo;
904  sentry.completedSuccessfully();
905 
907 
909  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
911  }
912 
913  return lastSourceTransition_;
914  }
915 
916  void EventProcessor::nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
917  WaitingTaskHolder nextTask) {
918  auto group = nextTask.group();
920  *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
921  CMS_SA_ALLOW try {
923  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
926  runStatus->runPrincipal()->run() == input_->run() &&
927  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
929  << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
930  << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
931  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
932  }
933  } catch (...) {
934  nextHolder.doneWaiting(std::current_exception());
935  }
936  });
937  }
938 
940  beginJob(); //make sure this was called
941 
942  // make the services available
944  actReg_->beginProcessingSignal_();
945  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
946  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
947  try {
948  FilesProcessor fp(fileModeNoMerge_);
949 
950  convertException::wrap([&]() {
951  bool firstTime = true;
952  do {
953  if (not firstTime) {
955  rewindInput();
956  } else {
957  firstTime = false;
958  }
959  startingNewLoop();
960 
961  auto trans = fp.processFiles(*this);
962 
963  fp.normalEnd();
964 
965  if (deferredExceptionPtrIsSet_.load()) {
966  std::rethrow_exception(deferredExceptionPtr_);
967  }
968  if (trans != InputSource::ItemType::IsStop) {
969  //problem with the source
970  doErrorStuff();
971 
972  throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
973  }
974  } while (not endOfLoop());
975  }); // convertException::wrap
976 
977  } // Try block
978  catch (cms::Exception& e) {
980  std::string message(
981  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
982  e.addAdditionalInfo(message);
983  if (e.alreadyPrinted()) {
984  LogAbsolute("Additional Exceptions") << message;
985  }
986  }
987  if (exceptionMessageRuns_) {
988  std::string message(
989  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
990  e.addAdditionalInfo(message);
991  if (e.alreadyPrinted()) {
992  LogAbsolute("Additional Exceptions") << message;
993  }
994  }
995  if (!exceptionMessageFiles_.empty()) {
996  e.addAdditionalInfo(exceptionMessageFiles_);
997  if (e.alreadyPrinted()) {
998  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
999  }
1000  }
1001  throw;
1002  }
1003  return epSuccess;
1004  }
1005 
1007  FDEBUG(1) << " \treadFile\n";
1008  size_t size = preg_->size();
1010 
1011  if (streamRunActive_ > 0) {
1012  streamRunStatus_[0]->runPrincipal()->preReadFile();
1013  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
1014  }
1015 
1016  if (streamLumiActive_ > 0) {
1017  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
1018  }
1019 
1020  fb_ = input_->readFile();
1021  if (size < preg_->size()) {
1023  }
1026  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1027  }
1028  sentry.completedSuccessfully();
1029  }
1030 
1031  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1032  if (fileBlockValid()) {
1034  input_->closeFile(fb_.get(), cleaningUpAfterException);
1035  sentry.completedSuccessfully();
1036  }
1037  FDEBUG(1) << "\tcloseInputFile\n";
1038  }
1039 
1041  if (fileBlockValid()) {
1042  schedule_->openOutputFiles(*fb_);
1043  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1044  }
1045  FDEBUG(1) << "\topenOutputFiles\n";
1046  }
1047 
1049  schedule_->closeOutputFiles();
1050  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1051  processBlockHelper_->clearAfterOutputFilesClose();
1052  FDEBUG(1) << "\tcloseOutputFiles\n";
1053  }
1054 
1056  if (fileBlockValid()) {
1058  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1059  schedule_->respondToOpenInputFile(*fb_);
1060  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1061  }
1062  FDEBUG(1) << "\trespondToOpenInputFile\n";
1063  }
1064 
1066  if (fileBlockValid()) {
1067  schedule_->respondToCloseInputFile(*fb_);
1068  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1069  }
1070  FDEBUG(1) << "\trespondToCloseInputFile\n";
1071  }
1072 
1074  shouldWeStop_ = false;
1075  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1076  // until after we've called beginOfJob
1077  if (looper_ && looperBeginJobRun_) {
1078  looper_->doStartingNewLoop();
1079  }
1080  FDEBUG(1) << "\tstartingNewLoop\n";
1081  }
1082 
1084  if (looper_) {
1085  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1086  looper_->setModuleChanger(&changer);
1087  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1088  looper_->setModuleChanger(nullptr);
1090  return true;
1091  else
1092  return false;
1093  }
1094  FDEBUG(1) << "\tendOfLoop\n";
1095  return true;
1096  }
1097 
1099  input_->repeat();
1100  input_->rewind();
1101  FDEBUG(1) << "\trewind\n";
1102  }
1103 
1105  looper_->prepareForNextLoop(esp_.get());
1106  FDEBUG(1) << "\tprepareForNextLoop\n";
1107  }
1108 
1110  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1111  if (!subProcesses_.empty()) {
1112  for (auto const& subProcess : subProcesses_) {
1113  if (subProcess.shouldWeCloseOutput()) {
1114  return true;
1115  }
1116  }
1117  return false;
1118  }
1119  return schedule_->shouldWeCloseOutput();
1120  }
1121 
1123  FDEBUG(1) << "\tdoErrorStuff\n";
1124  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1125  << "and went to the error state\n"
1126  << "Will attempt to terminate processing normally\n"
1127  << "(IF using the looper the next loop will be attempted)\n"
1128  << "This likely indicates a bug in an input module or corrupted input or both\n";
1129  }
1130 
1131  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1132  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1133  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1134 
1136  FinalWaitingTask globalWaitTask{taskGroup_};
1137 
1138  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1139  beginGlobalTransitionAsync<Traits>(
1140  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1141 
1142  globalWaitTask.wait();
1143  beginProcessBlockSucceeded = true;
1144  }
1145 
1147  input_->fillProcessBlockHelper();
1149  while (input_->nextProcessBlock(processBlockPrincipal)) {
1150  readProcessBlock(processBlockPrincipal);
1151 
1153  FinalWaitingTask globalWaitTask{taskGroup_};
1154 
1155  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1156  beginGlobalTransitionAsync<Traits>(
1157  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1158 
1159  globalWaitTask.wait();
1160 
1161  FinalWaitingTask writeWaitTask{taskGroup_};
1163  writeWaitTask.wait();
1164 
1165  processBlockPrincipal.clearPrincipal();
1166  for (auto& s : subProcesses_) {
1167  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1168  }
1169  }
1170  }
1171 
1172  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1173  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1174 
1176  FinalWaitingTask globalWaitTask{taskGroup_};
1177 
1178  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1179  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1180  *schedule_,
1181  transitionInfo,
1182  serviceToken_,
1183  subProcesses_,
1184  cleaningUpAfterException);
1185  globalWaitTask.wait();
1186 
1187  if (beginProcessBlockSucceeded) {
1188  FinalWaitingTask writeWaitTask{taskGroup_};
1190  writeWaitTask.wait();
1191  }
1192 
1193  processBlockPrincipal.clearPrincipal();
1194  for (auto& s : subProcesses_) {
1195  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1196  }
1197  }
1198 
1200  FinalWaitingTask waitTask{taskGroup_};
1202  if (streamRunActive_ == 0) {
1203  assert(streamLumiActive_ == 0);
1204 
1205  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1206  WaitingTaskHolder{taskGroup_, &waitTask});
1207  } else {
1209 
1210  auto runStatus = streamRunStatus_[0];
1211 
1213  runStatus->runPrincipal()->run() == input_->run() and
1214  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1215  readAndMergeRun(*runStatus);
1217  }
1218 
1219  setNeedToCallNext(false);
1220 
1221  WaitingTaskHolder holder{taskGroup_, &waitTask};
1222  runStatus->setHolderOfTaskInProcessRuns(holder);
1223  if (streamLumiActive_ > 0) {
1225  continueLumiAsync(std::move(holder));
1226  } else {
1228  }
1229  }
1230  waitTask.wait();
1231  return lastTransitionType();
1232  }
1233 
1235  if (iHolder.taskHasFailed()) {
1236  return;
1237  }
1238 
1239  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1240 
1241  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1242 
1243  chain::first([this, &status, iSync](auto nextTask) {
1244  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1245  nextTask,
1246  status->endIOVWaitingTasks(),
1247  status->eventSetupImpls(),
1249  actReg_.get(),
1250  serviceToken_,
1252  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1253  CMS_SA_ALLOW try {
1254  if (iException) {
1255  WaitingTaskHolder copyHolder(nextTask);
1256  copyHolder.doneWaiting(*iException);
1257  // Finish handling the exception in the task pushed to runQueue_
1258  }
1260 
1261  runQueue_->pushAndPause(
1262  *nextTask.group(),
1263  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1264  CMS_SA_ALLOW try {
1265  if (postRunQueueTask.taskHasFailed()) {
1266  status->resetBeginResources();
1268  return;
1269  }
1270 
1271  status->setResumer(std::move(iResumer));
1272 
1274  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1275  CMS_SA_ALLOW try {
1277 
1278  if (postSourceTask.taskHasFailed()) {
1279  status->resetBeginResources();
1281  status->resumeGlobalRunQueue();
1282  return;
1283  }
1284 
1285  status->setRunPrincipal(readRun());
1286 
1287  RunPrincipal& runPrincipal = *status->runPrincipal();
1288  {
1290  input_->doBeginRun(runPrincipal, &processContext_);
1291  sentry.completedSuccessfully();
1292  }
1293 
1294  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1295  if (looper_ && looperBeginJobRun_ == false) {
1296  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1297 
1298  oneapi::tbb::task_group group;
1299  FinalWaitingTask waitTask{group};
1300  using namespace edm::waiting_task::chain;
1301  chain::first([this, &es](auto nextTask) {
1302  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1303  }) | then([this, &es](auto nextTask) mutable {
1304  looper_->beginOfJob(es);
1305  looperBeginJobRun_ = true;
1306  looper_->doStartingNewLoop();
1307  }) | runLast(WaitingTaskHolder(group, &waitTask));
1308  waitTask.wait();
1309  }
1310 
1311  using namespace edm::waiting_task::chain;
1312  chain::first([this, status](auto nextTask) mutable {
1313  CMS_SA_ALLOW try {
1316  } else {
1317  setNeedToCallNext(true);
1318  }
1319  } catch (...) {
1320  status->setStopBeforeProcessingRun(true);
1321  nextTask.doneWaiting(std::current_exception());
1322  }
1323  }) | then([this, status, &es](auto nextTask) {
1324  if (status->stopBeforeProcessingRun()) {
1325  return;
1326  }
1327  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1329  beginGlobalTransitionAsync<Traits>(
1330  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1331  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1332  if (status->stopBeforeProcessingRun()) {
1333  return;
1334  }
1335  looper_->prefetchAsync(
1336  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1337  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1338  if (status->stopBeforeProcessingRun()) {
1339  return;
1340  }
1341  ServiceRegistry::Operate operateLooper(serviceToken_);
1342  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1343  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1344  if (iException) {
1345  WaitingTaskHolder copyHolder(holder);
1346  copyHolder.doneWaiting(*iException);
1347  } else {
1348  status->globalBeginDidSucceed();
1349  }
1350 
1351  if (status->stopBeforeProcessingRun()) {
1352  // We just quit now if there was a failure when merging runs
1353  status->resetBeginResources();
1355  status->resumeGlobalRunQueue();
1356  return;
1357  }
1358  CMS_SA_ALLOW try {
1359  // Under normal circumstances, this task runs after endRun has completed for all streams
1360  // and global endLumi has completed for all lumis contained in this run
1361  auto globalEndRunTask =
1362  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1363  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1364  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1366  });
1367  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1368  } catch (...) {
1369  status->resetBeginResources();
1371  status->resumeGlobalRunQueue();
1372  holder.doneWaiting(std::current_exception());
1373  return;
1374  }
1375 
1376  // After this point we are committed to end the run via endRunAsync
1377 
1379 
1380  // The only purpose of the pause is to cause stream begin run to execute before
1381  // global begin lumi in the single threaded case (maintains consistency with
1382  // the order that existed before concurrent runs were implemented).
1383  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1384 
1385  CMS_SA_ALLOW try {
1386  streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1387  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1388  CMS_SA_ALLOW try {
1389  streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1391  });
1392  } catch (...) {
1393  if (status->streamFinishedBeginRun()) {
1394  WaitingTaskHolder copyHolder(holder);
1395  copyHolder.doneWaiting(std::current_exception());
1396  status->resetBeginResources();
1399  }
1400  }
1401  }
1402  });
1403  } catch (...) {
1404  WaitingTaskHolder copyHolder(holder);
1405  copyHolder.doneWaiting(std::current_exception());
1406  status->resetBeginResources();
1409  }
1411  }) | runLast(postSourceTask);
1412  } catch (...) {
1413  status->resetBeginResources();
1415  status->resumeGlobalRunQueue();
1416  postSourceTask.doneWaiting(std::current_exception());
1417  }
1418  }); // task in sourceResourcesAcquirer
1419  } catch (...) {
1420  status->resetBeginResources();
1422  status->resumeGlobalRunQueue();
1423  postRunQueueTask.doneWaiting(std::current_exception());
1424  }
1425  }); // task in runQueue
1426  } catch (...) {
1427  status->resetBeginResources();
1429  nextTask.doneWaiting(std::current_exception());
1430  }
1431  }) | chain::runLast(std::move(iHolder));
1432  }
1433 
1434  void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1435  std::shared_ptr<RunProcessingStatus> status,
1436  WaitingTaskHolder iHolder) noexcept {
1437  // These shouldn't throw
1438  streamQueues_[iStream].pause();
1439  ++streamRunActive_;
1440  streamRunStatus_[iStream] = std::move(status);
1441 
1442  CMS_SA_ALLOW try {
1443  using namespace edm::waiting_task::chain;
1444  chain::first([this, iStream](auto nextTask) {
1445  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1446  if (rs.didGlobalBeginSucceed()) {
1447  RunTransitionInfo transitionInfo(
1448  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1450  beginStreamTransitionAsync<Traits>(
1451  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1452  }
1453  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1454  if (exceptionFromBeginStreamRun) {
1455  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1456  }
1457  releaseBeginRunResources(iStream);
1458  }) | runLast(iHolder);
1459  } catch (...) {
1460  releaseBeginRunResources(iStream);
1461  iHolder.doneWaiting(std::current_exception());
1462  }
1463  }
1464 
1465  void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1466  auto& status = streamRunStatus_[iStream];
1467  if (status->streamFinishedBeginRun()) {
1468  status->resetBeginResources();
1470  }
1471  streamQueues_[iStream].resume();
1472  }
1473 
1474  void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1475  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1476  iRunStatus->setEndTime();
1477  IOVSyncValue ts(
1479  runPrincipal.endTime());
1480  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1481  WaitingTaskHolder copyHolder(iHolder);
1482  copyHolder.doneWaiting(std::current_exception());
1483  }
1484 
1485  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1486  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1487  nextTask,
1488  iRunStatus->endIOVWaitingTasksEndRun(),
1489  iRunStatus->eventSetupImplsEndRun(),
1491  actReg_.get(),
1492  serviceToken_);
1493  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1494  if (iException) {
1495  iRunStatus->setEndingEventSetupSucceeded(false);
1496  handleEndRunExceptions(*iException, nextTask);
1497  }
1499  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1500  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1501  CMS_SA_ALLOW try {
1502  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1503  streamQueues_[i].pause();
1504  streamEndRunAsync(std::move(nextTask), i);
1505  });
1506  } catch (...) {
1507  WaitingTaskHolder copyHolder(nextTask);
1508  copyHolder.doneWaiting(std::current_exception());
1509  }
1510  }
1511  });
1512 
1514  CMS_SA_ALLOW try {
1515  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1516  } catch (...) {
1517  WaitingTaskHolder copyHolder(nextTask);
1518  copyHolder.doneWaiting(std::current_exception());
1519  }
1520  }
1521  }) | chain::runLast(std::move(iHolder));
1522  }
1523 
1524  void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1525  if (holder.taskHasFailed()) {
1527  } else {
1528  WaitingTaskHolder tmp(holder);
1529  tmp.doneWaiting(iException);
1530  }
1531  }
1532 
1533  void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1534  auto& runPrincipal = *(iRunStatus->runPrincipal());
1535  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1536  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1537  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1538  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1539  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1540 
1541  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1542  using namespace edm::waiting_task::chain;
1543  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1544  auto nextTask) {
1545  if (endingEventSetupSucceeded) {
1546  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1548  endGlobalTransitionAsync<Traits>(
1549  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1550  }
1551  }) |
1552  ifThen(looper_ && endingEventSetupSucceeded,
1553  [this, &runPrincipal, &es](auto nextTask) {
1554  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1555  }) |
1556  ifThen(looper_ && endingEventSetupSucceeded,
1557  [this, &runPrincipal, &es](auto nextTask) {
1559  looper_->doEndRun(runPrincipal, es, &processContext_);
1560  }) |
1561  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1562  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1563  mergeableRunProductMetadata->preWriteRun();
1564  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1565  }) |
1566  then([status = std::move(iRunStatus),
1567  this,
1568  didGlobalBeginSucceed,
1569  mergeableRunProductMetadata,
1570  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1571  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1572  mergeableRunProductMetadata->postWriteRun();
1573  }
1574  if (iException) {
1575  handleEndRunExceptions(*iException, nextTask);
1576  }
1578 
1579  std::exception_ptr ptr;
1580 
1581  // Try hard to clean up resources so the
1582  // process can terminate in a controlled
1583  // fashion even after exceptions have occurred.
1584  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1585  if (not ptr) {
1586  ptr = std::current_exception();
1587  }
1588  }
1589  CMS_SA_ALLOW try {
1590  status->resumeGlobalRunQueue();
1592  } catch (...) {
1593  if (not ptr) {
1594  ptr = std::current_exception();
1595  }
1596  }
1597  CMS_SA_ALLOW try {
1598  status->resetEndResources();
1599  status.reset();
1600  } catch (...) {
1601  if (not ptr) {
1602  ptr = std::current_exception();
1603  }
1604  }
1605 
1606  if (ptr && !iException) {
1607  handleEndRunExceptions(ptr, nextTask);
1608  }
1609  }) |
1610  runLast(std::move(iTask));
1611  }
1612 
1613  void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1614  CMS_SA_ALLOW try {
1615  if (!streamRunStatus_[iStreamIndex]) {
1616  if (exceptionRunStatus_->streamFinishedRun()) {
1617  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1618  exceptionRunStatus_.reset();
1619  }
1620  return;
1621  }
1622 
1623  auto runDoneTask =
1624  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1625  if (iException) {
1626  handleEndRunExceptions(*iException, iTask);
1627  }
1628 
1629  auto runStatus = streamRunStatus_[iStreamIndex];
1630 
1631  //reset status before releasing queue else get race condition
1632  if (runStatus->streamFinishedRun()) {
1633  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1634  }
1635  streamRunStatus_[iStreamIndex].reset();
1636  --streamRunActive_;
1637  streamQueues_[iStreamIndex].resume();
1638  });
1639 
1640  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1641 
1642  auto runStatus = streamRunStatus_[iStreamIndex].get();
1643 
1644  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1645  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1646  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1647  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1648 
1649  auto& runPrincipal = *runStatus->runPrincipal();
1651  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1652  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1653  *schedule_,
1654  iStreamIndex,
1655  transitionInfo,
1656  serviceToken_,
1657  subProcesses_,
1658  cleaningUpAfterException);
1659  }
1660  } catch (...) {
1661  handleEndRunExceptions(std::current_exception(), iTask);
1662  }
1663  }
1664 
1665  void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1666  if (streamRunActive_ > 0) {
1667  FinalWaitingTask waitTask{taskGroup_};
1668 
1669  auto runStatus = streamRunStatus_[0].get();
1670  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1671  WaitingTaskHolder holder{taskGroup_, &waitTask};
1672  runStatus->setHolderOfTaskInProcessRuns(holder);
1674  endRunAsync(streamRunStatus_[0], std::move(holder));
1675  waitTask.wait();
1676  }
1677  }
1678 
1680  std::shared_ptr<RunProcessingStatus> iRunStatus,
1681  edm::WaitingTaskHolder iHolder) {
1682  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1683 
1684  auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1685  chain::first([this, &iSync, &status](auto nextTask) {
1686  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1687  nextTask,
1688  status->endIOVWaitingTasks(),
1689  status->eventSetupImpls(),
1691  actReg_.get(),
1692  serviceToken_);
1693  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1694  CMS_SA_ALLOW try {
1695  //the call to doneWaiting will cause the count to decrement
1696  if (iException) {
1697  WaitingTaskHolder copyHolder(nextTask);
1698  copyHolder.doneWaiting(*iException);
1699  }
1700 
1701  lumiQueue_->pushAndPause(
1702  *nextTask.group(),
1703  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1704  CMS_SA_ALLOW try {
1705  if (postLumiQueueTask.taskHasFailed()) {
1706  status->resetResources();
1708  endRunAsync(iRunStatus, postLumiQueueTask);
1709  return;
1710  }
1711 
1712  status->setResumer(std::move(iResumer));
1713 
1715  *postLumiQueueTask.group(),
1716  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1717  CMS_SA_ALLOW try {
1719 
1720  if (postSourceTask.taskHasFailed()) {
1721  status->resetResources();
1723  endRunAsync(iRunStatus, postSourceTask);
1724  return;
1725  }
1726 
1727  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1728 
1729  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1730  {
1732  input_->doBeginLumi(lumiPrincipal, &processContext_);
1733  sentry.completedSuccessfully();
1734  }
1735 
1737  if (rng.isAvailable()) {
1738  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1739  rng->preBeginLumi(lb);
1740  }
1741 
1742  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1743 
1744  using namespace edm::waiting_task::chain;
1745  chain::first([this, status](auto nextTask) mutable {
1748  } else {
1749  setNeedToCallNext(true);
1750  }
1751  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1752  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1754  beginGlobalTransitionAsync<Traits>(
1755  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1756  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1757  looper_->prefetchAsync(
1758  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1759  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1760  ServiceRegistry::Operate operateLooper(serviceToken_);
1761  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1762  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1763  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1764 
1765  if (iException) {
1766  WaitingTaskHolder copyHolder(holder);
1767  copyHolder.doneWaiting(*iException);
1768  globalEndLumiAsync(holder, status);
1769  endRunAsync(iRunStatus, holder);
1770  } else {
1771  status->globalBeginDidSucceed();
1772 
1773  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1775 
1776  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1777  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1778  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1779  if (!status->shouldStreamStartLumi()) {
1780  return;
1781  }
1782  streamQueues_[i].pause();
1783 
1784  auto& event = principalCache_.eventPrincipal(i);
1785  auto eventSetupImpls = &status->eventSetupImpls();
1786  auto lp = status->lumiPrincipal().get();
1789  event.setLuminosityBlockPrincipal(lp);
1790  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1791  using namespace edm::waiting_task::chain;
1792  chain::first([this, i, &transitionInfo](auto nextTask) {
1793  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1794  *schedule_,
1795  i,
1796  transitionInfo,
1797  serviceToken_,
1798  subProcesses_);
1799  }) |
1800  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1801  auto nextTask) {
1802  if (exceptionFromBeginStreamLumi) {
1803  WaitingTaskHolder copyHolder(nextTask);
1804  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1805  }
1807  }) |
1808  runLast(std::move(holder));
1809  });
1810  } // end for loop over streams
1811  });
1812  }
1813  }) | runLast(postSourceTask);
1814  } catch (...) {
1815  status->resetResources();
1817  WaitingTaskHolder copyHolder(postSourceTask);
1818  copyHolder.doneWaiting(std::current_exception());
1819  endRunAsync(iRunStatus, postSourceTask);
1820  }
1821  }); // task in sourceResourcesAcquirer
1822  } catch (...) {
1823  status->resetResources();
1825  WaitingTaskHolder copyHolder(postLumiQueueTask);
1826  copyHolder.doneWaiting(std::current_exception());
1827  endRunAsync(iRunStatus, postLumiQueueTask);
1828  }
1829  }); // task in lumiQueue
1830  } catch (...) {
1831  status->resetResources();
1833  WaitingTaskHolder copyHolder(nextTask);
1834  copyHolder.doneWaiting(std::current_exception());
1835  endRunAsync(iRunStatus, nextTask);
1836  }
1837  }) | chain::runLast(std::move(iHolder));
1838  }
1839 
1841  chain::first([this](auto nextTask) {
1842  //all streams are sharing the same status at the moment
1843  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1845 
1847  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1850  }
1851  }) | chain::then([this](auto nextTask) mutable {
1852  unsigned int streamIndex = 0;
1853  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1854  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1855  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1856  }
1857  nextTask.group()->run(
1858  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1859  }) | chain::runLast(std::move(iHolder));
1860  }
1861 
1862  void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1863  if (holder.taskHasFailed()) {
1865  } else {
1866  WaitingTaskHolder tmp(holder);
1867  tmp.doneWaiting(iException);
1868  }
1869  }
1870 
1872  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1873  // Get some needed info out of the status object before moving
1874  // it into finalTaskForThisLumi.
1875  auto& lp = *(iLumiStatus->lumiPrincipal());
1876  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1877  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1878  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1879  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1880 
1881  using namespace edm::waiting_task::chain;
1882  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1883  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1884 
1885  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1887  endGlobalTransitionAsync<Traits>(
1888  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1889  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1890  //Only call writeLumi if beginLumi succeeded
1891  if (didGlobalBeginSucceed) {
1892  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1893  }
1894  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1895  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1896  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1897  //any thrown exception auto propagates to nextTask via the chain
1899  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1900  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1901  if (iException) {
1902  handleEndLumiExceptions(*iException, nextTask);
1903  }
1905 
1906  std::exception_ptr ptr;
1907 
1908  // Try hard to clean up resources so the
1909  // process can terminate in a controlled
1910  // fashion even after exceptions have occurred.
1911  // Caught exception is passed to handleEndLumiExceptions()
1912  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1913  if (not ptr) {
1914  ptr = std::current_exception();
1915  }
1916  }
1917  // Caught exception is passed to handleEndLumiExceptions()
1918  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1919  if (not ptr) {
1920  ptr = std::current_exception();
1921  }
1922  }
1923  // Caught exception is passed to handleEndLumiExceptions()
1924  CMS_SA_ALLOW try {
1925  status->resetResources();
1926  status->globalEndRunHolderDoneWaiting();
1927  status.reset();
1928  } catch (...) {
1929  if (not ptr) {
1930  ptr = std::current_exception();
1931  }
1932  }
1933 
1934  if (ptr && !iException) {
1935  handleEndLumiExceptions(ptr, nextTask);
1936  }
1937  }) | runLast(std::move(iTask));
1938  }
1939 
1940  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1941  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1942  auto status = streamLumiStatus_[iStreamIndex];
1943  if (iException) {
1944  handleEndLumiExceptions(*iException, iTask);
1945  }
1946 
1947  // reset status before releasing queue else get race condition
1948  streamLumiStatus_[iStreamIndex].reset();
1950  streamQueues_[iStreamIndex].resume();
1951 
1952  //are we the last one?
1953  if (status->streamFinishedLumi()) {
1955  }
1956  });
1957 
1958  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1959 
1960  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1961  // therefore we do not want to hold the shared_ptr
1962  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1963  lumiStatus->setEndTime();
1964 
1965  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1966  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1967  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1968 
1969  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1971  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1972  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1973  *schedule_,
1974  iStreamIndex,
1975  transitionInfo,
1976  serviceToken_,
1977  subProcesses_,
1978  cleaningUpAfterException);
1979  }
1980 
1981  void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1982  if (streamRunActive_ == 0) {
1983  assert(streamLumiActive_ == 0);
1984  } else {
1986  if (streamLumiActive_ > 0) {
1987  FinalWaitingTask globalWaitTask{taskGroup_};
1989  streamLumiStatus_[0]->noMoreEventsInLumi();
1990  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1991  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1992  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1993  }
1994  globalWaitTask.wait();
1995  }
1996  }
1997  }
1998 
2001  input_->readProcessBlock(processBlockPrincipal);
2002  sentry.completedSuccessfully();
2003  }
2004 
2005  std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
2007  assert(rp);
2008  rp->setAux(*input_->runAuxiliary());
2009  {
2011  input_->readRun(*rp, *historyAppender_);
2012  sentry.completedSuccessfully();
2013  }
2014  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2015  return rp;
2016  }
2017 
2019  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
2020 
2021  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
2022  assert(runOK);
2023  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2024  {
2026  input_->readAndMergeRun(runPrincipal);
2027  sentry.completedSuccessfully();
2028  }
2029  }
2030 
2031  std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
2033  assert(lbp);
2034  lbp->setAux(*input_->luminosityBlockAuxiliary());
2035  {
2037  input_->readLuminosityBlock(*lbp, *historyAppender_);
2038  sentry.completedSuccessfully();
2039  }
2040  lbp->setRunPrincipal(std::move(rp));
2041  return lbp;
2042  }
2043 
2045  auto& lumiPrincipal = *iStatus.lumiPrincipal();
2046  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2047  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2048  input_->processHistoryRegistry().reducedProcessHistoryID(
2049  input_->luminosityBlockAuxiliary()->processHistoryID()));
2050  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2051  assert(lumiOK);
2052  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2053  {
2055  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2056  sentry.completedSuccessfully();
2057  }
2058  }
2059 
2061  using namespace edm::waiting_task;
2062  chain::first([&](auto nextTask) {
2064  schedule_->writeProcessBlockAsync(
2065  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2066  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2068  for (auto& s : subProcesses_) {
2069  s.writeProcessBlockAsync(nextTask, processBlockType);
2070  }
2071  }) | chain::runLast(std::move(task));
2072  }
2073 
2075  RunPrincipal const& runPrincipal,
2076  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2077  using namespace edm::waiting_task;
2078  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2079  chain::first([&](auto nextTask) {
2081  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2082  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2084  for (auto& s : subProcesses_) {
2085  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2086  }
2087  }) | chain::runLast(std::move(task));
2088  }
2089  }
2090 
2092  for (auto& s : subProcesses_) {
2093  s.clearRunPrincipal(*iStatus.runPrincipal());
2094  }
2095  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2096  iStatus.runPrincipal()->clearPrincipal();
2097  }
2098 
2100  using namespace edm::waiting_task;
2101  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2102  chain::first([&](auto nextTask) {
2104 
2105  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2106  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2107  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2109  for (auto& s : subProcesses_) {
2110  s.writeLumiAsync(nextTask, lumiPrincipal);
2111  }
2113  }
2114  }
2115 
2117  for (auto& s : subProcesses_) {
2118  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2119  }
2120  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2121  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2122  iStatus.lumiPrincipal()->clearPrincipal();
2123  }
2124 
2125  void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2126  WaitingTaskHolder iHolder) {
2127  auto group = iHolder.group();
2129  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2130  CMS_SA_ALLOW try {
2132 
2133  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2134 
2136  setNeedToCallNext(false);
2137 
2139  status->runPrincipal()->run() == input_->run() and
2140  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2141  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2142  status->setStopBeforeProcessingRun(true);
2143  return;
2144  }
2147  setNeedToCallNext(true);
2148  return;
2149  }
2151  }
2152  } catch (...) {
2153  status->setStopBeforeProcessingRun(true);
2154  holder.doneWaiting(std::current_exception());
2155  }
2156  });
2157  }
2158 
2159  void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2160  WaitingTaskHolder iHolder) {
2161  auto group = iHolder.group();
2163  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2164  CMS_SA_ALLOW try {
2166 
2167  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2168 
2170  setNeedToCallNext(false);
2171 
2173  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2174  readAndMergeLumi(*iLumiStatus);
2176  setNeedToCallNext(true);
2177  return;
2178  }
2180  }
2181  } catch (...) {
2182  holder.doneWaiting(std::current_exception());
2183  }
2184  });
2185  }
2186 
2187  void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2188  WaitingTaskHolder iHolder) {
2189  chain::first([this, iRunStatus](auto nextTask) mutable {
2190  if (needToCallNext()) {
2191  nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2192  }
2193  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2195  if (iException) {
2196  WaitingTaskHolder copyHolder(nextTask);
2197  copyHolder.doneWaiting(*iException);
2198  }
2200  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2201  return;
2202  }
2203  if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2204  CMS_SA_ALLOW try {
2205  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2206  input_->luminosityBlockAuxiliary()->beginTime()),
2207  iRunStatus,
2208  nextTask);
2209  return;
2210  } catch (...) {
2211  WaitingTaskHolder copyHolder(nextTask);
2212  copyHolder.doneWaiting(std::current_exception());
2213  }
2214  }
2215  // Note that endRunAsync will call beginRunAsync for the following run
2216  // if appropriate.
2217  endRunAsync(iRunStatus, std::move(nextTask));
2218  }) | chain::runLast(std::move(iHolder));
2219  }
2220 
2222  unsigned int iStreamIndex,
2224  // This function returns true if it successfully reads an event for the stream and that
2225  // requires both that an event is next and there are no problems or requests to stop.
2226 
2227  if (iTask.taskHasFailed()) {
2228  // We want all streams to stop or all streams to pause. If we are already in the
2229  // middle of pausing streams, then finish pausing all of them and the lumi will be
2230  // ended later. Otherwise, just end it now.
2233  }
2234  return false;
2235  }
2236 
2237  // Did another stream already stop or pause this lumi?
2239  return false;
2240  }
2241 
2242  // Are output modules or the looper requesting we stop?
2243  if (shouldWeStop()) {
2246  return false;
2247  }
2248 
2250 
2251  // need to use lock in addition to the serial task queue because
2252  // of delayed provenance reading and reading data in response to
2253  // edm::Refs etc
2254  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2255 
2256  // If we didn't already call nextTransitionType while merging lumis, call it here.
2257  // This asks the input source what is next and also checks for signals.
2258 
2260  setNeedToCallNext(true);
2261 
2262  if (InputSource::ItemType::IsEvent != itemType) {
2263  // IsFile may continue processing the lumi and
2264  // looper_ can cause the input source to declare a new IsRun which is actually
2265  // just a continuation of the previous run
2267  (InputSource::ItemType::IsRun == itemType and
2268  (iStatus.lumiPrincipal()->run() != input_->run() or
2269  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2270  if (itemType == InputSource::ItemType::IsLumi &&
2271  iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2273  << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2274  << "but the next lumi entry has the same lumi number.\n"
2275  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2276  }
2278  } else {
2280  }
2281  return false;
2282  }
2283  readEvent(iStreamIndex);
2284  return true;
2285  }
2286 
2287  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2288  if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2289  streamEndLumiAsync(iTask, iStreamIndex);
2290  return;
2291  }
2292  auto group = iTask.group();
2293  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2294  CMS_SA_ALLOW try {
2295  auto status = streamLumiStatus_[iStreamIndex].get();
2297 
2298  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2299  auto recursionTask =
2300  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2301  if (iEventException) {
2302  WaitingTaskHolder copyHolder(iTask);
2303  copyHolder.doneWaiting(*iEventException);
2304  // Intentionally, we don't return here. The recursive call to
2305  // handleNextEvent takes care of immediately ending the run properly
2306  // using the same code it uses to end the run in other situations.
2307  }
2308  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2309  });
2310 
2311  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2312  } else {
2313  // the stream will stop processing this lumi now
2315  if (not status->haveStartedNextLumiOrEndedRun()) {
2316  status->noMoreEventsInLumi();
2317  status->startNextLumiOrEndRun();
2319  CMS_SA_ALLOW try {
2320  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2321  input_->luminosityBlockAuxiliary()->beginTime()),
2322  streamRunStatus_[iStreamIndex],
2323  iTask);
2324  } catch (...) {
2325  WaitingTaskHolder copyHolder(iTask);
2326  copyHolder.doneWaiting(std::current_exception());
2327  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2328  }
2329  } else {
2330  // If appropriate, this will also start the next run.
2331  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2332  }
2333  }
2334  streamEndLumiAsync(iTask, iStreamIndex);
2335  } else {
2336  assert(status->eventProcessingState() ==
2338  auto runStatus = streamRunStatus_[iStreamIndex].get();
2339 
2340  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2341  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2342  }
2343  }
2344  }
2345  } catch (...) {
2346  WaitingTaskHolder copyHolder(iTask);
2347  copyHolder.doneWaiting(std::current_exception());
2348  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2349  }
2350  });
2351  }
2352 
2353  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2354  //TODO this will have to become per stream
2355  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2356  StreamContext streamContext(event.streamID(), &processContext_);
2357 
2359  input_->readEvent(event, streamContext);
2360 
2361  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2362  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2363  sentry.completedSuccessfully();
2364 
2365  FDEBUG(1) << "\treadEvent\n";
2366  }
2367 
2368  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2369  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2370  }
2371 
2372  namespace {
2373  struct ClearEventGuard {
2374  ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
2375  : act_(iReg), context_(iContext) {
2376  iReg.preClearEventSignal_.emit(iContext);
2377  }
2378  ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2379  edm::ActivityRegistry& act_;
2380  edm::StreamContext const& context_;
2381  };
2382  } // namespace
2383 
2384  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2385  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2386 
2389  if (rng.isAvailable()) {
2390  Event ev(*pep, ModuleDescription(), nullptr);
2391  rng->postEventRead(ev);
2392  }
2393 
2394  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2395  using namespace edm::waiting_task::chain;
2396  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2397  EventTransitionInfo info(*pep, es);
2398  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2399  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2400  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2401  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2402  }
2403  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2404  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2405  ServiceRegistry::Operate operateLooper(serviceToken_);
2406  processEventWithLooper(*pep, iStreamIndex);
2407  }) | then([this, pep](auto nextTask) {
2408  FDEBUG(1) << "\tprocessEvent\n";
2409  StreamContext streamContext(pep->streamID(),
2411  pep->id(),
2412  pep->runPrincipal().index(),
2413  pep->luminosityBlockPrincipal().index(),
2414  pep->time(),
2415  &processContext_);
2416  ClearEventGuard guard(*this->actReg_.get(), streamContext);
2417  pep->clearEventPrincipal();
2418  }) | runLast(iHolder);
2419  }
2420 
2421  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2422  bool randomAccess = input_->randomAccess();
2423  ProcessingController::ForwardState forwardState = input_->forwardState();
2424  ProcessingController::ReverseState reverseState = input_->reverseState();
2425  ProcessingController pc(forwardState, reverseState, randomAccess);
2426 
2428  do {
2429  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2430  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2431  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2432 
2433  bool succeeded = true;
2434  if (randomAccess) {
2436  input_->skipEvents(-2);
2438  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2439  }
2440  }
2442  } while (!pc.lastOperationSucceeded());
2444  shouldWeStop_ = true;
2445  }
2446  }
2447 
2449  FDEBUG(1) << "\tshouldWeStop\n";
2450  if (shouldWeStop_)
2451  return true;
2452  if (!subProcesses_.empty()) {
2453  for (auto const& subProcess : subProcesses_) {
2454  if (subProcess.terminate()) {
2455  return true;
2456  }
2457  }
2458  return false;
2459  }
2460  return schedule_->terminate();
2461  }
2462 
2464 
2466 
2468 
2469  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2470  bool expected = false;
2471  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2472  deferredExceptionPtr_ = iException;
2473  return true;
2474  }
2475  return false;
2476  }
2477 
2479  cms::Exception ex("ModulesSynchingOnLumis");
2480  ex << "The framework is configured to use at least two streams, but the following modules\n"
2481  << "require synchronizing on LuminosityBlock boundaries:";
2482  bool found = false;
2483  for (auto worker : schedule_->allWorkers()) {
2484  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2485  found = true;
2486  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2487  }
2488  }
2489  if (found) {
2490  ex << "\n\nThe situation can be fixed by either\n"
2491  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2492  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2493  throw ex;
2494  }
2495  }
2496 
2498  std::unique_ptr<LogSystem> s;
2499  for (auto worker : schedule_->allWorkers()) {
2500  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2501  if (not s) {
2502  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2503  (*s) << "The following modules require synchronizing on Run boundaries:";
2504  }
2505  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2506  }
2507  }
2508  }
2509 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
PreClearEvent preClearEventSignal_
signal is emitted before the data products in the Event are cleared
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:386
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void clearRunPrincipal(RunProcessingStatus &)
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
InputSource::ItemTypeInfo lastSourceTransition_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:210
std::unique_ptr< ExceptionToActionTable const > act_table_
int merge(int argc, char *argv[])
Definition: DiMuonVmerge.cc:28
static PFTauRenderPlugin instance
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
void endStreams(ExceptionCollector &) noexcept
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
Definition: RunPrincipal.h:73
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:50
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
Definition: Signal.h:50
std::multimap< std::string, std::string > referencesToBranches_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
Definition: Registry.cc:40
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
virtual void endOfJob()
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
PreSourceNextTransition preSourceNextTransitionSignal_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void doneWaiting(std::exception_ptr iExcept) noexcept
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
void writeLumi(LuminosityBlockNumber_t lumi)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:837
void reportShutdownSignal()
Definition: JobReport.cc:543
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:169
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:315
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:666
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)