CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
44 
46 
54 
59 
64 
77 
78 #include "MessageForSource.h"
79 #include "MessageForParent.h"
81 #include "RunProcessingStatus.h"
82 
83 #include "boost/range/adaptor/reversed.hpp"
84 
85 #include <cassert>
86 #include <exception>
87 #include <iomanip>
88 #include <iostream>
89 #include <utility>
90 #include <sstream>
91 
92 #include <sys/ipc.h>
93 #include <sys/msg.h>
94 
95 #include "oneapi/tbb/task.h"
96 
97 //Used for CPU affinity
98 #ifndef __APPLE__
99 #include <sched.h>
100 #endif
101 
102 namespace {
103  class PauseQueueSentry {
104  public:
105  PauseQueueSentry(edm::SerialTaskQueue& queue) : queue_(queue) { queue_.pause(); }
106  ~PauseQueueSentry() { queue_.resume(); }
107 
108  private:
109  edm::SerialTaskQueue& queue_;
110  };
111 } // namespace
112 
113 namespace edm {
114 
115  namespace chain = waiting_task::chain;
116 
117  // ---------------------------------------------------------------
118  std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
120  CommonParams const& common,
121  std::shared_ptr<ProductRegistry> preg,
122  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
123  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
124  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125  std::shared_ptr<ActivityRegistry> areg,
126  std::shared_ptr<ProcessConfiguration const> processConfiguration,
127  PreallocationConfiguration const& allocations) {
128  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
129  if (main_input == nullptr) {
131  << "There must be exactly one source in the configuration.\n"
132  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
133  }
134 
135  std::string modtype(main_input->getParameter<std::string>("@module_type"));
136 
137  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
139  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
140  filler->fill(descriptions);
141 
142  try {
143  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
144  } catch (cms::Exception& iException) {
145  std::ostringstream ost;
146  ost << "Validating configuration of input source of type " << modtype;
147  iException.addContext(ost.str());
148  throw;
149  }
150 
151  main_input->registerIt();
152 
153  // Fill in "ModuleDescription", in case the input source produces
154  // any EDProducts, which would be registered in the ProductRegistry.
155  // Also fill in the process history item for this process.
156  // There is no module label for the unnamed input source, so
157  // just use "source".
158  // Only the tracked parameters belong in the process configuration.
159  ModuleDescription md(main_input->id(),
160  main_input->getParameter<std::string>("@module_type"),
161  "source",
162  processConfiguration.get(),
163  moduleIndex);
164 
165  InputSourceDescription isdesc(md,
166  preg,
167  branchIDListHelper,
168  processBlockHelper,
169  thinnedAssociationsHelper,
170  areg,
171  common.maxEventsInput_,
172  common.maxLumisInput_,
173  common.maxSecondsUntilRampdown_,
174  allocations);
175 
176  areg->preSourceConstructionSignal_(md);
177  std::unique_ptr<InputSource> input;
178  try {
179  //even if we have an exception, send the signal
180  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
181  convertException::wrap([&]() {
182  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
183  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
184  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
185  });
186  } catch (cms::Exception& iException) {
187  std::ostringstream ost;
188  ost << "Constructing input source of type " << modtype;
189  iException.addContext(ost.str());
190  throw;
191  }
192  return input;
193  }
194 
195  // ---------------------------------------------------------------
196  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
199  std::vector<std::string> const& loopers) {
200  std::shared_ptr<EDLooperBase> vLooper;
201 
202  assert(1 == loopers.size());
203 
204  for (auto const& looperName : loopers) {
205  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
206  // Unlikely we would ever need the ModuleTypeResolver in Looper
207  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet, nullptr);
208  }
209  return vLooper;
210  }
211 
212  // ---------------------------------------------------------------
213  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
214  ServiceToken const& iToken,
216  std::vector<std::string> const& defaultServices,
217  std::vector<std::string> const& forcedServices)
218  : actReg_(),
219  preg_(),
220  branchIDListHelper_(),
221  serviceToken_(),
222  input_(),
223  espController_(new eventsetup::EventSetupsController),
224  esp_(),
225  act_table_(),
226  processConfiguration_(),
227  schedule_(),
228  subProcesses_(),
229  historyAppender_(new HistoryAppender),
230  fb_(),
231  looper_(),
232  deferredExceptionPtrIsSet_(false),
233  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
234  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
235  principalCache_(),
236  beginJobCalled_(false),
237  shouldWeStop_(false),
238  fileModeNoMerge_(false),
239  exceptionMessageFiles_(),
240  exceptionMessageRuns_(false),
241  exceptionMessageLumis_(false),
242  forceLooperToEnd_(false),
243  looperBeginJobRun_(false),
244  forceESCacheClearOnNewRun_(false),
245  eventSetupDataToExcludeFromPrefetching_() {
246  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
247  processDesc->addServices(defaultServices, forcedServices);
248  init(processDesc, iToken, iLegacy);
249  }
250 
251  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
252  std::vector<std::string> const& defaultServices,
253  std::vector<std::string> const& forcedServices)
254  : actReg_(),
255  preg_(),
256  branchIDListHelper_(),
257  serviceToken_(),
258  input_(),
259  espController_(new eventsetup::EventSetupsController),
260  esp_(),
261  act_table_(),
262  processConfiguration_(),
263  schedule_(),
264  subProcesses_(),
265  historyAppender_(new HistoryAppender),
266  fb_(),
267  looper_(),
268  deferredExceptionPtrIsSet_(false),
269  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
270  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
271  principalCache_(),
272  beginJobCalled_(false),
273  shouldWeStop_(false),
274  fileModeNoMerge_(false),
275  exceptionMessageFiles_(),
276  exceptionMessageRuns_(false),
277  exceptionMessageLumis_(false),
278  forceLooperToEnd_(false),
279  looperBeginJobRun_(false),
280  forceESCacheClearOnNewRun_(false),
281  eventSetupDataToExcludeFromPrefetching_() {
282  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
283  processDesc->addServices(defaultServices, forcedServices);
285  }
286 
287  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
288  ServiceToken const& token,
290  : actReg_(),
291  preg_(),
292  branchIDListHelper_(),
293  serviceToken_(),
294  input_(),
295  espController_(new eventsetup::EventSetupsController),
296  esp_(),
297  act_table_(),
298  processConfiguration_(),
299  schedule_(),
300  subProcesses_(),
301  historyAppender_(new HistoryAppender),
302  fb_(),
303  looper_(),
304  deferredExceptionPtrIsSet_(false),
305  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
306  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
307  principalCache_(),
308  beginJobCalled_(false),
309  shouldWeStop_(false),
310  fileModeNoMerge_(false),
311  exceptionMessageFiles_(),
312  exceptionMessageRuns_(false),
313  exceptionMessageLumis_(false),
314  forceLooperToEnd_(false),
315  looperBeginJobRun_(false),
316  forceESCacheClearOnNewRun_(false),
317  eventSetupDataToExcludeFromPrefetching_() {
318  init(processDesc, token, legacy);
319  }
320 
321  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
322  ServiceToken const& iToken,
324  //std::cerr << processDesc->dump() << std::endl;
325 
326  // register the empty parentage vector , once and for all
328 
329  // register the empty parameter set, once and for all.
331 
332  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
333 
334  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
335  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
336  bool const hasSubProcesses = !subProcessVParameterSet.empty();
337 
338  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
339  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
340  // set in here if the parameters were not explicitly set.
342 
343  // Now set some parameters specific to the main process.
344  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
345  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
346  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
347  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
348  << fileMode << ".\n"
349  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
350  } else {
351  fileModeNoMerge_ = (fileMode == "NOMERGE");
352  }
353  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
355 
356  //threading
357  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
358 
359  // Even if numberOfThreads was set to zero in the Python configuration, the code
360  // in cmsRun.cpp should have reset it to something else.
361  assert(nThreads != 0);
362 
363  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
364  if (nStreams == 0) {
365  nStreams = nThreads;
366  }
367  unsigned int nConcurrentLumis =
368  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
369  if (nConcurrentLumis == 0) {
370  nConcurrentLumis = 2;
371  }
372  if (nConcurrentLumis > nStreams) {
373  nConcurrentLumis = nStreams;
374  }
375  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
376  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
377  nConcurrentRuns = nConcurrentLumis;
378  }
379  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
380  if (!loopers.empty()) {
381  //For now loopers make us run only 1 transition at a time
382  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
383  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
384  "of concurrent runs, and the number of concurrent lumis "
385  "are all being reset to 1. Loopers cannot currently support "
386  "values greater than 1.";
387  nStreams = 1;
388  nConcurrentLumis = 1;
389  nConcurrentRuns = 1;
390  }
391  }
392  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
393  if (dumpOptions) {
394  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
395  } else {
396  if (nThreads > 1 or nStreams > 1) {
397  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
398  }
399  }
400 
401  // The number of concurrent IOVs is configured individually for each record in
402  // the class NumberOfConcurrentIOVs to values less than or equal to this.
403  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
404  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
405  // concurrent run past the first in use cases where IOVs change within a run.
406  unsigned int maxConcurrentIOVs =
407  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
408 
409  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
410 
411  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
413  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
414  //for now, if have a subProcess, don't allow early delete
415  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
416  if (not hasSubProcesses) {
417  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
418  }
419  if (not branchesToDeleteEarly_.empty()) {
420  auto referencePSets =
421  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
422  for (auto const& pset : referencePSets) {
423  auto product = pset.getParameter<std::string>("product");
424  auto references = pset.getParameter<std::vector<std::string>>("references");
425  for (auto const& ref : references) {
426  referencesToBranches_.emplace(product, ref);
427  }
428  }
430  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
431  }
432 
433  // Now do general initialization
435 
436  //initialize the services
437  auto& serviceSets = processDesc->getServicesPSets();
438  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
439  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
440 
441  //make the services available
443 
444  CMS_SA_ALLOW try {
445  if (nThreads > 1) {
447  handler->willBeUsingThreads();
448  }
449 
450  // intialize miscellaneous items
451  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
452 
453  // intialize the event setup provider
454  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
455  esp_ = espController_->makeProvider(
456  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
457 
458  // initialize the looper, if any
459  if (!loopers.empty()) {
461  looper_->setActionTable(items.act_table_.get());
462  looper_->attachTo(*items.actReg_);
463 
464  // in presence of looper do not delete modules
466  }
467 
468  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
469 
470  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
471  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
472  streamQueues_.resize(nStreams);
473  streamRunStatus_.resize(nStreams);
474  streamLumiStatus_.resize(nStreams);
475 
476  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
477 
478  {
479  std::optional<ScheduleItems::MadeModules> madeModules;
480 
481  //setup input and modules concurrently
482  tbb::task_group group;
483 
484  // initialize the input source
485  auto tempReg = std::make_shared<ProductRegistry>();
486  auto sourceID = ModuleDescription::getUniqueID();
487 
488  group.run([&, this]() {
489  // initialize the Schedule
492  madeModules = items.initModules(*parameterSet, tns, preallocations_, &processContext_);
493  });
494 
495  group.run([&, this, tempReg]() {
497  input_ = makeInput(sourceID,
498  *parameterSet,
499  *common,
500  /*items.preg(),*/ tempReg,
501  items.branchIDListHelper(),
503  items.thinnedAssociationsHelper(),
504  items.actReg_,
505  items.processConfiguration(),
507  });
508 
509  group.wait();
510  items.preg()->addFromInput(*tempReg);
511  input_->switchTo(items.preg());
512 
513  {
515  schedule_ = items.finishSchedule(std::move(*madeModules),
516  *parameterSet,
517  tns,
518  hasSubProcesses,
522  }
523  }
524 
525  // set the data members
526  act_table_ = std::move(items.act_table_);
527  actReg_ = items.actReg_;
528  preg_ = items.preg();
530  branchIDListHelper_ = items.branchIDListHelper();
531  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
532  processConfiguration_ = items.processConfiguration();
534 
535  FDEBUG(2) << parameterSet << std::endl;
536 
538  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
539  // Reusable event principal
540  auto ep = std::make_shared<EventPrincipal>(preg(),
544  historyAppender_.get(),
545  index,
546  true /*primary process*/,
549  }
550 
551  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
552  auto rp = std::make_unique<RunPrincipal>(
555  }
556 
557  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
558  auto lp =
559  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
561  }
562 
563  {
564  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
566 
567  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
569  }
570 
571  // fill the subprocesses, if there are any
572  subProcesses_.reserve(subProcessVParameterSet.size());
573  for (auto& subProcessPSet : subProcessVParameterSet) {
574  subProcesses_.emplace_back(subProcessPSet,
575  *parameterSet,
576  preg(),
582  *actReg_,
583  token,
586  &processContext_);
587  }
588  } catch (...) {
589  //in case of an exception, make sure Services are available
590  // during the following destructors
591  espController_ = nullptr;
592  esp_ = nullptr;
593  schedule_ = nullptr;
594  input_ = nullptr;
595  looper_ = nullptr;
596  actReg_ = nullptr;
597  throw;
598  }
599  }
600 
602  // Make the services available while everything is being deleted.
605 
606  // manually destroy all these thing that may need the services around
607  // propagate_const<T> has no reset() function
608  espController_ = nullptr;
609  esp_ = nullptr;
610  schedule_ = nullptr;
611  input_ = nullptr;
612  looper_ = nullptr;
613  actReg_ = nullptr;
614 
617  }
618 
622  task.waitNoThrow();
623  assert(task.done());
624  }
625 
627  if (beginJobCalled_)
628  return;
629  beginJobCalled_ = true;
630  bk::beginJob();
631 
632  // StateSentry toerror(this); // should we add this ?
633  //make the services available
635 
640  actReg_->preallocateSignal_(bounds);
641  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
643 
644  std::vector<ModuleProcessName> consumedBySubProcesses;
646  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
647  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
648  if (consumedBySubProcesses.empty()) {
649  consumedBySubProcesses = std::move(c);
650  } else if (not c.empty()) {
651  std::vector<ModuleProcessName> tmp;
652  tmp.reserve(consumedBySubProcesses.size() + c.size());
653  std::merge(consumedBySubProcesses.begin(),
654  consumedBySubProcesses.end(),
655  c.begin(),
656  c.end(),
657  std::back_inserter(tmp));
658  std::swap(consumedBySubProcesses, tmp);
659  }
660  });
661 
662  // Note: all these may throw
665  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
666  not unusedModules.empty()) {
668 
669  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
670  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
671  "and "
672  "therefore they are deleted before beginJob transition.";
673  for (auto const& description : unusedModules) {
674  l << "\n " << description->moduleLabel();
675  }
676  });
677  for (auto const& description : unusedModules) {
678  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
679  }
680  }
681  }
682  // Initialize after the deletion of non-consumed unscheduled
683  // modules to avoid non-consumed non-run modules to keep the
684  // products unnecessarily alive
685  if (not branchesToDeleteEarly_.empty()) {
686  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
687  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
688  auto referencesToBranches = std::move(referencesToBranches_);
689  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
690  }
691 
692  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
693 
696  }
697  if (preallocations_.numberOfRuns() > 1) {
699  }
701 
702  //NOTE: This implementation assumes 'Job' means one call
703  // the EventProcessor::run
704  // If it really means once per 'application' then this code will
705  // have to be changed.
706  // Also have to deal with case where have 'run' then new Module
707  // added and do 'run'
708  // again. In that case the newly added Module needs its 'beginJob'
709  // to be called.
710 
711  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
712  // For now we delay calling beginOfJob until first beginOfRun
713  //if(looper_) {
714  // looper_->beginOfJob(es);
715  //}
716  try {
717  convertException::wrap([&]() { input_->doBeginJob(); });
718  } catch (cms::Exception& ex) {
719  ex.addContext("Calling beginJob for the source");
720  throw;
721  }
722  espController_->finishConfiguration();
723  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
724  if (looper_) {
725  constexpr bool mustPrefetchMayGet = true;
726  auto const processBlockLookup = preg_->productLookup(InProcess);
727  auto const runLookup = preg_->productLookup(InRun);
728  auto const lumiLookup = preg_->productLookup(InLumi);
729  auto const eventLookup = preg_->productLookup(InEvent);
730  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
731  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
732  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
733  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
734  looper_->updateLookup(esp_->recordsToProxyIndices());
735  }
736  // toerror.succeeded(); // should we add this?
737  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
738  actReg_->postBeginJobSignal_();
739 
740  oneapi::tbb::task_group group;
742  using namespace edm::waiting_task::chain;
743  first([this](auto nextTask) {
744  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
745  first([i, this](auto nextTask) {
747  schedule_->beginStream(i);
748  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
750  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
751  }) | lastTask(nextTask);
752  }
754  last.wait();
755  }
756 
758  // Collects exceptions, so we don't throw before all operations are performed.
760  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
761 
762  //make the services available
764 
765  using namespace edm::waiting_task::chain;
766 
767  oneapi::tbb::task_group group;
768  edm::FinalWaitingTask waitTask{group};
769 
770  {
771  //handle endStream transitions
772  edm::WaitingTaskHolder taskHolder(group, &waitTask);
773  std::mutex collectorMutex;
774  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
775  first([this, i, &c, &collectorMutex](auto nextTask) {
776  std::exception_ptr ep;
777  try {
779  this->schedule_->endStream(i);
780  } catch (...) {
781  ep = std::current_exception();
782  }
783  if (ep) {
784  std::lock_guard<std::mutex> l(collectorMutex);
785  c.call([&ep]() { std::rethrow_exception(ep); });
786  }
787  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
788  for (auto& subProcess : subProcesses_) {
789  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
790  std::exception_ptr ep;
791  try {
793  subProcess.doEndStream(i);
794  } catch (...) {
795  ep = std::current_exception();
796  }
797  if (ep) {
798  std::lock_guard<std::mutex> l(collectorMutex);
799  c.call([&ep]() { std::rethrow_exception(ep); });
800  }
801  }) | lastTask(nextTask);
802  }
803  }) | lastTask(taskHolder);
804  }
805  }
806  waitTask.waitNoThrow();
807 
808  auto actReg = actReg_.get();
809  c.call([actReg]() { actReg->preEndJobSignal_(); });
810  schedule_->endJob(c);
811  for (auto& subProcess : subProcesses_) {
812  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
813  }
814  c.call(std::bind(&InputSource::doEndJob, input_.get()));
815  if (looper_) {
816  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
817  }
818  c.call([actReg]() { actReg->postEndJobSignal_(); });
819  if (c.hasThrown()) {
820  c.rethrow();
821  }
822  }
823 
825 
826  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
827  return schedule_->getAllModuleDescriptions();
828  }
829 
830  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
831 
832  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
833 
834  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
835 
836  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
837 
838  namespace {
839 #include "TransitionProcessors.icc"
840  }
841 
843  bool returnValue = false;
844 
845  // Look for a shutdown signal
846  if (shutdown_flag.load(std::memory_order_acquire)) {
847  returnValue = true;
849  }
850  return returnValue;
851  }
852 
855  InputSource::ItemType itemType;
856  //For now, do nothing with InputSource::IsSynchronize
857  do {
858  itemType = input_->nextItemType();
859  } while (itemType == InputSource::IsSynchronize);
860 
861  lastSourceTransition_ = itemType;
862  sentry.completedSuccessfully();
863 
865 
867  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
869  }
870 
871  return lastSourceTransition_;
872  }
873 
875  beginJob(); //make sure this was called
876 
877  // make the services available
879 
880  try {
881  FilesProcessor fp(fileModeNoMerge_);
882 
883  convertException::wrap([&]() {
884  bool firstTime = true;
885  do {
886  if (not firstTime) {
888  rewindInput();
889  } else {
890  firstTime = false;
891  }
892  startingNewLoop();
893 
894  auto trans = fp.processFiles(*this);
895 
896  fp.normalEnd();
897 
898  if (deferredExceptionPtrIsSet_.load()) {
899  std::rethrow_exception(deferredExceptionPtr_);
900  }
901  if (trans != InputSource::IsStop) {
902  //problem with the source
903  doErrorStuff();
904 
905  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
906  }
907  } while (not endOfLoop());
908  }); // convertException::wrap
909 
910  } // Try block
911  catch (cms::Exception& e) {
913  std::string message(
914  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
915  e.addAdditionalInfo(message);
916  if (e.alreadyPrinted()) {
917  LogAbsolute("Additional Exceptions") << message;
918  }
919  }
920  if (exceptionMessageRuns_) {
921  std::string message(
922  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
923  e.addAdditionalInfo(message);
924  if (e.alreadyPrinted()) {
925  LogAbsolute("Additional Exceptions") << message;
926  }
927  }
928  if (!exceptionMessageFiles_.empty()) {
929  e.addAdditionalInfo(exceptionMessageFiles_);
930  if (e.alreadyPrinted()) {
931  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
932  }
933  }
934  throw;
935  }
936  return epSuccess;
937  }
938 
940  FDEBUG(1) << " \treadFile\n";
941  size_t size = preg_->size();
943 
944  if (streamRunActive_ > 0) {
945  streamRunStatus_[0]->runPrincipal()->preReadFile();
946  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
947  }
948 
949  if (streamLumiActive_ > 0) {
950  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
951  }
952 
953  fb_ = input_->readFile();
954  if (size < preg_->size()) {
956  }
959  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
960  }
961  sentry.completedSuccessfully();
962  }
963 
964  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
965  if (fileBlockValid()) {
967  input_->closeFile(fb_.get(), cleaningUpAfterException);
968  sentry.completedSuccessfully();
969  }
970  FDEBUG(1) << "\tcloseInputFile\n";
971  }
972 
974  if (fileBlockValid()) {
975  schedule_->openOutputFiles(*fb_);
976  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
977  }
978  FDEBUG(1) << "\topenOutputFiles\n";
979  }
980 
982  schedule_->closeOutputFiles();
983  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
984  processBlockHelper_->clearAfterOutputFilesClose();
985  FDEBUG(1) << "\tcloseOutputFiles\n";
986  }
987 
989  if (fileBlockValid()) {
991  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
992  schedule_->respondToOpenInputFile(*fb_);
993  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
994  }
995  FDEBUG(1) << "\trespondToOpenInputFile\n";
996  }
997 
999  if (fileBlockValid()) {
1000  schedule_->respondToCloseInputFile(*fb_);
1001  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1002  }
1003  FDEBUG(1) << "\trespondToCloseInputFile\n";
1004  }
1005 
1007  shouldWeStop_ = false;
1008  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1009  // until after we've called beginOfJob
1010  if (looper_ && looperBeginJobRun_) {
1011  looper_->doStartingNewLoop();
1012  }
1013  FDEBUG(1) << "\tstartingNewLoop\n";
1014  }
1015 
1017  if (looper_) {
1018  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1019  looper_->setModuleChanger(&changer);
1020  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1021  looper_->setModuleChanger(nullptr);
1023  return true;
1024  else
1025  return false;
1026  }
1027  FDEBUG(1) << "\tendOfLoop\n";
1028  return true;
1029  }
1030 
1032  input_->repeat();
1033  input_->rewind();
1034  FDEBUG(1) << "\trewind\n";
1035  }
1036 
1038  looper_->prepareForNextLoop(esp_.get());
1039  FDEBUG(1) << "\tprepareForNextLoop\n";
1040  }
1041 
1043  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1044  if (!subProcesses_.empty()) {
1045  for (auto const& subProcess : subProcesses_) {
1046  if (subProcess.shouldWeCloseOutput()) {
1047  return true;
1048  }
1049  }
1050  return false;
1051  }
1052  return schedule_->shouldWeCloseOutput();
1053  }
1054 
1056  FDEBUG(1) << "\tdoErrorStuff\n";
1057  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1058  << "and went to the error state\n"
1059  << "Will attempt to terminate processing normally\n"
1060  << "(IF using the looper the next loop will be attempted)\n"
1061  << "This likely indicates a bug in an input module or corrupted input or both\n";
1062  }
1063 
1064  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1065  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1066  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1067 
1069  FinalWaitingTask globalWaitTask{taskGroup_};
1070 
1071  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1072  beginGlobalTransitionAsync<Traits>(
1073  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1074 
1075  globalWaitTask.wait();
1076  beginProcessBlockSucceeded = true;
1077  }
1078 
1080  input_->fillProcessBlockHelper();
1082  while (input_->nextProcessBlock(processBlockPrincipal)) {
1083  readProcessBlock(processBlockPrincipal);
1084 
1086  FinalWaitingTask globalWaitTask{taskGroup_};
1087 
1088  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1089  beginGlobalTransitionAsync<Traits>(
1090  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1091 
1092  globalWaitTask.wait();
1093 
1094  FinalWaitingTask writeWaitTask{taskGroup_};
1096  writeWaitTask.wait();
1097 
1098  processBlockPrincipal.clearPrincipal();
1099  for (auto& s : subProcesses_) {
1100  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1101  }
1102  }
1103  }
1104 
1105  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1106  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1107 
1109  FinalWaitingTask globalWaitTask{taskGroup_};
1110 
1111  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1112  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1113  *schedule_,
1114  transitionInfo,
1115  serviceToken_,
1116  subProcesses_,
1117  cleaningUpAfterException);
1118  globalWaitTask.wait();
1119 
1120  if (beginProcessBlockSucceeded) {
1121  FinalWaitingTask writeWaitTask{taskGroup_};
1123  writeWaitTask.wait();
1124  }
1125 
1126  processBlockPrincipal.clearPrincipal();
1127  for (auto& s : subProcesses_) {
1128  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1129  }
1130  }
1131 
1133  FinalWaitingTask waitTask{taskGroup_};
1135  if (streamRunActive_ == 0) {
1136  assert(streamLumiActive_ == 0);
1137 
1138  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1139  WaitingTaskHolder{taskGroup_, &waitTask});
1140  } else {
1142 
1143  auto runStatus = streamRunStatus_[0];
1144 
1145  while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1146  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1147  readAndMergeRun(*runStatus);
1149  }
1150 
1151  WaitingTaskHolder holder{taskGroup_, &waitTask};
1152  runStatus->setHolderOfTaskInProcessRuns(holder);
1153  if (streamLumiActive_ > 0) {
1155  continueLumiAsync(std::move(holder));
1156  } else {
1158  }
1159  }
1160  waitTask.wait();
1161  return lastTransitionType();
1162  }
1163 
1165  if (iHolder.taskHasFailed()) {
1166  return;
1167  }
1168 
1169  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1170 
1171  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1172 
1173  chain::first([this, &status, &iSync](auto nextTask) {
1174  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1175  nextTask,
1176  status->endIOVWaitingTasks(),
1177  status->eventSetupImpls(),
1179  actReg_.get(),
1180  serviceToken_,
1182  }) | chain::then([this, status, iSync](std::exception_ptr const* iException, auto nextTask) {
1183  CMS_SA_ALLOW try {
1184  if (iException) {
1185  WaitingTaskHolder copyHolder(nextTask);
1186  copyHolder.doneWaiting(*iException);
1187  // Finish handling the exception in the task pushed to runQueue_
1188  }
1190  actReg_->postESSyncIOVSignal_.emit(iSync);
1191 
1192  runQueue_->pushAndPause(
1193  *nextTask.group(),
1194  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1195  CMS_SA_ALLOW try {
1196  if (postRunQueueTask.taskHasFailed()) {
1197  status->resetBeginResources();
1199  return;
1200  }
1201 
1202  status->setResumer(std::move(iResumer));
1203 
1205  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1206  CMS_SA_ALLOW try {
1208 
1209  if (postSourceTask.taskHasFailed()) {
1210  status->resetBeginResources();
1212  status->resumeGlobalRunQueue();
1213  return;
1214  }
1215 
1216  status->setRunPrincipal(readRun());
1217 
1218  RunPrincipal& runPrincipal = *status->runPrincipal();
1219  {
1221  input_->doBeginRun(runPrincipal, &processContext_);
1222  sentry.completedSuccessfully();
1223  }
1224 
1225  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1226  if (looper_ && looperBeginJobRun_ == false) {
1227  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1228 
1229  oneapi::tbb::task_group group;
1230  FinalWaitingTask waitTask{group};
1231  using namespace edm::waiting_task::chain;
1232  chain::first([this, &es](auto nextTask) {
1233  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1234  }) | then([this, &es](auto nextTask) mutable {
1235  looper_->beginOfJob(es);
1236  looperBeginJobRun_ = true;
1237  looper_->doStartingNewLoop();
1238  }) | runLast(WaitingTaskHolder(group, &waitTask));
1239  waitTask.wait();
1240  }
1241 
1242  using namespace edm::waiting_task::chain;
1243  chain::first([this, status](auto nextTask) mutable {
1244  CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1245  status->setStopBeforeProcessingRun(true);
1246  nextTask.doneWaiting(std::current_exception());
1247  }
1248  }) | then([this, status, &es](auto nextTask) {
1249  if (status->stopBeforeProcessingRun()) {
1250  return;
1251  }
1252  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1254  beginGlobalTransitionAsync<Traits>(
1255  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1256  }) | then([status](auto nextTask) mutable {
1257  if (status->stopBeforeProcessingRun()) {
1258  return;
1259  }
1260  status->globalBeginDidSucceed();
1261  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1262  if (status->stopBeforeProcessingRun()) {
1263  return;
1264  }
1265  looper_->prefetchAsync(
1266  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1267  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1268  if (status->stopBeforeProcessingRun()) {
1269  return;
1270  }
1271  ServiceRegistry::Operate operateLooper(serviceToken_);
1272  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1273  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1274  bool precedingTasksSucceeded = true;
1275  if (iException) {
1276  precedingTasksSucceeded = false;
1277  WaitingTaskHolder copyHolder(holder);
1278  copyHolder.doneWaiting(*iException);
1279  }
1280 
1281  if (status->stopBeforeProcessingRun()) {
1282  // We just quit now if there was a failure when merging runs
1283  status->resetBeginResources();
1285  status->resumeGlobalRunQueue();
1286  return;
1287  }
1288  CMS_SA_ALLOW try {
1289  // Under normal circumstances, this task runs after endRun has completed for all streams
1290  // and global endLumi has completed for all lumis contained in this run
1291  auto globalEndRunTask =
1292  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1293  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1294  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1296  });
1297  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1298  } catch (...) {
1299  status->resetBeginResources();
1301  status->resumeGlobalRunQueue();
1302  holder.doneWaiting(std::current_exception());
1303  return;
1304  }
1305 
1306  // After this point we are committed to end the run via endRunAsync
1307 
1309 
1310  // The only purpose of the pause is to cause stream begin run to execute before
1311  // global begin lumi in the single threaded case (maintains consistency with
1312  // the order that existed before concurrent runs were implemented).
1313  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1314 
1315  CMS_SA_ALLOW try {
1317  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1318  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1319  CMS_SA_ALLOW try {
1320  streamQueues_[i].push(
1321  *holder.group(),
1322  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1324  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1325  });
1326  } catch (...) {
1327  if (status->streamFinishedBeginRun()) {
1328  WaitingTaskHolder copyHolder(holder);
1329  copyHolder.doneWaiting(std::current_exception());
1330  status->resetBeginResources();
1333  }
1334  }
1335  }
1336  });
1337  } catch (...) {
1338  WaitingTaskHolder copyHolder(holder);
1339  copyHolder.doneWaiting(std::current_exception());
1340  status->resetBeginResources();
1343  }
1345  }) | runLast(postSourceTask);
1346  } catch (...) {
1347  status->resetBeginResources();
1349  status->resumeGlobalRunQueue();
1350  postSourceTask.doneWaiting(std::current_exception());
1351  }
1352  }); // task in sourceResourcesAcquirer
1353  } catch (...) {
1354  status->resetBeginResources();
1356  status->resumeGlobalRunQueue();
1357  postRunQueueTask.doneWaiting(std::current_exception());
1358  }
1359  }); // task in runQueue
1360  } catch (...) {
1361  status->resetBeginResources();
1363  nextTask.doneWaiting(std::current_exception());
1364  }
1365  }) | chain::runLast(std::move(iHolder));
1366  }
1367 
1368  void EventProcessor::streamBeginRunAsync(unsigned int iStream,
1369  std::shared_ptr<RunProcessingStatus> status,
1370  bool precedingTasksSucceeded,
1371  WaitingTaskHolder iHolder) {
1372  // These shouldn't throw
1373  streamQueues_[iStream].pause();
1374  ++streamRunActive_;
1375  streamRunStatus_[iStream] = std::move(status);
1376 
1377  CMS_SA_ALLOW try {
1378  using namespace edm::waiting_task::chain;
1379  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1380  if (precedingTasksSucceeded) {
1381  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1382  RunTransitionInfo transitionInfo(
1383  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1385  beginStreamTransitionAsync<Traits>(
1386  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1387  }
1388  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1389  if (exceptionFromBeginStreamRun) {
1390  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1391  }
1392  releaseBeginRunResources(iStream);
1393  }) | runLast(iHolder);
1394  } catch (...) {
1395  releaseBeginRunResources(iStream);
1396  iHolder.doneWaiting(std::current_exception());
1397  }
1398  }
1399 
1400  void EventProcessor::releaseBeginRunResources(unsigned int iStream) {
1401  auto& status = streamRunStatus_[iStream];
1402  if (status->streamFinishedBeginRun()) {
1403  status->resetBeginResources();
1405  }
1406  streamQueues_[iStream].resume();
1407  }
1408 
1409  void EventProcessor::endRunAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder iHolder) {
1410  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1411  iRunStatus->setEndTime();
1412  IOVSyncValue ts(
1414  runPrincipal.endTime());
1415  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1416  WaitingTaskHolder copyHolder(iHolder);
1417  copyHolder.doneWaiting(std::current_exception());
1418  }
1419 
1420  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1421  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1422  nextTask,
1423  iRunStatus->endIOVWaitingTasksEndRun(),
1424  iRunStatus->eventSetupImplsEndRun(),
1426  actReg_.get(),
1427  serviceToken_);
1428  }) | chain::then([this, iRunStatus, ts](std::exception_ptr const* iException, auto nextTask) {
1429  if (iException) {
1430  iRunStatus->setEndingEventSetupSucceeded(false);
1431  handleEndRunExceptions(*iException, nextTask);
1432  }
1434  CMS_SA_ALLOW try { actReg_->postESSyncIOVSignal_.emit(ts); } catch (...) {
1435  WaitingTaskHolder copyHolder(nextTask);
1436  copyHolder.doneWaiting(std::current_exception());
1437  }
1438 
1439  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1440  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1441  CMS_SA_ALLOW try {
1442  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1443  streamQueues_[i].pause();
1444  streamEndRunAsync(std::move(nextTask), i);
1445  });
1446  } catch (...) {
1447  WaitingTaskHolder copyHolder(nextTask);
1448  copyHolder.doneWaiting(std::current_exception());
1449  }
1450  }
1451  });
1452 
1454  CMS_SA_ALLOW try {
1455  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1456  } catch (...) {
1457  WaitingTaskHolder copyHolder(nextTask);
1458  copyHolder.doneWaiting(std::current_exception());
1459  }
1460  }
1461  }) | chain::runLast(std::move(iHolder));
1462  }
1463 
1464  void EventProcessor::handleEndRunExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1465  if (holder.taskHasFailed()) {
1467  } else {
1468  WaitingTaskHolder tmp(holder);
1469  tmp.doneWaiting(iException);
1470  }
1471  }
1472 
1473  void EventProcessor::globalEndRunAsync(WaitingTaskHolder iTask, std::shared_ptr<RunProcessingStatus> iRunStatus) {
1474  auto& runPrincipal = *(iRunStatus->runPrincipal());
1475  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1476  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1477  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1478  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1479  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1480 
1481  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1482  using namespace edm::waiting_task::chain;
1483  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1484  auto nextTask) {
1485  if (endingEventSetupSucceeded) {
1486  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1488  endGlobalTransitionAsync<Traits>(
1489  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1490  }
1491  }) |
1492  ifThen(looper_ && endingEventSetupSucceeded,
1493  [this, &runPrincipal, &es](auto nextTask) {
1494  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1495  }) |
1496  ifThen(looper_ && endingEventSetupSucceeded,
1497  [this, &runPrincipal, &es](auto nextTask) {
1499  looper_->doEndRun(runPrincipal, es, &processContext_);
1500  }) |
1501  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1502  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1503  mergeableRunProductMetadata->preWriteRun();
1504  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1505  }) |
1506  then([status = std::move(iRunStatus),
1507  this,
1508  didGlobalBeginSucceed,
1509  mergeableRunProductMetadata,
1510  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1511  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1512  mergeableRunProductMetadata->postWriteRun();
1513  }
1514  if (iException) {
1515  handleEndRunExceptions(*iException, nextTask);
1516  }
1518 
1519  std::exception_ptr ptr;
1520 
1521  // Try hard to clean up resources so the
1522  // process can terminate in a controlled
1523  // fashion even after exceptions have occurred.
1524  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1525  if (not ptr) {
1526  ptr = std::current_exception();
1527  }
1528  }
1529  CMS_SA_ALLOW try {
1530  status->resumeGlobalRunQueue();
1532  } catch (...) {
1533  if (not ptr) {
1534  ptr = std::current_exception();
1535  }
1536  }
1537  CMS_SA_ALLOW try {
1538  status->resetEndResources();
1539  status.reset();
1540  } catch (...) {
1541  if (not ptr) {
1542  ptr = std::current_exception();
1543  }
1544  }
1545 
1546  if (ptr && !iException) {
1547  handleEndRunExceptions(ptr, nextTask);
1548  }
1549  }) |
1550  runLast(std::move(iTask));
1551  }
1552 
1553  void EventProcessor::streamEndRunAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1554  CMS_SA_ALLOW try {
1555  if (!streamRunStatus_[iStreamIndex]) {
1556  if (exceptionRunStatus_->streamFinishedRun()) {
1557  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1558  exceptionRunStatus_.reset();
1559  }
1560  return;
1561  }
1562 
1563  auto runDoneTask =
1564  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1565  if (iException) {
1566  handleEndRunExceptions(*iException, iTask);
1567  }
1568 
1569  auto runStatus = streamRunStatus_[iStreamIndex];
1570 
1571  //reset status before releasing queue else get race condition
1572  if (runStatus->streamFinishedRun()) {
1573  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1574  }
1575  streamRunStatus_[iStreamIndex].reset();
1576  --streamRunActive_;
1577  streamQueues_[iStreamIndex].resume();
1578  });
1579 
1580  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1581 
1582  auto runStatus = streamRunStatus_[iStreamIndex].get();
1583 
1584  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1585  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1586  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1587  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1588 
1589  auto& runPrincipal = *runStatus->runPrincipal();
1591  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1592  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1593  *schedule_,
1594  iStreamIndex,
1595  transitionInfo,
1596  serviceToken_,
1597  subProcesses_,
1598  cleaningUpAfterException);
1599  }
1600  } catch (...) {
1601  handleEndRunExceptions(std::current_exception(), iTask);
1602  }
1603  }
1604 
1605  void EventProcessor::endUnfinishedRun(bool cleaningUpAfterException) {
1606  if (streamRunActive_ > 0) {
1607  FinalWaitingTask waitTask{taskGroup_};
1608 
1609  auto runStatus = streamRunStatus_[0].get();
1610  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1611  WaitingTaskHolder holder{taskGroup_, &waitTask};
1612  runStatus->setHolderOfTaskInProcessRuns(holder);
1614  endRunAsync(streamRunStatus_[0], std::move(holder));
1615  waitTask.wait();
1616  }
1617  }
1618 
1620  std::shared_ptr<RunProcessingStatus> iRunStatus,
1621  edm::WaitingTaskHolder iHolder) {
1622  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1623 
1624  auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1625  chain::first([this, &iSync, &status](auto nextTask) {
1626  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1627  nextTask,
1628  status->endIOVWaitingTasks(),
1629  status->eventSetupImpls(),
1631  actReg_.get(),
1632  serviceToken_);
1633  }) | chain::then([this, status, iRunStatus, iSync](std::exception_ptr const* iException, auto nextTask) {
1634  CMS_SA_ALLOW try {
1635  //the call to doneWaiting will cause the count to decrement
1636  if (iException) {
1637  WaitingTaskHolder copyHolder(nextTask);
1638  copyHolder.doneWaiting(*iException);
1639  }
1640 
1642  actReg_->postESSyncIOVSignal_.emit(iSync);
1643 
1644  lumiQueue_->pushAndPause(
1645  *nextTask.group(),
1646  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1647  CMS_SA_ALLOW try {
1648  if (postLumiQueueTask.taskHasFailed()) {
1649  status->resetResources();
1651  endRunAsync(iRunStatus, postLumiQueueTask);
1652  return;
1653  }
1654 
1655  status->setResumer(std::move(iResumer));
1656 
1658  *postLumiQueueTask.group(),
1659  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1660  CMS_SA_ALLOW try {
1662 
1663  if (postSourceTask.taskHasFailed()) {
1664  status->resetResources();
1666  endRunAsync(iRunStatus, postSourceTask);
1667  return;
1668  }
1669 
1670  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1671 
1672  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1673  {
1675  input_->doBeginLumi(lumiPrincipal, &processContext_);
1676  sentry.completedSuccessfully();
1677  }
1678 
1680  if (rng.isAvailable()) {
1681  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1682  rng->preBeginLumi(lb);
1683  }
1684 
1685  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1686 
1687  using namespace edm::waiting_task::chain;
1688  chain::first([this, status](auto nextTask) mutable {
1690  firstItemAfterLumiMerge_ = true;
1691  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1692  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1694  beginGlobalTransitionAsync<Traits>(
1695  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1696  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1697  looper_->prefetchAsync(
1698  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1699  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1700  status->globalBeginDidSucceed();
1701  ServiceRegistry::Operate operateLooper(serviceToken_);
1702  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1703  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1704  if (iException) {
1705  status->resetResources();
1707  WaitingTaskHolder copyHolder(holder);
1708  copyHolder.doneWaiting(*iException);
1709  endRunAsync(iRunStatus, holder);
1710  } else {
1711  if (not looper_) {
1712  status->globalBeginDidSucceed();
1713  }
1714 
1715  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1716 
1717  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1719 
1720  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1721  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1722  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1723  streamQueues_[i].pause();
1724 
1725  auto& event = principalCache_.eventPrincipal(i);
1726  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1727  // held by the container as this lambda may not finish executing before all the tasks it
1728  // spawns have already started to run.
1729  auto eventSetupImpls = &status->eventSetupImpls();
1730  auto lp = status->lumiPrincipal().get();
1733  event.setLuminosityBlockPrincipal(lp);
1734  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1735  using namespace edm::waiting_task::chain;
1736  chain::first([this, i, &transitionInfo](auto nextTask) {
1737  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1738  *schedule_,
1739  i,
1740  transitionInfo,
1741  serviceToken_,
1742  subProcesses_);
1743  }) |
1744  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1745  auto nextTask) {
1746  if (exceptionFromBeginStreamLumi) {
1747  WaitingTaskHolder copyHolder(nextTask);
1748  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1749  }
1751  }) |
1752  runLast(std::move(holder));
1753  });
1754  } // end for loop over streams
1755  });
1756  }
1757  }) | runLast(postSourceTask);
1758  } catch (...) {
1759  status->resetResources();
1761  WaitingTaskHolder copyHolder(postSourceTask);
1762  copyHolder.doneWaiting(std::current_exception());
1763  endRunAsync(iRunStatus, postSourceTask);
1764  }
1765  }); // task in sourceResourcesAcquirer
1766  } catch (...) {
1767  status->resetResources();
1769  WaitingTaskHolder copyHolder(postLumiQueueTask);
1770  copyHolder.doneWaiting(std::current_exception());
1771  endRunAsync(iRunStatus, postLumiQueueTask);
1772  }
1773  }); // task in lumiQueue
1774  } catch (...) {
1775  status->resetResources();
1777  WaitingTaskHolder copyHolder(nextTask);
1778  copyHolder.doneWaiting(std::current_exception());
1779  endRunAsync(iRunStatus, nextTask);
1780  }
1781  }) | chain::runLast(std::move(iHolder));
1782  }
1783 
1785  chain::first([this](auto nextTask) {
1786  //all streams are sharing the same status at the moment
1787  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1789 
1790  while (lastTransitionType() == InputSource::IsLumi and
1791  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1794  }
1795  firstItemAfterLumiMerge_ = true;
1796  }) | chain::then([this](auto nextTask) mutable {
1797  unsigned int streamIndex = 0;
1798  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1799  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1800  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1801  }
1802  nextTask.group()->run(
1803  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1804  }) | chain::runLast(std::move(iHolder));
1805  }
1806 
1807  void EventProcessor::handleEndLumiExceptions(std::exception_ptr iException, WaitingTaskHolder const& holder) {
1808  if (holder.taskHasFailed()) {
1810  } else {
1811  WaitingTaskHolder tmp(holder);
1812  tmp.doneWaiting(iException);
1813  }
1814  }
1815 
1817  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1818  // Get some needed info out of the status object before moving
1819  // it into finalTaskForThisLumi.
1820  auto& lp = *(iLumiStatus->lumiPrincipal());
1821  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1822  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1823  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1824  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1825 
1826  using namespace edm::waiting_task::chain;
1827  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1828  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1829 
1830  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1832  endGlobalTransitionAsync<Traits>(
1833  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1834  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1835  //Only call writeLumi if beginLumi succeeded
1836  if (didGlobalBeginSucceed) {
1837  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1838  }
1839  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1840  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1841  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1842  //any thrown exception auto propagates to nextTask via the chain
1844  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1845  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1846  if (iException) {
1847  handleEndLumiExceptions(*iException, nextTask);
1848  }
1850 
1851  std::exception_ptr ptr;
1852 
1853  // Try hard to clean up resources so the
1854  // process can terminate in a controlled
1855  // fashion even after exceptions have occurred.
1856  // Caught exception is passed to handleEndLumiExceptions()
1857  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1858  if (not ptr) {
1859  ptr = std::current_exception();
1860  }
1861  }
1862  // Caught exception is passed to handleEndLumiExceptions()
1863  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1864  if (not ptr) {
1865  ptr = std::current_exception();
1866  }
1867  }
1868  // Caught exception is passed to handleEndLumiExceptions()
1869  CMS_SA_ALLOW try {
1870  status->resetResources();
1871  status->globalEndRunHolderDoneWaiting();
1872  status.reset();
1873  } catch (...) {
1874  if (not ptr) {
1875  ptr = std::current_exception();
1876  }
1877  }
1878 
1879  if (ptr && !iException) {
1880  handleEndLumiExceptions(ptr, nextTask);
1881  }
1882  }) | runLast(std::move(iTask));
1883  }
1884 
1885  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1886  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1887  auto status = streamLumiStatus_[iStreamIndex];
1888  if (iException) {
1889  handleEndLumiExceptions(*iException, iTask);
1890  }
1891 
1892  // reset status before releasing queue else get race condition
1893  streamLumiStatus_[iStreamIndex].reset();
1895  streamQueues_[iStreamIndex].resume();
1896 
1897  //are we the last one?
1898  if (status->streamFinishedLumi()) {
1900  }
1901  });
1902 
1903  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1904 
1905  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1906  // therefore we do not want to hold the shared_ptr
1907  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1908  lumiStatus->setEndTime();
1909 
1910  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1911  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1912  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1913 
1914  if (lumiStatus->didGlobalBeginSucceed()) {
1915  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1917  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1918  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1919  *schedule_,
1920  iStreamIndex,
1921  transitionInfo,
1922  serviceToken_,
1923  subProcesses_,
1924  cleaningUpAfterException);
1925  }
1926  }
1927 
1928  void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
1929  if (streamRunActive_ == 0) {
1930  assert(streamLumiActive_ == 0);
1931  } else {
1933  if (streamLumiActive_ > 0) {
1934  FinalWaitingTask globalWaitTask{taskGroup_};
1936  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1937  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1938  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1939  }
1940  globalWaitTask.wait();
1941  }
1942  }
1943  }
1944 
1947  input_->readProcessBlock(processBlockPrincipal);
1948  sentry.completedSuccessfully();
1949  }
1950 
1951  std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
1953  assert(rp);
1954  rp->setAux(*input_->runAuxiliary());
1955  {
1957  input_->readRun(*rp, *historyAppender_);
1958  sentry.completedSuccessfully();
1959  }
1960  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1961  return rp;
1962  }
1963 
1965  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1966 
1967  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1968  assert(runOK);
1969  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1970  {
1972  input_->readAndMergeRun(runPrincipal);
1973  sentry.completedSuccessfully();
1974  }
1975  }
1976 
1977  std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
1979  assert(lbp);
1980  lbp->setAux(*input_->luminosityBlockAuxiliary());
1981  {
1983  input_->readLuminosityBlock(*lbp, *historyAppender_);
1984  sentry.completedSuccessfully();
1985  }
1986  lbp->setRunPrincipal(std::move(rp));
1987  return lbp;
1988  }
1989 
1991  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1992  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1993  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1994  input_->processHistoryRegistry().reducedProcessHistoryID(
1995  input_->luminosityBlockAuxiliary()->processHistoryID()));
1996  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1997  assert(lumiOK);
1998  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1999  {
2001  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2002  sentry.completedSuccessfully();
2003  }
2004  }
2005 
2007  using namespace edm::waiting_task;
2008  chain::first([&](auto nextTask) {
2010  schedule_->writeProcessBlockAsync(
2011  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2012  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2014  for (auto& s : subProcesses_) {
2015  s.writeProcessBlockAsync(nextTask, processBlockType);
2016  }
2017  }) | chain::runLast(std::move(task));
2018  }
2019 
2021  RunPrincipal const& runPrincipal,
2022  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
2023  using namespace edm::waiting_task;
2024  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2025  chain::first([&](auto nextTask) {
2027  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2028  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2030  for (auto& s : subProcesses_) {
2031  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2032  }
2033  }) | chain::runLast(std::move(task));
2034  }
2035  }
2036 
2038  for (auto& s : subProcesses_) {
2039  s.clearRunPrincipal(*iStatus.runPrincipal());
2040  }
2041  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2042  iStatus.runPrincipal()->clearPrincipal();
2043  }
2044 
2046  using namespace edm::waiting_task;
2047  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2048  chain::first([&](auto nextTask) {
2050 
2051  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2052  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2053  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2055  for (auto& s : subProcesses_) {
2056  s.writeLumiAsync(nextTask, lumiPrincipal);
2057  }
2059  }
2060  }
2061 
2063  for (auto& s : subProcesses_) {
2064  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2065  }
2066  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2067  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2068  iStatus.lumiPrincipal()->clearPrincipal();
2069  }
2070 
2071  void EventProcessor::readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus> iRunStatus,
2072  WaitingTaskHolder iHolder) {
2073  auto group = iHolder.group();
2075  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2076  CMS_SA_ALLOW try {
2078 
2079  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2080 
2082  while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2083  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2084  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2085  status->setStopBeforeProcessingRun(true);
2086  return;
2087  }
2090  }
2091  } catch (...) {
2092  status->setStopBeforeProcessingRun(true);
2093  holder.doneWaiting(std::current_exception());
2094  }
2095  });
2096  }
2097 
2098  void EventProcessor::readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus,
2099  WaitingTaskHolder iHolder) {
2100  auto group = iHolder.group();
2102  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2103  CMS_SA_ALLOW try {
2105 
2106  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2107 
2109  while (lastTransitionType() == InputSource::IsLumi and
2110  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2111  readAndMergeLumi(*iLumiStatus);
2113  }
2114  } catch (...) {
2115  holder.doneWaiting(std::current_exception());
2116  }
2117  });
2118  }
2119 
2120  void EventProcessor::handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus> iRunStatus,
2121  WaitingTaskHolder iHolder) {
2123  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2124  iHolder.doneWaiting(std::exception_ptr{});
2125  } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2126  CMS_SA_ALLOW try {
2127  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2128  input_->luminosityBlockAuxiliary()->beginTime()),
2129  iRunStatus,
2130  iHolder);
2131  } catch (...) {
2132  WaitingTaskHolder copyHolder(iHolder);
2133  iHolder.doneWaiting(std::current_exception());
2134  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2135  }
2136  } else {
2137  // Note that endRunAsync will call beginRunAsync for the following run
2138  // if appropriate.
2139  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2140  }
2141  }
2142 
2144  unsigned int iStreamIndex,
2146  // This function returns true if it successfully reads an event for the stream and that
2147  // requires both that an event is next and there are no problems or requests to stop.
2148 
2149  if (iTask.taskHasFailed()) {
2150  // We want all streams to stop or all streams to pause. If we are already in the
2151  // middle of pausing streams, then finish pausing all of them and the lumi will be
2152  // ended later. Otherwise, just end it now.
2155  }
2156  return false;
2157  }
2158 
2159  // Did another stream already stop or pause this lumi?
2161  return false;
2162  }
2163 
2164  // Are output modules or the looper requesting we stop?
2165  if (shouldWeStop()) {
2168  return false;
2169  }
2170 
2172 
2173  // need to use lock in addition to the serial task queue because
2174  // of delayed provenance reading and reading data in response to
2175  // edm::Refs etc
2176  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2177 
2178  // If we didn't already call nextTransitionType while merging lumis, call it here.
2179  // This asks the input source what is next and also checks for signals.
2180 
2182  firstItemAfterLumiMerge_ = false;
2183 
2184  if (InputSource::IsEvent != itemType) {
2185  // IsFile may continue processing the lumi and
2186  // looper_ can cause the input source to declare a new IsRun which is actually
2187  // just a continuation of the previous run
2188  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2189  (InputSource::IsRun == itemType and
2190  (iStatus.lumiPrincipal()->run() != input_->run() or
2191  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2193  } else {
2195  }
2196  return false;
2197  }
2198  readEvent(iStreamIndex);
2199  return true;
2200  }
2201 
2202  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
2203  auto group = iTask.group();
2204  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2205  CMS_SA_ALLOW try {
2206  auto status = streamLumiStatus_[iStreamIndex].get();
2208 
2209  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2210  auto recursionTask =
2211  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2212  if (iEventException) {
2213  WaitingTaskHolder copyHolder(iTask);
2214  copyHolder.doneWaiting(*iEventException);
2215  // Intentionally, we don't return here. The recursive call to
2216  // handleNextEvent takes care of immediately ending the run properly
2217  // using the same code it uses to end the run in other situations.
2218  }
2219  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2220  });
2221 
2222  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2223  } else {
2224  // the stream will stop processing this lumi now
2226  if (not status->haveStartedNextLumiOrEndedRun()) {
2227  status->startNextLumiOrEndRun();
2228  if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2229  CMS_SA_ALLOW try {
2230  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2231  input_->luminosityBlockAuxiliary()->beginTime()),
2232  streamRunStatus_[iStreamIndex],
2233  iTask);
2234  } catch (...) {
2235  WaitingTaskHolder copyHolder(iTask);
2236  copyHolder.doneWaiting(std::current_exception());
2237  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2238  }
2239  } else {
2240  // If appropriate, this will also start the next run.
2241  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2242  }
2243  }
2244  streamEndLumiAsync(iTask, iStreamIndex);
2245  } else {
2246  assert(status->eventProcessingState() ==
2248  auto runStatus = streamRunStatus_[iStreamIndex].get();
2249 
2250  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2251  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2252  }
2253  }
2254  }
2255  } catch (...) {
2256  WaitingTaskHolder copyHolder(iTask);
2257  copyHolder.doneWaiting(std::current_exception());
2258  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2259  }
2260  });
2261  }
2262 
2263  void EventProcessor::readEvent(unsigned int iStreamIndex) {
2264  //TODO this will have to become per stream
2265  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2266  StreamContext streamContext(event.streamID(), &processContext_);
2267 
2269  input_->readEvent(event, streamContext);
2270 
2271  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2272  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2273  sentry.completedSuccessfully();
2274 
2275  FDEBUG(1) << "\treadEvent\n";
2276  }
2277 
2278  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2279  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2280  }
2281 
2282  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
2283  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2284 
2287  if (rng.isAvailable()) {
2288  Event ev(*pep, ModuleDescription(), nullptr);
2289  rng->postEventRead(ev);
2290  }
2291 
2292  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2293  using namespace edm::waiting_task::chain;
2294  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2295  EventTransitionInfo info(*pep, es);
2296  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2297  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2298  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2299  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2300  }
2301  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2302  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2303  ServiceRegistry::Operate operateLooper(serviceToken_);
2304  processEventWithLooper(*pep, iStreamIndex);
2305  }) | then([pep](auto nextTask) {
2306  FDEBUG(1) << "\tprocessEvent\n";
2307  pep->clearEventPrincipal();
2308  }) | runLast(iHolder);
2309  }
2310 
2311  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
2312  bool randomAccess = input_->randomAccess();
2313  ProcessingController::ForwardState forwardState = input_->forwardState();
2314  ProcessingController::ReverseState reverseState = input_->reverseState();
2315  ProcessingController pc(forwardState, reverseState, randomAccess);
2316 
2318  do {
2319  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2320  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2321  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2322 
2323  bool succeeded = true;
2324  if (randomAccess) {
2326  input_->skipEvents(-2);
2328  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2329  }
2330  }
2332  } while (!pc.lastOperationSucceeded());
2334  shouldWeStop_ = true;
2335  }
2336  }
2337 
2339  FDEBUG(1) << "\tshouldWeStop\n";
2340  if (shouldWeStop_)
2341  return true;
2342  if (!subProcesses_.empty()) {
2343  for (auto const& subProcess : subProcesses_) {
2344  if (subProcess.terminate()) {
2345  return true;
2346  }
2347  }
2348  return false;
2349  }
2350  return schedule_->terminate();
2351  }
2352 
2354 
2356 
2358 
2359  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2360  bool expected = false;
2361  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2362  deferredExceptionPtr_ = iException;
2363  return true;
2364  }
2365  return false;
2366  }
2367 
2369  cms::Exception ex("ModulesSynchingOnLumis");
2370  ex << "The framework is configured to use at least two streams, but the following modules\n"
2371  << "require synchronizing on LuminosityBlock boundaries:";
2372  bool found = false;
2373  for (auto worker : schedule_->allWorkers()) {
2374  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2375  found = true;
2376  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2377  }
2378  }
2379  if (found) {
2380  ex << "\n\nThe situation can be fixed by either\n"
2381  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2382  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2383  throw ex;
2384  }
2385  }
2386 
2388  std::unique_ptr<LogSystem> s;
2389  for (auto worker : schedule_->allWorkers()) {
2390  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2391  if (not s) {
2392  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2393  (*s) << "The following modules require synchronizing on Run boundaries:";
2394  }
2395  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2396  }
2397  }
2398  }
2399 
2401  std::unique_ptr<LogSystem> s;
2402  for (auto worker : schedule_->allWorkers()) {
2403  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2404  if (not s) {
2405  s = std::make_unique<LogSystem>("LegacyModules");
2406  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2407  "is going to end soon. These modules need to be converted to have type\n"
2408  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2409  }
2410  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2411  }
2412  }
2413  }
2414 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
size
Write out results.
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:390
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void clearRunPrincipal(RunProcessingStatus &)
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< EDLooperBase const > looper() const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
InputSource::ItemType lastTransitionType() const
std::shared_ptr< RunPrincipal > & runPrincipal()
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
std::atomic< bool > exceptionMessageRuns_
void mergeAuxiliary(RunAuxiliary const &aux)
Definition: RunPrincipal.h:73
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:50
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
std::multimap< std::string, std::string > referencesToBranches_
fileMode
Definition: DMR_cfg.py:72
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
static ServiceRegistry & instance()
void clear()
Not thread safe.
Definition: Registry.cc:40
StatusCode runToCompletion()
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
virtual void endOfJob()
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void releaseBeginRunResources(unsigned int iStream)
void writeLumi(LuminosityBlockNumber_t lumi)
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
void readAndMergeRun(RunProcessingStatus &)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:798
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
EventSetupImpl const & eventSetupImpl(unsigned subProcessIndex) const
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemType processRuns()
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
std::vector< std::shared_ptr< const EventSetupImpl > > & eventSetupImpls()
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:319
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
Transition requestedTransition() const
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
Log< level::System, true > LogAbsolute
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void setEventProcessingState(EventProcessingState val)
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)