CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
42 
44 
52 
57 
61 
74 
75 #include "MessageForSource.h"
76 #include "MessageForParent.h"
78 
79 #include "boost/range/adaptor/reversed.hpp"
80 
81 #include <cassert>
82 #include <exception>
83 #include <iomanip>
84 #include <iostream>
85 #include <utility>
86 #include <sstream>
87 
88 #include <sys/ipc.h>
89 #include <sys/msg.h>
90 
91 #include "oneapi/tbb/task.h"
92 
93 //Used for CPU affinity
94 #ifndef __APPLE__
95 #include <sched.h>
96 #endif
97 
98 namespace {
99  //Sentry class to only send a signal if an
100  // exception occurs. An exception is identified
101  // by the destructor being called without first
102  // calling completedSuccessfully().
103  class SendSourceTerminationSignalIfException {
104  public:
105  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
106  ~SendSourceTerminationSignalIfException() {
107  if (reg_) {
108  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
109  }
110  }
111  void completedSuccessfully() { reg_ = nullptr; }
112 
113  private:
114  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
115  };
116 } // namespace
117 
118 namespace edm {
119 
120  namespace chain = waiting_task::chain;
121 
122  // ---------------------------------------------------------------
123  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
124  CommonParams const& common,
125  std::shared_ptr<ProductRegistry> preg,
126  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
127  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
128  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
129  std::shared_ptr<ActivityRegistry> areg,
130  std::shared_ptr<ProcessConfiguration const> processConfiguration,
131  PreallocationConfiguration const& allocations) {
132  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
133  if (main_input == nullptr) {
135  << "There must be exactly one source in the configuration.\n"
136  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
137  }
138 
139  std::string modtype(main_input->getParameter<std::string>("@module_type"));
140 
141  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
143  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
144  filler->fill(descriptions);
145 
146  try {
147  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
148  } catch (cms::Exception& iException) {
149  std::ostringstream ost;
150  ost << "Validating configuration of input source of type " << modtype;
151  iException.addContext(ost.str());
152  throw;
153  }
154 
155  main_input->registerIt();
156 
157  // Fill in "ModuleDescription", in case the input source produces
158  // any EDProducts, which would be registered in the ProductRegistry.
159  // Also fill in the process history item for this process.
160  // There is no module label for the unnamed input source, so
161  // just use "source".
162  // Only the tracked parameters belong in the process configuration.
163  ModuleDescription md(main_input->id(),
164  main_input->getParameter<std::string>("@module_type"),
165  "source",
166  processConfiguration.get(),
168 
169  InputSourceDescription isdesc(md,
170  preg,
171  branchIDListHelper,
172  processBlockHelper,
173  thinnedAssociationsHelper,
174  areg,
175  common.maxEventsInput_,
176  common.maxLumisInput_,
177  common.maxSecondsUntilRampdown_,
178  allocations);
179 
180  areg->preSourceConstructionSignal_(md);
181  std::unique_ptr<InputSource> input;
182  try {
183  //even if we have an exception, send the signal
184  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
185  convertException::wrap([&]() {
186  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
187  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
188  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
189  });
190  } catch (cms::Exception& iException) {
191  std::ostringstream ost;
192  ost << "Constructing input source of type " << modtype;
193  iException.addContext(ost.str());
194  throw;
195  }
196  return input;
197  }
198 
199  // ---------------------------------------------------------------
201  auto const modtype = pset.getParameter<std::string>("@module_type");
202  auto const moduleLabel = pset.getParameter<std::string>("@module_label");
204  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
205  filler->fill(descriptions);
206  try {
207  edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
208  } catch (cms::Exception& iException) {
209  iException.addContext(
210  fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
211  throw;
212  }
213  }
214 
215  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
218  std::vector<std::string> const& loopers) {
219  std::shared_ptr<EDLooperBase> vLooper;
220 
221  assert(1 == loopers.size());
222 
223  for (auto const& looperName : loopers) {
224  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
225  validateLooper(*providerPSet);
226  providerPSet->registerIt();
227  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
228  }
229  return vLooper;
230  }
231 
232  // ---------------------------------------------------------------
233  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
234  ServiceToken const& iToken,
236  std::vector<std::string> const& defaultServices,
237  std::vector<std::string> const& forcedServices)
238  : actReg_(),
239  preg_(),
240  branchIDListHelper_(),
241  serviceToken_(),
242  input_(),
243  espController_(new eventsetup::EventSetupsController),
244  esp_(),
245  act_table_(),
246  processConfiguration_(),
247  schedule_(),
248  subProcesses_(),
249  historyAppender_(new HistoryAppender),
250  fb_(),
251  looper_(),
252  deferredExceptionPtrIsSet_(false),
253  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
254  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
255  principalCache_(),
256  beginJobCalled_(false),
257  shouldWeStop_(false),
258  fileModeNoMerge_(false),
259  exceptionMessageFiles_(),
260  exceptionMessageRuns_(),
261  exceptionMessageLumis_(false),
262  forceLooperToEnd_(false),
263  looperBeginJobRun_(false),
264  forceESCacheClearOnNewRun_(false),
265  eventSetupDataToExcludeFromPrefetching_() {
266  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
267  processDesc->addServices(defaultServices, forcedServices);
268  init(processDesc, iToken, iLegacy);
269  }
270 
271  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
272  std::vector<std::string> const& defaultServices,
273  std::vector<std::string> const& forcedServices)
274  : actReg_(),
275  preg_(),
276  branchIDListHelper_(),
277  serviceToken_(),
278  input_(),
279  espController_(new eventsetup::EventSetupsController),
280  esp_(),
281  act_table_(),
282  processConfiguration_(),
283  schedule_(),
284  subProcesses_(),
285  historyAppender_(new HistoryAppender),
286  fb_(),
287  looper_(),
288  deferredExceptionPtrIsSet_(false),
289  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
290  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
291  principalCache_(),
292  beginJobCalled_(false),
293  shouldWeStop_(false),
294  fileModeNoMerge_(false),
295  exceptionMessageFiles_(),
296  exceptionMessageRuns_(),
297  exceptionMessageLumis_(false),
298  forceLooperToEnd_(false),
299  looperBeginJobRun_(false),
300  forceESCacheClearOnNewRun_(false),
301  asyncStopRequestedWhileProcessingEvents_(false),
302  eventSetupDataToExcludeFromPrefetching_() {
303  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
304  processDesc->addServices(defaultServices, forcedServices);
306  }
307 
308  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
309  ServiceToken const& token,
311  : actReg_(),
312  preg_(),
313  branchIDListHelper_(),
314  serviceToken_(),
315  input_(),
316  espController_(new eventsetup::EventSetupsController),
317  esp_(),
318  act_table_(),
319  processConfiguration_(),
320  schedule_(),
321  subProcesses_(),
322  historyAppender_(new HistoryAppender),
323  fb_(),
324  looper_(),
325  deferredExceptionPtrIsSet_(false),
326  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
327  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
328  principalCache_(),
329  beginJobCalled_(false),
330  shouldWeStop_(false),
331  fileModeNoMerge_(false),
332  exceptionMessageFiles_(),
333  exceptionMessageRuns_(),
334  exceptionMessageLumis_(false),
335  forceLooperToEnd_(false),
336  looperBeginJobRun_(false),
337  forceESCacheClearOnNewRun_(false),
338  asyncStopRequestedWhileProcessingEvents_(false),
339  eventSetupDataToExcludeFromPrefetching_() {
340  init(processDesc, token, legacy);
341  }
342 
343  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
344  ServiceToken const& iToken,
346  //std::cerr << processDesc->dump() << std::endl;
347 
348  // register the empty parentage vector , once and for all
350 
351  // register the empty parameter set, once and for all.
353 
354  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
355 
356  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
357  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
358  bool const hasSubProcesses = !subProcessVParameterSet.empty();
359 
360  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
361  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
362  // set in here if the parameters were not explicitly set.
364 
365  // Now set some parameters specific to the main process.
366  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
367  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
368  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
369  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
370  << fileMode << ".\n"
371  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
372  } else {
373  fileModeNoMerge_ = (fileMode == "NOMERGE");
374  }
375  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
377 
378  //threading
379  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
380 
381  // Even if numberOfThreads was set to zero in the Python configuration, the code
382  // in cmsRun.cpp should have reset it to something else.
383  assert(nThreads != 0);
384 
385  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
386  if (nStreams == 0) {
387  nStreams = nThreads;
388  }
389  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
390  if (nConcurrentRuns != 1) {
391  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
392  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
393  }
394  unsigned int nConcurrentLumis =
395  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
396  if (nConcurrentLumis == 0) {
397  nConcurrentLumis = 2;
398  }
399  if (nConcurrentLumis > nStreams) {
400  nConcurrentLumis = nStreams;
401  }
402  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
403  if (!loopers.empty()) {
404  //For now loopers make us run only 1 transition at a time
405  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
406  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
407  "of concurrent runs, and the number of concurrent lumis "
408  "are all being reset to 1. Loopers cannot currently support "
409  "values greater than 1.";
410  nStreams = 1;
411  nConcurrentLumis = 1;
412  nConcurrentRuns = 1;
413  }
414  }
415  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
416  if (dumpOptions) {
417  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
418  } else {
419  if (nThreads > 1 or nStreams > 1) {
420  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
421  }
422  }
423  // The number of concurrent IOVs is configured individually for each record in
424  // the class NumberOfConcurrentIOVs to values less than or equal to this.
425  unsigned int maxConcurrentIOVs = nConcurrentLumis;
426 
427  //Check that relationships between threading parameters makes sense
428  /*
429  if(nThreads<nStreams) {
430  //bad
431  }
432  if(nConcurrentRuns>nStreams) {
433  //bad
434  }
435  if(nConcurrentRuns>nConcurrentLumis) {
436  //bad
437  }
438  */
439  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
440 
441  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
443  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
444  //for now, if have a subProcess, don't allow early delete
445  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
446  if (not hasSubProcesses) {
447  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
448  }
449 
450  // Now do general initialization
452 
453  //initialize the services
454  auto& serviceSets = processDesc->getServicesPSets();
455  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
456  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
457 
458  //make the services available
460 
461  CMS_SA_ALLOW try {
462  if (nStreams > 1) {
464  handler->willBeUsingThreads();
465  }
466 
467  // intialize miscellaneous items
468  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
469 
470  // intialize the event setup provider
471  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
472  esp_ = espController_->makeProvider(
473  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
474 
475  // initialize the looper, if any
476  if (!loopers.empty()) {
478  looper_->setActionTable(items.act_table_.get());
479  looper_->attachTo(*items.actReg_);
480 
481  // in presence of looper do not delete modules
483  }
484 
485  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
486 
487  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
488  streamQueues_.resize(nStreams);
489  streamLumiStatus_.resize(nStreams);
490 
491  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
492 
493  // initialize the input source
495  *common,
496  items.preg(),
497  items.branchIDListHelper(),
499  items.thinnedAssociationsHelper(),
500  items.actReg_,
501  items.processConfiguration(),
503 
504  // initialize the Schedule
505  schedule_ =
506  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
507 
508  // set the data members
509  act_table_ = std::move(items.act_table_);
510  actReg_ = items.actReg_;
511  preg_ = items.preg();
513  branchIDListHelper_ = items.branchIDListHelper();
514  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
515  processConfiguration_ = items.processConfiguration();
517  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
518 
519  FDEBUG(2) << parameterSet << std::endl;
520 
522  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
523  // Reusable event principal
524  auto ep = std::make_shared<EventPrincipal>(preg(),
528  historyAppender_.get(),
529  index,
530  true /*primary process*/,
533  }
534 
535  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
536  auto lp =
537  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
539  }
540 
541  {
542  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
544 
545  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
547  }
548 
549  // fill the subprocesses, if there are any
550  subProcesses_.reserve(subProcessVParameterSet.size());
551  for (auto& subProcessPSet : subProcessVParameterSet) {
552  subProcesses_.emplace_back(subProcessPSet,
553  *parameterSet,
554  preg(),
560  *actReg_,
561  token,
564  &processContext_);
565  }
566  } catch (...) {
567  //in case of an exception, make sure Services are available
568  // during the following destructors
569  espController_ = nullptr;
570  esp_ = nullptr;
571  schedule_ = nullptr;
572  input_ = nullptr;
573  looper_ = nullptr;
574  actReg_ = nullptr;
575  throw;
576  }
577  }
578 
580  // Make the services available while everything is being deleted.
583 
584  // manually destroy all these thing that may need the services around
585  // propagate_const<T> has no reset() function
586  espController_ = nullptr;
587  esp_ = nullptr;
588  schedule_ = nullptr;
589  input_ = nullptr;
590  looper_ = nullptr;
591  actReg_ = nullptr;
592 
595  }
596 
600  taskGroup_.wait();
601  assert(task.done());
602  }
603 
605  if (beginJobCalled_)
606  return;
607  beginJobCalled_ = true;
608  bk::beginJob();
609 
610  // StateSentry toerror(this); // should we add this ?
611  //make the services available
613 
618  actReg_->preallocateSignal_(bounds);
619  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
621 
622  std::vector<ModuleProcessName> consumedBySubProcesses;
624  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
625  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
626  if (consumedBySubProcesses.empty()) {
627  consumedBySubProcesses = std::move(c);
628  } else if (not c.empty()) {
629  std::vector<ModuleProcessName> tmp;
630  tmp.reserve(consumedBySubProcesses.size() + c.size());
631  std::merge(consumedBySubProcesses.begin(),
632  consumedBySubProcesses.end(),
633  c.begin(),
634  c.end(),
635  std::back_inserter(tmp));
636  std::swap(consumedBySubProcesses, tmp);
637  }
638  });
639 
640  // Note: all these may throw
643  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
644  not unusedModules.empty()) {
646 
647  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
648  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
649  "and "
650  "therefore they are deleted before beginJob transition.";
651  for (auto const& description : unusedModules) {
652  l << "\n " << description->moduleLabel();
653  }
654  });
655  for (auto const& description : unusedModules) {
656  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
657  }
658  }
659  }
660  // Initialize after the deletion of non-consumed unscheduled
661  // modules to avoid non-consumed non-run modules to keep the
662  // products unnecessarily alive
663  if (not branchesToDeleteEarly_.empty()) {
664  schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
666  }
667 
668  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
669 
672  }
674 
675  //NOTE: This implementation assumes 'Job' means one call
676  // the EventProcessor::run
677  // If it really means once per 'application' then this code will
678  // have to be changed.
679  // Also have to deal with case where have 'run' then new Module
680  // added and do 'run'
681  // again. In that case the newly added Module needs its 'beginJob'
682  // to be called.
683 
684  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
685  // For now we delay calling beginOfJob until first beginOfRun
686  //if(looper_) {
687  // looper_->beginOfJob(es);
688  //}
689  try {
690  convertException::wrap([&]() { input_->doBeginJob(); });
691  } catch (cms::Exception& ex) {
692  ex.addContext("Calling beginJob for the source");
693  throw;
694  }
695  espController_->finishConfiguration();
696  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
697  if (looper_) {
698  constexpr bool mustPrefetchMayGet = true;
699  auto const processBlockLookup = preg_->productLookup(InProcess);
700  auto const runLookup = preg_->productLookup(InRun);
701  auto const lumiLookup = preg_->productLookup(InLumi);
702  auto const eventLookup = preg_->productLookup(InEvent);
703  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
704  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
705  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
706  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
707  looper_->updateLookup(esp_->recordsToProxyIndices());
708  }
709  // toerror.succeeded(); // should we add this?
710  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
711  actReg_->postBeginJobSignal_();
712 
714  oneapi::tbb::task_group group;
715  using namespace edm::waiting_task::chain;
716  first([this](auto nextTask) {
717  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
718  first([i, this](auto nextTask) {
720  schedule_->beginStream(i);
721  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
723  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
724  }) | lastTask(nextTask);
725  }
727  group.wait();
728  if (last.exceptionPtr()) {
729  std::rethrow_exception(*last.exceptionPtr());
730  }
731  }
732 
734  // Collects exceptions, so we don't throw before all operations are performed.
736  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
737 
738  //make the services available
740 
741  using namespace edm::waiting_task::chain;
742 
743  edm::FinalWaitingTask waitTask;
744  oneapi::tbb::task_group group;
745 
746  {
747  //handle endStream transitions
748  edm::WaitingTaskHolder taskHolder(group, &waitTask);
749  std::mutex collectorMutex;
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([this, i, &c, &collectorMutex](auto nextTask) {
752  std::exception_ptr ep;
753  try {
755  this->schedule_->endStream(i);
756  } catch (...) {
757  ep = std::current_exception();
758  }
759  if (ep) {
760  std::lock_guard<std::mutex> l(collectorMutex);
761  c.call([&ep]() { std::rethrow_exception(ep); });
762  }
763  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
764  for (auto& subProcess : subProcesses_) {
765  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
766  std::exception_ptr ep;
767  try {
769  subProcess.doEndStream(i);
770  } catch (...) {
771  ep = std::current_exception();
772  }
773  if (ep) {
774  std::lock_guard<std::mutex> l(collectorMutex);
775  c.call([&ep]() { std::rethrow_exception(ep); });
776  }
777  }) | lastTask(nextTask);
778  }
779  }) | lastTask(taskHolder);
780  }
781  }
782  group.wait();
783 
784  auto actReg = actReg_.get();
785  c.call([actReg]() { actReg->preEndJobSignal_(); });
786  schedule_->endJob(c);
787  for (auto& subProcess : subProcesses_) {
788  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
789  }
790  c.call(std::bind(&InputSource::doEndJob, input_.get()));
791  if (looper_) {
792  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
793  }
794  c.call([actReg]() { actReg->postEndJobSignal_(); });
795  if (c.hasThrown()) {
796  c.rethrow();
797  }
798  }
799 
801 
802  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
803  return schedule_->getAllModuleDescriptions();
804  }
805 
806  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
807 
808  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
809 
810  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
811 
812  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
813 
814  namespace {
815 #include "TransitionProcessors.icc"
816  }
817 
819  bool returnValue = false;
820 
821  // Look for a shutdown signal
822  if (shutdown_flag.load(std::memory_order_acquire)) {
823  returnValue = true;
825  }
826  return returnValue;
827  }
828 
830  if (deferredExceptionPtrIsSet_.load()) {
832  return InputSource::IsStop;
833  }
834 
835  SendSourceTerminationSignalIfException sentry(actReg_.get());
836  InputSource::ItemType itemType;
837  //For now, do nothing with InputSource::IsSynchronize
838  do {
839  itemType = input_->nextItemType();
840  } while (itemType == InputSource::IsSynchronize);
841 
842  lastSourceTransition_ = itemType;
843  sentry.completedSuccessfully();
844 
846 
848  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
850  }
851 
852  return lastSourceTransition_;
853  }
854 
855  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
856  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
857  }
858 
860 
864  {
865  beginJob(); //make sure this was called
866 
867  // make the services available
869 
871  try {
872  FilesProcessor fp(fileModeNoMerge_);
873 
874  convertException::wrap([&]() {
875  bool firstTime = true;
876  do {
877  if (not firstTime) {
879  rewindInput();
880  } else {
881  firstTime = false;
882  }
883  startingNewLoop();
884 
885  auto trans = fp.processFiles(*this);
886 
887  fp.normalEnd();
888 
889  if (deferredExceptionPtrIsSet_.load()) {
890  std::rethrow_exception(deferredExceptionPtr_);
891  }
892  if (trans != InputSource::IsStop) {
893  //problem with the source
894  doErrorStuff();
895 
896  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
897  }
898  } while (not endOfLoop());
899  }); // convertException::wrap
900 
901  } // Try block
902  catch (cms::Exception& e) {
904  std::string message(
905  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
906  e.addAdditionalInfo(message);
907  if (e.alreadyPrinted()) {
908  LogAbsolute("Additional Exceptions") << message;
909  }
910  }
911  if (!exceptionMessageRuns_.empty()) {
912  e.addAdditionalInfo(exceptionMessageRuns_);
913  if (e.alreadyPrinted()) {
914  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
915  }
916  }
917  if (!exceptionMessageFiles_.empty()) {
918  e.addAdditionalInfo(exceptionMessageFiles_);
919  if (e.alreadyPrinted()) {
920  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
921  }
922  }
923  throw;
924  }
925  }
926 
927  return returnCode;
928  }
929 
931  FDEBUG(1) << " \treadFile\n";
932  size_t size = preg_->size();
933  SendSourceTerminationSignalIfException sentry(actReg_.get());
934 
936 
937  fb_ = input_->readFile();
938  if (size < preg_->size()) {
940  }
943  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
944  }
945  sentry.completedSuccessfully();
946  }
947 
948  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
949  if (fileBlockValid()) {
950  SendSourceTerminationSignalIfException sentry(actReg_.get());
951  input_->closeFile(fb_.get(), cleaningUpAfterException);
952  sentry.completedSuccessfully();
953  }
954  FDEBUG(1) << "\tcloseInputFile\n";
955  }
956 
958  if (fileBlockValid()) {
959  schedule_->openOutputFiles(*fb_);
960  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
961  }
962  FDEBUG(1) << "\topenOutputFiles\n";
963  }
964 
966  schedule_->closeOutputFiles();
967  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
968  processBlockHelper_->clearAfterOutputFilesClose();
969  FDEBUG(1) << "\tcloseOutputFiles\n";
970  }
971 
973  if (fileBlockValid()) {
975  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
976  schedule_->respondToOpenInputFile(*fb_);
977  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
978  }
979  FDEBUG(1) << "\trespondToOpenInputFile\n";
980  }
981 
983  if (fileBlockValid()) {
984  schedule_->respondToCloseInputFile(*fb_);
985  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
986  }
987  FDEBUG(1) << "\trespondToCloseInputFile\n";
988  }
989 
991  shouldWeStop_ = false;
992  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
993  // until after we've called beginOfJob
994  if (looper_ && looperBeginJobRun_) {
995  looper_->doStartingNewLoop();
996  }
997  FDEBUG(1) << "\tstartingNewLoop\n";
998  }
999 
1001  if (looper_) {
1002  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1003  looper_->setModuleChanger(&changer);
1004  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1005  looper_->setModuleChanger(nullptr);
1007  return true;
1008  else
1009  return false;
1010  }
1011  FDEBUG(1) << "\tendOfLoop\n";
1012  return true;
1013  }
1014 
1016  input_->repeat();
1017  input_->rewind();
1018  FDEBUG(1) << "\trewind\n";
1019  }
1020 
1022  looper_->prepareForNextLoop(esp_.get());
1023  FDEBUG(1) << "\tprepareForNextLoop\n";
1024  }
1025 
1027  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1028  if (!subProcesses_.empty()) {
1029  for (auto const& subProcess : subProcesses_) {
1030  if (subProcess.shouldWeCloseOutput()) {
1031  return true;
1032  }
1033  }
1034  return false;
1035  }
1036  return schedule_->shouldWeCloseOutput();
1037  }
1038 
1040  FDEBUG(1) << "\tdoErrorStuff\n";
1041  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1042  << "and went to the error state\n"
1043  << "Will attempt to terminate processing normally\n"
1044  << "(IF using the looper the next loop will be attempted)\n"
1045  << "This likely indicates a bug in an input module or corrupted input or both\n";
1046  }
1047 
1048  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1049  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1050  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1051 
1053  FinalWaitingTask globalWaitTask;
1054 
1055  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1056  beginGlobalTransitionAsync<Traits>(
1057  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1058 
1059  do {
1060  taskGroup_.wait();
1061  } while (not globalWaitTask.done());
1062 
1063  if (globalWaitTask.exceptionPtr() != nullptr) {
1064  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1065  }
1066  beginProcessBlockSucceeded = true;
1067  }
1068 
1070  input_->fillProcessBlockHelper();
1072  while (input_->nextProcessBlock(processBlockPrincipal)) {
1073  readProcessBlock(processBlockPrincipal);
1074 
1076  FinalWaitingTask globalWaitTask;
1077 
1078  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1079  beginGlobalTransitionAsync<Traits>(
1080  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1081 
1082  do {
1083  taskGroup_.wait();
1084  } while (not globalWaitTask.done());
1085  if (globalWaitTask.exceptionPtr() != nullptr) {
1086  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1087  }
1088 
1089  FinalWaitingTask writeWaitTask;
1091  do {
1092  taskGroup_.wait();
1093  } while (not writeWaitTask.done());
1094  if (writeWaitTask.exceptionPtr()) {
1095  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1096  }
1097 
1098  processBlockPrincipal.clearPrincipal();
1099  for (auto& s : subProcesses_) {
1100  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1101  }
1102  }
1103  }
1104 
1105  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1106  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1107 
1109  FinalWaitingTask globalWaitTask;
1110 
1111  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1112  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1113  *schedule_,
1114  transitionInfo,
1115  serviceToken_,
1116  subProcesses_,
1117  cleaningUpAfterException);
1118  do {
1119  taskGroup_.wait();
1120  } while (not globalWaitTask.done());
1121  if (globalWaitTask.exceptionPtr() != nullptr) {
1122  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1123  }
1124 
1125  if (beginProcessBlockSucceeded) {
1126  FinalWaitingTask writeWaitTask;
1128  do {
1129  taskGroup_.wait();
1130  } while (not writeWaitTask.done());
1131  if (writeWaitTask.exceptionPtr()) {
1132  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1133  }
1134  }
1135 
1136  processBlockPrincipal.clearPrincipal();
1137  for (auto& s : subProcesses_) {
1138  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1139  }
1140  }
1141 
1143  RunNumber_t run,
1144  bool& globalBeginSucceeded,
1145  bool& eventSetupForInstanceSucceeded) {
1146  globalBeginSucceeded = false;
1147  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1148  {
1149  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150 
1151  input_->doBeginRun(runPrincipal, &processContext_);
1152  sentry.completedSuccessfully();
1153  }
1154 
1155  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1157  espController_->forceCacheClear();
1158  }
1159  {
1160  SendSourceTerminationSignalIfException sentry(actReg_.get());
1161  actReg_->preESSyncIOVSignal_.emit(ts);
1163  actReg_->postESSyncIOVSignal_.emit(ts);
1164  eventSetupForInstanceSucceeded = true;
1165  sentry.completedSuccessfully();
1166  }
1167  auto const& es = esp_->eventSetupImpl();
1168  if (looper_ && looperBeginJobRun_ == false) {
1169  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1170 
1171  FinalWaitingTask waitTask;
1172  using namespace edm::waiting_task::chain;
1173  chain::first([this, &es](auto nextTask) {
1174  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1175  }) | then([this, &es](auto nextTask) mutable {
1176  looper_->beginOfJob(es);
1177  looperBeginJobRun_ = true;
1178  looper_->doStartingNewLoop();
1179  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1180 
1181  do {
1182  taskGroup_.wait();
1183  } while (not waitTask.done());
1184  if (waitTask.exceptionPtr() != nullptr) {
1185  std::rethrow_exception(*(waitTask.exceptionPtr()));
1186  }
1187  }
1188  {
1190  FinalWaitingTask globalWaitTask;
1191 
1192  using namespace edm::waiting_task::chain;
1193  chain::first([&runPrincipal, &es, this](auto waitTask) {
1194  RunTransitionInfo transitionInfo(runPrincipal, es);
1195  beginGlobalTransitionAsync<Traits>(
1196  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1197  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1198  globalBeginSucceeded = true;
1199  FDEBUG(1) << "\tbeginRun " << run << "\n";
1200  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1201  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1202  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1203  looper_->doBeginRun(runPrincipal, es, &processContext_);
1204  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1205 
1206  do {
1207  taskGroup_.wait();
1208  } while (not globalWaitTask.done());
1209  if (globalWaitTask.exceptionPtr() != nullptr) {
1210  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1211  }
1212  }
1213  {
1214  //To wait, the ref count has to be 1+#streams
1215  FinalWaitingTask streamLoopWaitTask;
1216 
1218 
1219  RunTransitionInfo transitionInfo(runPrincipal, es);
1220  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1221  *schedule_,
1223  transitionInfo,
1224  serviceToken_,
1225  subProcesses_);
1226  do {
1227  taskGroup_.wait();
1228  } while (not streamLoopWaitTask.done());
1229  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1230  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1231  }
1232  }
1233  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1234  if (looper_) {
1235  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1236  }
1237  }
1238 
1240  RunNumber_t run,
1241  bool globalBeginSucceeded,
1242  bool cleaningUpAfterException,
1243  bool eventSetupForInstanceSucceeded) {
1244  if (eventSetupForInstanceSucceeded) {
1245  //If we skip empty runs, this would be called conditionally
1246  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1247 
1248  if (globalBeginSucceeded) {
1249  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1250  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1252  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1253  mergeableRunProductMetadata->preWriteRun();
1254  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1255  do {
1256  taskGroup_.wait();
1257  } while (not t.done());
1258  mergeableRunProductMetadata->postWriteRun();
1259  if (t.exceptionPtr()) {
1260  std::rethrow_exception(*t.exceptionPtr());
1261  }
1262  }
1263  }
1264  }
1265  deleteRunFromCache(phid, run);
1266  }
1267 
1269  RunNumber_t run,
1270  bool globalBeginSucceeded,
1271  bool cleaningUpAfterException) {
1272  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1273  runPrincipal.setEndTime(input_->timestamp());
1274 
1275  IOVSyncValue ts(
1277  runPrincipal.endTime());
1278  {
1279  SendSourceTerminationSignalIfException sentry(actReg_.get());
1280  actReg_->preESSyncIOVSignal_.emit(ts);
1282  actReg_->postESSyncIOVSignal_.emit(ts);
1283  sentry.completedSuccessfully();
1284  }
1285  auto const& es = esp_->eventSetupImpl();
1286  if (globalBeginSucceeded) {
1287  //To wait, the ref count has to be 1+#streams
1288  FinalWaitingTask streamLoopWaitTask;
1289 
1291 
1292  RunTransitionInfo transitionInfo(runPrincipal, es);
1293  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1294  *schedule_,
1296  transitionInfo,
1297  serviceToken_,
1298  subProcesses_,
1299  cleaningUpAfterException);
1300  do {
1301  taskGroup_.wait();
1302  } while (not streamLoopWaitTask.done());
1303  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1304  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1305  }
1306  }
1307  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1308  if (looper_) {
1309  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1310  }
1311  {
1312  FinalWaitingTask globalWaitTask;
1313 
1314  using namespace edm::waiting_task::chain;
1315  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1316  RunTransitionInfo transitionInfo(runPrincipal, es);
1318  endGlobalTransitionAsync<Traits>(
1319  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1320  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1321  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1322  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1323  looper_->doEndRun(runPrincipal, es, &processContext_);
1324  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1325 
1326  do {
1327  taskGroup_.wait();
1328  } while (not globalWaitTask.done());
1329  if (globalWaitTask.exceptionPtr() != nullptr) {
1330  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1331  }
1332  }
1333  FDEBUG(1) << "\tendRun " << run << "\n";
1334  }
1335 
1336  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1337  FinalWaitingTask waitTask;
1338  if (streamLumiActive_ > 0) {
1340  // Continue after opening a new input file
1342  } else {
1343  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1344  input_->luminosityBlockAuxiliary()->beginTime()),
1345  iRunResource,
1346  WaitingTaskHolder{taskGroup_, &waitTask});
1347  }
1348  do {
1349  taskGroup_.wait();
1350  } while (not waitTask.done());
1351 
1352  if (waitTask.exceptionPtr() != nullptr) {
1353  std::rethrow_exception(*(waitTask.exceptionPtr()));
1354  }
1355  return lastTransitionType();
1356  }
1357 
1359  std::shared_ptr<void> const& iRunResource,
1360  edm::WaitingTaskHolder iHolder) {
1361  if (iHolder.taskHasFailed()) {
1362  return;
1363  }
1364 
1365  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1366  // We must be careful with the status object here and in code this function calls. IF we want
1367  // endRun to be called, then we must call resetResources before the things waiting on
1368  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1369  // endRun to be called much later than it should be, because status is holding iRunResource).
1370  // Note that this must be done explicitly. Relying on the destructor does not work well
1371  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1372  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1373  // destroyed inside this function and lumiWork.
1374  auto status =
1375  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1376  chain::first([&](auto nextTask) {
1377  auto asyncEventSetup = [](ActivityRegistry* actReg,
1378  auto* espController,
1379  auto& queue,
1381  auto& status,
1382  IOVSyncValue const& iSync) {
1383  queue.pause();
1384  CMS_SA_ALLOW try {
1385  SendSourceTerminationSignalIfException sentry(actReg);
1386  // Pass in iSync to let the EventSetup system know which run and lumi
1387  // need to be processed and prepare IOVs for it.
1388  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1389  // lumi is done and no longer needs its EventSetup IOVs.
1390  actReg->preESSyncIOVSignal_.emit(iSync);
1391  espController->eventSetupForInstanceAsync(
1392  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1393  sentry.completedSuccessfully();
1394  } catch (...) {
1395  task.doneWaiting(std::current_exception());
1396  }
1397  };
1398  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1399  // We only get here inside this block if there is an EventSetup
1400  // module not able to handle concurrent IOVs (usually an ESSource)
1401  // and the new sync value is outside the current IOV of that module.
1402  auto group = nextTask.group();
1404  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1405  asyncEventSetup(
1407  });
1408  } else {
1409  asyncEventSetup(
1410  actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(nextTask), status, iSync);
1411  }
1412  }) | chain::then([this, status, iSync](std::exception_ptr const* iPtr, auto nextTask) {
1413  actReg_->postESSyncIOVSignal_.emit(iSync);
1414  //the call to doneWaiting will cause the count to decrement
1415  auto copyTask = nextTask;
1416  if (iPtr) {
1417  nextTask.doneWaiting(*iPtr);
1418  }
1419  auto group = copyTask.group();
1420  lumiQueue_->pushAndPause(
1421  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1422  if (task.taskHasFailed()) {
1423  status->resetResources();
1424  return;
1425  }
1426 
1427  status->setResumer(std::move(iResumer));
1428 
1429  auto group = task.group();
1431  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1432  //make the services available
1434  // Caught exception is propagated via WaitingTaskHolder
1435  CMS_SA_ALLOW try {
1437 
1438  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1439  {
1440  SendSourceTerminationSignalIfException sentry(actReg_.get());
1441 
1442  input_->doBeginLumi(lumiPrincipal, &processContext_);
1443  sentry.completedSuccessfully();
1444  }
1445 
1447  if (rng.isAvailable()) {
1448  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1449  rng->preBeginLumi(lb);
1450  }
1451 
1452  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1453 
1454  using namespace edm::waiting_task::chain;
1455  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1456  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1457  {
1458  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1460  beginGlobalTransitionAsync<Traits>(
1461  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1462  }
1463  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1464  looper_->prefetchAsync(
1465  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1466  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1467  status->globalBeginDidSucceed();
1468  //make the services available
1469  ServiceRegistry::Operate operateLooper(serviceToken_);
1470  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1471  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1472  if (iPtr) {
1473  status->resetResources();
1474  holder.doneWaiting(*iPtr);
1475  } else {
1476  if (not looper_) {
1477  status->globalBeginDidSucceed();
1478  }
1479  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1481 
1482  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1483  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1484  streamQueues_[i].pause();
1485 
1486  auto& event = principalCache_.eventPrincipal(i);
1487  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1488  // held by the container as this lambda may not finish executing before all the tasks it
1489  // spawns have already started to run.
1490  auto eventSetupImpls = &status->eventSetupImpls();
1491  auto lp = status->lumiPrincipal().get();
1494  event.setLuminosityBlockPrincipal(lp);
1495  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1496  using namespace edm::waiting_task::chain;
1497  chain::first([this, i, &transitionInfo](auto nextTask) {
1498  beginStreamTransitionAsync<Traits>(
1499  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1500  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1501  if (exceptionFromBeginStreamLumi) {
1502  WaitingTaskHolder tmp(nextTask);
1503  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1504  streamEndLumiAsync(nextTask, i);
1505  } else {
1507  }
1508  }) | runLast(holder);
1509  });
1510  }
1511  }
1512  }) | runLast(postQueueTask);
1513 
1514  } catch (...) {
1515  status->resetResources();
1516  postQueueTask.doneWaiting(std::current_exception());
1517  }
1518  }); // task in sourceResourcesAcquirer
1519  });
1520  }) | chain::runLast(std::move(iHolder));
1521  }
1522 
1524  {
1525  //all streams are sharing the same status at the moment
1526  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1527  status->needToContinueLumi();
1528  status->startProcessingEvents();
1529  }
1530 
1531  unsigned int streamIndex = 0;
1532  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1533  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1534  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1535  }
1536  iHolder.group()->run(
1537  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1538  }
1539 
1540  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1541  if (setDeferredException(*iPtr)) {
1542  WaitingTaskHolder tmp(holder);
1543  tmp.doneWaiting(*iPtr);
1544  } else {
1546  }
1547  }
1548 
1550  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1551  // Get some needed info out of the status object before moving
1552  // it into finalTaskForThisLumi.
1553  auto& lp = *(iLumiStatus->lumiPrincipal());
1554  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1555  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1556  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1557  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1558 
1559  using namespace edm::waiting_task::chain;
1560  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1561  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1562 
1563  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1565  endGlobalTransitionAsync<Traits>(
1566  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1567  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1568  //Only call writeLumi if beginLumi succeeded
1569  if (didGlobalBeginSucceed) {
1570  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1571  }
1572  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1573  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1574  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1575  //any thrown exception auto propagates to nextTask via the chain
1577  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1578  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1579  std::exception_ptr ptr;
1580  if (iPtr) {
1581  ptr = *iPtr;
1582  }
1584 
1585  // Try hard to clean up resources so the
1586  // process can terminate in a controlled
1587  // fashion even after exceptions have occurred.
1588  // Caught exception is passed to handleEndLumiExceptions()
1589  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1590  if (not ptr) {
1591  ptr = std::current_exception();
1592  }
1593  }
1594  // Caught exception is passed to handleEndLumiExceptions()
1595  CMS_SA_ALLOW try {
1596  status->resumeGlobalLumiQueue();
1598  } catch (...) {
1599  if (not ptr) {
1600  ptr = std::current_exception();
1601  }
1602  }
1603  // Caught exception is passed to handleEndLumiExceptions()
1604  CMS_SA_ALLOW try {
1605  // This call to status.resetResources() must occur before iTask is destroyed.
1606  // Otherwise there will be a data race which could result in endRun
1607  // being delayed until it is too late to successfully call it.
1608  status->resetResources();
1609  status.reset();
1610  } catch (...) {
1611  if (not ptr) {
1612  ptr = std::current_exception();
1613  }
1614  }
1615 
1616  if (ptr) {
1617  handleEndLumiExceptions(&ptr, nextTask);
1618  }
1619  }) | runLast(std::move(iTask));
1620  }
1621 
1622  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1623  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1624  if (iPtr) {
1625  handleEndLumiExceptions(iPtr, iTask);
1626  }
1627  auto status = streamLumiStatus_[iStreamIndex];
1628  //reset status before releasing queue else get race condtion
1629  streamLumiStatus_[iStreamIndex].reset();
1631  streamQueues_[iStreamIndex].resume();
1632 
1633  //are we the last one?
1634  if (status->streamFinishedLumi()) {
1636  }
1637  });
1638 
1639  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1640 
1641  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1642  // therefore we do not want to hold the shared_ptr
1643  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1644  lumiStatus->setEndTime();
1645 
1646  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1647 
1648  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1649  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1650 
1651  if (lumiStatus->didGlobalBeginSucceed()) {
1652  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1653  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1654  lumiPrincipal.endTime());
1656  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1657  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1658  *schedule_,
1659  iStreamIndex,
1660  transitionInfo,
1661  serviceToken_,
1662  subProcesses_,
1663  cleaningUpAfterException);
1664  }
1665  }
1666 
1668  if (streamLumiActive_.load() > 0) {
1669  FinalWaitingTask globalWaitTask;
1670  {
1671  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1672  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1673  if (streamLumiStatus_[i]) {
1674  streamEndLumiAsync(globalTaskHolder, i);
1675  }
1676  }
1677  }
1678  do {
1679  taskGroup_.wait();
1680  } while (not globalWaitTask.done());
1681  if (globalWaitTask.exceptionPtr() != nullptr) {
1682  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1683  }
1684  }
1685  }
1686 
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689  input_->readProcessBlock(processBlockPrincipal);
1690  sentry.completedSuccessfully();
1691  }
1692 
1693  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1695  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1696  << "Illegal attempt to insert run into cache\n"
1697  << "Contact a Framework Developer\n";
1698  }
1699  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1700  preg(),
1702  historyAppender_.get(),
1703  0,
1704  true,
1706  {
1707  SendSourceTerminationSignalIfException sentry(actReg_.get());
1708  input_->readRun(*rp, *historyAppender_);
1709  sentry.completedSuccessfully();
1710  }
1711  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1712  principalCache_.insert(rp);
1713  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1714  }
1715 
1716  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1717  principalCache_.merge(input_->runAuxiliary(), preg());
1718  auto runPrincipal = principalCache_.runPrincipalPtr();
1719  {
1720  SendSourceTerminationSignalIfException sentry(actReg_.get());
1721  input_->readAndMergeRun(*runPrincipal);
1722  sentry.completedSuccessfully();
1723  }
1724  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1725  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1726  }
1727 
1730  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1731  << "Illegal attempt to insert lumi into cache\n"
1732  << "Run is invalid\n"
1733  << "Contact a Framework Developer\n";
1734  }
1736  assert(lbp);
1737  lbp->setAux(*input_->luminosityBlockAuxiliary());
1738  {
1739  SendSourceTerminationSignalIfException sentry(actReg_.get());
1740  input_->readLuminosityBlock(*lbp, *historyAppender_);
1741  sentry.completedSuccessfully();
1742  }
1743  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1744  iStatus.lumiPrincipal() = std::move(lbp);
1745  }
1746 
1748  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1749  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1750  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1751  input_->processHistoryRegistry().reducedProcessHistoryID(
1752  input_->luminosityBlockAuxiliary()->processHistoryID()));
1753  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1754  assert(lumiOK);
1755  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1756  {
1757  SendSourceTerminationSignalIfException sentry(actReg_.get());
1758  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1759  sentry.completedSuccessfully();
1760  }
1761  return input_->luminosityBlock();
1762  }
1763 
1765  using namespace edm::waiting_task;
1766  chain::first([&](auto nextTask) {
1768  schedule_->writeProcessBlockAsync(
1769  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1770  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1772  for (auto& s : subProcesses_) {
1773  s.writeProcessBlockAsync(nextTask, processBlockType);
1774  }
1775  }) | chain::runLast(std::move(task));
1776  }
1777 
1779  ProcessHistoryID const& phid,
1780  RunNumber_t run,
1781  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1782  using namespace edm::waiting_task;
1783  chain::first([&](auto nextTask) {
1785  schedule_->writeRunAsync(nextTask,
1787  &processContext_,
1788  actReg_.get(),
1789  mergeableRunProductMetadata);
1790  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1792  for (auto& s : subProcesses_) {
1793  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1794  }
1795  }) | chain::runLast(std::move(task));
1796  }
1797 
1799  principalCache_.deleteRun(phid, run);
1800  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1801  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1802  }
1803 
1805  using namespace edm::waiting_task;
1806  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
1807  chain::first([&](auto nextTask) {
1809 
1810  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1811  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1812  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1814  for (auto& s : subProcesses_) {
1815  s.writeLumiAsync(nextTask, lumiPrincipal);
1816  }
1818  }
1819  }
1820 
1822  for (auto& s : subProcesses_) {
1823  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1824  }
1825  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
1826  iStatus.lumiPrincipal()->clearPrincipal();
1827  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1828  }
1829 
1831  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1832  iStatus.endLumi();
1833  return false;
1834  }
1835 
1836  if (iStatus.wasEventProcessingStopped()) {
1837  return false;
1838  }
1839 
1840  if (shouldWeStop()) {
1842  iStatus.stopProcessingEvents();
1843  iStatus.endLumi();
1844  return false;
1845  }
1846 
1848  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1849  CMS_SA_ALLOW try {
1850  //need to use lock in addition to the serial task queue because
1851  // of delayed provenance reading and reading data in response to
1852  // edm::Refs etc
1853  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1854 
1855  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1856  if (InputSource::IsLumi == itemType) {
1857  iStatus.haveContinuedLumi();
1858  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1859  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1860  readAndMergeLumi(iStatus);
1861  itemType = nextTransitionType();
1862  }
1863  if (InputSource::IsLumi == itemType) {
1864  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1865  input_->luminosityBlockAuxiliary()->beginTime()));
1866  }
1867  }
1868  if (InputSource::IsEvent != itemType) {
1869  iStatus.stopProcessingEvents();
1870 
1871  //IsFile may continue processing the lumi and
1872  // looper_ can cause the input source to declare a new IsRun which is actually
1873  // just a continuation of the previous run
1874  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1875  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1876  iStatus.endLumi();
1877  }
1878  return false;
1879  }
1880  readEvent(iStreamIndex);
1881  } catch (...) {
1882  bool expected = false;
1883  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1884  deferredExceptionPtr_ = std::current_exception();
1885  iStatus.endLumi();
1886  }
1887  return false;
1888  }
1889  return true;
1890  }
1891 
1892  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1893  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1895  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1896  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1897  auto status = streamLumiStatus_[iStreamIndex].get();
1898  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1899  CMS_SA_ALLOW try {
1900  if (readNextEventForStream(iStreamIndex, *status)) {
1901  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1902  if (iPtr) {
1903  // Try to end the stream properly even if an exception was
1904  // thrown on an event.
1905  bool expected = false;
1906  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1907  // This is the case where the exception in iPtr is the primary
1908  // exception and we want to see its message.
1909  deferredExceptionPtr_ = *iPtr;
1910  WaitingTaskHolder tempHolder(iTask);
1911  tempHolder.doneWaiting(*iPtr);
1912  }
1913  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1914  //the stream will stop now
1915  return;
1916  }
1917  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1918  });
1919 
1920  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1921  } else {
1922  //the stream will stop now
1923  if (status->isLumiEnding()) {
1924  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1925  status->startNextLumi();
1926  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1927  }
1928  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1929  } else {
1930  iTask.doneWaiting(std::exception_ptr{});
1931  }
1932  }
1933  } catch (...) {
1934  // It is unlikely we will ever get in here ...
1935  // But if we do try to clean up and propagate the exception
1936  if (streamLumiStatus_[iStreamIndex]) {
1937  streamEndLumiAsync(iTask, iStreamIndex);
1938  }
1939  bool expected = false;
1940  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1941  auto e = std::current_exception();
1943  iTask.doneWaiting(e);
1944  }
1945  }
1946  });
1947  }
1948 
1949  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1950  //TODO this will have to become per stream
1951  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1952  StreamContext streamContext(event.streamID(), &processContext_);
1953 
1954  SendSourceTerminationSignalIfException sentry(actReg_.get());
1955  input_->readEvent(event, streamContext);
1956 
1957  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1958  sentry.completedSuccessfully();
1959 
1960  FDEBUG(1) << "\treadEvent\n";
1961  }
1962 
1963  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1964  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1965  }
1966 
1967  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1968  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1969 
1972  if (rng.isAvailable()) {
1973  Event ev(*pep, ModuleDescription(), nullptr);
1974  rng->postEventRead(ev);
1975  }
1976 
1977  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1978  using namespace edm::waiting_task::chain;
1979  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1980  EventTransitionInfo info(*pep, es);
1981  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1982  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1983  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1984  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1985  }
1986  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1987  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1988  ServiceRegistry::Operate operateLooper(serviceToken_);
1989  processEventWithLooper(*pep, iStreamIndex);
1990  }) | then([pep](auto nextTask) {
1991  FDEBUG(1) << "\tprocessEvent\n";
1992  pep->clearEventPrincipal();
1993  }) | runLast(iHolder);
1994  }
1995 
1996  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1997  bool randomAccess = input_->randomAccess();
1998  ProcessingController::ForwardState forwardState = input_->forwardState();
1999  ProcessingController::ReverseState reverseState = input_->reverseState();
2000  ProcessingController pc(forwardState, reverseState, randomAccess);
2001 
2003  do {
2004  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2005  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2006  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2007 
2008  bool succeeded = true;
2009  if (randomAccess) {
2011  input_->skipEvents(-2);
2013  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2014  }
2015  }
2017  } while (!pc.lastOperationSucceeded());
2019  shouldWeStop_ = true;
2021  }
2022  }
2023 
2025  FDEBUG(1) << "\tshouldWeStop\n";
2026  if (shouldWeStop_)
2027  return true;
2028  if (!subProcesses_.empty()) {
2029  for (auto const& subProcess : subProcesses_) {
2030  if (subProcess.terminate()) {
2031  return true;
2032  }
2033  }
2034  return false;
2035  }
2036  return schedule_->terminate();
2037  }
2038 
2040 
2042 
2044 
2045  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
2046  bool expected = false;
2047  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2048  deferredExceptionPtr_ = iException;
2049  return true;
2050  }
2051  return false;
2052  }
2053 
2055  cms::Exception ex("ModulesSynchingOnLumis");
2056  ex << "The framework is configured to use at least two streams, but the following modules\n"
2057  << "require synchronizing on LuminosityBlock boundaries:";
2058  bool found = false;
2059  for (auto worker : schedule_->allWorkers()) {
2060  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2061  found = true;
2062  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2063  }
2064  }
2065  if (found) {
2066  ex << "\n\nThe situation can be fixed by either\n"
2067  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2068  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2069  throw ex;
2070  }
2071  }
2072 
2074  std::unique_ptr<LogSystem> s;
2075  for (auto worker : schedule_->allWorkers()) {
2076  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2077  if (not s) {
2078  s = std::make_unique<LogSystem>("LegacyModules");
2079  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2080  "is going to end soon. These modules need to be converted to have type\n"
2081  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2082  }
2083  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2084  }
2085  }
2086  }
2087 } // namespace edm
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< bool > exceptionMessageLumis_
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:382
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void clear()
Not thread safe.
static InputSourceFactory const * get()
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
PreESSyncIOV preESSyncIOVSignal_
static PFTauRenderPlugin instance
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
void setExceptionMessageFiles(std::string &message)
RunPrincipal const & runPrincipal() const
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
RunNumber_t run() const
Definition: RunPrincipal.h:61
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::shared_ptr< EDLooperBase const > looper() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
InputSource::ItemType lastTransitionType() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
bool hasRunPrincipal() const
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:47
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ProcessBlockPrincipal & processBlockPrincipal() const
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
Definition: Signal.h:48
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
ParameterSetID id() const
std::atomic< bool > deferredExceptionPtrIsSet_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
ParameterSet const & registerIt()
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void setExceptionMessageRuns(std::string &message)
void validateLooper(ParameterSet &pset)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ShouldWriteRun shouldWriteRun() const
Definition: RunPrincipal.h:86
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setLastOperationSucceeded(bool value)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
void clear()
Not thread safe.
Definition: Registry.cc:40
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
bool done() const
Definition: WaitingTask.h:82
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
virtual void endOfJob()
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void writeLumi(LuminosityBlockNumber_t lumi)
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
oneapi::tbb::task_group taskGroup_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
ShouldWriteLumi shouldWriteLumi() const
edm::EventID specifiedEventTransition() const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Transition requestedTransition() const
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
bool isAvailable() const
Definition: Service.h:40
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:205