CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
8 
41 
43 
51 
56 
60 
73 
74 #include "MessageForSource.h"
75 #include "MessageForParent.h"
77 
78 #include "boost/range/adaptor/reversed.hpp"
79 
80 #include <cassert>
81 #include <exception>
82 #include <iomanip>
83 #include <iostream>
84 #include <utility>
85 #include <sstream>
86 
87 #include <sys/ipc.h>
88 #include <sys/msg.h>
89 
90 #include "tbb/task.h"
91 
92 //Used for CPU affinity
93 #ifndef __APPLE__
94 #include <sched.h>
95 #endif
96 
97 namespace {
98  //Sentry class to only send a signal if an
99  // exception occurs. An exception is identified
100  // by the destructor being called without first
101  // calling completedSuccessfully().
102  class SendSourceTerminationSignalIfException {
103  public:
104  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
105  ~SendSourceTerminationSignalIfException() {
106  if (reg_) {
107  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
108  }
109  }
110  void completedSuccessfully() { reg_ = nullptr; }
111 
112  private:
113  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
114  };
115 } // namespace
116 
117 namespace edm {
118 
119  namespace chain = waiting_task::chain;
120 
121  // ---------------------------------------------------------------
122  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
123  CommonParams const& common,
124  std::shared_ptr<ProductRegistry> preg,
125  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
126  std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
127  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
128  std::shared_ptr<ActivityRegistry> areg,
129  std::shared_ptr<ProcessConfiguration const> processConfiguration,
130  PreallocationConfiguration const& allocations) {
131  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
132  if (main_input == nullptr) {
134  << "There must be exactly one source in the configuration.\n"
135  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
136  }
137 
138  std::string modtype(main_input->getParameter<std::string>("@module_type"));
139 
140  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
142  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
143  filler->fill(descriptions);
144 
145  try {
146  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
147  } catch (cms::Exception& iException) {
148  std::ostringstream ost;
149  ost << "Validating configuration of input source of type " << modtype;
150  iException.addContext(ost.str());
151  throw;
152  }
153 
154  main_input->registerIt();
155 
156  // Fill in "ModuleDescription", in case the input source produces
157  // any EDProducts, which would be registered in the ProductRegistry.
158  // Also fill in the process history item for this process.
159  // There is no module label for the unnamed input source, so
160  // just use "source".
161  // Only the tracked parameters belong in the process configuration.
162  ModuleDescription md(main_input->id(),
163  main_input->getParameter<std::string>("@module_type"),
164  "source",
165  processConfiguration.get(),
167 
168  InputSourceDescription isdesc(md,
169  preg,
170  branchIDListHelper,
171  processBlockHelper,
172  thinnedAssociationsHelper,
173  areg,
174  common.maxEventsInput_,
175  common.maxLumisInput_,
176  common.maxSecondsUntilRampdown_,
177  allocations);
178 
179  areg->preSourceConstructionSignal_(md);
180  std::unique_ptr<InputSource> input;
181  try {
182  //even if we have an exception, send the signal
183  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
184  convertException::wrap([&]() {
185  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
186  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
187  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
188  });
189  } catch (cms::Exception& iException) {
190  std::ostringstream ost;
191  ost << "Constructing input source of type " << modtype;
192  iException.addContext(ost.str());
193  throw;
194  }
195  return input;
196  }
197 
198  // ---------------------------------------------------------------
200  auto const modtype = pset.getParameter<std::string>("@module_type");
201  auto const moduleLabel = pset.getParameter<std::string>("@module_label");
203  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
204  filler->fill(descriptions);
205  try {
206  edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
207  } catch (cms::Exception& iException) {
208  iException.addContext(
209  fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
210  throw;
211  }
212  }
213 
214  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
217  std::vector<std::string> const& loopers) {
218  std::shared_ptr<EDLooperBase> vLooper;
219 
220  assert(1 == loopers.size());
221 
222  for (auto const& looperName : loopers) {
223  ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
224  validateLooper(*providerPSet);
225  providerPSet->registerIt();
226  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
227  }
228  return vLooper;
229  }
230 
231  // ---------------------------------------------------------------
232  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
233  ServiceToken const& iToken,
235  std::vector<std::string> const& defaultServices,
236  std::vector<std::string> const& forcedServices)
237  : actReg_(),
238  preg_(),
239  branchIDListHelper_(),
240  serviceToken_(),
241  input_(),
242  espController_(new eventsetup::EventSetupsController),
243  esp_(),
244  act_table_(),
245  processConfiguration_(),
246  schedule_(),
247  subProcesses_(),
248  historyAppender_(new HistoryAppender),
249  fb_(),
250  looper_(),
251  deferredExceptionPtrIsSet_(false),
252  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
253  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
254  principalCache_(),
255  beginJobCalled_(false),
256  shouldWeStop_(false),
257  fileModeNoMerge_(false),
258  exceptionMessageFiles_(),
259  exceptionMessageRuns_(),
260  exceptionMessageLumis_(false),
261  forceLooperToEnd_(false),
262  looperBeginJobRun_(false),
263  forceESCacheClearOnNewRun_(false),
264  eventSetupDataToExcludeFromPrefetching_() {
265  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
266  processDesc->addServices(defaultServices, forcedServices);
267  init(processDesc, iToken, iLegacy);
268  }
269 
270  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
271  std::vector<std::string> const& defaultServices,
272  std::vector<std::string> const& forcedServices)
273  : actReg_(),
274  preg_(),
275  branchIDListHelper_(),
276  serviceToken_(),
277  input_(),
278  espController_(new eventsetup::EventSetupsController),
279  esp_(),
280  act_table_(),
281  processConfiguration_(),
282  schedule_(),
283  subProcesses_(),
284  historyAppender_(new HistoryAppender),
285  fb_(),
286  looper_(),
287  deferredExceptionPtrIsSet_(false),
288  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
289  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
290  principalCache_(),
291  beginJobCalled_(false),
292  shouldWeStop_(false),
293  fileModeNoMerge_(false),
294  exceptionMessageFiles_(),
295  exceptionMessageRuns_(),
296  exceptionMessageLumis_(false),
297  forceLooperToEnd_(false),
298  looperBeginJobRun_(false),
299  forceESCacheClearOnNewRun_(false),
300  asyncStopRequestedWhileProcessingEvents_(false),
301  eventSetupDataToExcludeFromPrefetching_() {
302  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
303  processDesc->addServices(defaultServices, forcedServices);
305  }
306 
307  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
308  ServiceToken const& token,
310  : actReg_(),
311  preg_(),
312  branchIDListHelper_(),
313  serviceToken_(),
314  input_(),
315  espController_(new eventsetup::EventSetupsController),
316  esp_(),
317  act_table_(),
318  processConfiguration_(),
319  schedule_(),
320  subProcesses_(),
321  historyAppender_(new HistoryAppender),
322  fb_(),
323  looper_(),
324  deferredExceptionPtrIsSet_(false),
325  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
326  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
327  principalCache_(),
328  beginJobCalled_(false),
329  shouldWeStop_(false),
330  fileModeNoMerge_(false),
331  exceptionMessageFiles_(),
332  exceptionMessageRuns_(),
333  exceptionMessageLumis_(false),
334  forceLooperToEnd_(false),
335  looperBeginJobRun_(false),
336  forceESCacheClearOnNewRun_(false),
337  asyncStopRequestedWhileProcessingEvents_(false),
338  eventSetupDataToExcludeFromPrefetching_() {
339  init(processDesc, token, legacy);
340  }
341 
342  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
343  ServiceToken const& iToken,
345  //std::cerr << processDesc->dump() << std::endl;
346 
347  // register the empty parentage vector , once and for all
349 
350  // register the empty parameter set, once and for all.
352 
353  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
354 
355  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
356  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
357  bool const hasSubProcesses = !subProcessVParameterSet.empty();
358 
359  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
360  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
361  // set in here if the parameters were not explicitly set.
363 
364  // Now set some parameters specific to the main process.
365  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
366  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
367  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
368  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
369  << fileMode << ".\n"
370  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
371  } else {
372  fileModeNoMerge_ = (fileMode == "NOMERGE");
373  }
374  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
375 
376  //threading
377  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
378 
379  // Even if numberOfThreads was set to zero in the Python configuration, the code
380  // in cmsRun.cpp should have reset it to something else.
381  assert(nThreads != 0);
382 
383  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
384  if (nStreams == 0) {
385  nStreams = nThreads;
386  }
387  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
388  if (nConcurrentRuns != 1) {
389  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
390  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
391  }
392  unsigned int nConcurrentLumis =
393  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
394  if (nConcurrentLumis == 0) {
395  nConcurrentLumis = 2;
396  }
397  if (nConcurrentLumis > nStreams) {
398  nConcurrentLumis = nStreams;
399  }
400  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
401  if (!loopers.empty()) {
402  //For now loopers make us run only 1 transition at a time
403  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
404  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
405  "of concurrent runs, and the number of concurrent lumis "
406  "are all being reset to 1. Loopers cannot currently support "
407  "values greater than 1.";
408  nStreams = 1;
409  nConcurrentLumis = 1;
410  nConcurrentRuns = 1;
411  }
412  }
413  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
414  if (dumpOptions) {
415  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
416  } else {
417  if (nThreads > 1 or nStreams > 1) {
418  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
419  }
420  }
421  // The number of concurrent IOVs is configured individually for each record in
422  // the class NumberOfConcurrentIOVs to values less than or equal to this.
423  unsigned int maxConcurrentIOVs = nConcurrentLumis;
424 
425  //Check that relationships between threading parameters makes sense
426  /*
427  if(nThreads<nStreams) {
428  //bad
429  }
430  if(nConcurrentRuns>nStreams) {
431  //bad
432  }
433  if(nConcurrentRuns>nConcurrentLumis) {
434  //bad
435  }
436  */
437  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
438 
439  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
441  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
442 
443  // Now do general initialization
445 
446  //initialize the services
447  auto& serviceSets = processDesc->getServicesPSets();
448  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
449  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
450 
451  //make the services available
453 
454  if (nStreams > 1) {
456  handler->willBeUsingThreads();
457  }
458 
459  // intialize miscellaneous items
460  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
461 
462  // intialize the event setup provider
463  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
464  esp_ = espController_->makeProvider(
465  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
466 
467  // initialize the looper, if any
468  if (!loopers.empty()) {
470  looper_->setActionTable(items.act_table_.get());
471  looper_->attachTo(*items.actReg_);
472 
473  // in presence of looper do not delete modules
475  }
476 
477  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
478 
479  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
480  streamQueues_.resize(nStreams);
481  streamLumiStatus_.resize(nStreams);
482 
483  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
484 
485  // initialize the input source
487  *common,
488  items.preg(),
489  items.branchIDListHelper(),
491  items.thinnedAssociationsHelper(),
492  items.actReg_,
493  items.processConfiguration(),
495 
496  // initialize the Schedule
497  schedule_ =
498  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
499 
500  // set the data members
501  act_table_ = std::move(items.act_table_);
502  actReg_ = items.actReg_;
503  preg_ = items.preg();
505  branchIDListHelper_ = items.branchIDListHelper();
506  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
507  processConfiguration_ = items.processConfiguration();
509  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
510 
511  FDEBUG(2) << parameterSet << std::endl;
512 
514  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
515  // Reusable event principal
516  auto ep = std::make_shared<EventPrincipal>(preg(),
520  historyAppender_.get(),
521  index,
522  true /*primary process*/,
525  }
526 
527  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
528  auto lp =
529  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
531  }
532 
533  {
534  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
536 
537  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
539  }
540 
541  // fill the subprocesses, if there are any
542  subProcesses_.reserve(subProcessVParameterSet.size());
543  for (auto& subProcessPSet : subProcessVParameterSet) {
544  subProcesses_.emplace_back(subProcessPSet,
545  *parameterSet,
546  preg(),
552  *actReg_,
553  token,
556  &processContext_);
557  }
558  }
559 
561  // Make the services available while everything is being deleted.
564 
565  // manually destroy all these thing that may need the services around
566  // propagate_const<T> has no reset() function
567  espController_ = nullptr;
568  esp_ = nullptr;
569  schedule_ = nullptr;
570  input_ = nullptr;
571  looper_ = nullptr;
572  actReg_ = nullptr;
573 
576  }
577 
581  taskGroup_.wait();
582  assert(task.done());
583  }
584 
586  if (beginJobCalled_)
587  return;
588  beginJobCalled_ = true;
589  bk::beginJob();
590 
591  // StateSentry toerror(this); // should we add this ?
592  //make the services available
594 
599  actReg_->preallocateSignal_(bounds);
600  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
602 
603  std::vector<ModuleProcessName> consumedBySubProcesses;
605  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
606  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
607  if (consumedBySubProcesses.empty()) {
608  consumedBySubProcesses = std::move(c);
609  } else if (not c.empty()) {
610  std::vector<ModuleProcessName> tmp;
611  tmp.reserve(consumedBySubProcesses.size() + c.size());
612  std::merge(consumedBySubProcesses.begin(),
613  consumedBySubProcesses.end(),
614  c.begin(),
615  c.end(),
616  std::back_inserter(tmp));
617  std::swap(consumedBySubProcesses, tmp);
618  }
619  });
620 
621  // Note: all these may throw
624  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
625  not unusedModules.empty()) {
627 
628  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
629  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
630  "and "
631  "therefore they are deleted before beginJob transition.";
632  for (auto const& description : unusedModules) {
633  l << "\n " << description->moduleLabel();
634  }
635  });
636  for (auto const& description : unusedModules) {
637  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
638  }
639  }
640  }
641 
642  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
643 
646  }
648 
649  //NOTE: This implementation assumes 'Job' means one call
650  // the EventProcessor::run
651  // If it really means once per 'application' then this code will
652  // have to be changed.
653  // Also have to deal with case where have 'run' then new Module
654  // added and do 'run'
655  // again. In that case the newly added Module needs its 'beginJob'
656  // to be called.
657 
658  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
659  // For now we delay calling beginOfJob until first beginOfRun
660  //if(looper_) {
661  // looper_->beginOfJob(es);
662  //}
663  try {
664  convertException::wrap([&]() { input_->doBeginJob(); });
665  } catch (cms::Exception& ex) {
666  ex.addContext("Calling beginJob for the source");
667  throw;
668  }
669  espController_->finishConfiguration();
670  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
671  if (looper_) {
672  constexpr bool mustPrefetchMayGet = true;
673  auto const processBlockLookup = preg_->productLookup(InProcess);
674  auto const runLookup = preg_->productLookup(InRun);
675  auto const lumiLookup = preg_->productLookup(InLumi);
676  auto const eventLookup = preg_->productLookup(InEvent);
677  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
678  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
679  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
680  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
681  looper_->updateLookup(esp_->recordsToProxyIndices());
682  }
683  // toerror.succeeded(); // should we add this?
684  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
685  actReg_->postBeginJobSignal_();
686 
687  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
688  schedule_->beginStream(i);
689  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
690  }
691  }
692 
694  // Collects exceptions, so we don't throw before all operations are performed.
696  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
697 
698  //make the services available
700 
701  //NOTE: this really should go elsewhere in the future
702  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
703  c.call([this, i]() { this->schedule_->endStream(i); });
704  for (auto& subProcess : subProcesses_) {
705  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
706  }
707  }
708  auto actReg = actReg_.get();
709  c.call([actReg]() { actReg->preEndJobSignal_(); });
710  schedule_->endJob(c);
711  for (auto& subProcess : subProcesses_) {
712  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
713  }
714  c.call(std::bind(&InputSource::doEndJob, input_.get()));
715  if (looper_) {
716  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
717  }
718  c.call([actReg]() { actReg->postEndJobSignal_(); });
719  if (c.hasThrown()) {
720  c.rethrow();
721  }
722  }
723 
725 
726  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
727  return schedule_->getAllModuleDescriptions();
728  }
729 
730  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
731 
732  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
733 
734  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
735 
736  void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); }
737 
738  bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); }
739 
740  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
741 
742  namespace {
743 #include "TransitionProcessors.icc"
744  }
745 
747  bool returnValue = false;
748 
749  // Look for a shutdown signal
750  if (shutdown_flag.load(std::memory_order_acquire)) {
751  returnValue = true;
753  }
754  return returnValue;
755  }
756 
758  if (deferredExceptionPtrIsSet_.load()) {
760  return InputSource::IsStop;
761  }
762 
763  SendSourceTerminationSignalIfException sentry(actReg_.get());
764  InputSource::ItemType itemType;
765  //For now, do nothing with InputSource::IsSynchronize
766  do {
767  itemType = input_->nextItemType();
768  } while (itemType == InputSource::IsSynchronize);
769 
770  lastSourceTransition_ = itemType;
771  sentry.completedSuccessfully();
772 
774 
776  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
778  }
779 
780  return lastSourceTransition_;
781  }
782 
783  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
784  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
785  }
786 
788 
792  {
793  beginJob(); //make sure this was called
794 
795  // make the services available
797 
799  try {
800  FilesProcessor fp(fileModeNoMerge_);
801 
802  convertException::wrap([&]() {
803  bool firstTime = true;
804  do {
805  if (not firstTime) {
807  rewindInput();
808  } else {
809  firstTime = false;
810  }
811  startingNewLoop();
812 
813  auto trans = fp.processFiles(*this);
814 
815  fp.normalEnd();
816 
817  if (deferredExceptionPtrIsSet_.load()) {
818  std::rethrow_exception(deferredExceptionPtr_);
819  }
820  if (trans != InputSource::IsStop) {
821  //problem with the source
822  doErrorStuff();
823 
824  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
825  }
826  } while (not endOfLoop());
827  }); // convertException::wrap
828 
829  } // Try block
830  catch (cms::Exception& e) {
832  std::string message(
833  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
834  e.addAdditionalInfo(message);
835  if (e.alreadyPrinted()) {
836  LogAbsolute("Additional Exceptions") << message;
837  }
838  }
839  if (!exceptionMessageRuns_.empty()) {
840  e.addAdditionalInfo(exceptionMessageRuns_);
841  if (e.alreadyPrinted()) {
842  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
843  }
844  }
845  if (!exceptionMessageFiles_.empty()) {
846  e.addAdditionalInfo(exceptionMessageFiles_);
847  if (e.alreadyPrinted()) {
848  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
849  }
850  }
851  throw;
852  }
853  }
854 
855  return returnCode;
856  }
857 
859  FDEBUG(1) << " \treadFile\n";
860  size_t size = preg_->size();
861  SendSourceTerminationSignalIfException sentry(actReg_.get());
862 
864 
865  fb_ = input_->readFile();
866  if (size < preg_->size()) {
868  }
871  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
872  }
873  sentry.completedSuccessfully();
874  }
875 
876  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
877  if (fileBlockValid()) {
878  SendSourceTerminationSignalIfException sentry(actReg_.get());
879  input_->closeFile(fb_.get(), cleaningUpAfterException);
880  sentry.completedSuccessfully();
881  }
882  FDEBUG(1) << "\tcloseInputFile\n";
883  }
884 
886  if (fileBlockValid()) {
887  schedule_->openOutputFiles(*fb_);
888  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
889  }
890  FDEBUG(1) << "\topenOutputFiles\n";
891  }
892 
894  schedule_->closeOutputFiles();
895  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
896  processBlockHelper_->clearAfterOutputFilesClose();
897  FDEBUG(1) << "\tcloseOutputFiles\n";
898  }
899 
901  if (fileBlockValid()) {
903  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
904  schedule_->respondToOpenInputFile(*fb_);
905  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
906  }
907  FDEBUG(1) << "\trespondToOpenInputFile\n";
908  }
909 
911  if (fileBlockValid()) {
912  schedule_->respondToCloseInputFile(*fb_);
913  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
914  }
915  FDEBUG(1) << "\trespondToCloseInputFile\n";
916  }
917 
919  shouldWeStop_ = false;
920  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
921  // until after we've called beginOfJob
922  if (looper_ && looperBeginJobRun_) {
923  looper_->doStartingNewLoop();
924  }
925  FDEBUG(1) << "\tstartingNewLoop\n";
926  }
927 
929  if (looper_) {
930  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
931  looper_->setModuleChanger(&changer);
932  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
933  looper_->setModuleChanger(nullptr);
935  return true;
936  else
937  return false;
938  }
939  FDEBUG(1) << "\tendOfLoop\n";
940  return true;
941  }
942 
944  input_->repeat();
945  input_->rewind();
946  FDEBUG(1) << "\trewind\n";
947  }
948 
950  looper_->prepareForNextLoop(esp_.get());
951  FDEBUG(1) << "\tprepareForNextLoop\n";
952  }
953 
955  FDEBUG(1) << "\tshouldWeCloseOutput\n";
956  if (!subProcesses_.empty()) {
957  for (auto const& subProcess : subProcesses_) {
958  if (subProcess.shouldWeCloseOutput()) {
959  return true;
960  }
961  }
962  return false;
963  }
964  return schedule_->shouldWeCloseOutput();
965  }
966 
968  FDEBUG(1) << "\tdoErrorStuff\n";
969  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
970  << "and went to the error state\n"
971  << "Will attempt to terminate processing normally\n"
972  << "(IF using the looper the next loop will be attempted)\n"
973  << "This likely indicates a bug in an input module or corrupted input or both\n";
974  }
975 
976  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
978  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
979 
981  FinalWaitingTask globalWaitTask;
982 
983  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
984  beginGlobalTransitionAsync<Traits>(
985  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
986 
987  do {
988  taskGroup_.wait();
989  } while (not globalWaitTask.done());
990 
991  if (globalWaitTask.exceptionPtr() != nullptr) {
992  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
993  }
994  beginProcessBlockSucceeded = true;
995  }
996 
998  input_->fillProcessBlockHelper();
1000  while (input_->nextProcessBlock(processBlockPrincipal)) {
1001  readProcessBlock(processBlockPrincipal);
1002 
1004  FinalWaitingTask globalWaitTask;
1005 
1006  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1007  beginGlobalTransitionAsync<Traits>(
1008  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1009 
1010  do {
1011  taskGroup_.wait();
1012  } while (not globalWaitTask.done());
1013  if (globalWaitTask.exceptionPtr() != nullptr) {
1014  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1015  }
1016 
1017  FinalWaitingTask writeWaitTask;
1019  do {
1020  taskGroup_.wait();
1021  } while (not writeWaitTask.done());
1022  if (writeWaitTask.exceptionPtr()) {
1023  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1024  }
1025 
1026  processBlockPrincipal.clearPrincipal();
1027  for (auto& s : subProcesses_) {
1028  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1029  }
1030  }
1031  }
1032 
1033  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1034  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1035 
1037  FinalWaitingTask globalWaitTask;
1038 
1039  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1040  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1041  *schedule_,
1042  transitionInfo,
1043  serviceToken_,
1044  subProcesses_,
1045  cleaningUpAfterException);
1046  do {
1047  taskGroup_.wait();
1048  } while (not globalWaitTask.done());
1049  if (globalWaitTask.exceptionPtr() != nullptr) {
1050  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1051  }
1052 
1053  if (beginProcessBlockSucceeded) {
1054  FinalWaitingTask writeWaitTask;
1056  do {
1057  taskGroup_.wait();
1058  } while (not writeWaitTask.done());
1059  if (writeWaitTask.exceptionPtr()) {
1060  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1061  }
1062  }
1063 
1064  processBlockPrincipal.clearPrincipal();
1065  for (auto& s : subProcesses_) {
1066  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1067  }
1068  }
1069 
1071  RunNumber_t run,
1072  bool& globalBeginSucceeded,
1073  bool& eventSetupForInstanceSucceeded) {
1074  globalBeginSucceeded = false;
1075  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1076  {
1077  SendSourceTerminationSignalIfException sentry(actReg_.get());
1078 
1079  input_->doBeginRun(runPrincipal, &processContext_);
1080  sentry.completedSuccessfully();
1081  }
1082 
1083  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1085  espController_->forceCacheClear();
1086  }
1087  {
1088  SendSourceTerminationSignalIfException sentry(actReg_.get());
1090  eventSetupForInstanceSucceeded = true;
1091  sentry.completedSuccessfully();
1092  }
1093  auto const& es = esp_->eventSetupImpl();
1094  if (looper_ && looperBeginJobRun_ == false) {
1095  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1096 
1097  FinalWaitingTask waitTask;
1098  using namespace edm::waiting_task::chain;
1099  chain::first([this, &es](auto nextTask) {
1100  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1101  }) | then([this, &es](auto nextTask) mutable {
1102  looper_->beginOfJob(es);
1103  looperBeginJobRun_ = true;
1104  looper_->doStartingNewLoop();
1105  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1106 
1107  do {
1108  taskGroup_.wait();
1109  } while (not waitTask.done());
1110  if (waitTask.exceptionPtr() != nullptr) {
1111  std::rethrow_exception(*(waitTask.exceptionPtr()));
1112  }
1113  }
1114  {
1116  FinalWaitingTask globalWaitTask;
1117 
1118  using namespace edm::waiting_task::chain;
1119  chain::first([&runPrincipal, &es, this](auto waitTask) {
1120  RunTransitionInfo transitionInfo(runPrincipal, es);
1121  beginGlobalTransitionAsync<Traits>(
1122  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1123  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1124  globalBeginSucceeded = true;
1125  FDEBUG(1) << "\tbeginRun " << run << "\n";
1126  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1127  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1128  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1129  looper_->doBeginRun(runPrincipal, es, &processContext_);
1130  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1131 
1132  do {
1133  taskGroup_.wait();
1134  } while (not globalWaitTask.done());
1135  if (globalWaitTask.exceptionPtr() != nullptr) {
1136  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1137  }
1138  }
1139  {
1140  //To wait, the ref count has to be 1+#streams
1141  FinalWaitingTask streamLoopWaitTask;
1142 
1144 
1145  RunTransitionInfo transitionInfo(runPrincipal, es);
1146  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1147  *schedule_,
1149  transitionInfo,
1150  serviceToken_,
1151  subProcesses_);
1152  do {
1153  taskGroup_.wait();
1154  } while (not streamLoopWaitTask.done());
1155  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1156  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1157  }
1158  }
1159  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1160  if (looper_) {
1161  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1162  }
1163  }
1164 
1166  RunNumber_t run,
1167  bool globalBeginSucceeded,
1168  bool cleaningUpAfterException,
1169  bool eventSetupForInstanceSucceeded) {
1170  if (eventSetupForInstanceSucceeded) {
1171  //If we skip empty runs, this would be called conditionally
1172  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1173 
1174  if (globalBeginSucceeded) {
1176  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1177  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1178  mergeableRunProductMetadata->preWriteRun();
1179  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1180  do {
1181  taskGroup_.wait();
1182  } while (not t.done());
1183  mergeableRunProductMetadata->postWriteRun();
1184  if (t.exceptionPtr()) {
1185  std::rethrow_exception(*t.exceptionPtr());
1186  }
1187  }
1188  }
1189  deleteRunFromCache(phid, run);
1190  }
1191 
1193  RunNumber_t run,
1194  bool globalBeginSucceeded,
1195  bool cleaningUpAfterException) {
1196  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1197  runPrincipal.setEndTime(input_->timestamp());
1198 
1199  IOVSyncValue ts(
1201  runPrincipal.endTime());
1202  {
1203  SendSourceTerminationSignalIfException sentry(actReg_.get());
1205  sentry.completedSuccessfully();
1206  }
1207  auto const& es = esp_->eventSetupImpl();
1208  if (globalBeginSucceeded) {
1209  //To wait, the ref count has to be 1+#streams
1210  FinalWaitingTask streamLoopWaitTask;
1211 
1213 
1214  RunTransitionInfo transitionInfo(runPrincipal, es);
1215  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1216  *schedule_,
1218  transitionInfo,
1219  serviceToken_,
1220  subProcesses_,
1221  cleaningUpAfterException);
1222  do {
1223  taskGroup_.wait();
1224  } while (not streamLoopWaitTask.done());
1225  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1226  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1227  }
1228  }
1229  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1230  if (looper_) {
1231  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1232  }
1233  {
1234  FinalWaitingTask globalWaitTask;
1235 
1236  using namespace edm::waiting_task::chain;
1237  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1238  RunTransitionInfo transitionInfo(runPrincipal, es);
1240  endGlobalTransitionAsync<Traits>(
1241  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1242  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1243  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1244  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1245  looper_->doEndRun(runPrincipal, es, &processContext_);
1246  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1247 
1248  do {
1249  taskGroup_.wait();
1250  } while (not globalWaitTask.done());
1251  if (globalWaitTask.exceptionPtr() != nullptr) {
1252  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1253  }
1254  }
1255  FDEBUG(1) << "\tendRun " << run << "\n";
1256  }
1257 
1258  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1259  FinalWaitingTask waitTask;
1260  if (streamLumiActive_ > 0) {
1262  // Continue after opening a new input file
1264  } else {
1265  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1266  input_->luminosityBlockAuxiliary()->beginTime()),
1267  iRunResource,
1268  WaitingTaskHolder{taskGroup_, &waitTask});
1269  }
1270  do {
1271  taskGroup_.wait();
1272  } while (not waitTask.done());
1273 
1274  if (waitTask.exceptionPtr() != nullptr) {
1275  std::rethrow_exception(*(waitTask.exceptionPtr()));
1276  }
1277  return lastTransitionType();
1278  }
1279 
1281  std::shared_ptr<void> const& iRunResource,
1282  edm::WaitingTaskHolder iHolder) {
1283  if (iHolder.taskHasFailed()) {
1284  return;
1285  }
1286 
1287  // We must be careful with the status object here and in code this function calls. IF we want
1288  // endRun to be called, then we must call resetResources before the things waiting on
1289  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1290  // endRun to be called much later than it should be, because status is holding iRunResource).
1291  // Note that this must be done explicitly. Relying on the destructor does not work well
1292  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1293  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1294  // destroyed inside this function and lumiWork.
1295  auto status =
1296  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1297 
1298  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1299  if (iHolder.taskHasFailed()) {
1300  status->resetResources();
1301  return;
1302  }
1303 
1304  status->setResumer(std::move(iResumer));
1305 
1307  *iHolder.group(), [this, iHolder, status = std::move(status)]() mutable {
1308  //make the services available
1310  // Caught exception is propagated via WaitingTaskHolder
1311  CMS_SA_ALLOW try {
1313 
1314  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1315  {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317 
1318  input_->doBeginLumi(lumiPrincipal, &processContext_);
1319  sentry.completedSuccessfully();
1320  }
1321 
1323  if (rng.isAvailable()) {
1324  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1325  rng->preBeginLumi(lb);
1326  }
1327 
1328  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1329 
1330  using namespace edm::waiting_task::chain;
1331  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1332  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1333  {
1334  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1336  beginGlobalTransitionAsync<Traits>(
1337  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1338  }
1339  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1340  looper_->prefetchAsync(
1341  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1342  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1343  status->globalBeginDidSucceed();
1344  //make the services available
1345  ServiceRegistry::Operate operateLooper(serviceToken_);
1346  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1347  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1348  if (iPtr) {
1349  status->resetResources();
1350  holder.doneWaiting(*iPtr);
1351  } else {
1352  if (not looper_) {
1353  status->globalBeginDidSucceed();
1354  }
1355  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1357 
1358  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1359  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1360  streamQueues_[i].pause();
1361 
1362  auto& event = principalCache_.eventPrincipal(i);
1363  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1364  // held by the container as this lambda may not finish executing before all the tasks it
1365  // spawns have already started to run.
1366  auto eventSetupImpls = &status->eventSetupImpls();
1367  auto lp = status->lumiPrincipal().get();
1370  event.setLuminosityBlockPrincipal(lp);
1371  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1372  using namespace edm::waiting_task::chain;
1373  chain::first([this, i, &transitionInfo](auto nextTask) {
1374  beginStreamTransitionAsync<Traits>(
1375  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1376  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1377  if (exceptionFromBeginStreamLumi) {
1378  WaitingTaskHolder tmp(nextTask);
1379  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1380  streamEndLumiAsync(nextTask, i);
1381  } else {
1383  }
1384  }) | runLast(holder);
1385  });
1386  }
1387  }
1388  }) | runLast(iHolder);
1389 
1390  } catch (...) {
1391  status->resetResources();
1392  iHolder.doneWaiting(std::current_exception());
1393  }
1394  }); // task in sourceResourcesAcquirer
1395  }; // end lumiWork
1396 
1397  auto queueLumiWorkTask = make_waiting_task(
1398  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1399  if (iPtr) {
1400  iHolder.doneWaiting(*iPtr);
1401  }
1402  lumiQueue_->pushAndPause(*iHolder.group(), std::move(lumiWorkLambda));
1403  });
1404 
1405  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1406  // We only get here inside this block if there is an EventSetup
1407  // module not able to handle concurrent IOVs (usually an ESSource)
1408  // and the new sync value is outside the current IOV of that module.
1409 
1410  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1411 
1412  queueWhichWaitsForIOVsToFinish_.push(*iHolder.group(), [this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1413  // Caught exception is propagated via WaitingTaskHolder
1414  CMS_SA_ALLOW try {
1415  SendSourceTerminationSignalIfException sentry(actReg_.get());
1416  // Pass in iSync to let the EventSetup system know which run and lumi
1417  // need to be processed and prepare IOVs for it.
1418  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1419  // lumi is done and no longer needs its EventSetup IOVs.
1420  espController_->eventSetupForInstanceAsync(
1421  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1422  sentry.completedSuccessfully();
1423  } catch (...) {
1424  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1425  }
1427  });
1428 
1429  } else {
1431 
1432  // This holder will be used to wait until the EventSetup IOVs are ready
1433  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1434  // Caught exception is propagated via WaitingTaskHolder
1435  CMS_SA_ALLOW try {
1436  SendSourceTerminationSignalIfException sentry(actReg_.get());
1437 
1438  // Pass in iSync to let the EventSetup system know which run and lumi
1439  // need to be processed and prepare IOVs for it.
1440  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1441  // lumi is done and no longer needs its EventSetup IOVs.
1442  espController_->eventSetupForInstanceAsync(
1443  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1444  sentry.completedSuccessfully();
1445 
1446  } catch (...) {
1447  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1448  }
1449  }
1450  }
1451 
1453  {
1454  //all streams are sharing the same status at the moment
1455  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1456  status->needToContinueLumi();
1457  status->startProcessingEvents();
1458  }
1459 
1460  unsigned int streamIndex = 0;
1461  tbb::task_arena arena{tbb::task_arena::attach()};
1462  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1463  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1464  }
1465  iHolder.group()->run(
1466  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1467  }
1468 
1469  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1470  if (setDeferredException(*iPtr)) {
1471  WaitingTaskHolder tmp(holder);
1472  tmp.doneWaiting(*iPtr);
1473  } else {
1475  }
1476  }
1477 
1479  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1480  // Get some needed info out of the status object before moving
1481  // it into finalTaskForThisLumi.
1482  auto& lp = *(iLumiStatus->lumiPrincipal());
1483  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1484  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1485  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1486  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1487 
1488  using namespace edm::waiting_task::chain;
1489  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1490  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1491 
1492  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1494  endGlobalTransitionAsync<Traits>(
1495  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1496  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1497  //Only call writeLumi if beginLumi succeeded
1498  if (didGlobalBeginSucceed) {
1499  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1500  }
1501  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1502  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1503  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1504  //any thrown exception auto propagates to nextTask via the chain
1506  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1507  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1508  std::exception_ptr ptr;
1509  if (iPtr) {
1510  ptr = *iPtr;
1511  }
1513 
1514  // Try hard to clean up resources so the
1515  // process can terminate in a controlled
1516  // fashion even after exceptions have occurred.
1517  // Caught exception is passed to handleEndLumiExceptions()
1518  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1519  if (not ptr) {
1520  ptr = std::current_exception();
1521  }
1522  }
1523  // Caught exception is passed to handleEndLumiExceptions()
1524  CMS_SA_ALLOW try {
1525  status->resumeGlobalLumiQueue();
1527  } catch (...) {
1528  if (not ptr) {
1529  ptr = std::current_exception();
1530  }
1531  }
1532  // Caught exception is passed to handleEndLumiExceptions()
1533  CMS_SA_ALLOW try {
1534  // This call to status.resetResources() must occur before iTask is destroyed.
1535  // Otherwise there will be a data race which could result in endRun
1536  // being delayed until it is too late to successfully call it.
1537  status->resetResources();
1538  status.reset();
1539  } catch (...) {
1540  if (not ptr) {
1541  ptr = std::current_exception();
1542  }
1543  }
1544 
1545  if (ptr) {
1546  handleEndLumiExceptions(&ptr, nextTask);
1547  }
1548  }) | runLast(std::move(iTask));
1549  }
1550 
1551  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1552  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1553  if (iPtr) {
1554  handleEndLumiExceptions(iPtr, iTask);
1555  }
1556  auto status = streamLumiStatus_[iStreamIndex];
1557  //reset status before releasing queue else get race condtion
1558  streamLumiStatus_[iStreamIndex].reset();
1560  streamQueues_[iStreamIndex].resume();
1561 
1562  //are we the last one?
1563  if (status->streamFinishedLumi()) {
1565  }
1566  });
1567 
1568  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1569 
1570  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1571  // therefore we do not want to hold the shared_ptr
1572  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1573  lumiStatus->setEndTime();
1574 
1575  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1576 
1577  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1578  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1579 
1580  if (lumiStatus->didGlobalBeginSucceed()) {
1581  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1582  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1583  lumiPrincipal.endTime());
1585  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1586  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1587  *schedule_,
1588  iStreamIndex,
1589  transitionInfo,
1590  serviceToken_,
1591  subProcesses_,
1592  cleaningUpAfterException);
1593  }
1594  }
1595 
1597  if (streamLumiActive_.load() > 0) {
1598  FinalWaitingTask globalWaitTask;
1599  {
1600  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1601  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1602  if (streamLumiStatus_[i]) {
1603  streamEndLumiAsync(globalTaskHolder, i);
1604  }
1605  }
1606  }
1607  do {
1608  taskGroup_.wait();
1609  } while (not globalWaitTask.done());
1610  if (globalWaitTask.exceptionPtr() != nullptr) {
1611  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1612  }
1613  }
1614  }
1615 
1617  SendSourceTerminationSignalIfException sentry(actReg_.get());
1618  input_->readProcessBlock(processBlockPrincipal);
1619  sentry.completedSuccessfully();
1620  }
1621 
1622  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1624  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1625  << "Illegal attempt to insert run into cache\n"
1626  << "Contact a Framework Developer\n";
1627  }
1628  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1629  preg(),
1631  historyAppender_.get(),
1632  0,
1633  true,
1635  {
1636  SendSourceTerminationSignalIfException sentry(actReg_.get());
1637  input_->readRun(*rp, *historyAppender_);
1638  sentry.completedSuccessfully();
1639  }
1640  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1641  principalCache_.insert(rp);
1642  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1643  }
1644 
1645  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1646  principalCache_.merge(input_->runAuxiliary(), preg());
1647  auto runPrincipal = principalCache_.runPrincipalPtr();
1648  {
1649  SendSourceTerminationSignalIfException sentry(actReg_.get());
1650  input_->readAndMergeRun(*runPrincipal);
1651  sentry.completedSuccessfully();
1652  }
1653  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1654  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1655  }
1656 
1659  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1660  << "Illegal attempt to insert lumi into cache\n"
1661  << "Run is invalid\n"
1662  << "Contact a Framework Developer\n";
1663  }
1665  assert(lbp);
1666  lbp->setAux(*input_->luminosityBlockAuxiliary());
1667  {
1668  SendSourceTerminationSignalIfException sentry(actReg_.get());
1669  input_->readLuminosityBlock(*lbp, *historyAppender_);
1670  sentry.completedSuccessfully();
1671  }
1672  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1673  iStatus.lumiPrincipal() = std::move(lbp);
1674  }
1675 
1677  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1678  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1679  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1680  input_->processHistoryRegistry().reducedProcessHistoryID(
1681  input_->luminosityBlockAuxiliary()->processHistoryID()));
1682  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1683  assert(lumiOK);
1684  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1685  {
1686  SendSourceTerminationSignalIfException sentry(actReg_.get());
1687  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1688  sentry.completedSuccessfully();
1689  }
1690  return input_->luminosityBlock();
1691  }
1692 
1694  auto subsT = edm::make_waiting_task([this, task, processBlockType](std::exception_ptr const* iExcept) mutable {
1695  if (iExcept) {
1696  task.doneWaiting(*iExcept);
1697  } else {
1699  for (auto& s : subProcesses_) {
1700  s.writeProcessBlockAsync(task, processBlockType);
1701  }
1702  }
1703  });
1705  schedule_->writeProcessBlockAsync(WaitingTaskHolder(*task.group(), subsT),
1706  principalCache_.processBlockPrincipal(processBlockType),
1707  &processContext_,
1708  actReg_.get());
1709  }
1710 
1712  ProcessHistoryID const& phid,
1713  RunNumber_t run,
1714  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1715  auto subsT = edm::make_waiting_task(
1716  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1717  if (iExcept) {
1718  task.doneWaiting(*iExcept);
1719  } else {
1721  for (auto& s : subProcesses_) {
1722  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1723  }
1724  }
1725  });
1727  schedule_->writeRunAsync(WaitingTaskHolder(*task.group(), subsT),
1729  &processContext_,
1730  actReg_.get(),
1731  mergeableRunProductMetadata);
1732  }
1733 
1735  principalCache_.deleteRun(phid, run);
1736  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1737  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1738  }
1739 
1741  auto subsT = edm::make_waiting_task([this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1742  if (iExcept) {
1743  task.doneWaiting(*iExcept);
1744  } else {
1746  for (auto& s : subProcesses_) {
1747  s.writeLumiAsync(task, lumiPrincipal);
1748  }
1749  }
1750  });
1752 
1753  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1754 
1755  schedule_->writeLumiAsync(WaitingTaskHolder{*task.group(), subsT}, lumiPrincipal, &processContext_, actReg_.get());
1756  }
1757 
1759  for (auto& s : subProcesses_) {
1760  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1761  }
1762  iStatus.lumiPrincipal()->clearPrincipal();
1763  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1764  }
1765 
1767  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1768  iStatus.endLumi();
1769  return false;
1770  }
1771 
1772  if (iStatus.wasEventProcessingStopped()) {
1773  return false;
1774  }
1775 
1776  if (shouldWeStop()) {
1778  iStatus.stopProcessingEvents();
1779  iStatus.endLumi();
1780  return false;
1781  }
1782 
1784  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1785  CMS_SA_ALLOW try {
1786  //need to use lock in addition to the serial task queue because
1787  // of delayed provenance reading and reading data in response to
1788  // edm::Refs etc
1789  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1790 
1791  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1792  if (InputSource::IsLumi == itemType) {
1793  iStatus.haveContinuedLumi();
1794  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1795  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1796  readAndMergeLumi(iStatus);
1797  itemType = nextTransitionType();
1798  }
1799  if (InputSource::IsLumi == itemType) {
1800  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1801  input_->luminosityBlockAuxiliary()->beginTime()));
1802  }
1803  }
1804  if (InputSource::IsEvent != itemType) {
1805  iStatus.stopProcessingEvents();
1806 
1807  //IsFile may continue processing the lumi and
1808  // looper_ can cause the input source to declare a new IsRun which is actually
1809  // just a continuation of the previous run
1810  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1811  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1812  iStatus.endLumi();
1813  }
1814  return false;
1815  }
1816  readEvent(iStreamIndex);
1817  } catch (...) {
1818  bool expected = false;
1819  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1820  deferredExceptionPtr_ = std::current_exception();
1821  iStatus.endLumi();
1822  }
1823  return false;
1824  }
1825  return true;
1826  }
1827 
1828  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1829  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1831  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1832  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1833  auto status = streamLumiStatus_[iStreamIndex].get();
1834  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1835  CMS_SA_ALLOW try {
1836  if (readNextEventForStream(iStreamIndex, *status)) {
1837  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1838  if (iPtr) {
1839  // Try to end the stream properly even if an exception was
1840  // thrown on an event.
1841  bool expected = false;
1842  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1843  // This is the case where the exception in iPtr is the primary
1844  // exception and we want to see its message.
1845  deferredExceptionPtr_ = *iPtr;
1846  WaitingTaskHolder tempHolder(iTask);
1847  tempHolder.doneWaiting(*iPtr);
1848  }
1849  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1850  //the stream will stop now
1851  return;
1852  }
1853  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1854  });
1855 
1856  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1857  } else {
1858  //the stream will stop now
1859  if (status->isLumiEnding()) {
1860  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1861  status->startNextLumi();
1862  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1863  }
1864  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1865  } else {
1866  iTask.doneWaiting(std::exception_ptr{});
1867  }
1868  }
1869  } catch (...) {
1870  // It is unlikely we will ever get in here ...
1871  // But if we do try to clean up and propagate the exception
1872  if (streamLumiStatus_[iStreamIndex]) {
1873  streamEndLumiAsync(iTask, iStreamIndex);
1874  }
1875  bool expected = false;
1876  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1877  auto e = std::current_exception();
1879  iTask.doneWaiting(e);
1880  }
1881  }
1882  });
1883  }
1884 
1885  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1886  //TODO this will have to become per stream
1887  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1888  StreamContext streamContext(event.streamID(), &processContext_);
1889 
1890  SendSourceTerminationSignalIfException sentry(actReg_.get());
1891  input_->readEvent(event, streamContext);
1892 
1893  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1894  sentry.completedSuccessfully();
1895 
1896  FDEBUG(1) << "\treadEvent\n";
1897  }
1898 
1899  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1900  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1901  }
1902 
1903  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1904  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1905 
1908  if (rng.isAvailable()) {
1909  Event ev(*pep, ModuleDescription(), nullptr);
1910  rng->postEventRead(ev);
1911  }
1912 
1913  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1914  using namespace edm::waiting_task::chain;
1915  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1916  EventTransitionInfo info(*pep, es);
1917  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1918  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1919  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1920  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1921  }
1922  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1923  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1924  ServiceRegistry::Operate operateLooper(serviceToken_);
1925  processEventWithLooper(*pep, iStreamIndex);
1926  }) | then([pep](auto nextTask) {
1927  FDEBUG(1) << "\tprocessEvent\n";
1928  pep->clearEventPrincipal();
1929  }) | runLast(iHolder);
1930  }
1931 
1932  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1933  bool randomAccess = input_->randomAccess();
1934  ProcessingController::ForwardState forwardState = input_->forwardState();
1935  ProcessingController::ReverseState reverseState = input_->reverseState();
1936  ProcessingController pc(forwardState, reverseState, randomAccess);
1937 
1939  do {
1940  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1941  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1942  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1943 
1944  bool succeeded = true;
1945  if (randomAccess) {
1947  input_->skipEvents(-2);
1949  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1950  }
1951  }
1953  } while (!pc.lastOperationSucceeded());
1955  shouldWeStop_ = true;
1957  }
1958  }
1959 
1961  FDEBUG(1) << "\tshouldWeStop\n";
1962  if (shouldWeStop_)
1963  return true;
1964  if (!subProcesses_.empty()) {
1965  for (auto const& subProcess : subProcesses_) {
1966  if (subProcess.terminate()) {
1967  return true;
1968  }
1969  }
1970  return false;
1971  }
1972  return schedule_->terminate();
1973  }
1974 
1976 
1978 
1980 
1981  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1982  bool expected = false;
1983  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1984  deferredExceptionPtr_ = iException;
1985  return true;
1986  }
1987  return false;
1988  }
1989 
1991  std::unique_ptr<LogSystem> s;
1992  for (auto worker : schedule_->allWorkers()) {
1993  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1994  if (not s) {
1995  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1996  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1997  }
1998  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
1999  }
2000  }
2001  }
2002 
2004  std::unique_ptr<LogSystem> s;
2005  for (auto worker : schedule_->allWorkers()) {
2006  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2007  if (not s) {
2008  s = std::make_unique<LogSystem>("LegacyModules");
2009  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2010  "is going to end soon. These modules need to be converted to have type\n"
2011  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2012  }
2013  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2014  }
2015  }
2016  }
2017 } // namespace edm
edm::ParameterSet::registerIt
ParameterSet const & registerIt()
Definition: ParameterSet.cc:113
ConfigurationDescriptions.h
edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:362
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
ParameterSetDescriptionFillerBase.h
IllegalParameters.h
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1734
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:82
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:343
edm::ProcessingController::lastOperationSucceeded
bool lastOperationSucceeded() const
Definition: ProcessingController.cc:86
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::readProcessBlock
void readProcessBlock(ProcessBlockPrincipal &)
Definition: EventProcessor.cc:1616
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:299
edm::RunPrincipal::endTime
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::LuminosityBlockPrincipal::runPrincipal
RunPrincipal const & runPrincipal() const
Definition: LuminosityBlockPrincipal.h:45
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:356
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:943
edm::PrincipalCache::ProcessBlockType::New
edm::EventProcessor::respondToCloseInputFile
void respondToCloseInputFile()
Definition: EventProcessor.cc:910
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::totalEventsPassed
int totalEventsPassed() const
Definition: EventProcessor.cc:732
edm::SubProcessParentageHelper
Definition: SubProcessParentageHelper.h:21
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:321
edm::MergeableRunProductMetadata::postWriteRun
void postWriteRun()
Definition: MergeableRunProductMetadata.cc:155
input
static const std::string input
Definition: EdmProvDump.cc:48
ServiceRegistry.h
edm::EventProcessor::readRun
std::pair< ProcessHistoryID, RunNumber_t > readRun()
Definition: EventProcessor.cc:1622
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:835
edm::EventProcessor::shouldWeCloseOutput
bool shouldWeCloseOutput() const
Definition: EventProcessor.cc:954
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1711
MessageLogger.h
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:397
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:76
edm::ProcessBlockTransitionInfo
Definition: TransitionInfoTypes.h:80
funct::false
false
Definition: Factorize.h:29
edm::EventProcessor::processBlockHelper_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
Definition: EventProcessor.h:323
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:928
edm::EventProcessor::fb_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
Definition: EventProcessor.h:345
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:326
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:724
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:918
EventSetupProvider.h
ScheduleInfo.h
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
edm::LumiTransitionInfo
Definition: TransitionInfoTypes.h:42
edm::EventProcessor::totalEvents
int totalEvents() const
Definition: EventProcessor.cc:730
propagate_const.h
edm::errors::LogicError
Definition: EDMException.h:37
edm::EventSetupImpl
Definition: EventSetupImpl.h:49
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:115
edm::LuminosityBlock
Definition: LuminosityBlock.h:50
BranchIDListHelper.h
mps_update.status
status
Definition: mps_update.py:68
CalibrationSummaryClient_cfi.params
params
Definition: CalibrationSummaryClient_cfi.py:14
WaitingTaskHolder.h
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:71
LuminosityBlock.h
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1758
edm
HLT enums.
Definition: AlignableModifier.h:19
RandomNumberGenerator.h
edm::EventProcessor::endJob
void endJob()
Definition: EventProcessor.cc:693
edm::EventProcessor::warnAboutLegacyModules
void warnAboutLegacyModules() const
Definition: EventProcessor.cc:2003
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edmLumisInFiles.description
description
Definition: edmLumisInFiles.py:11
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:87
edm::eventsetup::synchronousEventSetupForInstance
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Definition: EventSetupsController.cc:412
edm::TerminationOrigin::ExternalSignal
MessageForSource.h
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1478
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:5
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:789
edm::EventProcessor::readAndMergeRun
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
Definition: EventProcessor.cc:1645
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:329
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:356
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1885
edm::EventProcessor::processLumis
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
Definition: EventProcessor.cc:1258
edm::InputSourceFactory::get
static InputSourceFactory const * get()
Definition: InputSourceFactory.cc:18
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
Algorithms.h
edm::EventProcessor::beginRun
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:1070
edm::ModuleDescription::getUniqueID
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
Definition: ModuleDescription.cc:87
edm::EventProcessor::enableEndPaths
void enableEndPaths(bool active)
Definition: EventProcessor.cc:736
edm::EventProcessor::endUnfinishedRun
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:1165
edm::ScheduleItems
Definition: ScheduleItems.h:30
edm::InputSourceDescription
Definition: InputSourceDescription.h:21
cms::cuda::assert
assert(be >=bs)
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:274
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:29
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:350
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
ProcessBlockPrincipal.h
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::dumpOptionsToLogFile
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
Definition: validateTopLevelParameterSets.cc:156
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:338
ProcessHistoryRegistry.h
hgcal_conditions::parameters
Definition: HGCConditions.h:86
edm::get_underlying_safe
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
Definition: get_underlying_safe.h:41
ESRecordsToProxyIndices.h
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:333
EventSetupsController.h
edm::ParameterSet::id
ParameterSetID id() const
Definition: ParameterSet.cc:189
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:327
edm::SerialTaskQueue::push
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:167
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:93
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:334
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:358
edm::RunTransitionInfo
Definition: TransitionInfoTypes.h:64
edm::LuminosityBlockNumber_t
unsigned int LuminosityBlockNumber_t
Definition: RunLumiEventNumber.h:13
edm::Worker::kLegacy
Definition: Worker.h:95
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:367
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:28
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
edm::InRun
Definition: BranchType.h:11
edm::ProcessBlockPrincipal
Definition: ProcessBlockPrincipal.h:22
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:471
edm::EventProcessor::totalEventsFailed
int totalEventsFailed() const
Definition: EventProcessor.cc:734
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:16
edm::ProcessingController
Definition: ProcessingController.h:29
edm::EventProcessor::setExceptionMessageRuns
void setExceptionMessageRuns(std::string &message)
Definition: EventProcessor.cc:1977
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:372
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::waiting_task::chain::runLast
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const noexcept
Definition: WaitingTaskHolder.h:71
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::eventsetup::ComponentFactory::get
static ComponentFactory< T > const * get()
EventSetupRecord.h
ParameterSetDescriptionFillerPluginFactory.h
edm::InputSource::IsRun
Definition: InputSource.h:54
edm::EventProcessor::setExceptionMessageFiles
void setExceptionMessageFiles(std::string &message)
Definition: EventProcessor.cc:1975
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:112
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
ModuleDescription.h
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:58
edm::parameterSet
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1903
edm::ProcessingController::requestedTransition
Transition requestedTransition() const
Definition: ProcessingController.cc:82
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:217
edm::EventProcessor::endProcessBlock
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
Definition: EventProcessor.cc:1033
EDMException.h
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::ServiceToken
Definition: ServiceToken.h:42
edm::InProcess
Definition: BranchType.h:11
edm::EventProcessor::inputProcessBlocks
void inputProcessBlocks()
Definition: EventProcessor.cc:997
ParentageRegistry.h
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:21
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
edm::RunPrincipal::beginTime
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:360
ProcessBlockHelper.h
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::waiting_task::chain::then
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1676
edm::LuminosityBlockProcessingStatus::endLumi
void endLumi()
Definition: LuminosityBlockProcessingStatus.h:78
edm::WaitingTask::exceptionPtr
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
h
edm::EventPrincipal
Definition: EventPrincipal.h:48
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:330
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:192
edm::EventProcessor::epSignal
Definition: EventProcessor.h:80
edm::StreamContext
Definition: StreamContext.h:31
edm::EventProcessor::EventProcessor
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
Definition: EventProcessor.cc:232
std::swap
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Definition: DataFrameContainer.h:209
edm::ExceptionCollector
Definition: ExceptionCollector.h:33
edm::eventsetup::EventSetupProvider
Definition: EventSetupProvider.h:50
edm::EventProcessor::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: EventProcessor.cc:726
edm::EventProcessor::endPathsEnabled
bool endPathsEnabled() const
Definition: EventProcessor.cc:738
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:54
EventPrincipal.h
Service.h
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:337
dqm-mbProfile.format
format
Definition: dqm-mbProfile.py:16
edm::EventProcessor::openOutputFiles
void openOutputFiles()
Definition: EventProcessor.cc:885
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1657
SubProcessParentageHelper.h
edm::EventProcessor::taskCleanup
void taskCleanup()
Definition: EventProcessor.cc:578
edm::InputSource::IsSynchronize
Definition: InputSource.h:54
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:253
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:353
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
edm::EventProcessor::clearCounters
void clearCounters()
Clears counters used by trigger report.
Definition: EventProcessor.cc:740
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:355
edm::validateLooper
void validateLooper(ParameterSet &pset)
Definition: EventProcessor.cc:199
edm::MergeableRunProductMetadata
Definition: MergeableRunProductMetadata.h:52
CommonParams.h
edm::LuminosityBlockProcessingStatus
Definition: LuminosityBlockProcessingStatus.h:41
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:58
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:269
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:357
ProcessDesc.h
edm::Hash< ProcessHistoryType >
edm::EventProcessor::writeProcessBlockAsync
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
Definition: EventProcessor.cc:1693
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1990
edm::InEvent
Definition: BranchType.h:11
edm::Transition::BeginLuminosityBlock
ConvertException.h
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:371
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:152
ExceptionCollector.h
edm::LuminosityBlockProcessingStatus::lumiPrincipal
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
Definition: LuminosityBlockProcessingStatus.h:51
edm::CommonParams
Definition: CommonParams.h:14
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
InputSourceDescription.h
first
auto first
Definition: CAHitNtupletGeneratorKernelsImpl.h:125
edm::IOVSyncValue
Definition: IOVSyncValue.h:31
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:320
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:359
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:149
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:375
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:365
streamTransitionAsync.h
edm::Parentage
Definition: Parentage.h:25
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
LooperFactory.h
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:288
HistoryAppender.h
edm::waiting_task::chain
Definition: chain_first.h:254
UnixSignalHandlers.h
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:100
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:331
ProcessingController.h
edm::EventProcessor::deleteNonConsumedUnscheduledModules_
bool deleteNonConsumedUnscheduledModules_
Definition: EventProcessor.h:376
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1280
edm::ParameterSet
Definition: ParameterSet.h:47
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:328
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput >
Definition: OccurrenceTraits.h:434
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:339
edm::LuminosityBlockProcessingStatus::stopProcessingEvents
void stopProcessingEvents()
Definition: LuminosityBlockProcessingStatus.h:74
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:233
Event.h
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::InLumi
Definition: BranchType.h:11
EDLooperBase.h
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:54
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1551
edm::FinalWaitingTask::done
bool done() const
Definition: WaitingTask.h:82
LuminosityBlockProcessingStatus.h
trigObjTnPSource_cfi.filler
filler
Definition: trigObjTnPSource_cfi.py:21
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:325
thread_safety_macros.h
RunPrincipal.h
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:110
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:194
edm::RunPrincipal::run
RunNumber_t run() const
Definition: RunPrincipal.h:61
edm::LuminosityBlockProcessingStatus::haveContinuedLumi
void haveContinuedLumi()
Definition: LuminosityBlockProcessingStatus.h:81
Schedule.h
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
edm::Service
Definition: Service.h:30
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:324
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1766
edm::SharedResourcesRegistry
Definition: SharedResourcesRegistry.h:39
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:262
edm::SerialTaskQueueChain::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:75
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:361
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
edm::PrincipalCache::insertForInput
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:98
edm::InputSource::IsStop
Definition: InputSource.h:54
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:127
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
Definition: EventProcessor.cc:214
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:110
edm::HistoryAppender
Definition: HistoryAppender.h:13
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:82
edm::ScheduleInfo
Definition: ScheduleInfo.h:32
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:107
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
get
#define get
edm::eventsetup::EventSetupsController
Definition: EventSetupsController.h:80
edm::Principal::clearPrincipal
void clearPrincipal()
Definition: Principal.cc:382
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:340
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:335
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
LuminosityBlockPrincipal.h
edm::EventProcessor::endUnfinishedLumi
void endUnfinishedLumi()
Definition: EventProcessor.cc:1596
edm::FinalWaitingTask
Definition: WaitingTask.h:76
WaitingTask.h
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
edm::EventProcessor::readFile
void readFile()
Definition: EventProcessor.cc:858
SubProcess.h
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
edm::PrincipalCache::ProcessBlockType
ProcessBlockType
Definition: PrincipalCache.h:57
edm::PrincipalCache::inputProcessBlockPrincipal
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
Definition: PrincipalCache.h:55
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:368
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:57
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:346
FileBlock.h
edm::ProcessBlockPrincipal::fillProcessBlockPrincipal
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
Definition: ProcessBlockPrincipal.cc:16
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:352
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1981
edm::LuminosityBlockPrincipal::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockPrincipal.h:61
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:54
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1452
Registry.h
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:967
ScheduleItems.h
OccurrenceTraits.h
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::EventProcessor::~EventProcessor
~EventProcessor()
Definition: EventProcessor.cc:560
edm::EventProcessor::beginProcessBlock
void beginProcessBlock(bool &beginProcessBlockSucceeded)
Definition: EventProcessor.cc:976
edm::PrincipalCache::ProcessBlockType::Input
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:25
edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped
bool wasEventProcessingStopped() const
Definition: LuminosityBlockProcessingStatus.h:73
edm::Transition::EndLuminosityBlock
eostools.move
def move(src, dest)
Definition: eostools.py:511
writedatasetfile.run
run
Definition: writedatasetfile.py:27
SharedResourcesRegistry.h
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
edm::nonConsumedUnscheduledModules
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
Definition: PathsAndConsumesOfModules.cc:165
edm::EventProcessor::closeInputFile
void closeInputFile(bool cleaningUpAfterException)
Definition: EventProcessor.cc:876
edm::EventProcessor::taskGroup_
tbb::task_group taskGroup_
Definition: EventProcessor.h:318
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:354
MergeableRunProductMetadata.h
InputSourceFactory.h
edm::PathsAndConsumesOfModules::removeModules
void removeModules(std::vector< ModuleDescription const * > const &modules)
Definition: PathsAndConsumesOfModules.cc:53
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:295
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:50
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:349
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:66
edm::Transition::BeginRun
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:77
chain_first.h
ev
bool ev
Definition: Hydjet2Hadronizer.cc:97
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:332
Exception
Definition: hltDiff.cc:245
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:381
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1469
edm::waiting_task::chain::ifThen
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false,...
Definition: chain_first.h:288
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1932
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
RootHandlers.h
Exception.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:314
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:30
edm::PrincipalCache::processBlockPrincipal
ProcessBlockPrincipal & processBlockPrincipal() const
Definition: PrincipalCache.h:54
ParameterSetID.h
edm::EventProcessor::fileBlockValid
bool fileBlockValid()
Definition: EventProcessor.h:204
globalTransitionAsync.h
DebugMacros.h
validateTopLevelParameterSets.h
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:1192
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1979
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:305
cms::Exception
Definition: Exception.h:70
TransitionInfoTypes.h
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:787
edm::InputSource::IsEvent
Definition: InputSource.h:54
edm::EventProcessor::closeOutputFiles
void closeOutputFiles()
Definition: EventProcessor.cc:893
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:342
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:757
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:949
StreamContext.h
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::RunPrincipal
Definition: RunPrincipal.h:34
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:70
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:342
MessageForParent.h
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:56
edm::EventProcessor::respondToOpenInputFile
void respondToOpenInputFile()
Definition: EventProcessor.cc:900
edm::PrincipalCache::insert
void insert(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:96
EventProcessor.h
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1899
event
Definition: event.py:1
edm::EventID
Definition: EventID.h:31
ModuleChanger.h
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
edm::Event
Definition: Event.h:73
MatrixUtil.merge
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:201
edm::ProcessingController::specifiedEventTransition
edm::EventID specifiedEventTransition() const
Definition: ProcessingController.cc:84
edm::Transition::EndRun
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1960
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::LuminosityBlockProcessingStatus::continuingLumi
bool continuingLumi() const
Definition: LuminosityBlockProcessingStatus.h:80
IOVSyncValue.h
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::errors::Configuration
Definition: EDMException.h:36
SystemBounds.h
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:293
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:336
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:122
edm::LuminosityBlockProcessingStatus::setNextSyncValue
void setNextSyncValue(IOVSyncValue const &iValue)
Definition: LuminosityBlockProcessingStatus.h:101
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1740
edm::EventProcessor::nextRunID
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
Definition: EventProcessor.cc:783
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1828
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:322
edm::ProcessingController::setLastOperationSucceeded
void setLastOperationSucceeded(bool value)
Definition: ProcessingController.cc:71
Breakpoints.h
edm::ModuleChanger
Definition: ModuleChanger.h:36
trackingPlots.common
common
Definition: trackingPlots.py:205
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:137
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
edm::serviceregistry::ServiceLegacy
ServiceLegacy
Definition: ServiceLegacy.h:29
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:746
common
Definition: common.py:1
edm::MergeableRunProductMetadata::writeLumi
void writeLumi(LuminosityBlockNumber_t lumi)
Definition: MergeableRunProductMetadata.cc:118
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:363
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:585
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:17
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18