CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
41 
43 
51 
56 
59 
72 
73 #include "MessageForSource.h"
74 #include "MessageForParent.h"
76 
77 #include "boost/range/adaptor/reversed.hpp"
78 
79 #include <cassert>
80 #include <exception>
81 #include <iomanip>
82 #include <iostream>
83 #include <utility>
84 #include <sstream>
85 
86 #include <sys/ipc.h>
87 #include <sys/msg.h>
88 
89 #include "tbb/task.h"
90 
91 //Used for CPU affinity
92 #ifndef __APPLE__
93 #include <sched.h>
94 #endif
95 
96 namespace {
97  //Sentry class to only send a signal if an
98  // exception occurs. An exception is identified
99  // by the destructor being called without first
100  // calling completedSuccessfully().
101  class SendSourceTerminationSignalIfException {
102  public:
103  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
104  ~SendSourceTerminationSignalIfException() {
105  if (reg_) {
106  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
107  }
108  }
109  void completedSuccessfully() { reg_ = nullptr; }
110 
111  private:
112  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
113  };
114 
115 } // namespace
116 
117 namespace edm {
118 
119  // ---------------------------------------------------------------
120  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
121  CommonParams const& common,
122  std::shared_ptr<ProductRegistry> preg,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125  std::shared_ptr<ActivityRegistry> areg,
126  std::shared_ptr<ProcessConfiguration const> processConfiguration,
127  PreallocationConfiguration const& allocations) {
128  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
129  if (main_input == nullptr) {
131  << "There must be exactly one source in the configuration.\n"
132  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
133  }
134 
135  std::string modtype(main_input->getParameter<std::string>("@module_type"));
136 
137  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
139  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
140  filler->fill(descriptions);
141 
142  try {
143  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
144  } catch (cms::Exception& iException) {
145  std::ostringstream ost;
146  ost << "Validating configuration of input source of type " << modtype;
147  iException.addContext(ost.str());
148  throw;
149  }
150 
151  main_input->registerIt();
152 
153  // Fill in "ModuleDescription", in case the input source produces
154  // any EDProducts, which would be registered in the ProductRegistry.
155  // Also fill in the process history item for this process.
156  // There is no module label for the unnamed input source, so
157  // just use "source".
158  // Only the tracked parameters belong in the process configuration.
159  ModuleDescription md(main_input->id(),
160  main_input->getParameter<std::string>("@module_type"),
161  "source",
162  processConfiguration.get(),
164 
165  InputSourceDescription isdesc(md,
166  preg,
167  branchIDListHelper,
168  thinnedAssociationsHelper,
169  areg,
170  common.maxEventsInput_,
171  common.maxLumisInput_,
172  common.maxSecondsUntilRampdown_,
173  allocations);
174 
175  areg->preSourceConstructionSignal_(md);
176  std::unique_ptr<InputSource> input;
177  try {
178  //even if we have an exception, send the signal
179  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
180  convertException::wrap([&]() {
181  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
182  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
183  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
184  });
185  } catch (cms::Exception& iException) {
186  std::ostringstream ost;
187  ost << "Constructing input source of type " << modtype;
188  iException.addContext(ost.str());
189  throw;
190  }
191  return input;
192  }
193 
194  // ---------------------------------------------------------------
196  auto const modtype = pset.getParameter<std::string>("@module_type");
197  auto const moduleLabel = pset.getParameter<std::string>("@module_label");
199  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
200  filler->fill(descriptions);
201  try {
202  edm::convertException::wrap([&]() { descriptions.validate(pset, moduleLabel); });
203  } catch (cms::Exception& iException) {
204  iException.addContext(
205  fmt::format("Validating configuration of EDLooper of type {} with label: '{}'", modtype, moduleLabel));
206  throw;
207  }
208  }
209 
210  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
212  ParameterSet& params) {
213  std::shared_ptr<EDLooperBase> vLooper;
214 
215  std::vector<std::string> loopers = params.getParameter<std::vector<std::string>>("@all_loopers");
216 
217  if (loopers.empty()) {
218  return vLooper;
219  }
220 
221  assert(1 == loopers.size());
222 
223  for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
224  ++itName) {
225  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
226  validateLooper(*providerPSet);
227  providerPSet->registerIt();
228  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
229  }
230  return vLooper;
231  }
232 
233  // ---------------------------------------------------------------
234  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
235  ServiceToken const& iToken,
237  std::vector<std::string> const& defaultServices,
238  std::vector<std::string> const& forcedServices)
239  : actReg_(),
240  preg_(),
241  branchIDListHelper_(),
242  serviceToken_(),
243  input_(),
244  espController_(new eventsetup::EventSetupsController),
245  esp_(),
246  act_table_(),
247  processConfiguration_(),
248  schedule_(),
249  subProcesses_(),
250  historyAppender_(new HistoryAppender),
251  fb_(),
252  looper_(),
253  deferredExceptionPtrIsSet_(false),
254  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
255  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
256  principalCache_(),
257  beginJobCalled_(false),
258  shouldWeStop_(false),
259  fileModeNoMerge_(false),
260  exceptionMessageFiles_(),
261  exceptionMessageRuns_(),
262  exceptionMessageLumis_(false),
263  forceLooperToEnd_(false),
264  looperBeginJobRun_(false),
265  forceESCacheClearOnNewRun_(false),
266  eventSetupDataToExcludeFromPrefetching_() {
267  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
268  processDesc->addServices(defaultServices, forcedServices);
269  init(processDesc, iToken, iLegacy);
270  }
271 
272  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
273  std::vector<std::string> const& defaultServices,
274  std::vector<std::string> const& forcedServices)
275  : actReg_(),
276  preg_(),
277  branchIDListHelper_(),
278  serviceToken_(),
279  input_(),
280  espController_(new eventsetup::EventSetupsController),
281  esp_(),
282  act_table_(),
283  processConfiguration_(),
284  schedule_(),
285  subProcesses_(),
286  historyAppender_(new HistoryAppender),
287  fb_(),
288  looper_(),
289  deferredExceptionPtrIsSet_(false),
290  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
291  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
292  principalCache_(),
293  beginJobCalled_(false),
294  shouldWeStop_(false),
295  fileModeNoMerge_(false),
296  exceptionMessageFiles_(),
297  exceptionMessageRuns_(),
298  exceptionMessageLumis_(false),
299  forceLooperToEnd_(false),
300  looperBeginJobRun_(false),
301  forceESCacheClearOnNewRun_(false),
302  asyncStopRequestedWhileProcessingEvents_(false),
303  eventSetupDataToExcludeFromPrefetching_() {
304  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
305  processDesc->addServices(defaultServices, forcedServices);
307  }
308 
309  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
310  ServiceToken const& token,
312  : actReg_(),
313  preg_(),
314  branchIDListHelper_(),
315  serviceToken_(),
316  input_(),
317  espController_(new eventsetup::EventSetupsController),
318  esp_(),
319  act_table_(),
320  processConfiguration_(),
321  schedule_(),
322  subProcesses_(),
323  historyAppender_(new HistoryAppender),
324  fb_(),
325  looper_(),
326  deferredExceptionPtrIsSet_(false),
327  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
328  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
329  principalCache_(),
330  beginJobCalled_(false),
331  shouldWeStop_(false),
332  fileModeNoMerge_(false),
333  exceptionMessageFiles_(),
334  exceptionMessageRuns_(),
335  exceptionMessageLumis_(false),
336  forceLooperToEnd_(false),
337  looperBeginJobRun_(false),
338  forceESCacheClearOnNewRun_(false),
339  asyncStopRequestedWhileProcessingEvents_(false),
340  eventSetupDataToExcludeFromPrefetching_() {
341  init(processDesc, token, legacy);
342  }
343 
344  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
345  ServiceToken const& iToken,
347  //std::cerr << processDesc->dump() << std::endl;
348 
349  // register the empty parentage vector , once and for all
351 
352  // register the empty parameter set, once and for all.
354 
355  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
356 
357  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
358  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
359  bool const hasSubProcesses = !subProcessVParameterSet.empty();
360 
361  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
362  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
363  // set in here if the parameters were not explicitly set.
365 
366  // Now set some parameters specific to the main process.
367  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
368  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
369  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
370  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
371  << fileMode << ".\n"
372  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
373  } else {
374  fileModeNoMerge_ = (fileMode == "NOMERGE");
375  }
376  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
377 
378  //threading
379  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
380 
381  // Even if numberOfThreads was set to zero in the Python configuration, the code
382  // in cmsRun.cpp should have reset it to something else.
383  assert(nThreads != 0);
384 
385  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
386  if (nStreams == 0) {
387  nStreams = nThreads;
388  }
389  if (nThreads > 1 or nStreams > 1) {
390  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
391  }
392  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
393  if (nConcurrentRuns != 1) {
394  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
395  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
396  }
397  unsigned int nConcurrentLumis =
398  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
399  if (nConcurrentLumis == 0) {
400  nConcurrentLumis = nConcurrentRuns;
401  }
402 
403  //Check that relationships between threading parameters makes sense
404  /*
405  if(nThreads<nStreams) {
406  //bad
407  }
408  if(nConcurrentRuns>nStreams) {
409  //bad
410  }
411  if(nConcurrentRuns>nConcurrentLumis) {
412  //bad
413  }
414  */
415  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
416 
417  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
419  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
420 
421  // Now do general initialization
423 
424  //initialize the services
425  auto& serviceSets = processDesc->getServicesPSets();
426  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
427  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
428 
429  //make the services available
431 
432  if (nStreams > 1) {
434  handler->willBeUsingThreads();
435  }
436 
437  // intialize miscellaneous items
438  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
439 
440  // intialize the event setup provider
441  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
442  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
443 
444  // initialize the looper, if any
446  if (looper_) {
447  looper_->setActionTable(items.act_table_.get());
448  looper_->attachTo(*items.actReg_);
449 
450  //For now loopers make us run only 1 transition at a time
451  nStreams = 1;
452  nConcurrentLumis = 1;
453  nConcurrentRuns = 1;
454  // in presence of looper do not delete modules
456  }
457  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
458 
459  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
460 
461  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
462  streamQueues_.resize(nStreams);
463  streamLumiStatus_.resize(nStreams);
464 
465  // initialize the input source
467  *common,
468  items.preg(),
469  items.branchIDListHelper(),
470  items.thinnedAssociationsHelper(),
471  items.actReg_,
472  items.processConfiguration(),
474 
475  // intialize the Schedule
476  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
477 
478  // set the data members
479  act_table_ = std::move(items.act_table_);
480  actReg_ = items.actReg_;
481  preg_ = items.preg();
483  branchIDListHelper_ = items.branchIDListHelper();
484  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
485  processConfiguration_ = items.processConfiguration();
487  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
488 
489  FDEBUG(2) << parameterSet << std::endl;
490 
492  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
493  // Reusable event principal
494  auto ep = std::make_shared<EventPrincipal>(preg(),
498  historyAppender_.get(),
499  index);
501  }
502 
503  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
504  auto lp =
505  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
507  }
508 
509  {
510  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
512 
513  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
515  }
516 
517  // fill the subprocesses, if there are any
518  subProcesses_.reserve(subProcessVParameterSet.size());
519  for (auto& subProcessPSet : subProcessVParameterSet) {
520  subProcesses_.emplace_back(subProcessPSet,
521  *parameterSet,
522  preg(),
527  *actReg_,
528  token,
531  &processContext_);
532  }
533  }
534 
536  // Make the services available while everything is being deleted.
539 
540  // manually destroy all these thing that may need the services around
541  // propagate_const<T> has no reset() function
542  espController_ = nullptr;
543  esp_ = nullptr;
544  schedule_ = nullptr;
545  input_ = nullptr;
546  looper_ = nullptr;
547  actReg_ = nullptr;
548 
551  }
552 
556  taskGroup_.wait();
557  assert(task.done());
558  }
559 
561  if (beginJobCalled_)
562  return;
563  beginJobCalled_ = true;
564  bk::beginJob();
565 
566  // StateSentry toerror(this); // should we add this ?
567  //make the services available
569 
574  actReg_->preallocateSignal_(bounds);
575  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
577 
578  std::vector<ModuleProcessName> consumedBySubProcesses;
580  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
581  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
582  if (consumedBySubProcesses.empty()) {
583  consumedBySubProcesses = std::move(c);
584  } else if (not c.empty()) {
585  std::vector<ModuleProcessName> tmp;
586  tmp.reserve(consumedBySubProcesses.size() + c.size());
587  std::merge(consumedBySubProcesses.begin(),
588  consumedBySubProcesses.end(),
589  c.begin(),
590  c.end(),
591  std::back_inserter(tmp));
592  std::swap(consumedBySubProcesses, tmp);
593  }
594  });
595 
596  // Note: all these may throw
599  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
600  not unusedModules.empty()) {
602 
603  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
604  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
605  "and "
606  "therefore they are deleted before beginJob transition.";
607  for (auto const& description : unusedModules) {
608  l << "\n " << description->moduleLabel();
609  }
610  });
611  for (auto const& description : unusedModules) {
612  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
613  }
614  }
615  }
616 
617  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
618 
621  }
622  //NOTE: This implementation assumes 'Job' means one call
623  // the EventProcessor::run
624  // If it really means once per 'application' then this code will
625  // have to be changed.
626  // Also have to deal with case where have 'run' then new Module
627  // added and do 'run'
628  // again. In that case the newly added Module needs its 'beginJob'
629  // to be called.
630 
631  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
632  // For now we delay calling beginOfJob until first beginOfRun
633  //if(looper_) {
634  // looper_->beginOfJob(es);
635  //}
636  try {
637  convertException::wrap([&]() { input_->doBeginJob(); });
638  } catch (cms::Exception& ex) {
639  ex.addContext("Calling beginJob for the source");
640  throw;
641  }
642  espController_->finishConfiguration();
643  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
644  if (looper_) {
645  constexpr bool mustPrefetchMayGet = true;
646  auto const processBlockLookup = preg_->productLookup(InProcess);
647  auto const runLookup = preg_->productLookup(InRun);
648  auto const lumiLookup = preg_->productLookup(InLumi);
649  auto const eventLookup = preg_->productLookup(InEvent);
650  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
651  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
652  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
653  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
654  looper_->updateLookup(esp_->recordsToProxyIndices());
655  }
656  // toerror.succeeded(); // should we add this?
657  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
658  actReg_->postBeginJobSignal_();
659 
660  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
661  schedule_->beginStream(i);
662  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
663  }
664  }
665 
667  // Collects exceptions, so we don't throw before all operations are performed.
669  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
670 
671  //make the services available
673 
674  //NOTE: this really should go elsewhere in the future
675  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
676  c.call([this, i]() { this->schedule_->endStream(i); });
677  for (auto& subProcess : subProcesses_) {
678  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
679  }
680  }
681  auto actReg = actReg_.get();
682  c.call([actReg]() { actReg->preEndJobSignal_(); });
683  schedule_->endJob(c);
684  for (auto& subProcess : subProcesses_) {
685  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
686  }
687  c.call(std::bind(&InputSource::doEndJob, input_.get()));
688  if (looper_) {
689  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
690  }
691  c.call([actReg]() { actReg->postEndJobSignal_(); });
692  if (c.hasThrown()) {
693  c.rethrow();
694  }
695  }
696 
698 
699  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
700  return schedule_->getAllModuleDescriptions();
701  }
702 
703  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
704 
705  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
706 
707  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
708 
709  void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); }
710 
711  bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); }
712 
713  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
714 
715  namespace {
716 #include "TransitionProcessors.icc"
717  }
718 
720  bool returnValue = false;
721 
722  // Look for a shutdown signal
723  if (shutdown_flag.load(std::memory_order_acquire)) {
724  returnValue = true;
726  }
727  return returnValue;
728  }
729 
731  if (deferredExceptionPtrIsSet_.load()) {
733  return InputSource::IsStop;
734  }
735 
736  SendSourceTerminationSignalIfException sentry(actReg_.get());
737  InputSource::ItemType itemType;
738  //For now, do nothing with InputSource::IsSynchronize
739  do {
740  itemType = input_->nextItemType();
741  } while (itemType == InputSource::IsSynchronize);
742 
743  lastSourceTransition_ = itemType;
744  sentry.completedSuccessfully();
745 
747 
749  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
751  }
752 
753  return lastSourceTransition_;
754  }
755 
756  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
757  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
758  }
759 
761 
765  {
766  beginJob(); //make sure this was called
767 
768  // make the services available
770 
772  try {
773  FilesProcessor fp(fileModeNoMerge_);
774 
775  convertException::wrap([&]() {
776  bool firstTime = true;
777  do {
778  if (not firstTime) {
780  rewindInput();
781  } else {
782  firstTime = false;
783  }
784  startingNewLoop();
785 
786  auto trans = fp.processFiles(*this);
787 
788  fp.normalEnd();
789 
790  if (deferredExceptionPtrIsSet_.load()) {
791  std::rethrow_exception(deferredExceptionPtr_);
792  }
793  if (trans != InputSource::IsStop) {
794  //problem with the source
795  doErrorStuff();
796 
797  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
798  }
799  } while (not endOfLoop());
800  }); // convertException::wrap
801 
802  } // Try block
803  catch (cms::Exception& e) {
805  std::string message(
806  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
807  e.addAdditionalInfo(message);
808  if (e.alreadyPrinted()) {
809  LogAbsolute("Additional Exceptions") << message;
810  }
811  }
812  if (!exceptionMessageRuns_.empty()) {
813  e.addAdditionalInfo(exceptionMessageRuns_);
814  if (e.alreadyPrinted()) {
815  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
816  }
817  }
818  if (!exceptionMessageFiles_.empty()) {
819  e.addAdditionalInfo(exceptionMessageFiles_);
820  if (e.alreadyPrinted()) {
821  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
822  }
823  }
824  throw;
825  }
826  }
827 
828  return returnCode;
829  }
830 
832  FDEBUG(1) << " \treadFile\n";
833  size_t size = preg_->size();
834  SendSourceTerminationSignalIfException sentry(actReg_.get());
835 
837 
838  fb_ = input_->readFile();
839  if (size < preg_->size()) {
841  }
844  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
845  }
846  sentry.completedSuccessfully();
847  }
848 
849  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
850  if (fileBlockValid()) {
851  SendSourceTerminationSignalIfException sentry(actReg_.get());
852  input_->closeFile(fb_.get(), cleaningUpAfterException);
853  sentry.completedSuccessfully();
854  }
855  FDEBUG(1) << "\tcloseInputFile\n";
856  }
857 
859  if (fileBlockValid()) {
860  schedule_->openOutputFiles(*fb_);
861  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
862  }
863  FDEBUG(1) << "\topenOutputFiles\n";
864  }
865 
867  schedule_->closeOutputFiles();
868  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
869 
870  FDEBUG(1) << "\tcloseOutputFiles\n";
871  }
872 
874  if (fileBlockValid()) {
876  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
877  schedule_->respondToOpenInputFile(*fb_);
878  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
879  }
880  FDEBUG(1) << "\trespondToOpenInputFile\n";
881  }
882 
884  if (fileBlockValid()) {
885  schedule_->respondToCloseInputFile(*fb_);
886  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
887  }
888  FDEBUG(1) << "\trespondToCloseInputFile\n";
889  }
890 
892  shouldWeStop_ = false;
893  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
894  // until after we've called beginOfJob
895  if (looper_ && looperBeginJobRun_) {
896  looper_->doStartingNewLoop();
897  }
898  FDEBUG(1) << "\tstartingNewLoop\n";
899  }
900 
902  if (looper_) {
903  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
904  looper_->setModuleChanger(&changer);
905  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
906  looper_->setModuleChanger(nullptr);
908  return true;
909  else
910  return false;
911  }
912  FDEBUG(1) << "\tendOfLoop\n";
913  return true;
914  }
915 
917  input_->repeat();
918  input_->rewind();
919  FDEBUG(1) << "\trewind\n";
920  }
921 
923  looper_->prepareForNextLoop(esp_.get());
924  FDEBUG(1) << "\tprepareForNextLoop\n";
925  }
926 
928  FDEBUG(1) << "\tshouldWeCloseOutput\n";
929  if (!subProcesses_.empty()) {
930  for (auto const& subProcess : subProcesses_) {
931  if (subProcess.shouldWeCloseOutput()) {
932  return true;
933  }
934  }
935  return false;
936  }
937  return schedule_->shouldWeCloseOutput();
938  }
939 
941  FDEBUG(1) << "\tdoErrorStuff\n";
942  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
943  << "and went to the error state\n"
944  << "Will attempt to terminate processing normally\n"
945  << "(IF using the looper the next loop will be attempted)\n"
946  << "This likely indicates a bug in an input module or corrupted input or both\n";
947  }
948 
949  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
951  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
952 
954  FinalWaitingTask globalWaitTask;
955 
956  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
957  beginGlobalTransitionAsync<Traits>(
958  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
959 
960  do {
961  taskGroup_.wait();
962  } while (not globalWaitTask.done());
963 
964  if (globalWaitTask.exceptionPtr() != nullptr) {
965  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
966  }
967  beginProcessBlockSucceeded = true;
968  }
969 
972  // For now the input source always returns false from readProcessBlock,
973  // so this does nothing at all.
974  // Eventually the ProcessBlockPrincipal needs to be properly filled
975  // and cleared. The delayed reader needs to be set. The correct process name
976  // needs to be supplied.
977  while (input_->readProcessBlock()) {
978  DelayedReader* reader = nullptr;
979  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName(), reader);
980 
982  FinalWaitingTask globalWaitTask;
983 
984  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
985  beginGlobalTransitionAsync<Traits>(
986  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
987 
988  do {
989  taskGroup_.wait();
990  } while (not globalWaitTask.done());
991  if (globalWaitTask.exceptionPtr() != nullptr) {
992  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
993  }
994 
995  FinalWaitingTask writeWaitTask;
997  do {
998  taskGroup_.wait();
999  } while (not writeWaitTask.done());
1000  if (writeWaitTask.exceptionPtr()) {
1001  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1002  }
1003 
1004  processBlockPrincipal.clearPrincipal();
1005  for (auto& s : subProcesses_) {
1006  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1007  }
1008  }
1009  }
1010 
1011  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
1012  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1013 
1015  FinalWaitingTask globalWaitTask;
1016 
1017  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1018  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1019  *schedule_,
1020  transitionInfo,
1021  serviceToken_,
1022  subProcesses_,
1023  cleaningUpAfterException);
1024  do {
1025  taskGroup_.wait();
1026  } while (not globalWaitTask.done());
1027  if (globalWaitTask.exceptionPtr() != nullptr) {
1028  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1029  }
1030 
1031  if (beginProcessBlockSucceeded) {
1032  FinalWaitingTask writeWaitTask;
1034  do {
1035  taskGroup_.wait();
1036  } while (not writeWaitTask.done());
1037  if (writeWaitTask.exceptionPtr()) {
1038  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1039  }
1040  }
1041 
1042  processBlockPrincipal.clearPrincipal();
1043  for (auto& s : subProcesses_) {
1044  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1045  }
1046  }
1047 
1049  RunNumber_t run,
1050  bool& globalBeginSucceeded,
1051  bool& eventSetupForInstanceSucceeded) {
1052  globalBeginSucceeded = false;
1053  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1054  {
1055  SendSourceTerminationSignalIfException sentry(actReg_.get());
1056 
1057  input_->doBeginRun(runPrincipal, &processContext_);
1058  sentry.completedSuccessfully();
1059  }
1060 
1061  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1063  espController_->forceCacheClear();
1064  }
1065  {
1066  SendSourceTerminationSignalIfException sentry(actReg_.get());
1068  eventSetupForInstanceSucceeded = true;
1069  sentry.completedSuccessfully();
1070  }
1071  auto const& es = esp_->eventSetupImpl();
1072  if (looper_ && looperBeginJobRun_ == false) {
1073  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1074  looper_->beginOfJob(es);
1075  looperBeginJobRun_ = true;
1076  looper_->doStartingNewLoop();
1077  }
1078  {
1080  FinalWaitingTask globalWaitTask;
1081  RunTransitionInfo transitionInfo(runPrincipal, es);
1082  beginGlobalTransitionAsync<Traits>(
1083  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1084  do {
1085  taskGroup_.wait();
1086  } while (not globalWaitTask.done());
1087  if (globalWaitTask.exceptionPtr() != nullptr) {
1088  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1089  }
1090  }
1091  globalBeginSucceeded = true;
1092  FDEBUG(1) << "\tbeginRun " << run << "\n";
1093  if (looper_) {
1094  looper_->doBeginRun(runPrincipal, es, &processContext_);
1095  }
1096  {
1097  //To wait, the ref count has to be 1+#streams
1098  FinalWaitingTask streamLoopWaitTask;
1099 
1101 
1102  RunTransitionInfo transitionInfo(runPrincipal, es);
1103  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1104  *schedule_,
1106  transitionInfo,
1107  serviceToken_,
1108  subProcesses_);
1109  do {
1110  taskGroup_.wait();
1111  } while (not streamLoopWaitTask.done());
1112  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1113  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1114  }
1115  }
1116  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1117  if (looper_) {
1118  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1119  }
1120  }
1121 
1123  RunNumber_t run,
1124  bool globalBeginSucceeded,
1125  bool cleaningUpAfterException,
1126  bool eventSetupForInstanceSucceeded) {
1127  if (eventSetupForInstanceSucceeded) {
1128  //If we skip empty runs, this would be called conditionally
1129  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1130 
1131  if (globalBeginSucceeded) {
1133  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1134  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1135  mergeableRunProductMetadata->preWriteRun();
1136  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1137  do {
1138  taskGroup_.wait();
1139  } while (not t.done());
1140  mergeableRunProductMetadata->postWriteRun();
1141  if (t.exceptionPtr()) {
1142  std::rethrow_exception(*t.exceptionPtr());
1143  }
1144  }
1145  }
1146  deleteRunFromCache(phid, run);
1147  }
1148 
1150  RunNumber_t run,
1151  bool globalBeginSucceeded,
1152  bool cleaningUpAfterException) {
1153  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1154  runPrincipal.setEndTime(input_->timestamp());
1155 
1156  IOVSyncValue ts(
1158  runPrincipal.endTime());
1159  {
1160  SendSourceTerminationSignalIfException sentry(actReg_.get());
1162  sentry.completedSuccessfully();
1163  }
1164  auto const& es = esp_->eventSetupImpl();
1165  if (globalBeginSucceeded) {
1166  //To wait, the ref count has to be 1+#streams
1167  FinalWaitingTask streamLoopWaitTask;
1168 
1170 
1171  RunTransitionInfo transitionInfo(runPrincipal, es);
1172  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1173  *schedule_,
1175  transitionInfo,
1176  serviceToken_,
1177  subProcesses_,
1178  cleaningUpAfterException);
1179  do {
1180  taskGroup_.wait();
1181  } while (not streamLoopWaitTask.done());
1182  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1183  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1184  }
1185  }
1186  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1187  if (looper_) {
1188  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1189  }
1190  {
1191  FinalWaitingTask globalWaitTask;
1192 
1193  RunTransitionInfo transitionInfo(runPrincipal, es);
1195  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1196  *schedule_,
1197  transitionInfo,
1198  serviceToken_,
1199  subProcesses_,
1200  cleaningUpAfterException);
1201  do {
1202  taskGroup_.wait();
1203  } while (not globalWaitTask.done());
1204  if (globalWaitTask.exceptionPtr() != nullptr) {
1205  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1206  }
1207  }
1208  FDEBUG(1) << "\tendRun " << run << "\n";
1209  if (looper_) {
1210  looper_->doEndRun(runPrincipal, es, &processContext_);
1211  }
1212  }
1213 
1214  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1215  FinalWaitingTask waitTask;
1216  if (streamLumiActive_ > 0) {
1218  // Continue after opening a new input file
1220  } else {
1221  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1222  input_->luminosityBlockAuxiliary()->beginTime()),
1223  iRunResource,
1224  WaitingTaskHolder{taskGroup_, &waitTask});
1225  }
1226  do {
1227  taskGroup_.wait();
1228  } while (not waitTask.done());
1229 
1230  if (waitTask.exceptionPtr() != nullptr) {
1231  std::rethrow_exception(*(waitTask.exceptionPtr()));
1232  }
1233  return lastTransitionType();
1234  }
1235 
1237  std::shared_ptr<void> const& iRunResource,
1238  edm::WaitingTaskHolder iHolder) {
1239  if (iHolder.taskHasFailed()) {
1240  return;
1241  }
1242 
1243  // We must be careful with the status object here and in code this function calls. IF we want
1244  // endRun to be called, then we must call resetResources before the things waiting on
1245  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1246  // endRun to be called much later than it should be, because status is holding iRunResource).
1247  // Note that this must be done explicitly. Relying on the destructor does not work well
1248  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1249  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1250  // destroyed inside this function and lumiWork.
1251  auto status =
1252  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1253 
1254  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1255  if (iHolder.taskHasFailed()) {
1256  status->resetResources();
1257  return;
1258  }
1259 
1260  status->setResumer(std::move(iResumer));
1261 
1263  *iHolder.group(), [this, iHolder, status = std::move(status)]() mutable {
1264  //make the services available
1266  // Caught exception is propagated via WaitingTaskHolder
1267  CMS_SA_ALLOW try {
1269 
1270  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1271  {
1272  SendSourceTerminationSignalIfException sentry(actReg_.get());
1273 
1274  input_->doBeginLumi(lumiPrincipal, &processContext_);
1275  sentry.completedSuccessfully();
1276  }
1277 
1279  if (rng.isAvailable()) {
1280  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1281  rng->preBeginLumi(lb);
1282  }
1283 
1284  //Task to start the stream beginLumis
1285  auto beginStreamsTask =
1286  make_waiting_task([this, holder = iHolder, status](std::exception_ptr const* iPtr) mutable {
1287  if (iPtr) {
1288  status->resetResources();
1289  holder.doneWaiting(*iPtr);
1290  } else {
1291  status->globalBeginDidSucceed();
1292  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1293 
1294  if (looper_) {
1295  // Caught exception is propagated via WaitingTaskHolder
1296  CMS_SA_ALLOW try {
1297  //make the services available
1298  ServiceRegistry::Operate operateLooper(serviceToken_);
1299  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1300  } catch (...) {
1301  status->resetResources();
1302  holder.doneWaiting(std::current_exception());
1303  return;
1304  }
1305  }
1307 
1308  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1309  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1310  streamQueues_[i].pause();
1311 
1312  auto eventTask = edm::make_waiting_task(
1313  [this, i, h = std::move(holder)](
1314  std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1315  if (exceptionFromBeginStreamLumi) {
1317  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1319  } else {
1321  }
1322  });
1323  auto& event = principalCache_.eventPrincipal(i);
1324  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1325  // held by the container as this lambda may not finish executing before all the tasks it
1326  // spawns have already started to run.
1327  auto eventSetupImpls = &status->eventSetupImpls();
1328  auto lp = status->lumiPrincipal().get();
1331  event.setLuminosityBlockPrincipal(lp);
1332  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1333  beginStreamTransitionAsync<Traits>(WaitingTaskHolder(*holder.group(), eventTask),
1334  *schedule_,
1335  i,
1336  transitionInfo,
1337  serviceToken_,
1338  subProcesses_);
1339  });
1340  }
1341  }
1342  }); // beginStreamTask
1343 
1344  //task to start the global begin lumi
1345  WaitingTaskHolder beginStreamsHolder{*iHolder.group(), beginStreamsTask};
1346 
1347  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1348  {
1349  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1351  beginGlobalTransitionAsync<Traits>(
1352  beginStreamsHolder, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1353  }
1354  } catch (...) {
1355  status->resetResources();
1356  iHolder.doneWaiting(std::current_exception());
1357  }
1358  }); // task in sourceResourcesAcquirer
1359  }; // end lumiWork
1360 
1361  auto queueLumiWorkTask = make_waiting_task(
1362  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1363  if (iPtr) {
1364  iHolder.doneWaiting(*iPtr);
1365  }
1366  lumiQueue_->pushAndPause(*iHolder.group(), std::move(lumiWorkLambda));
1367  });
1368 
1369  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1370  // We only get here inside this block if there is an EventSetup
1371  // module not able to handle concurrent IOVs (usually an ESSource)
1372  // and the new sync value is outside the current IOV of that module.
1373 
1374  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1375 
1376  queueWhichWaitsForIOVsToFinish_.push(*iHolder.group(), [this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1377  // Caught exception is propagated via WaitingTaskHolder
1378  CMS_SA_ALLOW try {
1379  SendSourceTerminationSignalIfException sentry(actReg_.get());
1380  // Pass in iSync to let the EventSetup system know which run and lumi
1381  // need to be processed and prepare IOVs for it.
1382  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1383  // lumi is done and no longer needs its EventSetup IOVs.
1384  espController_->eventSetupForInstanceAsync(
1385  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1386  sentry.completedSuccessfully();
1387  } catch (...) {
1388  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1389  }
1391  });
1392 
1393  } else {
1395 
1396  // This holder will be used to wait until the EventSetup IOVs are ready
1397  WaitingTaskHolder queueLumiWorkTaskHolder{*iHolder.group(), queueLumiWorkTask};
1398  // Caught exception is propagated via WaitingTaskHolder
1399  CMS_SA_ALLOW try {
1400  SendSourceTerminationSignalIfException sentry(actReg_.get());
1401 
1402  // Pass in iSync to let the EventSetup system know which run and lumi
1403  // need to be processed and prepare IOVs for it.
1404  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1405  // lumi is done and no longer needs its EventSetup IOVs.
1406  espController_->eventSetupForInstanceAsync(
1407  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1408  sentry.completedSuccessfully();
1409 
1410  } catch (...) {
1411  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1412  }
1413  }
1414  }
1415 
1417  {
1418  //all streams are sharing the same status at the moment
1419  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1420  status->needToContinueLumi();
1421  status->startProcessingEvents();
1422  }
1423 
1424  unsigned int streamIndex = 0;
1425  tbb::task_arena arena{tbb::task_arena::attach()};
1426  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1427  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1428  }
1429  iHolder.group()->run(
1430  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1431  }
1432 
1433  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1434  if (setDeferredException(*iPtr)) {
1435  WaitingTaskHolder tmp(holder);
1436  tmp.doneWaiting(*iPtr);
1437  } else {
1439  }
1440  }
1441 
1443  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1444  // Get some needed info out of the status object before moving
1445  // it into finalTaskForThisLumi.
1446  auto& lp = *(iLumiStatus->lumiPrincipal());
1447  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1448  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1449  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1450  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1451 
1452  // group is used later in this function, and lives outside of iTask
1453  tbb::task_group& taskGroup = *iTask.group();
1454  auto finalTaskForThisLumi = edm::make_waiting_task(
1455  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1456  std::exception_ptr ptr;
1457  if (iPtr) {
1458  handleEndLumiExceptions(iPtr, iTask);
1459  } else {
1460  // Caught exception is passed to handleEndLumiExceptions()
1461  CMS_SA_ALLOW try {
1463  if (looper_) {
1464  auto& lumiPrincipal = *(status->lumiPrincipal());
1465  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1466  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1467  }
1468  } catch (...) {
1469  ptr = std::current_exception();
1470  }
1471  }
1473 
1474  // Try hard to clean up resources so the
1475  // process can terminate in a controlled
1476  // fashion even after exceptions have occurred.
1477  // Caught exception is passed to handleEndLumiExceptions()
1478  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1479  if (not ptr) {
1480  ptr = std::current_exception();
1481  }
1482  }
1483  // Caught exception is passed to handleEndLumiExceptions()
1484  CMS_SA_ALLOW try {
1485  status->resumeGlobalLumiQueue();
1487  } catch (...) {
1488  if (not ptr) {
1489  ptr = std::current_exception();
1490  }
1491  }
1492  // Caught exception is passed to handleEndLumiExceptions()
1493  CMS_SA_ALLOW try {
1494  // This call to status.resetResources() must occur before iTask is destroyed.
1495  // Otherwise there will be a data race which could result in endRun
1496  // being delayed until it is too late to successfully call it.
1497  status->resetResources();
1498  status.reset();
1499  } catch (...) {
1500  if (not ptr) {
1501  ptr = std::current_exception();
1502  }
1503  }
1504 
1505  if (ptr) {
1506  handleEndLumiExceptions(&ptr, iTask);
1507  }
1508  });
1509 
1510  auto writeT = edm::make_waiting_task(
1511  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(taskGroup, finalTaskForThisLumi)](
1512  std::exception_ptr const* iExcept) mutable {
1513  if (iExcept) {
1514  task.doneWaiting(*iExcept);
1515  } else {
1516  //Only call writeLumi if beginLumi succeeded
1517  if (didGlobalBeginSucceed) {
1518  writeLumiAsync(std::move(task), lumiPrincipal);
1519  }
1520  }
1521  });
1522 
1523  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1524 
1525  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1527  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup, writeT),
1528  *schedule_,
1529  transitionInfo,
1530  serviceToken_,
1531  subProcesses_,
1532  cleaningUpAfterException);
1533  }
1534 
1535  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1536  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1537  if (iPtr) {
1538  handleEndLumiExceptions(iPtr, iTask);
1539  }
1540  auto status = streamLumiStatus_[iStreamIndex];
1541  //reset status before releasing queue else get race condtion
1542  streamLumiStatus_[iStreamIndex].reset();
1544  streamQueues_[iStreamIndex].resume();
1545 
1546  //are we the last one?
1547  if (status->streamFinishedLumi()) {
1549  }
1550  });
1551 
1552  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1553 
1554  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1555  // therefore we do not want to hold the shared_ptr
1556  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1557  lumiStatus->setEndTime();
1558 
1559  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1560 
1561  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1562  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1563 
1564  if (lumiStatus->didGlobalBeginSucceed()) {
1565  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1566  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1567  lumiPrincipal.endTime());
1569  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1570  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1571  *schedule_,
1572  iStreamIndex,
1573  transitionInfo,
1574  serviceToken_,
1575  subProcesses_,
1576  cleaningUpAfterException);
1577  }
1578  }
1579 
1581  if (streamLumiActive_.load() > 0) {
1582  FinalWaitingTask globalWaitTask;
1583  {
1584  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1585  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1586  if (streamLumiStatus_[i]) {
1587  streamEndLumiAsync(globalTaskHolder, i);
1588  }
1589  }
1590  }
1591  do {
1592  taskGroup_.wait();
1593  } while (not globalWaitTask.done());
1594  if (globalWaitTask.exceptionPtr() != nullptr) {
1595  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1596  }
1597  }
1598  }
1599 
1600  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1602  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1603  << "Illegal attempt to insert run into cache\n"
1604  << "Contact a Framework Developer\n";
1605  }
1606  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1607  preg(),
1609  historyAppender_.get(),
1610  0,
1611  true,
1613  {
1614  SendSourceTerminationSignalIfException sentry(actReg_.get());
1615  input_->readRun(*rp, *historyAppender_);
1616  sentry.completedSuccessfully();
1617  }
1618  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1619  principalCache_.insert(rp);
1620  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1621  }
1622 
1623  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1624  principalCache_.merge(input_->runAuxiliary(), preg());
1625  auto runPrincipal = principalCache_.runPrincipalPtr();
1626  {
1627  SendSourceTerminationSignalIfException sentry(actReg_.get());
1628  input_->readAndMergeRun(*runPrincipal);
1629  sentry.completedSuccessfully();
1630  }
1631  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1632  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1633  }
1634 
1637  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1638  << "Illegal attempt to insert lumi into cache\n"
1639  << "Run is invalid\n"
1640  << "Contact a Framework Developer\n";
1641  }
1643  assert(lbp);
1644  lbp->setAux(*input_->luminosityBlockAuxiliary());
1645  {
1646  SendSourceTerminationSignalIfException sentry(actReg_.get());
1647  input_->readLuminosityBlock(*lbp, *historyAppender_);
1648  sentry.completedSuccessfully();
1649  }
1650  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1651  iStatus.lumiPrincipal() = std::move(lbp);
1652  }
1653 
1655  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1656  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1657  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1658  input_->processHistoryRegistry().reducedProcessHistoryID(
1659  input_->luminosityBlockAuxiliary()->processHistoryID()));
1660  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1661  assert(lumiOK);
1662  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1663  {
1664  SendSourceTerminationSignalIfException sentry(actReg_.get());
1665  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1666  sentry.completedSuccessfully();
1667  }
1668  return input_->luminosityBlock();
1669  }
1670 
1672  auto subsT = edm::make_waiting_task([this, task, processBlockType](std::exception_ptr const* iExcept) mutable {
1673  if (iExcept) {
1674  task.doneWaiting(*iExcept);
1675  } else {
1677  for (auto& s : subProcesses_) {
1678  s.writeProcessBlockAsync(task, processBlockType);
1679  }
1680  }
1681  });
1683  schedule_->writeProcessBlockAsync(WaitingTaskHolder(*task.group(), subsT),
1684  principalCache_.processBlockPrincipal(processBlockType),
1685  &processContext_,
1686  actReg_.get());
1687  }
1688 
1690  ProcessHistoryID const& phid,
1691  RunNumber_t run,
1692  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1693  auto subsT = edm::make_waiting_task(
1694  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1695  if (iExcept) {
1696  task.doneWaiting(*iExcept);
1697  } else {
1699  for (auto& s : subProcesses_) {
1700  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1701  }
1702  }
1703  });
1705  schedule_->writeRunAsync(WaitingTaskHolder(*task.group(), subsT),
1707  &processContext_,
1708  actReg_.get(),
1709  mergeableRunProductMetadata);
1710  }
1711 
1713  principalCache_.deleteRun(phid, run);
1714  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1715  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1716  }
1717 
1719  auto subsT = edm::make_waiting_task([this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1720  if (iExcept) {
1721  task.doneWaiting(*iExcept);
1722  } else {
1724  for (auto& s : subProcesses_) {
1725  s.writeLumiAsync(task, lumiPrincipal);
1726  }
1727  }
1728  });
1730 
1731  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1732 
1733  schedule_->writeLumiAsync(WaitingTaskHolder{*task.group(), subsT}, lumiPrincipal, &processContext_, actReg_.get());
1734  }
1735 
1737  for (auto& s : subProcesses_) {
1738  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1739  }
1740  iStatus.lumiPrincipal()->clearPrincipal();
1741  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1742  }
1743 
1745  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1746  iStatus.endLumi();
1747  return false;
1748  }
1749 
1750  if (iStatus.wasEventProcessingStopped()) {
1751  return false;
1752  }
1753 
1754  if (shouldWeStop()) {
1756  iStatus.stopProcessingEvents();
1757  iStatus.endLumi();
1758  return false;
1759  }
1760 
1762  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1763  CMS_SA_ALLOW try {
1764  //need to use lock in addition to the serial task queue because
1765  // of delayed provenance reading and reading data in response to
1766  // edm::Refs etc
1767  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1768 
1769  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1770  if (InputSource::IsLumi == itemType) {
1771  iStatus.haveContinuedLumi();
1772  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1773  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1774  readAndMergeLumi(iStatus);
1775  itemType = nextTransitionType();
1776  }
1777  if (InputSource::IsLumi == itemType) {
1778  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1779  input_->luminosityBlockAuxiliary()->beginTime()));
1780  }
1781  }
1782  if (InputSource::IsEvent != itemType) {
1783  iStatus.stopProcessingEvents();
1784 
1785  //IsFile may continue processing the lumi and
1786  // looper_ can cause the input source to declare a new IsRun which is actually
1787  // just a continuation of the previous run
1788  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1789  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1790  iStatus.endLumi();
1791  }
1792  return false;
1793  }
1794  readEvent(iStreamIndex);
1795  } catch (...) {
1796  bool expected = false;
1797  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1798  deferredExceptionPtr_ = std::current_exception();
1799  iStatus.endLumi();
1800  }
1801  return false;
1802  }
1803  return true;
1804  }
1805 
1806  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1807  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1809  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1810  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1811  auto status = streamLumiStatus_[iStreamIndex].get();
1812  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1813  CMS_SA_ALLOW try {
1814  if (readNextEventForStream(iStreamIndex, *status)) {
1815  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1816  if (iPtr) {
1817  // Try to end the stream properly even if an exception was
1818  // thrown on an event.
1819  bool expected = false;
1820  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1821  // This is the case where the exception in iPtr is the primary
1822  // exception and we want to see its message.
1823  deferredExceptionPtr_ = *iPtr;
1824  WaitingTaskHolder tempHolder(iTask);
1825  tempHolder.doneWaiting(*iPtr);
1826  }
1827  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1828  //the stream will stop now
1829  return;
1830  }
1831  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1832  });
1833 
1834  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1835  } else {
1836  //the stream will stop now
1837  if (status->isLumiEnding()) {
1838  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1839  status->startNextLumi();
1840  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1841  }
1842  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1843  } else {
1844  iTask.doneWaiting(std::exception_ptr{});
1845  }
1846  }
1847  } catch (...) {
1848  // It is unlikely we will ever get in here ...
1849  // But if we do try to clean up and propagate the exception
1850  if (streamLumiStatus_[iStreamIndex]) {
1851  streamEndLumiAsync(iTask, iStreamIndex);
1852  }
1853  bool expected = false;
1854  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1855  auto e = std::current_exception();
1857  iTask.doneWaiting(e);
1858  }
1859  }
1860  });
1861  }
1862 
1863  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1864  //TODO this will have to become per stream
1865  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1866  StreamContext streamContext(event.streamID(), &processContext_);
1867 
1868  SendSourceTerminationSignalIfException sentry(actReg_.get());
1869  input_->readEvent(event, streamContext);
1870 
1871  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1872  sentry.completedSuccessfully();
1873 
1874  FDEBUG(1) << "\treadEvent\n";
1875  }
1876 
1877  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1878  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1879  }
1880 
1881  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1882  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1883 
1886  if (rng.isAvailable()) {
1887  Event ev(*pep, ModuleDescription(), nullptr);
1888  rng->postEventRead(ev);
1889  }
1890 
1891  WaitingTaskHolder finalizeEventTask(
1892  *iHolder.group(), make_waiting_task([this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1893  //NOTE: If we have a looper we only have one Stream
1894  if (looper_) {
1895  ServiceRegistry::Operate operateLooper(serviceToken_);
1896  processEventWithLooper(*pep, iStreamIndex);
1897  }
1898 
1899  FDEBUG(1) << "\tprocessEvent\n";
1900  pep->clearEventPrincipal();
1901  if (iPtr) {
1902  iHolder.doneWaiting(*iPtr);
1903  } else {
1904  iHolder.doneWaiting(std::exception_ptr());
1905  }
1906  }));
1907  WaitingTaskHolder afterProcessTask;
1908  if (subProcesses_.empty()) {
1909  afterProcessTask = std::move(finalizeEventTask);
1910  } else {
1911  //Need to run SubProcesses after schedule has finished
1912  // with the event
1913  afterProcessTask = WaitingTaskHolder(
1914  *iHolder.group(),
1915  make_waiting_task([this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1916  if (not iPtr) {
1917  //when run with 1 thread, we want to the order to be what
1918  // it was before. This requires reversing the order since
1919  // tasks are run last one in first one out
1920  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1921  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1922  }
1923  } else {
1924  finalizeEventTask.doneWaiting(*iPtr);
1925  }
1926  }));
1927  }
1928 
1929  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1930  EventTransitionInfo info(*pep, es);
1931  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, info, serviceToken_);
1932  }
1933 
1934  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1935  bool randomAccess = input_->randomAccess();
1936  ProcessingController::ForwardState forwardState = input_->forwardState();
1937  ProcessingController::ReverseState reverseState = input_->reverseState();
1938  ProcessingController pc(forwardState, reverseState, randomAccess);
1939 
1941  do {
1942  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1943  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1944  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1945 
1946  bool succeeded = true;
1947  if (randomAccess) {
1949  input_->skipEvents(-2);
1951  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1952  }
1953  }
1955  } while (!pc.lastOperationSucceeded());
1957  shouldWeStop_ = true;
1959  }
1960  }
1961 
1963  FDEBUG(1) << "\tshouldWeStop\n";
1964  if (shouldWeStop_)
1965  return true;
1966  if (!subProcesses_.empty()) {
1967  for (auto const& subProcess : subProcesses_) {
1968  if (subProcess.terminate()) {
1969  return true;
1970  }
1971  }
1972  return false;
1973  }
1974  return schedule_->terminate();
1975  }
1976 
1978 
1980 
1982 
1983  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1984  bool expected = false;
1985  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1986  deferredExceptionPtr_ = iException;
1987  return true;
1988  }
1989  return false;
1990  }
1991 
1993  std::unique_ptr<LogSystem> s;
1994  for (auto worker : schedule_->allWorkers()) {
1995  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1996  if (not s) {
1997  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1998  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1999  }
2000  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2001  }
2002  }
2003  }
2004 } // namespace edm
edm::ParameterSet::registerIt
ParameterSet const & registerIt()
Definition: ParameterSet.cc:113
ConfigurationDescriptions.h
edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:357
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
ParameterSetDescriptionFillerBase.h
IllegalParameters.h
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1712
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:80
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:338
edm::ProcessingController::lastOperationSucceeded
bool lastOperationSucceeded() const
Definition: ProcessingController.cc:86
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:296
edm::RunPrincipal::endTime
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::LuminosityBlockPrincipal::runPrincipal
RunPrincipal const & runPrincipal() const
Definition: LuminosityBlockPrincipal.h:45
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:356
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:916
edm::PrincipalCache::ProcessBlockType::New
edm::EventProcessor::respondToCloseInputFile
void respondToCloseInputFile()
Definition: EventProcessor.cc:883
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::totalEventsPassed
int totalEventsPassed() const
Definition: EventProcessor.cc:705
edm::SubProcessParentageHelper
Definition: SubProcessParentageHelper.h:21
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:317
edm::MergeableRunProductMetadata::postWriteRun
void postWriteRun()
Definition: MergeableRunProductMetadata.cc:155
input
static const std::string input
Definition: EdmProvDump.cc:48
ServiceRegistry.h
edm::EventProcessor::readRun
std::pair< ProcessHistoryID, RunNumber_t > readRun()
Definition: EventProcessor.cc:1600
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:761
edm::EventProcessor::shouldWeCloseOutput
bool shouldWeCloseOutput() const
Definition: EventProcessor.cc:927
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1689
MessageLogger.h
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:397
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:74
edm::ProcessBlockTransitionInfo
Definition: TransitionInfoTypes.h:80
funct::false
false
Definition: Factorize.h:29
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:901
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:321
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:697
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:891
EventSetupProvider.h
ScheduleInfo.h
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
edm::LumiTransitionInfo
Definition: TransitionInfoTypes.h:42
edm::EventProcessor::totalEvents
int totalEvents() const
Definition: EventProcessor.cc:703
propagate_const.h
edm::errors::LogicError
Definition: EDMException.h:37
edm::EventSetupImpl
Definition: EventSetupImpl.h:49
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:93
edm::LuminosityBlock
Definition: LuminosityBlock.h:50
BranchIDListHelper.h
mps_update.status
status
Definition: mps_update.py:69
CalibrationSummaryClient_cfi.params
params
Definition: CalibrationSummaryClient_cfi.py:14
WaitingTaskHolder.h
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:71
LuminosityBlock.h
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1736
edm
HLT enums.
Definition: AlignableModifier.h:19
RandomNumberGenerator.h
edm::EventProcessor::endJob
void endJob()
Definition: EventProcessor.cc:666
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edmLumisInFiles.description
description
Definition: edmLumisInFiles.py:11
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:87
edm::eventsetup::synchronousEventSetupForInstance
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
Definition: EventSetupsController.cc:414
edm::TerminationOrigin::ExternalSignal
MessageForSource.h
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1442
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:4
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:762
edm::EventProcessor::readAndMergeRun
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
Definition: EventProcessor.cc:1623
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:324
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:351
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1863
edm::EventProcessor::processLumis
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
Definition: EventProcessor.cc:1214
edm::InputSourceFactory::get
static InputSourceFactory const * get()
Definition: InputSourceFactory.cc:18
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
Algorithms.h
edm::EventProcessor::beginRun
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:1048
edm::ModuleDescription::getUniqueID
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
Definition: ModuleDescription.cc:87
edm::EventProcessor::enableEndPaths
void enableEndPaths(bool active)
Definition: EventProcessor.cc:709
edm::EventProcessor::endUnfinishedRun
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:1122
edm::ScheduleItems
Definition: ScheduleItems.h:28
edm::InputSourceDescription
Definition: InputSourceDescription.h:20
cms::cuda::assert
assert(be >=bs)
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:274
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:28
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:345
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
ProcessBlockPrincipal.h
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
Definition: EventProcessor.cc:210
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:333
ProcessHistoryRegistry.h
hgcal_conditions::parameters
Definition: HGCConditions.h:86
ESRecordsToProxyIndices.h
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:328
EventSetupsController.h
edm::ParameterSet::id
ParameterSetID id() const
Definition: ParameterSet.cc:189
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:322
edm::SerialTaskQueue::push
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:167
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:93
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:329
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:353
edm::RunTransitionInfo
Definition: TransitionInfoTypes.h:64
edm::LuminosityBlockNumber_t
unsigned int LuminosityBlockNumber_t
Definition: RunLumiEventNumber.h:13
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:120
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:362
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:28
edm::InRun
Definition: BranchType.h:11
edm::ProcessBlockPrincipal
Definition: ProcessBlockPrincipal.h:22
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:471
edm::EventProcessor::totalEventsFailed
int totalEventsFailed() const
Definition: EventProcessor.cc:707
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:16
edm::ProcessingController
Definition: ProcessingController.h:29
edm::EventProcessor::setExceptionMessageRuns
void setExceptionMessageRuns(std::string &message)
Definition: EventProcessor.cc:1979
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:371
edm::ModuleDescription
Definition: ModuleDescription.h:21
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const noexcept
Definition: WaitingTaskHolder.h:71
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::eventsetup::ComponentFactory::get
static ComponentFactory< T > const * get()
EventSetupRecord.h
ParameterSetDescriptionFillerPluginFactory.h
edm::InputSource::IsRun
Definition: InputSource.h:78
edm::EventProcessor::setExceptionMessageFiles
void setExceptionMessageFiles(std::string &message)
Definition: EventProcessor.cc:1977
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:112
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
ModuleDescription.h
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:58
edm::parameterSet
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1881
edm::ProcessingController::requestedTransition
Transition requestedTransition() const
Definition: ProcessingController.cc:82
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:217
edm::EventProcessor::endProcessBlock
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
Definition: EventProcessor.cc:1011
EDMException.h
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::ServiceToken
Definition: ServiceToken.h:42
edm::InProcess
Definition: BranchType.h:11
edm::EventProcessor::inputProcessBlocks
void inputProcessBlocks()
Definition: EventProcessor.cc:970
ParentageRegistry.h
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:21
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
edm::RunPrincipal::beginTime
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:355
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1654
edm::LuminosityBlockProcessingStatus::endLumi
void endLumi()
Definition: LuminosityBlockProcessingStatus.h:78
edm::WaitingTask::exceptionPtr
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
edm::EventPrincipal
Definition: EventPrincipal.h:46
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:325
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:192
edm::EventProcessor::epSignal
Definition: EventProcessor.h:78
edm::StreamContext
Definition: StreamContext.h:31
edm::EventProcessor::EventProcessor
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
Definition: EventProcessor.cc:234
std::swap
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
Definition: DataFrameContainer.h:209
edm::ExceptionCollector
Definition: ExceptionCollector.h:33
edm::eventsetup::EventSetupProvider
Definition: EventSetupProvider.h:50
edm::EventProcessor::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: EventProcessor.cc:699
edm::EventProcessor::endPathsEnabled
bool endPathsEnabled() const
Definition: EventProcessor.cc:711
DQM.reader
reader
Definition: DQM.py:105
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:54
EventPrincipal.h
Service.h
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:332
dqm-mbProfile.format
format
Definition: dqm-mbProfile.py:16
edm::EventProcessor::openOutputFiles
void openOutputFiles()
Definition: EventProcessor.cc:858
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1635
SubProcessParentageHelper.h
edm::EventProcessor::taskCleanup
void taskCleanup()
Definition: EventProcessor.cc:553
edm::InputSource::IsSynchronize
Definition: InputSource.h:78
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:348
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
edm::EventProcessor::clearCounters
void clearCounters()
Clears counters used by trigger report.
Definition: EventProcessor.cc:713
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:350
edm::validateLooper
void validateLooper(ParameterSet &pset)
Definition: EventProcessor.cc:195
edm::MergeableRunProductMetadata
Definition: MergeableRunProductMetadata.h:52
CommonParams.h
edm::LuminosityBlockProcessingStatus
Definition: LuminosityBlockProcessingStatus.h:41
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:58
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:269
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:352
ProcessDesc.h
edm::Hash< ProcessHistoryType >
edm::EventProcessor::writeProcessBlockAsync
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
Definition: EventProcessor.cc:1671
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1992
edm::InEvent
Definition: BranchType.h:11
h
ConvertException.h
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:370
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:152
ExceptionCollector.h
edm::LuminosityBlockProcessingStatus::lumiPrincipal
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
Definition: LuminosityBlockProcessingStatus.h:51
edm::CommonParams
Definition: CommonParams.h:14
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
InputSourceDescription.h
edm::IOVSyncValue
Definition: IOVSyncValue.h:31
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:316
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:354
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:149
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:370
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:360
streamTransitionAsync.h
edm::Parentage
Definition: Parentage.h:25
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
LooperFactory.h
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:280
HistoryAppender.h
UnixSignalHandlers.h
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:101
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:326
ProcessingController.h
edm::EventProcessor::deleteNonConsumedUnscheduledModules_
bool deleteNonConsumedUnscheduledModules_
Definition: EventProcessor.h:371
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1236
edm::ParameterSet
Definition: ParameterSet.h:47
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:323
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput >
Definition: OccurrenceTraits.h:434
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:334
edm::LuminosityBlockProcessingStatus::stopProcessingEvents
void stopProcessingEvents()
Definition: LuminosityBlockProcessingStatus.h:74
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:233
Event.h
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::InLumi
Definition: BranchType.h:11
EDLooperBase.h
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:78
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1535
edm::FinalWaitingTask::done
bool done() const
Definition: WaitingTask.h:82
LuminosityBlockProcessingStatus.h
trigObjTnPSource_cfi.filler
filler
Definition: trigObjTnPSource_cfi.py:21
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:320
thread_safety_macros.h
RunPrincipal.h
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:110
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:192
edm::RunPrincipal::run
RunNumber_t run() const
Definition: RunPrincipal.h:61
edm::LuminosityBlockProcessingStatus::haveContinuedLumi
void haveContinuedLumi()
Definition: LuminosityBlockProcessingStatus.h:81
Schedule.h
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
edm::Service
Definition: Service.h:30
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:319
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1744
edm::SharedResourcesRegistry
Definition: SharedResourcesRegistry.h:39
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:263
edm::SerialTaskQueueChain::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:75
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:356
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
edm::PrincipalCache::insertForInput
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:98
edm::InputSource::IsStop
Definition: InputSource.h:78
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:127
edm::EventProcessor::fb_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
Definition: EventProcessor.h:340
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:106
edm::HistoryAppender
Definition: HistoryAppender.h:13
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:80
edm::ScheduleInfo
Definition: ScheduleInfo.h:32
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:101
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
get
#define get
edm::eventsetup::EventSetupsController
Definition: EventSetupsController.h:80
edm::Principal::clearPrincipal
void clearPrincipal()
Definition: Principal.cc:382
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:335
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:330
LuminosityBlockPrincipal.h
edm::EventProcessor::endUnfinishedLumi
void endUnfinishedLumi()
Definition: EventProcessor.cc:1580
edm::FinalWaitingTask
Definition: WaitingTask.h:76
WaitingTask.h
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
edm::EventProcessor::readFile
void readFile()
Definition: EventProcessor.cc:831
SubProcess.h
cmsLHEtoEOSManager.l
l
Definition: cmsLHEtoEOSManager.py:204
edm::PrincipalCache::ProcessBlockType
ProcessBlockType
Definition: PrincipalCache.h:57
edm::PrincipalCache::inputProcessBlockPrincipal
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
Definition: PrincipalCache.h:55
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:363
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:57
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:341
FileBlock.h
edm::ProcessBlockPrincipal::fillProcessBlockPrincipal
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
Definition: ProcessBlockPrincipal.cc:16
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:347
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1983
edm::LuminosityBlockPrincipal::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockPrincipal.h:61
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:78
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1416
Registry.h
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:940
ScheduleItems.h
OccurrenceTraits.h
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::EventProcessor::~EventProcessor
~EventProcessor()
Definition: EventProcessor.cc:535
edm::EventProcessor::beginProcessBlock
void beginProcessBlock(bool &beginProcessBlockSucceeded)
Definition: EventProcessor.cc:949
edm::PrincipalCache::ProcessBlockType::Input
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:24
edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped
bool wasEventProcessingStopped() const
Definition: LuminosityBlockProcessingStatus.h:73
eostools.move
def move(src, dest)
Definition: eostools.py:511
writedatasetfile.run
run
Definition: writedatasetfile.py:27
SharedResourcesRegistry.h
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
edm::nonConsumedUnscheduledModules
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
Definition: PathsAndConsumesOfModules.cc:165
edm::EventProcessor::closeInputFile
void closeInputFile(bool cleaningUpAfterException)
Definition: EventProcessor.cc:849
edm::EventProcessor::taskGroup_
tbb::task_group taskGroup_
Definition: EventProcessor.h:314
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:349
MergeableRunProductMetadata.h
InputSourceFactory.h
edm::PathsAndConsumesOfModules::removeModules
void removeModules(std::vector< ModuleDescription const * > const &modules)
Definition: PathsAndConsumesOfModules.cc:53
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:292
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:50
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:344
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:66
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:75
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:327
Exception
Definition: hltDiff.cc:245
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:376
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1433
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::DelayedReader
Definition: DelayedReader.h:29
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1934
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
RootHandlers.h
Exception.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:314
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:28
edm::PrincipalCache::processBlockPrincipal
ProcessBlockPrincipal & processBlockPrincipal() const
Definition: PrincipalCache.h:54
ParameterSetID.h
edm::EventProcessor::fileBlockValid
bool fileBlockValid()
Definition: EventProcessor.h:202
globalTransitionAsync.h
DebugMacros.h
validateTopLevelParameterSets.h
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:1149
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1981
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:302
cms::Exception
Definition: Exception.h:70
TransitionInfoTypes.h
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:760
edm::InputSource::IsEvent
Definition: InputSource.h:78
edm::EventProcessor::closeOutputFiles
void closeOutputFiles()
Definition: EventProcessor.cc:866
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:337
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:730
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:922
StreamContext.h
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::RunPrincipal
Definition: RunPrincipal.h:34
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:70
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:344
MessageForParent.h
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:46
edm::EventProcessor::respondToOpenInputFile
void respondToOpenInputFile()
Definition: EventProcessor.cc:873
edm::PrincipalCache::insert
void insert(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:96
EventProcessor.h
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1877
event
Definition: event.py:1
edm::EventID
Definition: EventID.h:31
ModuleChanger.h
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
edm::Event
Definition: Event.h:73
MatrixUtil.merge
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:201
edm::ProcessingController::specifiedEventTransition
edm::EventID specifiedEventTransition() const
Definition: ProcessingController.cc:84
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1962
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::LuminosityBlockProcessingStatus::continuingLumi
bool continuingLumi() const
Definition: LuminosityBlockProcessingStatus.h:80
IOVSyncValue.h
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::errors::Configuration
Definition: EDMException.h:36
SystemBounds.h
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:290
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:331
edm::LuminosityBlockProcessingStatus::setNextSyncValue
void setNextSyncValue(IOVSyncValue const &iValue)
Definition: LuminosityBlockProcessingStatus.h:101
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1718
edm::EventProcessor::nextRunID
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
Definition: EventProcessor.cc:756
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1806
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:318
edm::ProcessingController::setLastOperationSucceeded
void setLastOperationSucceeded(bool value)
Definition: ProcessingController.cc:71
Breakpoints.h
edm::ModuleChanger
Definition: ModuleChanger.h:36
trackingPlots.common
common
Definition: trackingPlots.py:206
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:137
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
edm::serviceregistry::ServiceLegacy
ServiceLegacy
Definition: ServiceLegacy.h:29
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:719
common
Definition: common.py:1
edm::MergeableRunProductMetadata::writeLumi
void writeLumi(LuminosityBlockNumber_t lumi)
Definition: MergeableRunProductMetadata.cc:118
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:358
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:560
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:17
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18