CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
EventProcessor.cc
Go to the documentation of this file.
8 
41 
43 
51 
56 
60 
73 
74 #include "MessageForSource.h"
75 #include "MessageForParent.h"
77 
78 #include "boost/range/adaptor/reversed.hpp"
79 
80 #include <cassert>
81 #include <exception>
82 #include <iomanip>
83 #include <iostream>
84 #include <utility>
85 #include <sstream>
86 
87 #include <sys/ipc.h>
88 #include <sys/msg.h>
89 
90 #include "tbb/task.h"
91 
92 //Used for CPU affinity
93 #ifndef __APPLE__
94 #include <sched.h>
95 #endif
96 
97 namespace {
98  //Sentry class to only send a signal if an
99  // exception occurs. An exception is identified
100  // by the destructor being called without first
101  // calling completedSuccessfully().
102  class SendSourceTerminationSignalIfException {
103  public:
104  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
105  ~SendSourceTerminationSignalIfException() {
106  if (reg_) {
107  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
108  }
109  }
110  void completedSuccessfully() { reg_ = nullptr; }
111 
112  private:
113  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
114  };
115 } // namespace
116 
117 namespace edm {
118 
119  namespace chain = waiting_task::chain;
120 
121  // ---------------------------------------------------------------
122  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
123  CommonParams const& common,
124  std::shared_ptr<ProductRegistry> preg,
125  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
126  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
127  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
128  std::shared_ptr<ActivityRegistry> areg,
129  std::shared_ptr<ProcessConfiguration const> processConfiguration,
130  PreallocationConfiguration const& allocations) {
131  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
132  if (main_input == nullptr) {
134  << "There must be exactly one source in the configuration.\n"
135  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
136  }
137 
138  std::string modtype(main_input->getParameter<std::string>("@module_type"));
139 
140  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
142  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
143  filler->fill(descriptions);
144 
145  try {
146  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
147  } catch (cms::Exception& iException) {
148  std::ostringstream ost;
149  ost << "Validating configuration of input source of type " << modtype;
150  iException.addContext(ost.str());
151  throw;
152  }
153 
154  main_input->registerIt();
155 
156  // Fill in "ModuleDescription", in case the input source produces
157  // any EDProducts, which would be registered in the ProductRegistry.
158  // Also fill in the process history item for this process.
159  // There is no module label for the unnamed input source, so
160  // just use "source".
161  // Only the tracked parameters belong in the process configuration.
162  ModuleDescription md(main_input->id(),
163  main_input->getParameter<std::string>("@module_type"),
164  "source",
165  processConfiguration.get(),
167 
168  InputSourceDescription isdesc(md,
169  preg,
170  branchIDListHelper,
171  processBlockHelper,
172  thinnedAssociationsHelper,
173  areg,
174  common.maxEventsInput_,
175  common.maxLumisInput_,
177  allocations);
178 
179  areg->preSourceConstructionSignal_(md);
180  std::unique_ptr<InputSource> input;
181  try {
182  //even if we have an exception, send the signal
183  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
184  convertException::wrap([&]() {
185  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
186  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
187  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188  });
189  } catch (cms::Exception& iException) {
190  std::ostringstream ost;
191  ost << "Constructing input source of type " << modtype;
192  iException.addContext(ost.str());
193  throw;
194  }
195  return input;
196  }
197 
198  // ---------------------------------------------------------------
200  auto const modtype = pset.getParameter<std::string>("@module_type");
201  auto const moduleLabel = pset.getParameter<std::string>("@module_label");
202  auto filler = ParameterSetDescriptionFillerPluginFactory::get()->create(modtype);
203  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
204  filler->fill(descriptions);
205  try {
206  edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
207  } catch (cms::Exception& iException) {
208  iException.addContext(
209  fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
210  throw;
211  }
212  }
213 
214  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
217  std::vector<std::string> const& loopers) {
218  std::shared_ptr<EDLooperBase> vLooper;
219 
220  assert(1 == loopers.size());
221 
222  for (auto const& looperName : loopers) {
223  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
224  validateLooper(*providerPSet);
225  providerPSet->registerIt();
226  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
227  }
228  return vLooper;
229  }
230 
231  // ---------------------------------------------------------------
232  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
233  ServiceToken const& iToken,
235  std::vector<std::string> const& defaultServices,
236  std::vector<std::string> const& forcedServices)
237  : actReg_(),
238  preg_(),
239  branchIDListHelper_(),
240  serviceToken_(),
241  input_(),
242  espController_(new eventsetup::EventSetupsController),
243  esp_(),
244  act_table_(),
245  processConfiguration_(),
246  schedule_(),
247  subProcesses_(),
248  historyAppender_(new HistoryAppender),
249  fb_(),
250  looper_(),
251  deferredExceptionPtrIsSet_(false),
252  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
253  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
254  principalCache_(),
255  beginJobCalled_(false),
256  shouldWeStop_(false),
257  fileModeNoMerge_(false),
258  exceptionMessageFiles_(),
259  exceptionMessageRuns_(),
260  exceptionMessageLumis_(false),
261  forceLooperToEnd_(false),
262  looperBeginJobRun_(false),
263  forceESCacheClearOnNewRun_(false),
264  eventSetupDataToExcludeFromPrefetching_() {
265  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
266  processDesc->addServices(defaultServices, forcedServices);
267  init(processDesc, iToken, iLegacy);
268  }
269 
270  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
271  std::vector<std::string> const& defaultServices,
272  std::vector<std::string> const& forcedServices)
273  : actReg_(),
274  preg_(),
275  branchIDListHelper_(),
276  serviceToken_(),
277  input_(),
278  espController_(new eventsetup::EventSetupsController),
279  esp_(),
280  act_table_(),
281  processConfiguration_(),
282  schedule_(),
283  subProcesses_(),
284  historyAppender_(new HistoryAppender),
285  fb_(),
286  looper_(),
287  deferredExceptionPtrIsSet_(false),
288  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
289  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
290  principalCache_(),
291  beginJobCalled_(false),
292  shouldWeStop_(false),
293  fileModeNoMerge_(false),
294  exceptionMessageFiles_(),
295  exceptionMessageRuns_(),
296  exceptionMessageLumis_(false),
297  forceLooperToEnd_(false),
298  looperBeginJobRun_(false),
299  forceESCacheClearOnNewRun_(false),
300  asyncStopRequestedWhileProcessingEvents_(false),
301  eventSetupDataToExcludeFromPrefetching_() {
302  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
303  processDesc->addServices(defaultServices, forcedServices);
305  }
306 
307  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
308  ServiceToken const& token,
310  : actReg_(),
311  preg_(),
312  branchIDListHelper_(),
313  serviceToken_(),
314  input_(),
315  espController_(new eventsetup::EventSetupsController),
316  esp_(),
317  act_table_(),
318  processConfiguration_(),
319  schedule_(),
320  subProcesses_(),
321  historyAppender_(new HistoryAppender),
322  fb_(),
323  looper_(),
324  deferredExceptionPtrIsSet_(false),
325  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
326  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
327  principalCache_(),
328  beginJobCalled_(false),
329  shouldWeStop_(false),
330  fileModeNoMerge_(false),
331  exceptionMessageFiles_(),
332  exceptionMessageRuns_(),
333  exceptionMessageLumis_(false),
334  forceLooperToEnd_(false),
335  looperBeginJobRun_(false),
336  forceESCacheClearOnNewRun_(false),
337  asyncStopRequestedWhileProcessingEvents_(false),
338  eventSetupDataToExcludeFromPrefetching_() {
339  init(processDesc, token, legacy);
340  }
341 
342  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
343  ServiceToken const& iToken,
345  //std::cerr << processDesc->dump() << std::endl;
346 
347  // register the empty parentage vector , once and for all
349 
350  // register the empty parameter set, once and for all.
352 
353  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
354 
355  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
356  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
357  bool const hasSubProcesses = !subProcessVParameterSet.empty();
358 
359  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
360  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
361  // set in here if the parameters were not explicitly set.
362  validateTopLevelParameterSets(parameterSet.get());
363 
364  // Now set some parameters specific to the main process.
365  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
366  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
367  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
368  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
369  << fileMode << ".\n"
370  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
371  } else {
372  fileModeNoMerge_ = (fileMode == "NOMERGE");
373  }
374  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
375 
376  //threading
377  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
378 
379  // Even if numberOfThreads was set to zero in the Python configuration, the code
380  // in cmsRun.cpp should have reset it to something else.
381  assert(nThreads != 0);
382 
383  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
384  if (nStreams == 0) {
385  nStreams = nThreads;
386  }
387  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
388  if (nConcurrentRuns != 1) {
389  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
390  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
391  }
392  unsigned int nConcurrentLumis =
393  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
394  if (nConcurrentLumis == 0) {
395  nConcurrentLumis = 2;
396  }
397  if (nConcurrentLumis > nStreams) {
398  nConcurrentLumis = nStreams;
399  }
400  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
401  if (!loopers.empty()) {
402  //For now loopers make us run only 1 transition at a time
403  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
404  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
405  "of concurrent runs, and the number of concurrent lumis "
406  "are all being reset to 1. Loopers cannot currently support "
407  "values greater than 1.";
408  nStreams = 1;
409  nConcurrentLumis = 1;
410  nConcurrentRuns = 1;
411  }
412  }
413  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
414  if (dumpOptions) {
415  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
416  } else {
417  if (nThreads > 1 or nStreams > 1) {
418  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
419  }
420  }
421  // The number of concurrent IOVs is configured individually for each record in
422  // the class NumberOfConcurrentIOVs to values less than or equal to this.
423  unsigned int maxConcurrentIOVs = nConcurrentLumis;
424 
425  //Check that relationships between threading parameters makes sense
426  /*
427  if(nThreads<nStreams) {
428  //bad
429  }
430  if(nConcurrentRuns>nStreams) {
431  //bad
432  }
433  if(nConcurrentRuns>nConcurrentLumis) {
434  //bad
435  }
436  */
437  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
438 
439  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
441  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
442 
443  // Now do general initialization
445 
446  //initialize the services
447  auto& serviceSets = processDesc->getServicesPSets();
448  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
449  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
450 
451  //make the services available
453 
454  if (nStreams > 1) {
456  handler->willBeUsingThreads();
457  }
458 
459  // intialize miscellaneous items
460  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
461 
462  // intialize the event setup provider
463  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
464  esp_ = espController_->makeProvider(
465  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
466 
467  // initialize the looper, if any
468  if (!loopers.empty()) {
469  looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
470  looper_->setActionTable(items.act_table_.get());
471  looper_->attachTo(*items.actReg_);
472 
473  // in presence of looper do not delete modules
474  deleteNonConsumedUnscheduledModules_ = false;
475  }
476 
477  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
478 
479  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
480  streamQueues_.resize(nStreams);
481  streamLumiStatus_.resize(nStreams);
482 
483  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
484 
485  // initialize the input source
486  input_ = makeInput(*parameterSet,
487  *common,
488  items.preg(),
489  items.branchIDListHelper(),
491  items.thinnedAssociationsHelper(),
492  items.actReg_,
493  items.processConfiguration(),
495 
496  // initialize the Schedule
497  schedule_ =
498  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
499 
500  // set the data members
501  act_table_ = std::move(items.act_table_);
502  actReg_ = items.actReg_;
503  preg_ = items.preg();
505  branchIDListHelper_ = items.branchIDListHelper();
506  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
507  processConfiguration_ = items.processConfiguration();
509  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
510 
511  FDEBUG(2) << parameterSet << std::endl;
512 
514  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
515  // Reusable event principal
516  auto ep = std::make_shared<EventPrincipal>(preg(),
520  historyAppender_.get(),
521  index,
522  true /*primary process*/,
525  }
526 
527  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
528  auto lp =
529  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
531  }
532 
533  {
534  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
536 
537  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
539  }
540 
541  // fill the subprocesses, if there are any
542  subProcesses_.reserve(subProcessVParameterSet.size());
543  for (auto& subProcessPSet : subProcessVParameterSet) {
544  subProcesses_.emplace_back(subProcessPSet,
545  *parameterSet,
546  preg(),
552  *actReg_,
553  token,
556  &processContext_);
557  }
558  }
559 
561  // Make the services available while everything is being deleted.
563  ServiceRegistry::Operate op(token);
564 
565  // manually destroy all these thing that may need the services around
566  // propagate_const<T> has no reset() function
567  espController_ = nullptr;
568  esp_ = nullptr;
569  schedule_ = nullptr;
570  input_ = nullptr;
571  looper_ = nullptr;
572  actReg_ = nullptr;
573 
576  }
577 
580  espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
581  taskGroup_.wait();
582  assert(task.done());
583  }
584 
586  if (beginJobCalled_)
587  return;
588  beginJobCalled_ = true;
589  bk::beginJob();
590 
591  // StateSentry toerror(this); // should we add this ?
592  //make the services available
594 
599  actReg_->preallocateSignal_(bounds);
600  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
602 
603  std::vector<ModuleProcessName> consumedBySubProcesses;
605  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
606  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
607  if (consumedBySubProcesses.empty()) {
608  consumedBySubProcesses = std::move(c);
609  } else if (not c.empty()) {
610  std::vector<ModuleProcessName> tmp;
611  tmp.reserve(consumedBySubProcesses.size() + c.size());
612  std::merge(consumedBySubProcesses.begin(),
613  consumedBySubProcesses.end(),
614  c.begin(),
615  c.end(),
616  std::back_inserter(tmp));
617  std::swap(consumedBySubProcesses, tmp);
618  }
619  });
620 
621  // Note: all these may throw
624  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
625  not unusedModules.empty()) {
627 
628  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
629  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
630  "and "
631  "therefore they are deleted before beginJob transition.";
632  for (auto const& description : unusedModules) {
633  l << "\n " << description->moduleLabel();
634  }
635  });
636  for (auto const& description : unusedModules) {
637  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
638  }
639  }
640  }
641 
642  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
643 
646  }
648 
649  //NOTE: This implementation assumes 'Job' means one call
650  // the EventProcessor::run
651  // If it really means once per 'application' then this code will
652  // have to be changed.
653  // Also have to deal with case where have 'run' then new Module
654  // added and do 'run'
655  // again. In that case the newly added Module needs its 'beginJob'
656  // to be called.
657 
658  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
659  // For now we delay calling beginOfJob until first beginOfRun
660  //if(looper_) {
661  // looper_->beginOfJob(es);
662  //}
663  try {
664  convertException::wrap([&]() { input_->doBeginJob(); });
665  } catch (cms::Exception& ex) {
666  ex.addContext("Calling beginJob for the source");
667  throw;
668  }
669  espController_->finishConfiguration();
670  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
671  if (looper_) {
672  constexpr bool mustPrefetchMayGet = true;
673  auto const processBlockLookup = preg_->productLookup(InProcess);
674  auto const runLookup = preg_->productLookup(InRun);
675  auto const lumiLookup = preg_->productLookup(InLumi);
676  auto const eventLookup = preg_->productLookup(InEvent);
677  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
678  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
679  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
680  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
681  looper_->updateLookup(esp_->recordsToProxyIndices());
682  }
683  // toerror.succeeded(); // should we add this?
684  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
685  actReg_->postBeginJobSignal_();
686 
687  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
688  schedule_->beginStream(i);
689  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
690  }
691  }
692 
694  // Collects exceptions, so we don't throw before all operations are performed.
696  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
697 
698  //make the services available
700 
701  using namespace edm::waiting_task::chain;
702 
703  edm::FinalWaitingTask waitTask;
704  tbb::task_group group;
705 
706  {
707  //handle endStream transitions
708  edm::WaitingTaskHolder taskHolder(group, &waitTask);
709  std::mutex collectorMutex;
710  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
711  first([this, i, &c, &collectorMutex](auto nextTask) {
712  std::exception_ptr ep;
713  try {
715  this->schedule_->endStream(i);
716  } catch (...) {
717  ep = std::current_exception();
718  }
719  if (ep) {
720  std::lock_guard<std::mutex> l(collectorMutex);
721  c.call([&ep]() { std::rethrow_exception(ep); });
722  }
723  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
724  for (auto& subProcess : subProcesses_) {
725  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
726  std::exception_ptr ep;
727  try {
729  subProcess.doEndStream(i);
730  } catch (...) {
731  ep = std::current_exception();
732  }
733  if (ep) {
734  std::lock_guard<std::mutex> l(collectorMutex);
735  c.call([&ep]() { std::rethrow_exception(ep); });
736  }
737  }) | lastTask(nextTask);
738  }
739  }) | lastTask(taskHolder);
740  }
741  }
742  group.wait();
743 
744  auto actReg = actReg_.get();
745  c.call([actReg]() { actReg->preEndJobSignal_(); });
746  schedule_->endJob(c);
747  for (auto& subProcess : subProcesses_) {
748  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
749  }
750  c.call(std::bind(&InputSource::doEndJob, input_.get()));
751  if (looper_) {
752  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
753  }
754  c.call([actReg]() { actReg->postEndJobSignal_(); });
755  if (c.hasThrown()) {
756  c.rethrow();
757  }
758  }
759 
761 
762  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
763  return schedule_->getAllModuleDescriptions();
764  }
765 
766  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
767 
768  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
769 
770  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
771 
772  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
773 
774  namespace {
775 #include "TransitionProcessors.icc"
776  }
777 
779  bool returnValue = false;
780 
781  // Look for a shutdown signal
782  if (shutdown_flag.load(std::memory_order_acquire)) {
783  returnValue = true;
784  returnCode = epSignal;
785  }
786  return returnValue;
787  }
788 
790  if (deferredExceptionPtrIsSet_.load()) {
792  return InputSource::IsStop;
793  }
794 
795  SendSourceTerminationSignalIfException sentry(actReg_.get());
796  InputSource::ItemType itemType;
797  //For now, do nothing with InputSource::IsSynchronize
798  do {
799  itemType = input_->nextItemType();
800  } while (itemType == InputSource::IsSynchronize);
801 
802  lastSourceTransition_ = itemType;
803  sentry.completedSuccessfully();
804 
806 
807  if (checkForAsyncStopRequest(returnCode)) {
808  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
810  }
811 
812  return lastSourceTransition_;
813  }
814 
815  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
816  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
817  }
818 
820 
824  {
825  beginJob(); //make sure this was called
826 
827  // make the services available
829 
831  try {
832  FilesProcessor fp(fileModeNoMerge_);
833 
834  convertException::wrap([&]() {
835  bool firstTime = true;
836  do {
837  if (not firstTime) {
839  rewindInput();
840  } else {
841  firstTime = false;
842  }
843  startingNewLoop();
844 
845  auto trans = fp.processFiles(*this);
846 
847  fp.normalEnd();
848 
849  if (deferredExceptionPtrIsSet_.load()) {
850  std::rethrow_exception(deferredExceptionPtr_);
851  }
852  if (trans != InputSource::IsStop) {
853  //problem with the source
854  doErrorStuff();
855 
856  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
857  }
858  } while (not endOfLoop());
859  }); // convertException::wrap
860 
861  } // Try block
862  catch (cms::Exception& e) {
864  std::string message(
865  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
866  e.addAdditionalInfo(message);
867  if (e.alreadyPrinted()) {
868  LogAbsolute("Additional Exceptions") << message;
869  }
870  }
871  if (!exceptionMessageRuns_.empty()) {
873  if (e.alreadyPrinted()) {
874  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
875  }
876  }
877  if (!exceptionMessageFiles_.empty()) {
879  if (e.alreadyPrinted()) {
880  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
881  }
882  }
883  throw;
884  }
885  }
886 
887  return returnCode;
888  }
889 
891  FDEBUG(1) << " \treadFile\n";
892  size_t size = preg_->size();
893  SendSourceTerminationSignalIfException sentry(actReg_.get());
894 
896 
897  fb_ = input_->readFile();
898  if (size < preg_->size()) {
900  }
903  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
904  }
905  sentry.completedSuccessfully();
906  }
907 
908  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
909  if (fileBlockValid()) {
910  SendSourceTerminationSignalIfException sentry(actReg_.get());
911  input_->closeFile(fb_.get(), cleaningUpAfterException);
912  sentry.completedSuccessfully();
913  }
914  FDEBUG(1) << "\tcloseInputFile\n";
915  }
916 
918  if (fileBlockValid()) {
919  schedule_->openOutputFiles(*fb_);
920  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
921  }
922  FDEBUG(1) << "\topenOutputFiles\n";
923  }
924 
926  schedule_->closeOutputFiles();
927  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
928  processBlockHelper_->clearAfterOutputFilesClose();
929  FDEBUG(1) << "\tcloseOutputFiles\n";
930  }
931 
933  if (fileBlockValid()) {
935  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
936  schedule_->respondToOpenInputFile(*fb_);
937  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
938  }
939  FDEBUG(1) << "\trespondToOpenInputFile\n";
940  }
941 
943  if (fileBlockValid()) {
944  schedule_->respondToCloseInputFile(*fb_);
945  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
946  }
947  FDEBUG(1) << "\trespondToCloseInputFile\n";
948  }
949 
951  shouldWeStop_ = false;
952  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
953  // until after we've called beginOfJob
954  if (looper_ && looperBeginJobRun_) {
955  looper_->doStartingNewLoop();
956  }
957  FDEBUG(1) << "\tstartingNewLoop\n";
958  }
959 
961  if (looper_) {
962  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
963  looper_->setModuleChanger(&changer);
964  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
965  looper_->setModuleChanger(nullptr);
967  return true;
968  else
969  return false;
970  }
971  FDEBUG(1) << "\tendOfLoop\n";
972  return true;
973  }
974 
976  input_->repeat();
977  input_->rewind();
978  FDEBUG(1) << "\trewind\n";
979  }
980 
982  looper_->prepareForNextLoop(esp_.get());
983  FDEBUG(1) << "\tprepareForNextLoop\n";
984  }
985 
987  FDEBUG(1) << "\tshouldWeCloseOutput\n";
988  if (!subProcesses_.empty()) {
989  for (auto const& subProcess : subProcesses_) {
990  if (subProcess.shouldWeCloseOutput()) {
991  return true;
992  }
993  }
994  return false;
995  }
996  return schedule_->shouldWeCloseOutput();
997  }
998 
1000  FDEBUG(1) << "\tdoErrorStuff\n";
1001  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1002  << "and went to the error state\n"
1003  << "Will attempt to terminate processing normally\n"
1004  << "(IF using the looper the next loop will be attempted)\n"
1005  << "This likely indicates a bug in an input module or corrupted input or both\n";
1006  }
1007 
1008  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
1009  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1010  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1011 
1013  FinalWaitingTask globalWaitTask;
1014 
1015  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1016  beginGlobalTransitionAsync<Traits>(
1017  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1018 
1019  do {
1020  taskGroup_.wait();
1021  } while (not globalWaitTask.done());
1022 
1023  if (globalWaitTask.exceptionPtr() != nullptr) {
1024  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1025  }
1026  beginProcessBlockSucceeded = true;
1027  }
1028 
1030  input_->fillProcessBlockHelper();
1032  while (input_->nextProcessBlock(processBlockPrincipal)) {
1033  readProcessBlock(processBlockPrincipal);
1034 
1036  FinalWaitingTask globalWaitTask;
1037 
1038  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1039  beginGlobalTransitionAsync<Traits>(
1040  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1041 
1042  do {
1043  taskGroup_.wait();
1044  } while (not globalWaitTask.done());
1045  if (globalWaitTask.exceptionPtr() != nullptr) {
1046  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1047  }
1048 
1049  FinalWaitingTask writeWaitTask;
1051  do {
1052  taskGroup_.wait();
1053  } while (not writeWaitTask.done());
1054  if (writeWaitTask.exceptionPtr()) {
1055  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1056  }
1057 
1058  processBlockPrincipal.clearPrincipal();
1059  for (auto& s : subProcesses_) {
1060  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1061  }
1062  }
1063  }
1064 
1065  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1066  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1067 
1069  FinalWaitingTask globalWaitTask;
1070 
1071  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1072  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1073  *schedule_,
1074  transitionInfo,
1075  serviceToken_,
1076  subProcesses_,
1077  cleaningUpAfterException);
1078  do {
1079  taskGroup_.wait();
1080  } while (not globalWaitTask.done());
1081  if (globalWaitTask.exceptionPtr() != nullptr) {
1082  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1083  }
1084 
1085  if (beginProcessBlockSucceeded) {
1086  FinalWaitingTask writeWaitTask;
1088  do {
1089  taskGroup_.wait();
1090  } while (not writeWaitTask.done());
1091  if (writeWaitTask.exceptionPtr()) {
1092  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1093  }
1094  }
1095 
1096  processBlockPrincipal.clearPrincipal();
1097  for (auto& s : subProcesses_) {
1098  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1099  }
1100  }
1101 
1103  RunNumber_t run,
1104  bool& globalBeginSucceeded,
1105  bool& eventSetupForInstanceSucceeded) {
1106  globalBeginSucceeded = false;
1107  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1108  {
1109  SendSourceTerminationSignalIfException sentry(actReg_.get());
1110 
1111  input_->doBeginRun(runPrincipal, &processContext_);
1112  sentry.completedSuccessfully();
1113  }
1114 
1115  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1117  espController_->forceCacheClear();
1118  }
1119  {
1120  SendSourceTerminationSignalIfException sentry(actReg_.get());
1122  eventSetupForInstanceSucceeded = true;
1123  sentry.completedSuccessfully();
1124  }
1125  auto const& es = esp_->eventSetupImpl();
1126  if (looper_ && looperBeginJobRun_ == false) {
1127  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1128 
1129  FinalWaitingTask waitTask;
1130  using namespace edm::waiting_task::chain;
1131  chain::first([this, &es](auto nextTask) {
1132  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1133  }) | then([this, &es](auto nextTask) mutable {
1134  looper_->beginOfJob(es);
1135  looperBeginJobRun_ = true;
1136  looper_->doStartingNewLoop();
1137  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1138 
1139  do {
1140  taskGroup_.wait();
1141  } while (not waitTask.done());
1142  if (waitTask.exceptionPtr() != nullptr) {
1143  std::rethrow_exception(*(waitTask.exceptionPtr()));
1144  }
1145  }
1146  {
1148  FinalWaitingTask globalWaitTask;
1149 
1150  using namespace edm::waiting_task::chain;
1151  chain::first([&runPrincipal, &es, this](auto waitTask) {
1152  RunTransitionInfo transitionInfo(runPrincipal, es);
1153  beginGlobalTransitionAsync<Traits>(
1154  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1155  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1156  globalBeginSucceeded = true;
1157  FDEBUG(1) << "\tbeginRun " << run << "\n";
1158  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1159  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1160  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1161  looper_->doBeginRun(runPrincipal, es, &processContext_);
1162  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1163 
1164  do {
1165  taskGroup_.wait();
1166  } while (not globalWaitTask.done());
1167  if (globalWaitTask.exceptionPtr() != nullptr) {
1168  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1169  }
1170  }
1171  {
1172  //To wait, the ref count has to be 1+#streams
1173  FinalWaitingTask streamLoopWaitTask;
1174 
1176 
1177  RunTransitionInfo transitionInfo(runPrincipal, es);
1178  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1179  *schedule_,
1181  transitionInfo,
1182  serviceToken_,
1183  subProcesses_);
1184  do {
1185  taskGroup_.wait();
1186  } while (not streamLoopWaitTask.done());
1187  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1188  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1189  }
1190  }
1191  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1192  if (looper_) {
1193  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1194  }
1195  }
1196 
1198  RunNumber_t run,
1199  bool globalBeginSucceeded,
1200  bool cleaningUpAfterException,
1201  bool eventSetupForInstanceSucceeded) {
1202  if (eventSetupForInstanceSucceeded) {
1203  //If we skip empty runs, this would be called conditionally
1204  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1205 
1206  if (globalBeginSucceeded) {
1208  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1209  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1210  mergeableRunProductMetadata->preWriteRun();
1211  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1212  do {
1213  taskGroup_.wait();
1214  } while (not t.done());
1215  mergeableRunProductMetadata->postWriteRun();
1216  if (t.exceptionPtr()) {
1217  std::rethrow_exception(*t.exceptionPtr());
1218  }
1219  }
1220  }
1221  deleteRunFromCache(phid, run);
1222  }
1223 
1225  RunNumber_t run,
1226  bool globalBeginSucceeded,
1227  bool cleaningUpAfterException) {
1228  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1229  runPrincipal.setEndTime(input_->timestamp());
1230 
1231  IOVSyncValue ts(
1233  runPrincipal.endTime());
1234  {
1235  SendSourceTerminationSignalIfException sentry(actReg_.get());
1237  sentry.completedSuccessfully();
1238  }
1239  auto const& es = esp_->eventSetupImpl();
1240  if (globalBeginSucceeded) {
1241  //To wait, the ref count has to be 1+#streams
1242  FinalWaitingTask streamLoopWaitTask;
1243 
1245 
1246  RunTransitionInfo transitionInfo(runPrincipal, es);
1247  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1248  *schedule_,
1250  transitionInfo,
1251  serviceToken_,
1252  subProcesses_,
1253  cleaningUpAfterException);
1254  do {
1255  taskGroup_.wait();
1256  } while (not streamLoopWaitTask.done());
1257  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1258  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1259  }
1260  }
1261  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1262  if (looper_) {
1263  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1264  }
1265  {
1266  FinalWaitingTask globalWaitTask;
1267 
1268  using namespace edm::waiting_task::chain;
1269  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1270  RunTransitionInfo transitionInfo(runPrincipal, es);
1272  endGlobalTransitionAsync<Traits>(
1273  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1274  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1275  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1276  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1277  looper_->doEndRun(runPrincipal, es, &processContext_);
1278  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1279 
1280  do {
1281  taskGroup_.wait();
1282  } while (not globalWaitTask.done());
1283  if (globalWaitTask.exceptionPtr() != nullptr) {
1284  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1285  }
1286  }
1287  FDEBUG(1) << "\tendRun " << run << "\n";
1288  }
1289 
1290  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1291  FinalWaitingTask waitTask;
1292  if (streamLumiActive_ > 0) {
1294  // Continue after opening a new input file
1296  } else {
1297  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1298  input_->luminosityBlockAuxiliary()->beginTime()),
1299  iRunResource,
1300  WaitingTaskHolder{taskGroup_, &waitTask});
1301  }
1302  do {
1303  taskGroup_.wait();
1304  } while (not waitTask.done());
1305 
1306  if (waitTask.exceptionPtr() != nullptr) {
1307  std::rethrow_exception(*(waitTask.exceptionPtr()));
1308  }
1309  return lastTransitionType();
1310  }
1311 
1313  std::shared_ptr<void> const& iRunResource,
1314  edm::WaitingTaskHolder iHolder) {
1315  if (iHolder.taskHasFailed()) {
1316  return;
1317  }
1318 
1319  // We must be careful with the status object here and in code this function calls. IF we want
1320  // endRun to be called, then we must call resetResources before the things waiting on
1321  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1322  // endRun to be called much later than it should be, because status is holding iRunResource).
1323  // Note that this must be done explicitly. Relying on the destructor does not work well
1324  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1325  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1326  // destroyed inside this function and lumiWork.
1327  auto status =
1328  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1329  chain::first([&](auto nextTask) {
1330  auto asyncEventSetup = [](ActivityRegistry* actReg,
1331  auto* espController,
1332  auto& queue,
1333  WaitingTaskHolder task,
1334  auto& status,
1335  IOVSyncValue const& iSync) {
1336  queue.pause();
1337  CMS_SA_ALLOW try {
1338  SendSourceTerminationSignalIfException sentry(actReg);
1339  // Pass in iSync to let the EventSetup system know which run and lumi
1340  // need to be processed and prepare IOVs for it.
1341  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1342  // lumi is done and no longer needs its EventSetup IOVs.
1343  espController->eventSetupForInstanceAsync(
1344  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1345  sentry.completedSuccessfully();
1346  } catch (...) {
1347  task.doneWaiting(std::current_exception());
1348  }
1349  };
1350  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1351  // We only get here inside this block if there is an EventSetup
1352  // module not able to handle concurrent IOVs (usually an ESSource)
1353  // and the new sync value is outside the current IOV of that module.
1354  auto group = nextTask.group();
1356  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1357  asyncEventSetup(
1359  });
1360  } else {
1361  asyncEventSetup(
1363  }
1364  }) | chain::then([this, status](std::exception_ptr const* iPtr, auto nextTask) {
1365  //the call to doneWaiting will cause the count to decrement
1366  auto copyTask = nextTask;
1367  if (iPtr) {
1368  nextTask.doneWaiting(*iPtr);
1369  }
1370  auto group = copyTask.group();
1371  lumiQueue_->pushAndPause(
1372  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1373  if (task.taskHasFailed()) {
1374  status->resetResources();
1375  return;
1376  }
1377 
1378  status->setResumer(std::move(iResumer));
1379 
1380  auto group = task.group();
1382  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1383  //make the services available
1385  // Caught exception is propagated via WaitingTaskHolder
1386  CMS_SA_ALLOW try {
1388 
1389  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1390  {
1391  SendSourceTerminationSignalIfException sentry(actReg_.get());
1392 
1393  input_->doBeginLumi(lumiPrincipal, &processContext_);
1394  sentry.completedSuccessfully();
1395  }
1396 
1398  if (rng.isAvailable()) {
1399  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1400  rng->preBeginLumi(lb);
1401  }
1402 
1403  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1404 
1405  using namespace edm::waiting_task::chain;
1406  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1407  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1408  {
1409  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1411  beginGlobalTransitionAsync<Traits>(
1412  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1413  }
1414  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1415  looper_->prefetchAsync(
1416  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1417  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1418  status->globalBeginDidSucceed();
1419  //make the services available
1420  ServiceRegistry::Operate operateLooper(serviceToken_);
1421  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1422  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1423  if (iPtr) {
1424  status->resetResources();
1425  holder.doneWaiting(*iPtr);
1426  } else {
1427  if (not looper_) {
1428  status->globalBeginDidSucceed();
1429  }
1430  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1432 
1433  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1434  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1435  streamQueues_[i].pause();
1436 
1437  auto& event = principalCache_.eventPrincipal(i);
1438  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1439  // held by the container as this lambda may not finish executing before all the tasks it
1440  // spawns have already started to run.
1441  auto eventSetupImpls = &status->eventSetupImpls();
1442  auto lp = status->lumiPrincipal().get();
1443  streamLumiStatus_[i] = std::move(status);
1445  event.setLuminosityBlockPrincipal(lp);
1446  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1447  using namespace edm::waiting_task::chain;
1448  chain::first([this, i, &transitionInfo](auto nextTask) {
1449  beginStreamTransitionAsync<Traits>(
1450  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1451  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1452  if (exceptionFromBeginStreamLumi) {
1453  WaitingTaskHolder tmp(nextTask);
1454  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1455  streamEndLumiAsync(nextTask, i);
1456  } else {
1458  }
1459  }) | runLast(holder);
1460  });
1461  }
1462  }
1463  }) | runLast(postQueueTask);
1464 
1465  } catch (...) {
1466  status->resetResources();
1467  postQueueTask.doneWaiting(std::current_exception());
1468  }
1469  }); // task in sourceResourcesAcquirer
1470  });
1471  }) | chain::runLast(std::move(iHolder));
1472  }
1473 
1475  {
1476  //all streams are sharing the same status at the moment
1477  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1478  status->needToContinueLumi();
1479  status->startProcessingEvents();
1480  }
1481 
1482  unsigned int streamIndex = 0;
1483  tbb::task_arena arena{tbb::task_arena::attach()};
1484  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1485  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1486  }
1487  iHolder.group()->run(
1488  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1489  }
1490 
1491  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1492  if (setDeferredException(*iPtr)) {
1493  WaitingTaskHolder tmp(holder);
1494  tmp.doneWaiting(*iPtr);
1495  } else {
1497  }
1498  }
1499 
1501  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1502  // Get some needed info out of the status object before moving
1503  // it into finalTaskForThisLumi.
1504  auto& lp = *(iLumiStatus->lumiPrincipal());
1505  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1506  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1507  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1508  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1509 
1510  using namespace edm::waiting_task::chain;
1511  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1512  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1513 
1514  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1516  endGlobalTransitionAsync<Traits>(
1517  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1518  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1519  //Only call writeLumi if beginLumi succeeded
1520  if (didGlobalBeginSucceed) {
1521  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1522  }
1523  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1524  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1525  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1526  //any thrown exception auto propagates to nextTask via the chain
1528  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1529  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1530  std::exception_ptr ptr;
1531  if (iPtr) {
1532  ptr = *iPtr;
1533  }
1535 
1536  // Try hard to clean up resources so the
1537  // process can terminate in a controlled
1538  // fashion even after exceptions have occurred.
1539  // Caught exception is passed to handleEndLumiExceptions()
1540  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1541  if (not ptr) {
1542  ptr = std::current_exception();
1543  }
1544  }
1545  // Caught exception is passed to handleEndLumiExceptions()
1546  CMS_SA_ALLOW try {
1547  status->resumeGlobalLumiQueue();
1549  } catch (...) {
1550  if (not ptr) {
1551  ptr = std::current_exception();
1552  }
1553  }
1554  // Caught exception is passed to handleEndLumiExceptions()
1555  CMS_SA_ALLOW try {
1556  // This call to status.resetResources() must occur before iTask is destroyed.
1557  // Otherwise there will be a data race which could result in endRun
1558  // being delayed until it is too late to successfully call it.
1559  status->resetResources();
1560  status.reset();
1561  } catch (...) {
1562  if (not ptr) {
1563  ptr = std::current_exception();
1564  }
1565  }
1566 
1567  if (ptr) {
1568  handleEndLumiExceptions(&ptr, nextTask);
1569  }
1570  }) | runLast(std::move(iTask));
1571  }
1572 
1573  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1574  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1575  if (iPtr) {
1576  handleEndLumiExceptions(iPtr, iTask);
1577  }
1578  auto status = streamLumiStatus_[iStreamIndex];
1579  //reset status before releasing queue else get race condtion
1580  streamLumiStatus_[iStreamIndex].reset();
1582  streamQueues_[iStreamIndex].resume();
1583 
1584  //are we the last one?
1585  if (status->streamFinishedLumi()) {
1587  }
1588  });
1589 
1590  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1591 
1592  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1593  // therefore we do not want to hold the shared_ptr
1594  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1595  lumiStatus->setEndTime();
1596 
1597  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1598 
1599  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1600  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1601 
1602  if (lumiStatus->didGlobalBeginSucceed()) {
1603  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1604  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1605  lumiPrincipal.endTime());
1607  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1608  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1609  *schedule_,
1610  iStreamIndex,
1611  transitionInfo,
1612  serviceToken_,
1613  subProcesses_,
1614  cleaningUpAfterException);
1615  }
1616  }
1617 
1619  if (streamLumiActive_.load() > 0) {
1620  FinalWaitingTask globalWaitTask;
1621  {
1622  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1623  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1624  if (streamLumiStatus_[i]) {
1625  streamEndLumiAsync(globalTaskHolder, i);
1626  }
1627  }
1628  }
1629  do {
1630  taskGroup_.wait();
1631  } while (not globalWaitTask.done());
1632  if (globalWaitTask.exceptionPtr() != nullptr) {
1633  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1634  }
1635  }
1636  }
1637 
1639  SendSourceTerminationSignalIfException sentry(actReg_.get());
1640  input_->readProcessBlock(processBlockPrincipal);
1641  sentry.completedSuccessfully();
1642  }
1643 
1644  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1646  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1647  << "Illegal attempt to insert run into cache\n"
1648  << "Contact a Framework Developer\n";
1649  }
1650  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1651  preg(),
1653  historyAppender_.get(),
1654  0,
1655  true,
1657  {
1658  SendSourceTerminationSignalIfException sentry(actReg_.get());
1659  input_->readRun(*rp, *historyAppender_);
1660  sentry.completedSuccessfully();
1661  }
1662  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1663  principalCache_.insert(rp);
1664  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1665  }
1666 
1667  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1668  principalCache_.merge(input_->runAuxiliary(), preg());
1669  auto runPrincipal = principalCache_.runPrincipalPtr();
1670  {
1671  SendSourceTerminationSignalIfException sentry(actReg_.get());
1672  input_->readAndMergeRun(*runPrincipal);
1673  sentry.completedSuccessfully();
1674  }
1675  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1676  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1677  }
1678 
1681  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1682  << "Illegal attempt to insert lumi into cache\n"
1683  << "Run is invalid\n"
1684  << "Contact a Framework Developer\n";
1685  }
1687  assert(lbp);
1688  lbp->setAux(*input_->luminosityBlockAuxiliary());
1689  {
1690  SendSourceTerminationSignalIfException sentry(actReg_.get());
1691  input_->readLuminosityBlock(*lbp, *historyAppender_);
1692  sentry.completedSuccessfully();
1693  }
1694  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1695  iStatus.lumiPrincipal() = std::move(lbp);
1696  }
1697 
1699  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1700  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1701  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1702  input_->processHistoryRegistry().reducedProcessHistoryID(
1703  input_->luminosityBlockAuxiliary()->processHistoryID()));
1704  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1705  assert(lumiOK);
1706  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1707  {
1708  SendSourceTerminationSignalIfException sentry(actReg_.get());
1709  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1710  sentry.completedSuccessfully();
1711  }
1712  return input_->luminosityBlock();
1713  }
1714 
1716  using namespace edm::waiting_task;
1717  chain::first([&](auto nextTask) {
1719  schedule_->writeProcessBlockAsync(
1720  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1721  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1723  for (auto& s : subProcesses_) {
1724  s.writeProcessBlockAsync(nextTask, processBlockType);
1725  }
1726  }) | chain::runLast(std::move(task));
1727  }
1728 
1730  ProcessHistoryID const& phid,
1731  RunNumber_t run,
1732  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1733  using namespace edm::waiting_task;
1734  chain::first([&](auto nextTask) {
1736  schedule_->writeRunAsync(nextTask,
1737  principalCache_.runPrincipal(phid, run),
1738  &processContext_,
1739  actReg_.get(),
1740  mergeableRunProductMetadata);
1741  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1743  for (auto& s : subProcesses_) {
1744  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1745  }
1746  }) | chain::runLast(std::move(task));
1747  }
1748 
1750  principalCache_.deleteRun(phid, run);
1751  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1752  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1753  }
1754 
1756  using namespace edm::waiting_task;
1757  if (not lumiPrincipal.willBeContinued()) {
1758  chain::first([&](auto nextTask) {
1760 
1761  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1762  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1763  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1765  for (auto& s : subProcesses_) {
1766  s.writeLumiAsync(nextTask, lumiPrincipal);
1767  }
1768  }) | chain::lastTask(std::move(task));
1769  }
1770  }
1771 
1773  for (auto& s : subProcesses_) {
1774  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1775  }
1776  iStatus.lumiPrincipal()->clearPrincipal();
1777  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1778  }
1779 
1781  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1782  iStatus.endLumi();
1783  return false;
1784  }
1785 
1786  if (iStatus.wasEventProcessingStopped()) {
1787  return false;
1788  }
1789 
1790  if (shouldWeStop()) {
1792  iStatus.stopProcessingEvents();
1793  iStatus.endLumi();
1794  return false;
1795  }
1796 
1798  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1799  CMS_SA_ALLOW try {
1800  //need to use lock in addition to the serial task queue because
1801  // of delayed provenance reading and reading data in response to
1802  // edm::Refs etc
1803  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1804 
1805  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1806  if (InputSource::IsLumi == itemType) {
1807  iStatus.haveContinuedLumi();
1808  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1809  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1810  readAndMergeLumi(iStatus);
1811  itemType = nextTransitionType();
1812  }
1813  if (InputSource::IsLumi == itemType) {
1814  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1815  input_->luminosityBlockAuxiliary()->beginTime()));
1816  }
1817  }
1818  if (InputSource::IsEvent != itemType) {
1819  iStatus.stopProcessingEvents();
1820 
1821  //IsFile may continue processing the lumi and
1822  // looper_ can cause the input source to declare a new IsRun which is actually
1823  // just a continuation of the previous run
1824  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1825  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1826  iStatus.endLumi();
1827  }
1828  return false;
1829  }
1830  readEvent(iStreamIndex);
1831  } catch (...) {
1832  bool expected = false;
1833  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1834  deferredExceptionPtr_ = std::current_exception();
1835  iStatus.endLumi();
1836  }
1837  return false;
1838  }
1839  return true;
1840  }
1841 
1842  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1843  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1845  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1846  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1847  auto status = streamLumiStatus_[iStreamIndex].get();
1848  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1849  CMS_SA_ALLOW try {
1850  if (readNextEventForStream(iStreamIndex, *status)) {
1851  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1852  if (iPtr) {
1853  // Try to end the stream properly even if an exception was
1854  // thrown on an event.
1855  bool expected = false;
1856  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1857  // This is the case where the exception in iPtr is the primary
1858  // exception and we want to see its message.
1859  deferredExceptionPtr_ = *iPtr;
1860  WaitingTaskHolder tempHolder(iTask);
1861  tempHolder.doneWaiting(*iPtr);
1862  }
1863  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1864  //the stream will stop now
1865  return;
1866  }
1867  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1868  });
1869 
1870  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1871  } else {
1872  //the stream will stop now
1873  if (status->isLumiEnding()) {
1874  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1875  status->startNextLumi();
1876  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1877  }
1878  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1879  } else {
1880  iTask.doneWaiting(std::exception_ptr{});
1881  }
1882  }
1883  } catch (...) {
1884  // It is unlikely we will ever get in here ...
1885  // But if we do try to clean up and propagate the exception
1886  if (streamLumiStatus_[iStreamIndex]) {
1887  streamEndLumiAsync(iTask, iStreamIndex);
1888  }
1889  bool expected = false;
1890  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1891  auto e = std::current_exception();
1893  iTask.doneWaiting(e);
1894  }
1895  }
1896  });
1897  }
1898 
1899  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1900  //TODO this will have to become per stream
1901  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1902  StreamContext streamContext(event.streamID(), &processContext_);
1903 
1904  SendSourceTerminationSignalIfException sentry(actReg_.get());
1905  input_->readEvent(event, streamContext);
1906 
1907  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1908  sentry.completedSuccessfully();
1909 
1910  FDEBUG(1) << "\treadEvent\n";
1911  }
1912 
1913  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1914  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1915  }
1916 
1917  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1918  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1919 
1922  if (rng.isAvailable()) {
1923  Event ev(*pep, ModuleDescription(), nullptr);
1924  rng->postEventRead(ev);
1925  }
1926 
1927  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1928  using namespace edm::waiting_task::chain;
1929  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1930  EventTransitionInfo info(*pep, es);
1931  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1932  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1933  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1934  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1935  }
1936  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1937  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1938  ServiceRegistry::Operate operateLooper(serviceToken_);
1939  processEventWithLooper(*pep, iStreamIndex);
1940  }) | then([pep](auto nextTask) {
1941  FDEBUG(1) << "\tprocessEvent\n";
1942  pep->clearEventPrincipal();
1943  }) | runLast(iHolder);
1944  }
1945 
1946  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1947  bool randomAccess = input_->randomAccess();
1948  ProcessingController::ForwardState forwardState = input_->forwardState();
1949  ProcessingController::ReverseState reverseState = input_->reverseState();
1950  ProcessingController pc(forwardState, reverseState, randomAccess);
1951 
1953  do {
1954  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1955  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1956  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1957 
1958  bool succeeded = true;
1959  if (randomAccess) {
1961  input_->skipEvents(-2);
1963  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1964  }
1965  }
1966  pc.setLastOperationSucceeded(succeeded);
1967  } while (!pc.lastOperationSucceeded());
1968  if (status != EDLooperBase::kContinue) {
1969  shouldWeStop_ = true;
1971  }
1972  }
1973 
1975  FDEBUG(1) << "\tshouldWeStop\n";
1976  if (shouldWeStop_)
1977  return true;
1978  if (!subProcesses_.empty()) {
1979  for (auto const& subProcess : subProcesses_) {
1980  if (subProcess.terminate()) {
1981  return true;
1982  }
1983  }
1984  return false;
1985  }
1986  return schedule_->terminate();
1987  }
1988 
1990 
1992 
1994 
1995  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1996  bool expected = false;
1997  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1998  deferredExceptionPtr_ = iException;
1999  return true;
2000  }
2001  return false;
2002  }
2003 
2005  cms::Exception ex("ModulesSynchingOnLumis");
2006  ex << "The framework is configured to use at least two streams, but the following modules\n"
2007  << "require synchronizing on LuminosityBlock boundaries:";
2008  bool found = false;
2009  for (auto worker : schedule_->allWorkers()) {
2010  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2011  found = true;
2012  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2013  }
2014  }
2015  if (found) {
2016  ex << "\n\nThe situation can be fixed by either\n"
2017  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2018  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2019  throw ex;
2020  }
2021  }
2022 
2024  std::unique_ptr<LogSystem> s;
2025  for (auto worker : schedule_->allWorkers()) {
2026  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2027  if (not s) {
2028  s = std::make_unique<LogSystem>("LegacyModules");
2029  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2030  "is going to end soon. These modules need to be converted to have type\n"
2031  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2032  }
2033  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2034  }
2035  }
2036  }
2037 } // namespace edm
std::atomic< bool > exceptionMessageLumis_
RunPrincipal const & runPrincipal() const
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void clearPrincipal()
Definition: Principal.cc:382
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
static const TGPicture * info(bool iBackgroundIsBlack)
const edm::EventSetup & c
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
tbb::task_group taskGroup_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
static PFTauRenderPlugin instance
ParameterSetID id() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool willBeContinued() const
The source is replaying overlapping LuminosityBlocks and this is not the last part for this Lumiosity...
static std::mutex mutex
Definition: Proxy.cc:8
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
list status
Definition: mps_update.py:107
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
processConfiguration
Definition: Schedule.cc:687
bool ev
bool hasRunPrincipal() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
RunNumber_t run() const
Definition: RunPrincipal.h:61
Log< level::Error, false > LogError
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
assert(be >=bs)
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
bool done() const
Definition: WaitingTask.h:82
bool alreadyPrinted() const
Definition: Exception.cc:177
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:47
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
void warnAboutLegacyModules() const
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
static InputSourceFactory const * get()
std::unique_ptr< InputSource > makeInputSource(ParameterSet const &, InputSourceDescription const &) const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void setExceptionMessageRuns(std::string &message)
void validateLooper(ParameterSet &pset)
bool taskHasFailed() const noexcept
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
def move
Definition: eostools.py:511
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:40
void clear()
Not thread safe.
Definition: Registry.cc:40
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
virtual void endOfJob()
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
int totalEvents() const
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
areg
Definition: Schedule.cc:687
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void writeLumi(LuminosityBlockNumber_t lumi)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Log< level::Info, false > LogInfo
tuple group
Definition: watchdog.py:82
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
void addContext(std::string const &context)
Definition: Exception.cc:165
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
tbb::task_group * group() const noexcept
constexpr element_type const * get() const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
void removeModules(std::vector< ModuleDescription const * > const &modules)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
static ComponentFactory< T > const * get()
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Log< level::System, true > LogAbsolute
void setNextSyncValue(IOVSyncValue const &iValue)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
#define get
std::atomic< unsigned int > streamLumiActive_
Log< level::Warning, false > LogWarning
void beginProcessBlock(bool &beginProcessBlockSucceeded)
preg
Definition: Schedule.cc:687
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
tmp
align.sh
Definition: createJobs.py:716
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
ParameterSet const & registerIt()
tuple size
Write out results.
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
Transition requestedTransition() const
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:22