CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
EventProcessor.cc
Go to the documentation of this file.
8 
42 
44 
52 
57 
61 
74 
75 #include "MessageForSource.h"
76 #include "MessageForParent.h"
78 
79 #include "boost/range/adaptor/reversed.hpp"
80 
81 #include <cassert>
82 #include <exception>
83 #include <iomanip>
84 #include <iostream>
85 #include <utility>
86 #include <sstream>
87 
88 #include <sys/ipc.h>
89 #include <sys/msg.h>
90 
91 #include "oneapi/tbb/task.h"
92 
93 //Used for CPU affinity
94 #ifndef __APPLE__
95 #include <sched.h>
96 #endif
97 
98 namespace {
99  //Sentry class to only send a signal if an
100  // exception occurs. An exception is identified
101  // by the destructor being called without first
102  // calling completedSuccessfully().
103  class SendSourceTerminationSignalIfException {
104  public:
105  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
106  ~SendSourceTerminationSignalIfException() {
107  if (reg_) {
108  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
109  }
110  }
111  void completedSuccessfully() { reg_ = nullptr; }
112 
113  private:
114  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
115  };
116 } // namespace
117 
118 namespace edm {
119 
120  namespace chain = waiting_task::chain;
121 
122  // ---------------------------------------------------------------
123  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
124  CommonParams const& common,
125  std::shared_ptr<ProductRegistry> preg,
126  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
127  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
128  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
129  std::shared_ptr<ActivityRegistry> areg,
130  std::shared_ptr<ProcessConfiguration const> processConfiguration,
131  PreallocationConfiguration const& allocations) {
132  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
133  if (main_input == nullptr) {
135  << "There must be exactly one source in the configuration.\n"
136  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
137  }
138 
139  std::string modtype(main_input->getParameter<std::string>("@module_type"));
140 
141  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
143  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
144  filler->fill(descriptions);
145 
146  try {
147  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
148  } catch (cms::Exception& iException) {
149  std::ostringstream ost;
150  ost << "Validating configuration of input source of type " << modtype;
151  iException.addContext(ost.str());
152  throw;
153  }
154 
155  main_input->registerIt();
156 
157  // Fill in "ModuleDescription", in case the input source produces
158  // any EDProducts, which would be registered in the ProductRegistry.
159  // Also fill in the process history item for this process.
160  // There is no module label for the unnamed input source, so
161  // just use "source".
162  // Only the tracked parameters belong in the process configuration.
163  ModuleDescription md(main_input->id(),
164  main_input->getParameter<std::string>("@module_type"),
165  "source",
166  processConfiguration.get(),
168 
169  InputSourceDescription isdesc(md,
170  preg,
171  branchIDListHelper,
172  processBlockHelper,
173  thinnedAssociationsHelper,
174  areg,
175  common.maxEventsInput_,
176  common.maxLumisInput_,
178  allocations);
179 
180  areg->preSourceConstructionSignal_(md);
181  std::unique_ptr<InputSource> input;
182  try {
183  //even if we have an exception, send the signal
184  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
185  convertException::wrap([&]() {
186  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
187  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
188  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
189  });
190  } catch (cms::Exception& iException) {
191  std::ostringstream ost;
192  ost << "Constructing input source of type " << modtype;
193  iException.addContext(ost.str());
194  throw;
195  }
196  return input;
197  }
198 
199  // ---------------------------------------------------------------
201  auto const modtype = pset.getParameter<std::string>("@module_type");
202  auto const moduleLabel = pset.getParameter<std::string>("@module_label");
203  auto filler = ParameterSetDescriptionFillerPluginFactory::get()->create(modtype);
204  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
205  filler->fill(descriptions);
206  try {
207  edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
208  } catch (cms::Exception& iException) {
209  iException.addContext(
210  fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
211  throw;
212  }
213  }
214 
215  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
218  std::vector<std::string> const& loopers) {
219  std::shared_ptr<EDLooperBase> vLooper;
220 
221  assert(1 == loopers.size());
222 
223  for (auto const& looperName : loopers) {
224  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
225  validateLooper(*providerPSet);
226  providerPSet->registerIt();
227  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
228  }
229  return vLooper;
230  }
231 
232  // ---------------------------------------------------------------
233  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
234  ServiceToken const& iToken,
236  std::vector<std::string> const& defaultServices,
237  std::vector<std::string> const& forcedServices)
238  : actReg_(),
239  preg_(),
240  branchIDListHelper_(),
241  serviceToken_(),
242  input_(),
243  espController_(new eventsetup::EventSetupsController),
244  esp_(),
245  act_table_(),
246  processConfiguration_(),
247  schedule_(),
248  subProcesses_(),
249  historyAppender_(new HistoryAppender),
250  fb_(),
251  looper_(),
252  deferredExceptionPtrIsSet_(false),
253  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
254  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
255  principalCache_(),
256  beginJobCalled_(false),
257  shouldWeStop_(false),
258  fileModeNoMerge_(false),
259  exceptionMessageFiles_(),
260  exceptionMessageRuns_(),
261  exceptionMessageLumis_(false),
262  forceLooperToEnd_(false),
263  looperBeginJobRun_(false),
264  forceESCacheClearOnNewRun_(false),
265  eventSetupDataToExcludeFromPrefetching_() {
266  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
267  processDesc->addServices(defaultServices, forcedServices);
268  init(processDesc, iToken, iLegacy);
269  }
270 
271  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
272  std::vector<std::string> const& defaultServices,
273  std::vector<std::string> const& forcedServices)
274  : actReg_(),
275  preg_(),
276  branchIDListHelper_(),
277  serviceToken_(),
278  input_(),
279  espController_(new eventsetup::EventSetupsController),
280  esp_(),
281  act_table_(),
282  processConfiguration_(),
283  schedule_(),
284  subProcesses_(),
285  historyAppender_(new HistoryAppender),
286  fb_(),
287  looper_(),
288  deferredExceptionPtrIsSet_(false),
289  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
290  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
291  principalCache_(),
292  beginJobCalled_(false),
293  shouldWeStop_(false),
294  fileModeNoMerge_(false),
295  exceptionMessageFiles_(),
296  exceptionMessageRuns_(),
297  exceptionMessageLumis_(false),
298  forceLooperToEnd_(false),
299  looperBeginJobRun_(false),
300  forceESCacheClearOnNewRun_(false),
301  asyncStopRequestedWhileProcessingEvents_(false),
302  eventSetupDataToExcludeFromPrefetching_() {
303  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
304  processDesc->addServices(defaultServices, forcedServices);
306  }
307 
308  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
309  ServiceToken const& token,
311  : actReg_(),
312  preg_(),
313  branchIDListHelper_(),
314  serviceToken_(),
315  input_(),
316  espController_(new eventsetup::EventSetupsController),
317  esp_(),
318  act_table_(),
319  processConfiguration_(),
320  schedule_(),
321  subProcesses_(),
322  historyAppender_(new HistoryAppender),
323  fb_(),
324  looper_(),
325  deferredExceptionPtrIsSet_(false),
326  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
327  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
328  principalCache_(),
329  beginJobCalled_(false),
330  shouldWeStop_(false),
331  fileModeNoMerge_(false),
332  exceptionMessageFiles_(),
333  exceptionMessageRuns_(),
334  exceptionMessageLumis_(false),
335  forceLooperToEnd_(false),
336  looperBeginJobRun_(false),
337  forceESCacheClearOnNewRun_(false),
338  asyncStopRequestedWhileProcessingEvents_(false),
339  eventSetupDataToExcludeFromPrefetching_() {
340  init(processDesc, token, legacy);
341  }
342 
343  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
344  ServiceToken const& iToken,
346  //std::cerr << processDesc->dump() << std::endl;
347 
348  // register the empty parentage vector , once and for all
350 
351  // register the empty parameter set, once and for all.
353 
354  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
355 
356  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
357  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
358  bool const hasSubProcesses = !subProcessVParameterSet.empty();
359 
360  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
361  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
362  // set in here if the parameters were not explicitly set.
363  validateTopLevelParameterSets(parameterSet.get());
364 
365  // Now set some parameters specific to the main process.
366  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
367  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
368  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
369  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
370  << fileMode << ".\n"
371  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
372  } else {
373  fileModeNoMerge_ = (fileMode == "NOMERGE");
374  }
375  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
376  ensureAvailableAccelerators(*parameterSet);
377 
378  //threading
379  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
380 
381  // Even if numberOfThreads was set to zero in the Python configuration, the code
382  // in cmsRun.cpp should have reset it to something else.
383  assert(nThreads != 0);
384 
385  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
386  if (nStreams == 0) {
387  nStreams = nThreads;
388  }
389  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
390  if (nConcurrentRuns != 1) {
391  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
392  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
393  }
394  unsigned int nConcurrentLumis =
395  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
396  if (nConcurrentLumis == 0) {
397  nConcurrentLumis = 2;
398  }
399  if (nConcurrentLumis > nStreams) {
400  nConcurrentLumis = nStreams;
401  }
402  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
403  if (!loopers.empty()) {
404  //For now loopers make us run only 1 transition at a time
405  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
406  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
407  "of concurrent runs, and the number of concurrent lumis "
408  "are all being reset to 1. Loopers cannot currently support "
409  "values greater than 1.";
410  nStreams = 1;
411  nConcurrentLumis = 1;
412  nConcurrentRuns = 1;
413  }
414  }
415  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
416  if (dumpOptions) {
417  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
418  } else {
419  if (nThreads > 1 or nStreams > 1) {
420  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
421  }
422  }
423  // The number of concurrent IOVs is configured individually for each record in
424  // the class NumberOfConcurrentIOVs to values less than or equal to this.
425  unsigned int maxConcurrentIOVs = nConcurrentLumis;
426 
427  //Check that relationships between threading parameters makes sense
428  /*
429  if(nThreads<nStreams) {
430  //bad
431  }
432  if(nConcurrentRuns>nStreams) {
433  //bad
434  }
435  if(nConcurrentRuns>nConcurrentLumis) {
436  //bad
437  }
438  */
439  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
440 
441  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
443  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
444  //for now, if have a subProcess, don't allow early delete
445  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
446  if (not hasSubProcesses) {
447  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
448  }
449 
450  // Now do general initialization
452 
453  //initialize the services
454  auto& serviceSets = processDesc->getServicesPSets();
455  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
456  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
457 
458  //make the services available
460 
461  if (nStreams > 1) {
463  handler->willBeUsingThreads();
464  }
465 
466  // intialize miscellaneous items
467  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
468 
469  // intialize the event setup provider
470  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
471  esp_ = espController_->makeProvider(
472  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
473 
474  // initialize the looper, if any
475  if (!loopers.empty()) {
476  looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
477  looper_->setActionTable(items.act_table_.get());
478  looper_->attachTo(*items.actReg_);
479 
480  // in presence of looper do not delete modules
481  deleteNonConsumedUnscheduledModules_ = false;
482  }
483 
484  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
485 
486  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
487  streamQueues_.resize(nStreams);
488  streamLumiStatus_.resize(nStreams);
489 
490  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
491 
492  // initialize the input source
493  input_ = makeInput(*parameterSet,
494  *common,
495  items.preg(),
496  items.branchIDListHelper(),
499  items.actReg_,
500  items.processConfiguration(),
502 
503  // initialize the Schedule
504  schedule_ =
505  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
506 
507  // set the data members
509  actReg_ = items.actReg_;
510  preg_ = items.preg();
516  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
517 
518  FDEBUG(2) << parameterSet << std::endl;
519 
521  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
522  // Reusable event principal
523  auto ep = std::make_shared<EventPrincipal>(preg(),
527  historyAppender_.get(),
528  index,
529  true /*primary process*/,
532  }
533 
534  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
535  auto lp =
536  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
538  }
539 
540  {
541  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
543 
544  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
546  }
547 
548  // fill the subprocesses, if there are any
549  subProcesses_.reserve(subProcessVParameterSet.size());
550  for (auto& subProcessPSet : subProcessVParameterSet) {
551  subProcesses_.emplace_back(subProcessPSet,
552  *parameterSet,
553  preg(),
559  *actReg_,
560  token,
563  &processContext_);
564  }
565  }
566 
568  // Make the services available while everything is being deleted.
570  ServiceRegistry::Operate op(token);
571 
572  // manually destroy all these thing that may need the services around
573  // propagate_const<T> has no reset() function
574  espController_ = nullptr;
575  esp_ = nullptr;
576  schedule_ = nullptr;
577  input_ = nullptr;
578  looper_ = nullptr;
579  actReg_ = nullptr;
580 
583  }
584 
587  espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
588  taskGroup_.wait();
589  assert(task.done());
590  }
591 
593  if (beginJobCalled_)
594  return;
595  beginJobCalled_ = true;
596  bk::beginJob();
597 
598  // StateSentry toerror(this); // should we add this ?
599  //make the services available
601 
606  actReg_->preallocateSignal_(bounds);
607  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
609 
610  std::vector<ModuleProcessName> consumedBySubProcesses;
612  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
613  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
614  if (consumedBySubProcesses.empty()) {
615  consumedBySubProcesses = std::move(c);
616  } else if (not c.empty()) {
617  std::vector<ModuleProcessName> tmp;
618  tmp.reserve(consumedBySubProcesses.size() + c.size());
619  std::merge(consumedBySubProcesses.begin(),
620  consumedBySubProcesses.end(),
621  c.begin(),
622  c.end(),
623  std::back_inserter(tmp));
624  std::swap(consumedBySubProcesses, tmp);
625  }
626  });
627 
628  // Note: all these may throw
631  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
632  not unusedModules.empty()) {
634 
635  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
636  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
637  "and "
638  "therefore they are deleted before beginJob transition.";
639  for (auto const& description : unusedModules) {
640  l << "\n " << description->moduleLabel();
641  }
642  });
643  for (auto const& description : unusedModules) {
644  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
645  }
646  }
647  }
648  // Initialize after the deletion of non-consumed unscheduled
649  // modules to avoid non-consumed non-run modules to keep the
650  // products unnecessarily alive
651  if (not branchesToDeleteEarly_.empty()) {
652  schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
654  }
655 
656  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
657 
660  }
662 
663  //NOTE: This implementation assumes 'Job' means one call
664  // the EventProcessor::run
665  // If it really means once per 'application' then this code will
666  // have to be changed.
667  // Also have to deal with case where have 'run' then new Module
668  // added and do 'run'
669  // again. In that case the newly added Module needs its 'beginJob'
670  // to be called.
671 
672  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
673  // For now we delay calling beginOfJob until first beginOfRun
674  //if(looper_) {
675  // looper_->beginOfJob(es);
676  //}
677  try {
678  convertException::wrap([&]() { input_->doBeginJob(); });
679  } catch (cms::Exception& ex) {
680  ex.addContext("Calling beginJob for the source");
681  throw;
682  }
683  espController_->finishConfiguration();
684  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
685  if (looper_) {
686  constexpr bool mustPrefetchMayGet = true;
687  auto const processBlockLookup = preg_->productLookup(InProcess);
688  auto const runLookup = preg_->productLookup(InRun);
689  auto const lumiLookup = preg_->productLookup(InLumi);
690  auto const eventLookup = preg_->productLookup(InEvent);
691  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
692  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
693  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
694  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
695  looper_->updateLookup(esp_->recordsToProxyIndices());
696  }
697  // toerror.succeeded(); // should we add this?
698  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
699  actReg_->postBeginJobSignal_();
700 
702  oneapi::tbb::task_group group;
703  using namespace edm::waiting_task::chain;
704  first([this](auto nextTask) {
705  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
706  first([i, this](auto nextTask) {
708  schedule_->beginStream(i);
709  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
711  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
712  }) | lastTask(nextTask);
713  }
714  }) | runLast(WaitingTaskHolder(group, &last));
715  group.wait();
716  if (last.exceptionPtr()) {
717  std::rethrow_exception(*last.exceptionPtr());
718  }
719  }
720 
722  // Collects exceptions, so we don't throw before all operations are performed.
724  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
725 
726  //make the services available
728 
729  using namespace edm::waiting_task::chain;
730 
731  edm::FinalWaitingTask waitTask;
732  oneapi::tbb::task_group group;
733 
734  {
735  //handle endStream transitions
736  edm::WaitingTaskHolder taskHolder(group, &waitTask);
737  std::mutex collectorMutex;
738  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
739  first([this, i, &c, &collectorMutex](auto nextTask) {
740  std::exception_ptr ep;
741  try {
743  this->schedule_->endStream(i);
744  } catch (...) {
745  ep = std::current_exception();
746  }
747  if (ep) {
748  std::lock_guard<std::mutex> l(collectorMutex);
749  c.call([&ep]() { std::rethrow_exception(ep); });
750  }
751  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
752  for (auto& subProcess : subProcesses_) {
753  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
754  std::exception_ptr ep;
755  try {
757  subProcess.doEndStream(i);
758  } catch (...) {
759  ep = std::current_exception();
760  }
761  if (ep) {
762  std::lock_guard<std::mutex> l(collectorMutex);
763  c.call([&ep]() { std::rethrow_exception(ep); });
764  }
765  }) | lastTask(nextTask);
766  }
767  }) | lastTask(taskHolder);
768  }
769  }
770  group.wait();
771 
772  auto actReg = actReg_.get();
773  c.call([actReg]() { actReg->preEndJobSignal_(); });
774  schedule_->endJob(c);
775  for (auto& subProcess : subProcesses_) {
776  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
777  }
778  c.call(std::bind(&InputSource::doEndJob, input_.get()));
779  if (looper_) {
780  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
781  }
782  c.call([actReg]() { actReg->postEndJobSignal_(); });
783  if (c.hasThrown()) {
784  c.rethrow();
785  }
786  }
787 
789 
790  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
791  return schedule_->getAllModuleDescriptions();
792  }
793 
794  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
795 
796  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
797 
798  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
799 
800  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
801 
802  namespace {
803 #include "TransitionProcessors.icc"
804  }
805 
807  bool returnValue = false;
808 
809  // Look for a shutdown signal
810  if (shutdown_flag.load(std::memory_order_acquire)) {
811  returnValue = true;
812  returnCode = epSignal;
813  }
814  return returnValue;
815  }
816 
818  if (deferredExceptionPtrIsSet_.load()) {
820  return InputSource::IsStop;
821  }
822 
823  SendSourceTerminationSignalIfException sentry(actReg_.get());
824  InputSource::ItemType itemType;
825  //For now, do nothing with InputSource::IsSynchronize
826  do {
827  itemType = input_->nextItemType();
828  } while (itemType == InputSource::IsSynchronize);
829 
830  lastSourceTransition_ = itemType;
831  sentry.completedSuccessfully();
832 
834 
835  if (checkForAsyncStopRequest(returnCode)) {
836  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
838  }
839 
840  return lastSourceTransition_;
841  }
842 
843  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
844  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
845  }
846 
848 
852  {
853  beginJob(); //make sure this was called
854 
855  // make the services available
857 
859  try {
860  FilesProcessor fp(fileModeNoMerge_);
861 
862  convertException::wrap([&]() {
863  bool firstTime = true;
864  do {
865  if (not firstTime) {
867  rewindInput();
868  } else {
869  firstTime = false;
870  }
871  startingNewLoop();
872 
873  auto trans = fp.processFiles(*this);
874 
875  fp.normalEnd();
876 
877  if (deferredExceptionPtrIsSet_.load()) {
878  std::rethrow_exception(deferredExceptionPtr_);
879  }
880  if (trans != InputSource::IsStop) {
881  //problem with the source
882  doErrorStuff();
883 
884  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
885  }
886  } while (not endOfLoop());
887  }); // convertException::wrap
888 
889  } // Try block
890  catch (cms::Exception& e) {
892  std::string message(
893  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
894  e.addAdditionalInfo(message);
895  if (e.alreadyPrinted()) {
896  LogAbsolute("Additional Exceptions") << message;
897  }
898  }
899  if (!exceptionMessageRuns_.empty()) {
901  if (e.alreadyPrinted()) {
902  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
903  }
904  }
905  if (!exceptionMessageFiles_.empty()) {
907  if (e.alreadyPrinted()) {
908  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
909  }
910  }
911  throw;
912  }
913  }
914 
915  return returnCode;
916  }
917 
919  FDEBUG(1) << " \treadFile\n";
920  size_t size = preg_->size();
921  SendSourceTerminationSignalIfException sentry(actReg_.get());
922 
924 
925  fb_ = input_->readFile();
926  if (size < preg_->size()) {
928  }
931  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
932  }
933  sentry.completedSuccessfully();
934  }
935 
936  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
937  if (fileBlockValid()) {
938  SendSourceTerminationSignalIfException sentry(actReg_.get());
939  input_->closeFile(fb_.get(), cleaningUpAfterException);
940  sentry.completedSuccessfully();
941  }
942  FDEBUG(1) << "\tcloseInputFile\n";
943  }
944 
946  if (fileBlockValid()) {
947  schedule_->openOutputFiles(*fb_);
948  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
949  }
950  FDEBUG(1) << "\topenOutputFiles\n";
951  }
952 
954  schedule_->closeOutputFiles();
955  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
956  processBlockHelper_->clearAfterOutputFilesClose();
957  FDEBUG(1) << "\tcloseOutputFiles\n";
958  }
959 
961  if (fileBlockValid()) {
963  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
964  schedule_->respondToOpenInputFile(*fb_);
965  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
966  }
967  FDEBUG(1) << "\trespondToOpenInputFile\n";
968  }
969 
971  if (fileBlockValid()) {
972  schedule_->respondToCloseInputFile(*fb_);
973  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
974  }
975  FDEBUG(1) << "\trespondToCloseInputFile\n";
976  }
977 
979  shouldWeStop_ = false;
980  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
981  // until after we've called beginOfJob
982  if (looper_ && looperBeginJobRun_) {
983  looper_->doStartingNewLoop();
984  }
985  FDEBUG(1) << "\tstartingNewLoop\n";
986  }
987 
989  if (looper_) {
990  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
991  looper_->setModuleChanger(&changer);
992  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
993  looper_->setModuleChanger(nullptr);
995  return true;
996  else
997  return false;
998  }
999  FDEBUG(1) << "\tendOfLoop\n";
1000  return true;
1001  }
1002 
1004  input_->repeat();
1005  input_->rewind();
1006  FDEBUG(1) << "\trewind\n";
1007  }
1008 
1010  looper_->prepareForNextLoop(esp_.get());
1011  FDEBUG(1) << "\tprepareForNextLoop\n";
1012  }
1013 
1015  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1016  if (!subProcesses_.empty()) {
1017  for (auto const& subProcess : subProcesses_) {
1018  if (subProcess.shouldWeCloseOutput()) {
1019  return true;
1020  }
1021  }
1022  return false;
1023  }
1024  return schedule_->shouldWeCloseOutput();
1025  }
1026 
1028  FDEBUG(1) << "\tdoErrorStuff\n";
1029  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1030  << "and went to the error state\n"
1031  << "Will attempt to terminate processing normally\n"
1032  << "(IF using the looper the next loop will be attempted)\n"
1033  << "This likely indicates a bug in an input module or corrupted input or both\n";
1034  }
1035 
1036  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1037  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1038  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1039 
1041  FinalWaitingTask globalWaitTask;
1042 
1043  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1044  beginGlobalTransitionAsync<Traits>(
1045  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1046 
1047  do {
1048  taskGroup_.wait();
1049  } while (not globalWaitTask.done());
1050 
1051  if (globalWaitTask.exceptionPtr() != nullptr) {
1052  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1053  }
1054  beginProcessBlockSucceeded = true;
1055  }
1056 
1058  input_->fillProcessBlockHelper();
1060  while (input_->nextProcessBlock(processBlockPrincipal)) {
1061  readProcessBlock(processBlockPrincipal);
1062 
1064  FinalWaitingTask globalWaitTask;
1065 
1066  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1067  beginGlobalTransitionAsync<Traits>(
1068  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1069 
1070  do {
1071  taskGroup_.wait();
1072  } while (not globalWaitTask.done());
1073  if (globalWaitTask.exceptionPtr() != nullptr) {
1074  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1075  }
1076 
1077  FinalWaitingTask writeWaitTask;
1079  do {
1080  taskGroup_.wait();
1081  } while (not writeWaitTask.done());
1082  if (writeWaitTask.exceptionPtr()) {
1083  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1084  }
1085 
1086  processBlockPrincipal.clearPrincipal();
1087  for (auto& s : subProcesses_) {
1088  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1089  }
1090  }
1091  }
1092 
1093  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1094  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1095 
1097  FinalWaitingTask globalWaitTask;
1098 
1099  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1100  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1101  *schedule_,
1102  transitionInfo,
1103  serviceToken_,
1104  subProcesses_,
1105  cleaningUpAfterException);
1106  do {
1107  taskGroup_.wait();
1108  } while (not globalWaitTask.done());
1109  if (globalWaitTask.exceptionPtr() != nullptr) {
1110  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1111  }
1112 
1113  if (beginProcessBlockSucceeded) {
1114  FinalWaitingTask writeWaitTask;
1116  do {
1117  taskGroup_.wait();
1118  } while (not writeWaitTask.done());
1119  if (writeWaitTask.exceptionPtr()) {
1120  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1121  }
1122  }
1123 
1124  processBlockPrincipal.clearPrincipal();
1125  for (auto& s : subProcesses_) {
1126  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1127  }
1128  }
1129 
1131  RunNumber_t run,
1132  bool& globalBeginSucceeded,
1133  bool& eventSetupForInstanceSucceeded) {
1134  globalBeginSucceeded = false;
1135  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1136  {
1137  SendSourceTerminationSignalIfException sentry(actReg_.get());
1138 
1139  input_->doBeginRun(runPrincipal, &processContext_);
1140  sentry.completedSuccessfully();
1141  }
1142 
1143  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1145  espController_->forceCacheClear();
1146  }
1147  {
1148  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150  eventSetupForInstanceSucceeded = true;
1151  sentry.completedSuccessfully();
1152  }
1153  auto const& es = esp_->eventSetupImpl();
1154  if (looper_ && looperBeginJobRun_ == false) {
1155  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1156 
1157  FinalWaitingTask waitTask;
1158  using namespace edm::waiting_task::chain;
1159  chain::first([this, &es](auto nextTask) {
1160  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1161  }) | then([this, &es](auto nextTask) mutable {
1162  looper_->beginOfJob(es);
1163  looperBeginJobRun_ = true;
1164  looper_->doStartingNewLoop();
1165  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1166 
1167  do {
1168  taskGroup_.wait();
1169  } while (not waitTask.done());
1170  if (waitTask.exceptionPtr() != nullptr) {
1171  std::rethrow_exception(*(waitTask.exceptionPtr()));
1172  }
1173  }
1174  {
1176  FinalWaitingTask globalWaitTask;
1177 
1178  using namespace edm::waiting_task::chain;
1179  chain::first([&runPrincipal, &es, this](auto waitTask) {
1180  RunTransitionInfo transitionInfo(runPrincipal, es);
1181  beginGlobalTransitionAsync<Traits>(
1182  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1183  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1184  globalBeginSucceeded = true;
1185  FDEBUG(1) << "\tbeginRun " << run << "\n";
1186  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1187  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1188  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1189  looper_->doBeginRun(runPrincipal, es, &processContext_);
1190  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1191 
1192  do {
1193  taskGroup_.wait();
1194  } while (not globalWaitTask.done());
1195  if (globalWaitTask.exceptionPtr() != nullptr) {
1196  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1197  }
1198  }
1199  {
1200  //To wait, the ref count has to be 1+#streams
1201  FinalWaitingTask streamLoopWaitTask;
1202 
1204 
1205  RunTransitionInfo transitionInfo(runPrincipal, es);
1206  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1207  *schedule_,
1209  transitionInfo,
1210  serviceToken_,
1211  subProcesses_);
1212  do {
1213  taskGroup_.wait();
1214  } while (not streamLoopWaitTask.done());
1215  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1216  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1217  }
1218  }
1219  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1220  if (looper_) {
1221  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1222  }
1223  }
1224 
1226  RunNumber_t run,
1227  bool globalBeginSucceeded,
1228  bool cleaningUpAfterException,
1229  bool eventSetupForInstanceSucceeded) {
1230  if (eventSetupForInstanceSucceeded) {
1231  //If we skip empty runs, this would be called conditionally
1232  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1233 
1234  if (globalBeginSucceeded) {
1236  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1237  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1238  mergeableRunProductMetadata->preWriteRun();
1239  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1240  do {
1241  taskGroup_.wait();
1242  } while (not t.done());
1243  mergeableRunProductMetadata->postWriteRun();
1244  if (t.exceptionPtr()) {
1245  std::rethrow_exception(*t.exceptionPtr());
1246  }
1247  }
1248  }
1249  deleteRunFromCache(phid, run);
1250  }
1251 
1253  RunNumber_t run,
1254  bool globalBeginSucceeded,
1255  bool cleaningUpAfterException) {
1256  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1257  runPrincipal.setEndTime(input_->timestamp());
1258 
1259  IOVSyncValue ts(
1261  runPrincipal.endTime());
1262  {
1263  SendSourceTerminationSignalIfException sentry(actReg_.get());
1265  sentry.completedSuccessfully();
1266  }
1267  auto const& es = esp_->eventSetupImpl();
1268  if (globalBeginSucceeded) {
1269  //To wait, the ref count has to be 1+#streams
1270  FinalWaitingTask streamLoopWaitTask;
1271 
1273 
1274  RunTransitionInfo transitionInfo(runPrincipal, es);
1275  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1276  *schedule_,
1278  transitionInfo,
1279  serviceToken_,
1280  subProcesses_,
1281  cleaningUpAfterException);
1282  do {
1283  taskGroup_.wait();
1284  } while (not streamLoopWaitTask.done());
1285  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1286  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1287  }
1288  }
1289  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1290  if (looper_) {
1291  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1292  }
1293  {
1294  FinalWaitingTask globalWaitTask;
1295 
1296  using namespace edm::waiting_task::chain;
1297  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1298  RunTransitionInfo transitionInfo(runPrincipal, es);
1300  endGlobalTransitionAsync<Traits>(
1301  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1302  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1303  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1304  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1305  looper_->doEndRun(runPrincipal, es, &processContext_);
1306  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1307 
1308  do {
1309  taskGroup_.wait();
1310  } while (not globalWaitTask.done());
1311  if (globalWaitTask.exceptionPtr() != nullptr) {
1312  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1313  }
1314  }
1315  FDEBUG(1) << "\tendRun " << run << "\n";
1316  }
1317 
1318  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1319  FinalWaitingTask waitTask;
1320  if (streamLumiActive_ > 0) {
1322  // Continue after opening a new input file
1324  } else {
1325  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1326  input_->luminosityBlockAuxiliary()->beginTime()),
1327  iRunResource,
1328  WaitingTaskHolder{taskGroup_, &waitTask});
1329  }
1330  do {
1331  taskGroup_.wait();
1332  } while (not waitTask.done());
1333 
1334  if (waitTask.exceptionPtr() != nullptr) {
1335  std::rethrow_exception(*(waitTask.exceptionPtr()));
1336  }
1337  return lastTransitionType();
1338  }
1339 
1341  std::shared_ptr<void> const& iRunResource,
1342  edm::WaitingTaskHolder iHolder) {
1343  if (iHolder.taskHasFailed()) {
1344  return;
1345  }
1346 
1347  // We must be careful with the status object here and in code this function calls. IF we want
1348  // endRun to be called, then we must call resetResources before the things waiting on
1349  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1350  // endRun to be called much later than it should be, because status is holding iRunResource).
1351  // Note that this must be done explicitly. Relying on the destructor does not work well
1352  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1353  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1354  // destroyed inside this function and lumiWork.
1355  auto status =
1356  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1357  chain::first([&](auto nextTask) {
1358  auto asyncEventSetup = [](ActivityRegistry* actReg,
1359  auto* espController,
1360  auto& queue,
1361  WaitingTaskHolder task,
1362  auto& status,
1363  IOVSyncValue const& iSync) {
1364  queue.pause();
1365  CMS_SA_ALLOW try {
1366  SendSourceTerminationSignalIfException sentry(actReg);
1367  // Pass in iSync to let the EventSetup system know which run and lumi
1368  // need to be processed and prepare IOVs for it.
1369  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1370  // lumi is done and no longer needs its EventSetup IOVs.
1371  espController->eventSetupForInstanceAsync(
1372  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1373  sentry.completedSuccessfully();
1374  } catch (...) {
1375  task.doneWaiting(std::current_exception());
1376  }
1377  };
1378  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1379  // We only get here inside this block if there is an EventSetup
1380  // module not able to handle concurrent IOVs (usually an ESSource)
1381  // and the new sync value is outside the current IOV of that module.
1382  auto group = nextTask.group();
1384  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1385  asyncEventSetup(
1387  });
1388  } else {
1389  asyncEventSetup(
1391  }
1392  }) | chain::then([this, status](std::exception_ptr const* iPtr, auto nextTask) {
1393  //the call to doneWaiting will cause the count to decrement
1394  auto copyTask = nextTask;
1395  if (iPtr) {
1396  nextTask.doneWaiting(*iPtr);
1397  }
1398  auto group = copyTask.group();
1399  lumiQueue_->pushAndPause(
1400  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1401  if (task.taskHasFailed()) {
1402  status->resetResources();
1403  return;
1404  }
1405 
1406  status->setResumer(std::move(iResumer));
1407 
1408  auto group = task.group();
1410  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1411  //make the services available
1413  // Caught exception is propagated via WaitingTaskHolder
1414  CMS_SA_ALLOW try {
1416 
1417  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1418  {
1419  SendSourceTerminationSignalIfException sentry(actReg_.get());
1420 
1421  input_->doBeginLumi(lumiPrincipal, &processContext_);
1422  sentry.completedSuccessfully();
1423  }
1424 
1426  if (rng.isAvailable()) {
1427  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1428  rng->preBeginLumi(lb);
1429  }
1430 
1431  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1432 
1433  using namespace edm::waiting_task::chain;
1434  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1435  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1436  {
1437  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1439  beginGlobalTransitionAsync<Traits>(
1440  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1441  }
1442  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1443  looper_->prefetchAsync(
1444  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1445  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1446  status->globalBeginDidSucceed();
1447  //make the services available
1448  ServiceRegistry::Operate operateLooper(serviceToken_);
1449  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1450  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1451  if (iPtr) {
1452  status->resetResources();
1453  holder.doneWaiting(*iPtr);
1454  } else {
1455  if (not looper_) {
1456  status->globalBeginDidSucceed();
1457  }
1458  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1460 
1461  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1462  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1463  streamQueues_[i].pause();
1464 
1465  auto& event = principalCache_.eventPrincipal(i);
1466  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1467  // held by the container as this lambda may not finish executing before all the tasks it
1468  // spawns have already started to run.
1469  auto eventSetupImpls = &status->eventSetupImpls();
1470  auto lp = status->lumiPrincipal().get();
1471  streamLumiStatus_[i] = std::move(status);
1473  event.setLuminosityBlockPrincipal(lp);
1474  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1475  using namespace edm::waiting_task::chain;
1476  chain::first([this, i, &transitionInfo](auto nextTask) {
1477  beginStreamTransitionAsync<Traits>(
1478  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1479  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1480  if (exceptionFromBeginStreamLumi) {
1481  WaitingTaskHolder tmp(nextTask);
1482  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1483  streamEndLumiAsync(nextTask, i);
1484  } else {
1486  }
1487  }) | runLast(holder);
1488  });
1489  }
1490  }
1491  }) | runLast(postQueueTask);
1492 
1493  } catch (...) {
1494  status->resetResources();
1495  postQueueTask.doneWaiting(std::current_exception());
1496  }
1497  }); // task in sourceResourcesAcquirer
1498  });
1499  }) | chain::runLast(std::move(iHolder));
1500  }
1501 
1503  {
1504  //all streams are sharing the same status at the moment
1505  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1506  status->needToContinueLumi();
1507  status->startProcessingEvents();
1508  }
1509 
1510  unsigned int streamIndex = 0;
1511  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1512  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1513  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1514  }
1515  iHolder.group()->run(
1516  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1517  }
1518 
1519  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1520  if (setDeferredException(*iPtr)) {
1521  WaitingTaskHolder tmp(holder);
1522  tmp.doneWaiting(*iPtr);
1523  } else {
1525  }
1526  }
1527 
1529  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1530  // Get some needed info out of the status object before moving
1531  // it into finalTaskForThisLumi.
1532  auto& lp = *(iLumiStatus->lumiPrincipal());
1533  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1534  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1535  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1536  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1537 
1538  using namespace edm::waiting_task::chain;
1539  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1540  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1541 
1542  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1544  endGlobalTransitionAsync<Traits>(
1545  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1546  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1547  //Only call writeLumi if beginLumi succeeded
1548  if (didGlobalBeginSucceed) {
1549  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1550  }
1551  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1552  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1553  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1554  //any thrown exception auto propagates to nextTask via the chain
1556  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1557  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1558  std::exception_ptr ptr;
1559  if (iPtr) {
1560  ptr = *iPtr;
1561  }
1563 
1564  // Try hard to clean up resources so the
1565  // process can terminate in a controlled
1566  // fashion even after exceptions have occurred.
1567  // Caught exception is passed to handleEndLumiExceptions()
1568  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1569  if (not ptr) {
1570  ptr = std::current_exception();
1571  }
1572  }
1573  // Caught exception is passed to handleEndLumiExceptions()
1574  CMS_SA_ALLOW try {
1575  status->resumeGlobalLumiQueue();
1577  } catch (...) {
1578  if (not ptr) {
1579  ptr = std::current_exception();
1580  }
1581  }
1582  // Caught exception is passed to handleEndLumiExceptions()
1583  CMS_SA_ALLOW try {
1584  // This call to status.resetResources() must occur before iTask is destroyed.
1585  // Otherwise there will be a data race which could result in endRun
1586  // being delayed until it is too late to successfully call it.
1587  status->resetResources();
1588  status.reset();
1589  } catch (...) {
1590  if (not ptr) {
1591  ptr = std::current_exception();
1592  }
1593  }
1594 
1595  if (ptr) {
1596  handleEndLumiExceptions(&ptr, nextTask);
1597  }
1598  }) | runLast(std::move(iTask));
1599  }
1600 
1601  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1602  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1603  if (iPtr) {
1604  handleEndLumiExceptions(iPtr, iTask);
1605  }
1606  auto status = streamLumiStatus_[iStreamIndex];
1607  //reset status before releasing queue else get race condtion
1608  streamLumiStatus_[iStreamIndex].reset();
1610  streamQueues_[iStreamIndex].resume();
1611 
1612  //are we the last one?
1613  if (status->streamFinishedLumi()) {
1615  }
1616  });
1617 
1618  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1619 
1620  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1621  // therefore we do not want to hold the shared_ptr
1622  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1623  lumiStatus->setEndTime();
1624 
1625  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1626 
1627  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1628  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1629 
1630  if (lumiStatus->didGlobalBeginSucceed()) {
1631  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1632  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1633  lumiPrincipal.endTime());
1635  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1636  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1637  *schedule_,
1638  iStreamIndex,
1639  transitionInfo,
1640  serviceToken_,
1641  subProcesses_,
1642  cleaningUpAfterException);
1643  }
1644  }
1645 
1647  if (streamLumiActive_.load() > 0) {
1648  FinalWaitingTask globalWaitTask;
1649  {
1650  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1651  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1652  if (streamLumiStatus_[i]) {
1653  streamEndLumiAsync(globalTaskHolder, i);
1654  }
1655  }
1656  }
1657  do {
1658  taskGroup_.wait();
1659  } while (not globalWaitTask.done());
1660  if (globalWaitTask.exceptionPtr() != nullptr) {
1661  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1662  }
1663  }
1664  }
1665 
1667  SendSourceTerminationSignalIfException sentry(actReg_.get());
1668  input_->readProcessBlock(processBlockPrincipal);
1669  sentry.completedSuccessfully();
1670  }
1671 
1672  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1674  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1675  << "Illegal attempt to insert run into cache\n"
1676  << "Contact a Framework Developer\n";
1677  }
1678  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1679  preg(),
1681  historyAppender_.get(),
1682  0,
1683  true,
1685  {
1686  SendSourceTerminationSignalIfException sentry(actReg_.get());
1687  input_->readRun(*rp, *historyAppender_);
1688  sentry.completedSuccessfully();
1689  }
1690  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1691  principalCache_.insert(rp);
1692  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1693  }
1694 
1695  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1696  principalCache_.merge(input_->runAuxiliary(), preg());
1697  auto runPrincipal = principalCache_.runPrincipalPtr();
1698  {
1699  SendSourceTerminationSignalIfException sentry(actReg_.get());
1700  input_->readAndMergeRun(*runPrincipal);
1701  sentry.completedSuccessfully();
1702  }
1703  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1704  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1705  }
1706 
1709  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1710  << "Illegal attempt to insert lumi into cache\n"
1711  << "Run is invalid\n"
1712  << "Contact a Framework Developer\n";
1713  }
1715  assert(lbp);
1716  lbp->setAux(*input_->luminosityBlockAuxiliary());
1717  {
1718  SendSourceTerminationSignalIfException sentry(actReg_.get());
1719  input_->readLuminosityBlock(*lbp, *historyAppender_);
1720  sentry.completedSuccessfully();
1721  }
1722  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1723  iStatus.lumiPrincipal() = std::move(lbp);
1724  }
1725 
1727  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1728  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1729  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1730  input_->processHistoryRegistry().reducedProcessHistoryID(
1731  input_->luminosityBlockAuxiliary()->processHistoryID()));
1732  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1733  assert(lumiOK);
1734  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1735  {
1736  SendSourceTerminationSignalIfException sentry(actReg_.get());
1737  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1738  sentry.completedSuccessfully();
1739  }
1740  return input_->luminosityBlock();
1741  }
1742 
1744  using namespace edm::waiting_task;
1745  chain::first([&](auto nextTask) {
1747  schedule_->writeProcessBlockAsync(
1748  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1749  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1751  for (auto& s : subProcesses_) {
1752  s.writeProcessBlockAsync(nextTask, processBlockType);
1753  }
1754  }) | chain::runLast(std::move(task));
1755  }
1756 
1758  ProcessHistoryID const& phid,
1759  RunNumber_t run,
1760  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1761  using namespace edm::waiting_task;
1762  chain::first([&](auto nextTask) {
1764  schedule_->writeRunAsync(nextTask,
1765  principalCache_.runPrincipal(phid, run),
1766  &processContext_,
1767  actReg_.get(),
1768  mergeableRunProductMetadata);
1769  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1771  for (auto& s : subProcesses_) {
1772  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1773  }
1774  }) | chain::runLast(std::move(task));
1775  }
1776 
1778  principalCache_.deleteRun(phid, run);
1779  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1780  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1781  }
1782 
1784  using namespace edm::waiting_task;
1785  if (not lumiPrincipal.willBeContinued()) {
1786  chain::first([&](auto nextTask) {
1788 
1789  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1790  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1791  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1793  for (auto& s : subProcesses_) {
1794  s.writeLumiAsync(nextTask, lumiPrincipal);
1795  }
1796  }) | chain::lastTask(std::move(task));
1797  }
1798  }
1799 
1801  for (auto& s : subProcesses_) {
1802  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1803  }
1804  iStatus.lumiPrincipal()->clearPrincipal();
1805  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1806  }
1807 
1809  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1810  iStatus.endLumi();
1811  return false;
1812  }
1813 
1814  if (iStatus.wasEventProcessingStopped()) {
1815  return false;
1816  }
1817 
1818  if (shouldWeStop()) {
1820  iStatus.stopProcessingEvents();
1821  iStatus.endLumi();
1822  return false;
1823  }
1824 
1826  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1827  CMS_SA_ALLOW try {
1828  //need to use lock in addition to the serial task queue because
1829  // of delayed provenance reading and reading data in response to
1830  // edm::Refs etc
1831  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1832 
1833  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1834  if (InputSource::IsLumi == itemType) {
1835  iStatus.haveContinuedLumi();
1836  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1837  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1838  readAndMergeLumi(iStatus);
1839  itemType = nextTransitionType();
1840  }
1841  if (InputSource::IsLumi == itemType) {
1842  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1843  input_->luminosityBlockAuxiliary()->beginTime()));
1844  }
1845  }
1846  if (InputSource::IsEvent != itemType) {
1847  iStatus.stopProcessingEvents();
1848 
1849  //IsFile may continue processing the lumi and
1850  // looper_ can cause the input source to declare a new IsRun which is actually
1851  // just a continuation of the previous run
1852  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1853  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1854  iStatus.endLumi();
1855  }
1856  return false;
1857  }
1858  readEvent(iStreamIndex);
1859  } catch (...) {
1860  bool expected = false;
1861  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1862  deferredExceptionPtr_ = std::current_exception();
1863  iStatus.endLumi();
1864  }
1865  return false;
1866  }
1867  return true;
1868  }
1869 
1870  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1871  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1873  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1874  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1875  auto status = streamLumiStatus_[iStreamIndex].get();
1876  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1877  CMS_SA_ALLOW try {
1878  if (readNextEventForStream(iStreamIndex, *status)) {
1879  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1880  if (iPtr) {
1881  // Try to end the stream properly even if an exception was
1882  // thrown on an event.
1883  bool expected = false;
1884  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1885  // This is the case where the exception in iPtr is the primary
1886  // exception and we want to see its message.
1887  deferredExceptionPtr_ = *iPtr;
1888  WaitingTaskHolder tempHolder(iTask);
1889  tempHolder.doneWaiting(*iPtr);
1890  }
1891  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1892  //the stream will stop now
1893  return;
1894  }
1895  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1896  });
1897 
1898  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1899  } else {
1900  //the stream will stop now
1901  if (status->isLumiEnding()) {
1902  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1903  status->startNextLumi();
1904  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1905  }
1906  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1907  } else {
1908  iTask.doneWaiting(std::exception_ptr{});
1909  }
1910  }
1911  } catch (...) {
1912  // It is unlikely we will ever get in here ...
1913  // But if we do try to clean up and propagate the exception
1914  if (streamLumiStatus_[iStreamIndex]) {
1915  streamEndLumiAsync(iTask, iStreamIndex);
1916  }
1917  bool expected = false;
1918  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1919  auto e = std::current_exception();
1921  iTask.doneWaiting(e);
1922  }
1923  }
1924  });
1925  }
1926 
1927  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1928  //TODO this will have to become per stream
1929  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1930  StreamContext streamContext(event.streamID(), &processContext_);
1931 
1932  SendSourceTerminationSignalIfException sentry(actReg_.get());
1933  input_->readEvent(event, streamContext);
1934 
1935  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1936  sentry.completedSuccessfully();
1937 
1938  FDEBUG(1) << "\treadEvent\n";
1939  }
1940 
1941  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1942  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1943  }
1944 
1945  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1946  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1947 
1950  if (rng.isAvailable()) {
1951  Event ev(*pep, ModuleDescription(), nullptr);
1952  rng->postEventRead(ev);
1953  }
1954 
1955  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1956  using namespace edm::waiting_task::chain;
1957  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1958  EventTransitionInfo info(*pep, es);
1959  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1960  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1961  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1962  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1963  }
1964  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1965  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1966  ServiceRegistry::Operate operateLooper(serviceToken_);
1967  processEventWithLooper(*pep, iStreamIndex);
1968  }) | then([pep](auto nextTask) {
1969  FDEBUG(1) << "\tprocessEvent\n";
1970  pep->clearEventPrincipal();
1971  }) | runLast(iHolder);
1972  }
1973 
1974  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1975  bool randomAccess = input_->randomAccess();
1976  ProcessingController::ForwardState forwardState = input_->forwardState();
1977  ProcessingController::ReverseState reverseState = input_->reverseState();
1978  ProcessingController pc(forwardState, reverseState, randomAccess);
1979 
1981  do {
1982  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1983  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1984  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1985 
1986  bool succeeded = true;
1987  if (randomAccess) {
1989  input_->skipEvents(-2);
1991  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1992  }
1993  }
1994  pc.setLastOperationSucceeded(succeeded);
1995  } while (!pc.lastOperationSucceeded());
1996  if (status != EDLooperBase::kContinue) {
1997  shouldWeStop_ = true;
1999  }
2000  }
2001 
2003  FDEBUG(1) << "\tshouldWeStop\n";
2004  if (shouldWeStop_)
2005  return true;
2006  if (!subProcesses_.empty()) {
2007  for (auto const& subProcess : subProcesses_) {
2008  if (subProcess.terminate()) {
2009  return true;
2010  }
2011  }
2012  return false;
2013  }
2014  return schedule_->terminate();
2015  }
2016 
2018 
2020 
2022 
2023  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2024  bool expected = false;
2025  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2026  deferredExceptionPtr_ = iException;
2027  return true;
2028  }
2029  return false;
2030  }
2031 
2033  cms::Exception ex("ModulesSynchingOnLumis");
2034  ex << "The framework is configured to use at least two streams, but the following modules\n"
2035  << "require synchronizing on LuminosityBlock boundaries:";
2036  bool found = false;
2037  for (auto worker : schedule_->allWorkers()) {
2038  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2039  found = true;
2040  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2041  }
2042  }
2043  if (found) {
2044  ex << "\n\nThe situation can be fixed by either\n"
2045  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2046  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2047  throw ex;
2048  }
2049  }
2050 
2052  std::unique_ptr<LogSystem> s;
2053  for (auto worker : schedule_->allWorkers()) {
2054  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2055  if (not s) {
2056  s = std::make_unique<LogSystem>("LegacyModules");
2057  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2058  "is going to end soon. These modules need to be converted to have type\n"
2059  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2060  }
2061  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2062  }
2063  }
2064  }
2065 } // namespace edm
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:77
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *, ProcessBlockHelperBase &processBlockHelper)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:382
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
const edm::EventSetup & c
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:82
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:63
static PFTauRenderPlugin instance
ParameterSetID id() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool willBeContinued() const
The source is replaying overlapping LuminosityBlocks and this is not the last part for this Lumiosity...
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
list status
Definition: mps_update.py:107
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
processConfiguration
Definition: Schedule.cc:687
bool hasRunPrincipal() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
RunNumber_t run() const
Definition: RunPrincipal.h:61
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
assert(be >=bs)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool done() const
Definition: WaitingTask.h:82
bool alreadyPrinted() const
Definition: Exception.cc:177
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:47
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
void warnAboutLegacyModules() const
static InputSourceFactory const * get()
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void setExceptionMessageRuns(std::string &message)
void validateLooper(ParameterSet &pset)
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
def move
Definition: eostools.py:511
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:40
void clear()
Not thread safe.
Definition: Registry.cc:40
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
virtual void endOfJob()
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
int totalEvents() const
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
areg
Definition: Schedule.cc:687
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void writeLumi(LuminosityBlockNumber_t lumi)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
tuple group
Definition: watchdog.py:82
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:72
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
oneapi::tbb::task_group taskGroup_
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
constexpr element_type const * get() const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
void removeModules(std::vector< ModuleDescription const * > const &modules)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:57
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
#define get
std::atomic< unsigned int > streamLumiActive_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:59
tuple last
Definition: dqmdumpme.py:56
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
preg
Definition: Schedule.cc:687
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
ParameterSet const & registerIt()
tuple size
Write out results.
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
Transition requestedTransition() const
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:22