CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
45 
47 
55 
60 
65 
78 
79 #include "MessageForSource.h"
80 #include "MessageForParent.h"
82 #include "RunProcessingStatus.h"
83 
84 #include "boost/range/adaptor/reversed.hpp"
85 
86 #include <cassert>
87 #include <exception>
88 #include <iomanip>
89 #include <iostream>
90 #include <utility>
91 #include <sstream>
92 
93 #include <sys/ipc.h>
94 #include <sys/msg.h>
95 
96 #include "oneapi/tbb/task.h"
97 
98 //Used for CPU affinity
99 #ifndef __APPLE__
100 #include <sched.h>
101 #endif
102 
103 namespace {
104  class PauseQueueSentry {
105  public:
106  PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
107  ~PauseQueueSentry() { queue_.resume(); }
108 
109  private:
110  edm::SerialTaskQueue& queue_;
111  };
112 } // namespace
113 
114 namespace edm {
115 
116  namespace chain = waiting_task::chain;
117 
118  // ---------------------------------------------------------------
119  std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
121  CommonParams const& common,
122  std::shared_ptr<ProductRegistry> preg,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
125  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
126  std::shared_ptr<ActivityRegistry> areg,
127  std::shared_ptr<ProcessConfiguration const> processConfiguration,
128  PreallocationConfiguration const& allocations) {
129  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
130  if (main_input == nullptr) {
132  << "There must be exactly one source in the configuration.\n"
133  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
134  }
135 
136  std::string modtype(main_input->getParameter<std::string>("@module_type"));
137 
138  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
140  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
141  filler->fill(descriptions);
142 
143  try {
144  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
145  } catch (cms::Exception& iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
164  moduleIndex);
165 
166  InputSourceDescription isdesc(md,
167  preg,
168  branchIDListHelper,
169  processBlockHelper,
170  thinnedAssociationsHelper,
171  areg,
172  common.maxEventsInput_,
173  common.maxLumisInput_,
174  common.maxSecondsUntilRampdown_,
175  allocations);
176 
177  areg->preSourceConstructionSignal_(md);
178  std::unique_ptr<InputSource> input;
179  try {
180  //even if we have an exception, send the signal
181  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
182  convertException::wrap([&]() {
183  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
184  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
185  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
186  });
187  } catch (cms::Exception& iException) {
188  std::ostringstream ost;
189  ost << "Constructing input source of type " << modtype;
190  iException.addContext(ost.str());
191  throw;
192  }
193  return input;
194  }
195 
196  // ---------------------------------------------------------------
197  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
200  std::vector<std::string> const& loopers) {
201  std::shared_ptr<EDLooperBase> vLooper;
202 
203  assert(1 == loopers.size());
204 
205  for (auto const& looperName : loopers) {
206  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
207  // Unlikely we would ever need the ModuleTypeResolver in Looper
208  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
209  }
210  return vLooper;
211  }
212 
213  // ---------------------------------------------------------------
214  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
215  ServiceToken const& iToken,
217  std::vector<std::string> const& defaultServices,
218  std::vector<std::string> const& forcedServices)
219  : actReg_(),
220  preg_(),
221  branchIDListHelper_(),
222  serviceToken_(),
223  input_(),
224  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
252 
253  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
254  std::vector<std::string> const& defaultServices,
255  std::vector<std::string> const& forcedServices)
256  : actReg_(),
257  preg_(),
258  branchIDListHelper_(),
259  serviceToken_(),
260  input_(),
261  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*parameterSet)),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
265  processConfiguration_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
271  deferredExceptionPtrIsSet_(false),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
278  exceptionMessageFiles_(),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
283  forceESCacheClearOnNewRun_(false),
284  eventSetupDataToExcludeFromPrefetching_() {
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
289 
290  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
291  ServiceToken const& token,
293  : actReg_(),
294  preg_(),
295  branchIDListHelper_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
302  processConfiguration_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
308  deferredExceptionPtrIsSet_(false),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
315  exceptionMessageFiles_(),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
320  forceESCacheClearOnNewRun_(false),
321  eventSetupDataToExcludeFromPrefetching_() {
322  init(processDesc, token, legacy);
323  }
324 
325  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
326  ServiceToken const& iToken,
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
606 
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
624 
628  task.waitNoThrow();
629  assert(task.done());
630  }
631 
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
700  }
701  if (preallocations_.numberOfRuns() > 1) {
703  }
704 
705  //NOTE: This implementation assumes 'Job' means one call
706  // the EventProcessor::run
707  // If it really means once per 'application' then this code will
708  // have to be changed.
709  // Also have to deal with case where have 'run' then new Module
710  // added and do 'run'
711  // again. In that case the newly added Module needs its 'beginJob'
712  // to be called.
713 
714  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
715  // For now we delay calling beginOfJob until first beginOfRun
716  //if(looper_) {
717  // looper_->beginOfJob(es);
718  //}
719  espController_->finishConfiguration();
720  actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
721  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728 
729  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToResolverIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
762 
764  // Collects exceptions, so we don't throw before all operations are performed.
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
829 
831 
832  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
833  return schedule_->getAllModuleDescriptions();
834  }
835 
836  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
837 
838  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
839 
840  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
841 
842  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
843 
844  namespace {
845 #include "TransitionProcessors.icc"
846  }
847 
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
854  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
856  jr->reportShutdownSignal();
858  }
859  return returnValue;
860  }
861 
862  namespace {
863  struct SourceNextGuard {
864  SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
865  ~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
866  edm::ActivityRegistry& act_;
867  };
868  } // namespace
869 
872  InputSource::ItemTypeInfo itemTypeInfo;
873  {
874  SourceNextGuard guard(*actReg_.get());
875  //For now, do nothing with InputSource::IsSynchronize
876  do {
877  itemTypeInfo = input_->nextItemType();
878  } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
879  }
880  lastSourceTransition_ = itemTypeInfo;
881  sentry.completedSuccessfully();
882 
884 
886  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
888  }
889 
890  return lastSourceTransition_;
891  }
892 
893  void EventProcessor::nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
894  WaitingTaskHolder nextTask) {
895  auto group = nextTask.group();
897  *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
898  CMS_SA_ALLOW try {
900  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
903  runStatus->runPrincipal()->run() == input_->run() &&
904  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
906  << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
907  << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
908  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
909  }
910  } catch (...) {
911  nextHolder.doneWaiting(std::current_exception());
912  }
913  });
914  }
915 
917  beginJob(); //make sure this was called
918 
919  // make the services available
921  actReg_->beginProcessingSignal_();
922  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
923  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
924  try {
925  FilesProcessor fp(fileModeNoMerge_);
926 
927  convertException::wrap([&]() {
928  bool firstTime = true;
929  do {
930  if (not firstTime) {
932  rewindInput();
933  } else {
934  firstTime = false;
935  }
936  startingNewLoop();
937 
938  auto trans = fp.processFiles(*this);
939 
940  fp.normalEnd();
941 
942  if (deferredExceptionPtrIsSet_.load()) {
943  std::rethrow_exception(deferredExceptionPtr_);
944  }
945  if (trans != InputSource::ItemType::IsStop) {
946  //problem with the source
947  doErrorStuff();
948 
949  throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
950  }
951  } while (not endOfLoop());
952  }); // convertException::wrap
953 
954  } // Try block
955  catch (cms::Exception& e) {
957  std::string message(
958  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
959  e.addAdditionalInfo(message);
960  if (e.alreadyPrinted()) {
961  LogAbsolute("Additional Exceptions") << message;
962  }
963  }
964  if (exceptionMessageRuns_) {
965  std::string message(
966  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
967  e.addAdditionalInfo(message);
968  if (e.alreadyPrinted()) {
969  LogAbsolute("Additional Exceptions") << message;
970  }
971  }
972  if (!exceptionMessageFiles_.empty()) {
973  e.addAdditionalInfo(exceptionMessageFiles_);
974  if (e.alreadyPrinted()) {
975  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
976  }
977  }
978  throw;
979  }
980  return epSuccess;
981  }
982 
984  FDEBUG(1) << " \treadFile\n";
985  size_t size = preg_->size();
987 
988  if (streamRunActive_ > 0) {
989  streamRunStatus_[0]->runPrincipal()->preReadFile();
990  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
991  }
992 
993  if (streamLumiActive_ > 0) {
994  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
995  }
996 
997  fb_ = input_->readFile();
998  if (size < preg_->size()) {
1000  }
1003  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1004  }
1005  sentry.completedSuccessfully();
1006  }
1007 
1008  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
1009  if (fileBlockValid()) {
1011  input_->closeFile(fb_.get(), cleaningUpAfterException);
1012  sentry.completedSuccessfully();
1013  }
1014  FDEBUG(1) << "\tcloseInputFile\n";
1015  }
1016 
1018  if (fileBlockValid()) {
1019  schedule_->openOutputFiles(*fb_);
1020  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1021  }
1022  FDEBUG(1) << "\topenOutputFiles\n";
1023  }
1024 
1026  schedule_->closeOutputFiles();
1027  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1028  processBlockHelper_->clearAfterOutputFilesClose();
1029  FDEBUG(1) << "\tcloseOutputFiles\n";
1030  }
1031 
1033  if (fileBlockValid()) {
1035  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1036  schedule_->respondToOpenInputFile(*fb_);
1037  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1038  }
1039  FDEBUG(1) << "\trespondToOpenInputFile\n";
1040  }
1041 
1043  if (fileBlockValid()) {
1044  schedule_->respondToCloseInputFile(*fb_);
1045  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1046  }
1047  FDEBUG(1) << "\trespondToCloseInputFile\n";
1048  }
1049 
1051  shouldWeStop_ = false;
1052  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1053  // until after we've called beginOfJob
1054  if (looper_ && looperBeginJobRun_) {
1055  looper_->doStartingNewLoop();
1056  }
1057  FDEBUG(1) << "\tstartingNewLoop\n";
1058  }
1059 
1061  if (looper_) {
1062  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1063  looper_->setModuleChanger(&changer);
1064  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1065  looper_->setModuleChanger(nullptr);
1067  return true;
1068  else
1069  return false;
1070  }
1071  FDEBUG(1) << "\tendOfLoop\n";
1072  return true;
1073  }
1074 
1076  input_->repeat();
1077  input_->rewind();
1078  FDEBUG(1) << "\trewind\n";
1079  }
1080 
1082  looper_->prepareForNextLoop(esp_.get());
1083  FDEBUG(1) << "\tprepareForNextLoop\n";
1084  }
1085 
1087  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1088  if (!subProcesses_.empty()) {
1089  for (auto const& subProcess : subProcesses_) {
1090  if (subProcess.shouldWeCloseOutput()) {
1091  return true;
1092  }
1093  }
1094  return false;
1095  }
1096  return schedule_->shouldWeCloseOutput();
1097  }
1098 
1100  FDEBUG(1) << "\tdoErrorStuff\n";
1101  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1102  << "and went to the error state\n"
1103  << "Will attempt to terminate processing normally\n"
1104  << "(IF using the looper the next loop will be attempted)\n"
1105  << "This likely indicates a bug in an input module or corrupted input or both\n";
1106  }
1107 
1108  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1109  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1110  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1111 
1113  FinalWaitingTask globalWaitTask{taskGroup_};
1114 
1115  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1116  beginGlobalTransitionAsync<Traits>(
1117  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1118 
1119  globalWaitTask.wait();
1120  beginProcessBlockSucceeded = true;
1121  }
1122 
1124  input_->fillProcessBlockHelper();
1126  while (input_->nextProcessBlock(processBlockPrincipal)) {
1127  readProcessBlock(processBlockPrincipal);
1128 
1130  FinalWaitingTask globalWaitTask{taskGroup_};
1131 
1132  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1133  beginGlobalTransitionAsync<Traits>(
1134  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1135 
1136  globalWaitTask.wait();
1137 
1138  FinalWaitingTask writeWaitTask{taskGroup_};
1140  writeWaitTask.wait();
1141 
1142  processBlockPrincipal.clearPrincipal();
1143  for (auto& s : subProcesses_) {
1144  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1145  }
1146  }
1147  }
1148 
1149  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1150  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1151 
1153  FinalWaitingTask globalWaitTask{taskGroup_};
1154 
1155  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1156  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1157  *schedule_,
1158  transitionInfo,
1159  serviceToken_,
1160  subProcesses_,
1161  cleaningUpAfterException);
1162  globalWaitTask.wait();
1163 
1164  if (beginProcessBlockSucceeded) {
1165  FinalWaitingTask writeWaitTask{taskGroup_};
1167  writeWaitTask.wait();
1168  }
1169 
1170  processBlockPrincipal.clearPrincipal();
1171  for (auto& s : subProcesses_) {
1172  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1173  }
1174  }
1175 
1177  FinalWaitingTask waitTask{taskGroup_};
1179  if (streamRunActive_ == 0) {
1180  assert(streamLumiActive_ == 0);
1181 
1182  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1183  WaitingTaskHolder{taskGroup_, &waitTask});
1184  } else {
1186 
1187  auto runStatus = streamRunStatus_[0];
1188 
1190  runStatus->runPrincipal()->run() == input_->run() and
1191  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1192  readAndMergeRun(*runStatus);
1194  }
1195 
1196  setNeedToCallNext(false);
1197 
1198  WaitingTaskHolder holder{taskGroup_, &waitTask};
1199  runStatus->setHolderOfTaskInProcessRuns(holder);
1200  if (streamLumiActive_ > 0) {
1202  continueLumiAsync(std::move(holder));
1203  } else {
1205  }
1206  }
1207  waitTask.wait();
1208  return lastTransitionType();
1209  }
1210 
1212  if (iHolder.taskHasFailed()) {
1213  return;
1214  }
1215 
1216  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1217 
1218  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1219 
1220  chain::first([this, &status, iSync](auto nextTask) {
1221  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1222  nextTask,
1223  status->endIOVWaitingTasks(),
1224  status->eventSetupImpls(),
1226  actReg_.get(),
1227  serviceToken_,
1229  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1230  CMS_SA_ALLOW try {
1231  if (iException) {
1232  WaitingTaskHolder copyHolder(nextTask);
1233  copyHolder.doneWaiting(*iException);
1234  // Finish handling the exception in the task pushed to runQueue_
1235  }
1237 
1238  runQueue_->pushAndPause(
1239  *nextTask.group(),
1240  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1241  CMS_SA_ALLOW try {
1242  if (postRunQueueTask.taskHasFailed()) {
1243  status->resetBeginResources();
1245  return;
1246  }
1247 
1248  status->setResumer(std::move(iResumer));
1249 
1251  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1252  CMS_SA_ALLOW try {
1254 
1255  if (postSourceTask.taskHasFailed()) {
1256  status->resetBeginResources();
1258  status->resumeGlobalRunQueue();
1259  return;
1260  }
1261 
1262  status->setRunPrincipal(readRun());
1263 
1264  RunPrincipal& runPrincipal = *status->runPrincipal();
1265  {
1267  input_->doBeginRun(runPrincipal, &processContext_);
1268  sentry.completedSuccessfully();
1269  }
1270 
1271  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1272  if (looper_ && looperBeginJobRun_ == false) {
1273  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1274 
1275  oneapi::tbb::task_group group;
1276  FinalWaitingTask waitTask{group};
1277  using namespace edm::waiting_task::chain;
1278  chain::first([this, &es](auto nextTask) {
1279  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1280  }) | then([this, &es](auto nextTask) mutable {
1281  looper_->beginOfJob(es);
1282  looperBeginJobRun_ = true;
1283  looper_->doStartingNewLoop();
1284  }) | runLast(WaitingTaskHolder(group, &waitTask));
1285  waitTask.wait();
1286  }
1287 
1288  using namespace edm::waiting_task::chain;
1289  chain::first([this, status](auto nextTask) mutable {
1290  CMS_SA_ALLOW try {
1293  } else {
1294  setNeedToCallNext(true);
1295  }
1296  } catch (...) {
1297  status->setStopBeforeProcessingRun(true);
1298  nextTask.doneWaiting(std::current_exception());
1299  }
1300  }) | then([this, status, &es](auto nextTask) {
1301  if (status->stopBeforeProcessingRun()) {
1302  return;
1303  }
1304  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1306  beginGlobalTransitionAsync<Traits>(
1307  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1308  }) | then([status](auto nextTask) mutable {
1309  if (status->stopBeforeProcessingRun()) {
1310  return;
1311  }
1312  status->globalBeginDidSucceed();
1313  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1314  if (status->stopBeforeProcessingRun()) {
1315  return;
1316  }
1317  looper_->prefetchAsync(
1318  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1319  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1320  if (status->stopBeforeProcessingRun()) {
1321  return;
1322  }
1323  ServiceRegistry::Operate operateLooper(serviceToken_);
1324  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1325  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1326  bool precedingTasksSucceeded = true;
1327  if (iException) {
1328  precedingTasksSucceeded = false;
1329  WaitingTaskHolder copyHolder(holder);
1330  copyHolder.doneWaiting(*iException);
1331  }
1332 
1333  if (status->stopBeforeProcessingRun()) {
1334  // We just quit now if there was a failure when merging runs
1335  status->resetBeginResources();
1337  status->resumeGlobalRunQueue();
1338  return;
1339  }
1340  CMS_SA_ALLOW try {
1341  // Under normal circumstances, this task runs after endRun has completed for all streams
1342  // and global endLumi has completed for all lumis contained in this run
1343  auto globalEndRunTask =
1344  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1345  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1346  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1348  });
1349  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1350  } catch (...) {
1351  status->resetBeginResources();
1353  status->resumeGlobalRunQueue();
1354  holder.doneWaiting(std::current_exception());
1355  return;
1356  }
1357 
1358  // After this point we are committed to end the run via endRunAsync
1359 
1361 
1362  // The only purpose of the pause is to cause stream begin run to execute before
1363  // global begin lumi in the single threaded case (maintains consistency with
1364  // the order that existed before concurrent runs were implemented).
1365  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1366 
1367  CMS_SA_ALLOW try {
1369  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1370  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1371  CMS_SA_ALLOW try {
1372  streamQueues_[i].push(
1373  *holder.group(),
1374  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1376  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1377  });
1378  } catch (...) {
1379  if (status->streamFinishedBeginRun()) {
1380  WaitingTaskHolder copyHolder(holder);
1381  copyHolder.doneWaiting(std::current_exception());
1382  status->resetBeginResources();
1385  }
1386  }
1387  }
1388  });
1389  } catch (...) {
1390  WaitingTaskHolder copyHolder(holder);
1391  copyHolder.doneWaiting(std::current_exception());
1392  status->resetBeginResources();
1395  }
1397  }) | runLast(postSourceTask);
1398  } catch (...) {
1399  status->resetBeginResources();
1401  status->resumeGlobalRunQueue();
1402  postSourceTask.doneWaiting(std::current_exception());
1403  }
1404  }); // task in sourceResourcesAcquirer
1405  } catch (...) {
1406  status->resetBeginResources();
1408  status->resumeGlobalRunQueue();
1409  postRunQueueTask.doneWaiting(std::current_exception());
1410  }
1411  }); // task in runQueue
1412  } catch (...) {
1413  status->resetBeginResources();
1415  nextTask.doneWaiting(std::current_exception());
1416  }
1417  }) | chain::runLast(std::move(iHolder));
1418  }
1419 
1420  void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1421  std::shared_ptr<RunProcessingStatus> status,
1422  bool precedingTasksSucceeded,
1423  WaitingTaskHolder iHolder) {
1424  // These shouldn't throw
1425  streamQueues_[iStream].pause();
1426  ++streamRunActive_;
1427  streamRunStatus_[iStream] = std::move(status);
1428 
1429  CMS_SA_ALLOW try {
1430  using namespace edm::waiting_task::chain;
1431  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1432  if (precedingTasksSucceeded) {
1433  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1434  RunTransitionInfo transitionInfo(
1435  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1437  beginStreamTransitionAsync<Traits>(
1438  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1439  }
1440  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1441  if (exceptionFromBeginStreamRun) {
1442  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1443  }
1444  releaseBeginRunResources(iStream);
1445  }) | runLast(iHolder);
1446  } catch (...) {
1447  releaseBeginRunResources(iStream);
1448  iHolder.doneWaiting(std::current_exception());
1449  }
1450  }
1451 
1452  void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1453  auto& status = streamRunStatus_[iStream];
1454  if (status->streamFinishedBeginRun()) {
1455  status->resetBeginResources();
1457  }
1458  streamQueues_[iStream].resume();
1459  }
1460 
1461  void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1462  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1463  iRunStatus->setEndTime();
1464  IOVSyncValue ts(
1466  runPrincipal.endTime());
1467  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1468  WaitingTaskHolder copyHolder(iHolder);
1469  copyHolder.doneWaiting(std::current_exception());
1470  }
1471 
1472  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1473  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1474  nextTask,
1475  iRunStatus->endIOVWaitingTasksEndRun(),
1476  iRunStatus->eventSetupImplsEndRun(),
1478  actReg_.get(),
1479  serviceToken_);
1480  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1481  if (iException) {
1482  iRunStatus->setEndingEventSetupSucceeded(false);
1483  handleEndRunExceptions(*iException, nextTask);
1484  }
1486  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1487  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1488  CMS_SA_ALLOW try {
1489  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1490  streamQueues_[i].pause();
1491  streamEndRunAsync(std::move(nextTask), i);
1492  });
1493  } catch (...) {
1494  WaitingTaskHolder copyHolder(nextTask);
1495  copyHolder.doneWaiting(std::current_exception());
1496  }
1497  }
1498  });
1499 
1501  CMS_SA_ALLOW try {
1502  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1503  } catch (...) {
1504  WaitingTaskHolder copyHolder(nextTask);
1505  copyHolder.doneWaiting(std::current_exception());
1506  }
1507  }
1508  }) | chain::runLast(std::move(iHolder));
1509  }
1510 
1511  void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1512  if (holder.taskHasFailed()) {
1514  } else {
1515  WaitingTaskHolder tmp(holder);
1516  tmp.doneWaiting(iException);
1517  }
1518  }
1519 
1520  void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1521  auto& runPrincipal = *(iRunStatus->runPrincipal());
1522  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1523  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1524  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1525  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1526  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1527 
1528  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1529  using namespace edm::waiting_task::chain;
1530  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1531  auto nextTask) {
1532  if (endingEventSetupSucceeded) {
1533  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1535  endGlobalTransitionAsync<Traits>(
1536  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1537  }
1538  }) |
1539  ifThen(looper_ && endingEventSetupSucceeded,
1540  [this, &runPrincipal, &es](auto nextTask) {
1541  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1542  }) |
1543  ifThen(looper_ && endingEventSetupSucceeded,
1544  [this, &runPrincipal, &es](auto nextTask) {
1546  looper_->doEndRun(runPrincipal, es, &processContext_);
1547  }) |
1548  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1549  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1550  mergeableRunProductMetadata->preWriteRun();
1551  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1552  }) |
1553  then([status = std::move(iRunStatus),
1554  this,
1555  didGlobalBeginSucceed,
1556  mergeableRunProductMetadata,
1557  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1558  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1559  mergeableRunProductMetadata->postWriteRun();
1560  }
1561  if (iException) {
1562  handleEndRunExceptions(*iException, nextTask);
1563  }
1565 
1566  std::exception_ptr ptr;
1567 
1568  // Try hard to clean up resources so the
1569  // process can terminate in a controlled
1570  // fashion even after exceptions have occurred.
1571  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1572  if (not ptr) {
1573  ptr = std::current_exception();
1574  }
1575  }
1576  CMS_SA_ALLOW try {
1577  status->resumeGlobalRunQueue();
1579  } catch (...) {
1580  if (not ptr) {
1581  ptr = std::current_exception();
1582  }
1583  }
1584  CMS_SA_ALLOW try {
1585  status->resetEndResources();
1586  status.reset();
1587  } catch (...) {
1588  if (not ptr) {
1589  ptr = std::current_exception();
1590  }
1591  }
1592 
1593  if (ptr && !iException) {
1594  handleEndRunExceptions(ptr, nextTask);
1595  }
1596  }) |
1597  runLast(std::move(iTask));
1598  }
1599 
1600  void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1601  CMS_SA_ALLOW try {
1602  if (!streamRunStatus_[iStreamIndex]) {
1603  if (exceptionRunStatus_->streamFinishedRun()) {
1604  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1605  exceptionRunStatus_.reset();
1606  }
1607  return;
1608  }
1609 
1610  auto runDoneTask =
1611  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1612  if (iException) {
1613  handleEndRunExceptions(*iException, iTask);
1614  }
1615 
1616  auto runStatus = streamRunStatus_[iStreamIndex];
1617 
1618  //reset status before releasing queue else get race condition
1619  if (runStatus->streamFinishedRun()) {
1620  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1621  }
1622  streamRunStatus_[iStreamIndex].reset();
1623  --streamRunActive_;
1624  streamQueues_[iStreamIndex].resume();
1625  });
1626 
1627  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1628 
1629  auto runStatus = streamRunStatus_[iStreamIndex].get();
1630 
1631  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1632  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1633  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1634  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1635 
1636  auto& runPrincipal = *runStatus->runPrincipal();
1638  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1639  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1640  *schedule_,
1641  iStreamIndex,
1642  transitionInfo,
1643  serviceToken_,
1644  subProcesses_,
1645  cleaningUpAfterException);
1646  }
1647  } catch (...) {
1648  handleEndRunExceptions(std::current_exception(), iTask);
1649  }
1650  }
1651 
1652  void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1653  if (streamRunActive_ > 0) {
1654  FinalWaitingTask waitTask{taskGroup_};
1655 
1656  auto runStatus = streamRunStatus_[0].get();
1657  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1658  WaitingTaskHolder holder{taskGroup_, &waitTask};
1659  runStatus->setHolderOfTaskInProcessRuns(holder);
1661  endRunAsync(streamRunStatus_[0], std::move(holder));
1662  waitTask.wait();
1663  }
1664  }
1665 
1667  std::shared_ptr<RunProcessingStatus> iRunStatus,
1668  edm::WaitingTaskHolder iHolder) {
1669  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1670 
1671  auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1672  chain::first([this, &iSync, &status](auto nextTask) {
1673  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1674  nextTask,
1675  status->endIOVWaitingTasks(),
1676  status->eventSetupImpls(),
1678  actReg_.get(),
1679  serviceToken_);
1680  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1681  CMS_SA_ALLOW try {
1682  //the call to doneWaiting will cause the count to decrement
1683  if (iException) {
1684  WaitingTaskHolder copyHolder(nextTask);
1685  copyHolder.doneWaiting(*iException);
1686  }
1687 
1688  lumiQueue_->pushAndPause(
1689  *nextTask.group(),
1690  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1691  CMS_SA_ALLOW try {
1692  if (postLumiQueueTask.taskHasFailed()) {
1693  status->resetResources();
1695  endRunAsync(iRunStatus, postLumiQueueTask);
1696  return;
1697  }
1698 
1699  status->setResumer(std::move(iResumer));
1700 
1702  *postLumiQueueTask.group(),
1703  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1704  CMS_SA_ALLOW try {
1706 
1707  if (postSourceTask.taskHasFailed()) {
1708  status->resetResources();
1710  endRunAsync(iRunStatus, postSourceTask);
1711  return;
1712  }
1713 
1714  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1715 
1716  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1717  {
1719  input_->doBeginLumi(lumiPrincipal, &processContext_);
1720  sentry.completedSuccessfully();
1721  }
1722 
1724  if (rng.isAvailable()) {
1725  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1726  rng->preBeginLumi(lb);
1727  }
1728 
1729  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1730 
1731  using namespace edm::waiting_task::chain;
1732  chain::first([this, status](auto nextTask) mutable {
1735  } else {
1736  setNeedToCallNext(true);
1737  }
1738  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1739  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1741  beginGlobalTransitionAsync<Traits>(
1742  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1743  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1744  looper_->prefetchAsync(
1745  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1746  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1747  status->globalBeginDidSucceed();
1748  ServiceRegistry::Operate operateLooper(serviceToken_);
1749  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1750  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1751  if (iException) {
1752  status->resetResources();
1754  WaitingTaskHolder copyHolder(holder);
1755  copyHolder.doneWaiting(*iException);
1756  endRunAsync(iRunStatus, holder);
1757  } else {
1758  if (not looper_) {
1759  status->globalBeginDidSucceed();
1760  }
1761 
1762  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1763 
1764  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1766 
1767  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1768  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1769  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1770  if (!status->shouldStreamStartLumi()) {
1771  return;
1772  }
1773  streamQueues_[i].pause();
1774 
1775  auto& event = principalCache_.eventPrincipal(i);
1776  auto eventSetupImpls = &status->eventSetupImpls();
1777  auto lp = status->lumiPrincipal().get();
1780  event.setLuminosityBlockPrincipal(lp);
1781  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1782  using namespace edm::waiting_task::chain;
1783  chain::first([this, i, &transitionInfo](auto nextTask) {
1784  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1785  *schedule_,
1786  i,
1787  transitionInfo,
1788  serviceToken_,
1789  subProcesses_);
1790  }) |
1791  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1792  auto nextTask) {
1793  if (exceptionFromBeginStreamLumi) {
1794  WaitingTaskHolder copyHolder(nextTask);
1795  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1796  }
1798  }) |
1799  runLast(std::move(holder));
1800  });
1801  } // end for loop over streams
1802  });
1803  }
1804  }) | runLast(postSourceTask);
1805  } catch (...) {
1806  status->resetResources();
1808  WaitingTaskHolder copyHolder(postSourceTask);
1809  copyHolder.doneWaiting(std::current_exception());
1810  endRunAsync(iRunStatus, postSourceTask);
1811  }
1812  }); // task in sourceResourcesAcquirer
1813  } catch (...) {
1814  status->resetResources();
1816  WaitingTaskHolder copyHolder(postLumiQueueTask);
1817  copyHolder.doneWaiting(std::current_exception());
1818  endRunAsync(iRunStatus, postLumiQueueTask);
1819  }
1820  }); // task in lumiQueue
1821  } catch (...) {
1822  status->resetResources();
1824  WaitingTaskHolder copyHolder(nextTask);
1825  copyHolder.doneWaiting(std::current_exception());
1826  endRunAsync(iRunStatus, nextTask);
1827  }
1828  }) | chain::runLast(std::move(iHolder));
1829  }
1830 
1832  chain::first([this](auto nextTask) {
1833  //all streams are sharing the same status at the moment
1834  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1836 
1838  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1841  }
1842  }) | chain::then([this](auto nextTask) mutable {
1843  unsigned int streamIndex = 0;
1844  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1845  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1846  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1847  }
1848  nextTask.group()->run(
1849  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1850  }) | chain::runLast(std::move(iHolder));
1851  }
1852 
1853  void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1854  if (holder.taskHasFailed()) {
1856  } else {
1857  WaitingTaskHolder tmp(holder);
1858  tmp.doneWaiting(iException);
1859  }
1860  }
1861 
1863  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1864  // Get some needed info out of the status object before moving
1865  // it into finalTaskForThisLumi.
1866  auto& lp = *(iLumiStatus->lumiPrincipal());
1867  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1868  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1869  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1870  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1871 
1872  using namespace edm::waiting_task::chain;
1873  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1874  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1875 
1876  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1878  endGlobalTransitionAsync<Traits>(
1879  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1880  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1881  //Only call writeLumi if beginLumi succeeded
1882  if (didGlobalBeginSucceed) {
1883  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1884  }
1885  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1886  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1887  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1888  //any thrown exception auto propagates to nextTask via the chain
1890  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1891  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1892  if (iException) {
1893  handleEndLumiExceptions(*iException, nextTask);
1894  }
1896 
1897  std::exception_ptr ptr;
1898 
1899  // Try hard to clean up resources so the
1900  // process can terminate in a controlled
1901  // fashion even after exceptions have occurred.
1902  // Caught exception is passed to handleEndLumiExceptions()
1903  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1904  if (not ptr) {
1905  ptr = std::current_exception();
1906  }
1907  }
1908  // Caught exception is passed to handleEndLumiExceptions()
1909  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1910  if (not ptr) {
1911  ptr = std::current_exception();
1912  }
1913  }
1914  // Caught exception is passed to handleEndLumiExceptions()
1915  CMS_SA_ALLOW try {
1916  status->resetResources();
1917  status->globalEndRunHolderDoneWaiting();
1918  status.reset();
1919  } catch (...) {
1920  if (not ptr) {
1921  ptr = std::current_exception();
1922  }
1923  }
1924 
1925  if (ptr && !iException) {
1926  handleEndLumiExceptions(ptr, nextTask);
1927  }
1928  }) | runLast(std::move(iTask));
1929  }
1930 
1931  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1932  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1933  auto status = streamLumiStatus_[iStreamIndex];
1934  if (iException) {
1935  handleEndLumiExceptions(*iException, iTask);
1936  }
1937 
1938  // reset status before releasing queue else get race condition
1939  streamLumiStatus_[iStreamIndex].reset();
1941  streamQueues_[iStreamIndex].resume();
1942 
1943  //are we the last one?
1944  if (status->streamFinishedLumi()) {
1946  }
1947  });
1948 
1949  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1950 
1951  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1952  // therefore we do not want to hold the shared_ptr
1953  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1954  lumiStatus->setEndTime();
1955 
1956  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1957  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1958  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1959 
1960  if (lumiStatus->didGlobalBeginSucceed()) {
1961  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1963  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1964  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1965  *schedule_,
1966  iStreamIndex,
1967  transitionInfo,
1968  serviceToken_,
1969  subProcesses_,
1970  cleaningUpAfterException);
1971  }
1972  }
1973 
1974  void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1975  if (streamRunActive_ == 0) {
1976  assert(streamLumiActive_ == 0);
1977  } else {
1979  if (streamLumiActive_ > 0) {
1980  FinalWaitingTask globalWaitTask{taskGroup_};
1982  streamLumiStatus_[0]->noMoreEventsInLumi();
1983  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1984  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1985  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1986  }
1987  globalWaitTask.wait();
1988  }
1989  }
1990  }
1991 
1994  input_->readProcessBlock(processBlockPrincipal);
1995  sentry.completedSuccessfully();
1996  }
1997 
1998  std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
2000  assert(rp);
2001  rp->setAux(*input_->runAuxiliary());
2002  {
2004  input_->readRun(*rp, *historyAppender_);
2005  sentry.completedSuccessfully();
2006  }
2007  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2008  return rp;
2009  }
2010 
2012  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
2013 
2014  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
2015  assert(runOK);
2016  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2017  {
2019  input_->readAndMergeRun(runPrincipal);
2020  sentry.completedSuccessfully();
2021  }
2022  }
2023 
2024  std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
2026  assert(lbp);
2027  lbp->setAux(*input_->luminosityBlockAuxiliary());
2028  {
2030  input_->readLuminosityBlock(*lbp, *historyAppender_);
2031  sentry.completedSuccessfully();
2032  }
2033  lbp->setRunPrincipal(std::move(rp));
2034  return lbp;
2035  }
2036 
2038  auto& lumiPrincipal = *iStatus.lumiPrincipal();
2039  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2040  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2041  input_->processHistoryRegistry().reducedProcessHistoryID(
2042  input_->luminosityBlockAuxiliary()->processHistoryID()));
2043  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2044  assert(lumiOK);
2045  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2046  {
2048  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2049  sentry.completedSuccessfully();
2050  }
2051  }
2052 
2054  using namespace edm::waiting_task;
2055  chain::first([&](auto nextTask) {
2057  schedule_->writeProcessBlockAsync(
2058  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2059  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2061  for (auto& s : subProcesses_) {
2062  s.writeProcessBlockAsync(nextTask, processBlockType);
2063  }
2064  }) | chain::runLast(std::move(task));
2065  }
2066 
2068  RunPrincipal const& runPrincipal,
2069  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2070  using namespace edm::waiting_task;
2071  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2072  chain::first([&](auto nextTask) {
2074  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2075  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2077  for (auto& s : subProcesses_) {
2078  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2079  }
2080  }) | chain::runLast(std::move(task));
2081  }
2082  }
2083 
2085  for (auto& s : subProcesses_) {
2086  s.clearRunPrincipal(*iStatus.runPrincipal());
2087  }
2088  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2089  iStatus.runPrincipal()->clearPrincipal();
2090  }
2091 
2093  using namespace edm::waiting_task;
2094  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2095  chain::first([&](auto nextTask) {
2097 
2098  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2099  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2100  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2102  for (auto& s : subProcesses_) {
2103  s.writeLumiAsync(nextTask, lumiPrincipal);
2104  }
2106  }
2107  }
2108 
2110  for (auto& s : subProcesses_) {
2111  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2112  }
2113  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2114  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2115  iStatus.lumiPrincipal()->clearPrincipal();
2116  }
2117 
2118  void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2119  WaitingTaskHolder iHolder) {
2120  auto group = iHolder.group();
2122  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2123  CMS_SA_ALLOW try {
2125 
2126  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2127 
2129  setNeedToCallNext(false);
2130 
2132  status->runPrincipal()->run() == input_->run() and
2133  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2134  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2135  status->setStopBeforeProcessingRun(true);
2136  return;
2137  }
2140  setNeedToCallNext(true);
2141  return;
2142  }
2144  }
2145  } catch (...) {
2146  status->setStopBeforeProcessingRun(true);
2147  holder.doneWaiting(std::current_exception());
2148  }
2149  });
2150  }
2151 
2152  void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2153  WaitingTaskHolder iHolder) {
2154  auto group = iHolder.group();
2156  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2157  CMS_SA_ALLOW try {
2159 
2160  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2161 
2163  setNeedToCallNext(false);
2164 
2166  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2167  readAndMergeLumi(*iLumiStatus);
2169  setNeedToCallNext(true);
2170  return;
2171  }
2173  }
2174  } catch (...) {
2175  holder.doneWaiting(std::current_exception());
2176  }
2177  });
2178  }
2179 
2180  void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2181  WaitingTaskHolder iHolder) {
2182  chain::first([this, iRunStatus](auto nextTask) mutable {
2183  if (needToCallNext()) {
2184  nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2185  }
2186  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2188  if (iException) {
2189  WaitingTaskHolder copyHolder(nextTask);
2190  copyHolder.doneWaiting(*iException);
2191  }
2193  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2194  return;
2195  }
2196  if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2197  CMS_SA_ALLOW try {
2198  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2199  input_->luminosityBlockAuxiliary()->beginTime()),
2200  iRunStatus,
2201  nextTask);
2202  return;
2203  } catch (...) {
2204  WaitingTaskHolder copyHolder(nextTask);
2205  copyHolder.doneWaiting(std::current_exception());
2206  }
2207  }
2208  // Note that endRunAsync will call beginRunAsync for the following run
2209  // if appropriate.
2210  endRunAsync(iRunStatus, std::move(nextTask));
2211  }) | chain::runLast(std::move(iHolder));
2212  }
2213 
2215  unsigned int iStreamIndex,
2217  // This function returns true if it successfully reads an event for the stream and that
2218  // requires both that an event is next and there are no problems or requests to stop.
2219 
2220  if (iTask.taskHasFailed()) {
2221  // We want all streams to stop or all streams to pause. If we are already in the
2222  // middle of pausing streams, then finish pausing all of them and the lumi will be
2223  // ended later. Otherwise, just end it now.
2226  }
2227  return false;
2228  }
2229 
2230  // Did another stream already stop or pause this lumi?
2232  return false;
2233  }
2234 
2235  // Are output modules or the looper requesting we stop?
2236  if (shouldWeStop()) {
2239  return false;
2240  }
2241 
2243 
2244  // need to use lock in addition to the serial task queue because
2245  // of delayed provenance reading and reading data in response to
2246  // edm::Refs etc
2247  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2248 
2249  // If we didn't already call nextTransitionType while merging lumis, call it here.
2250  // This asks the input source what is next and also checks for signals.
2251 
2253  setNeedToCallNext(true);
2254 
2255  if (InputSource::ItemType::IsEvent != itemType) {
2256  // IsFile may continue processing the lumi and
2257  // looper_ can cause the input source to declare a new IsRun which is actually
2258  // just a continuation of the previous run
2260  (InputSource::ItemType::IsRun == itemType and
2261  (iStatus.lumiPrincipal()->run() != input_->run() or
2262  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2263  if (itemType == InputSource::ItemType::IsLumi &&
2264  iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2266  << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2267  << "but the next lumi entry has the same lumi number.\n"
2268  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2269  }
2271  } else {
2273  }
2274  return false;
2275  }
2276  readEvent(iStreamIndex);
2277  return true;
2278  }
2279 
2280  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2281  if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2282  streamEndLumiAsync(iTask, iStreamIndex);
2283  return;
2284  }
2285  auto group = iTask.group();
2286  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2287  CMS_SA_ALLOW try {
2288  auto status = streamLumiStatus_[iStreamIndex].get();
2290 
2291  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2292  auto recursionTask =
2293  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2294  if (iEventException) {
2295  WaitingTaskHolder copyHolder(iTask);
2296  copyHolder.doneWaiting(*iEventException);
2297  // Intentionally, we don't return here. The recursive call to
2298  // handleNextEvent takes care of immediately ending the run properly
2299  // using the same code it uses to end the run in other situations.
2300  }
2301  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2302  });
2303 
2304  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2305  } else {
2306  // the stream will stop processing this lumi now
2308  if (not status->haveStartedNextLumiOrEndedRun()) {
2309  status->noMoreEventsInLumi();
2310  status->startNextLumiOrEndRun();
2312  CMS_SA_ALLOW try {
2313  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2314  input_->luminosityBlockAuxiliary()->beginTime()),
2315  streamRunStatus_[iStreamIndex],
2316  iTask);
2317  } catch (...) {
2318  WaitingTaskHolder copyHolder(iTask);
2319  copyHolder.doneWaiting(std::current_exception());
2320  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2321  }
2322  } else {
2323  // If appropriate, this will also start the next run.
2324  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2325  }
2326  }
2327  streamEndLumiAsync(iTask, iStreamIndex);
2328  } else {
2329  assert(status->eventProcessingState() ==
2331  auto runStatus = streamRunStatus_[iStreamIndex].get();
2332 
2333  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2334  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2335  }
2336  }
2337  }
2338  } catch (...) {
2339  WaitingTaskHolder copyHolder(iTask);
2340  copyHolder.doneWaiting(std::current_exception());
2341  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2342  }
2343  });
2344  }
2345 
2346  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2347  //TODO this will have to become per stream
2348  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2349  StreamContext streamContext(event.streamID(), &processContext_);
2350 
2352  input_->readEvent(event, streamContext);
2353 
2354  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2355  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2356  sentry.completedSuccessfully();
2357 
2358  FDEBUG(1) << "\treadEvent\n";
2359  }
2360 
2361  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2362  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2363  }
2364 
2365  namespace {
2366  struct ClearEventGuard {
2367  ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
2368  : act_(iReg), context_(iContext) {
2369  iReg.preClearEventSignal_.emit(iContext);
2370  }
2371  ~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
2372  edm::ActivityRegistry& act_;
2373  edm::StreamContext const& context_;
2374  };
2375  } // namespace
2376 
2377  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2378  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2379 
2382  if (rng.isAvailable()) {
2383  Event ev(*pep, ModuleDescription(), nullptr);
2384  rng->postEventRead(ev);
2385  }
2386 
2387  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2388  using namespace edm::waiting_task::chain;
2389  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2390  EventTransitionInfo info(*pep, es);
2391  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2392  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2393  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2394  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2395  }
2396  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2397  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2398  ServiceRegistry::Operate operateLooper(serviceToken_);
2399  processEventWithLooper(*pep, iStreamIndex);
2400  }) | then([this, pep](auto nextTask) {
2401  FDEBUG(1) << "\tprocessEvent\n";
2402  StreamContext streamContext(pep->streamID(),
2404  pep->id(),
2405  pep->runPrincipal().index(),
2406  pep->luminosityBlockPrincipal().index(),
2407  pep->time(),
2408  &processContext_);
2409  ClearEventGuard guard(*this->actReg_.get(), streamContext);
2410  pep->clearEventPrincipal();
2411  }) | runLast(iHolder);
2412  }
2413 
2414  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2415  bool randomAccess = input_->randomAccess();
2416  ProcessingController::ForwardState forwardState = input_->forwardState();
2417  ProcessingController::ReverseState reverseState = input_->reverseState();
2418  ProcessingController pc(forwardState, reverseState, randomAccess);
2419 
2421  do {
2422  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2423  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2424  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2425 
2426  bool succeeded = true;
2427  if (randomAccess) {
2429  input_->skipEvents(-2);
2431  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2432  }
2433  }
2435  } while (!pc.lastOperationSucceeded());
2437  shouldWeStop_ = true;
2438  }
2439  }
2440 
2442  FDEBUG(1) << "\tshouldWeStop\n";
2443  if (shouldWeStop_)
2444  return true;
2445  if (!subProcesses_.empty()) {
2446  for (auto const& subProcess : subProcesses_) {
2447  if (subProcess.terminate()) {
2448  return true;
2449  }
2450  }
2451  return false;
2452  }
2453  return schedule_->terminate();
2454  }
2455 
2457 
2459 
2461 
2462  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2463  bool expected = false;
2464  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2465  deferredExceptionPtr_ = iException;
2466  return true;
2467  }
2468  return false;
2469  }
2470 
2472  cms::Exception ex("ModulesSynchingOnLumis");
2473  ex << "The framework is configured to use at least two streams, but the following modules\n"
2474  << "require synchronizing on LuminosityBlock boundaries:";
2475  bool found = false;
2476  for (auto worker : schedule_->allWorkers()) {
2477  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2478  found = true;
2479  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2480  }
2481  }
2482  if (found) {
2483  ex << "\n\nThe situation can be fixed by either\n"
2484  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2485  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2486  throw ex;
2487  }
2488  }
2489 
2491  std::unique_ptr<LogSystem> s;
2492  for (auto worker : schedule_->allWorkers()) {
2493  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2494  if (not s) {
2495  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2496  (*s) << "The following modules require synchronizing on Run boundaries:";
2497  }
2498  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2499  }
2500  }
2501  }
2502 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
PreClearEvent preClearEventSignal_
signal is emitted before the data products in the Event are cleared
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:386
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
Log< level::System, false > LogSystem
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void clearRunPrincipal(RunProcessingStatus &)
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
InputSource::ItemTypeInfo lastSourceTransition_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:210
std::unique_ptr< ExceptionToActionTable const > act_table_
int merge(int argc, char *argv[])
Definition: DiMuonVmerge.cc:28
static PFTauRenderPlugin instance
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
Definition: RunPrincipal.h:73
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:50
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
Definition: Signal.h:50
std::multimap< std::string, std::string > referencesToBranches_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
Definition: Registry.cc:40
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
virtual void endOfJob()
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
PreSourceNextTransition preSourceNextTransitionSignal_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
void writeLumi(LuminosityBlockNumber_t lumi)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void reportShutdownSignal()
Definition: JobReport.cc:543
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:169
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:315
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:668
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)