CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
38 
40 
48 
53 
56 
69 
70 #include "MessageForSource.h"
71 #include "MessageForParent.h"
73 
74 #include "boost/range/adaptor/reversed.hpp"
75 
76 #include <cassert>
77 #include <exception>
78 #include <iomanip>
79 #include <iostream>
80 #include <utility>
81 #include <sstream>
82 
83 #include <sys/ipc.h>
84 #include <sys/msg.h>
85 
86 #include "tbb/task.h"
87 
88 //Used for CPU affinity
89 #ifndef __APPLE__
90 #include <sched.h>
91 #endif
92 
93 namespace {
94  //Sentry class to only send a signal if an
95  // exception occurs. An exception is identified
96  // by the destructor being called without first
97  // calling completedSuccessfully().
98  class SendSourceTerminationSignalIfException {
99  public:
100  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
101  ~SendSourceTerminationSignalIfException() {
102  if (reg_) {
103  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
104  }
105  }
106  void completedSuccessfully() { reg_ = nullptr; }
107 
108  private:
109  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
110  };
111 
112 } // namespace
113 
114 namespace edm {
115 
116  // ---------------------------------------------------------------
117  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
118  CommonParams const& common,
119  std::shared_ptr<ProductRegistry> preg,
120  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122  std::shared_ptr<ActivityRegistry> areg,
123  std::shared_ptr<ProcessConfiguration const> processConfiguration,
124  PreallocationConfiguration const& allocations) {
125  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
126  if (main_input == nullptr) {
128  << "There must be exactly one source in the configuration.\n"
129  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
130  }
131 
132  std::string modtype(main_input->getParameter<std::string>("@module_type"));
133 
134  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
136  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
137  filler->fill(descriptions);
138 
139  try {
140  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
141  } catch (cms::Exception& iException) {
142  std::ostringstream ost;
143  ost << "Validating configuration of input source of type " << modtype;
144  iException.addContext(ost.str());
145  throw;
146  }
147 
148  main_input->registerIt();
149 
150  // Fill in "ModuleDescription", in case the input source produces
151  // any EDProducts, which would be registered in the ProductRegistry.
152  // Also fill in the process history item for this process.
153  // There is no module label for the unnamed input source, so
154  // just use "source".
155  // Only the tracked parameters belong in the process configuration.
156  ModuleDescription md(main_input->id(),
157  main_input->getParameter<std::string>("@module_type"),
158  "source",
159  processConfiguration.get(),
161 
162  InputSourceDescription isdesc(md,
163  preg,
164  branchIDListHelper,
165  thinnedAssociationsHelper,
166  areg,
167  common.maxEventsInput_,
168  common.maxLumisInput_,
169  common.maxSecondsUntilRampdown_,
170  allocations);
171 
172  areg->preSourceConstructionSignal_(md);
173  std::unique_ptr<InputSource> input;
174  try {
175  //even if we have an exception, send the signal
176  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
177  convertException::wrap([&]() {
178  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
179  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
180  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
181  });
182  } catch (cms::Exception& iException) {
183  std::ostringstream ost;
184  ost << "Constructing input source of type " << modtype;
185  iException.addContext(ost.str());
186  throw;
187  }
188  return input;
189  }
190 
191  // ---------------------------------------------------------------
192  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
194  ParameterSet& params) {
195  std::shared_ptr<EDLooperBase> vLooper;
196 
197  std::vector<std::string> loopers = params.getParameter<std::vector<std::string>>("@all_loopers");
198 
199  if (loopers.empty()) {
200  return vLooper;
201  }
202 
203  assert(1 == loopers.size());
204 
205  for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
206  ++itName) {
207  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
208  providerPSet->registerIt();
209  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
210  }
211  return vLooper;
212  }
213 
214  // ---------------------------------------------------------------
215  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
216  ServiceToken const& iToken,
218  std::vector<std::string> const& defaultServices,
219  std::vector<std::string> const& forcedServices)
220  : actReg_(),
221  preg_(),
222  branchIDListHelper_(),
223  serviceToken_(),
224  input_(),
225  espController_(new eventsetup::EventSetupsController),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
252 
253  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
254  std::vector<std::string> const& defaultServices,
255  std::vector<std::string> const& forcedServices)
256  : actReg_(),
257  preg_(),
258  branchIDListHelper_(),
259  serviceToken_(),
260  input_(),
261  espController_(new eventsetup::EventSetupsController),
262  esp_(),
263  act_table_(),
264  processConfiguration_(),
265  schedule_(),
266  subProcesses_(),
267  historyAppender_(new HistoryAppender),
268  fb_(),
269  looper_(),
270  deferredExceptionPtrIsSet_(false),
271  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
272  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
273  principalCache_(),
274  beginJobCalled_(false),
275  shouldWeStop_(false),
276  fileModeNoMerge_(false),
277  exceptionMessageFiles_(),
278  exceptionMessageRuns_(),
279  exceptionMessageLumis_(false),
280  forceLooperToEnd_(false),
281  looperBeginJobRun_(false),
282  forceESCacheClearOnNewRun_(false),
283  asyncStopRequestedWhileProcessingEvents_(false),
284  eventSetupDataToExcludeFromPrefetching_() {
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
289 
290  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
291  ServiceToken const& token,
293  : actReg_(),
294  preg_(),
295  branchIDListHelper_(),
296  serviceToken_(),
297  input_(),
298  espController_(new eventsetup::EventSetupsController),
299  esp_(),
300  act_table_(),
301  processConfiguration_(),
302  schedule_(),
303  subProcesses_(),
304  historyAppender_(new HistoryAppender),
305  fb_(),
306  looper_(),
307  deferredExceptionPtrIsSet_(false),
308  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
309  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
310  principalCache_(),
311  beginJobCalled_(false),
312  shouldWeStop_(false),
313  fileModeNoMerge_(false),
314  exceptionMessageFiles_(),
315  exceptionMessageRuns_(),
316  exceptionMessageLumis_(false),
317  forceLooperToEnd_(false),
318  looperBeginJobRun_(false),
319  forceESCacheClearOnNewRun_(false),
320  asyncStopRequestedWhileProcessingEvents_(false),
321  eventSetupDataToExcludeFromPrefetching_() {
322  init(processDesc, token, legacy);
323  }
324 
325  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
326  ServiceToken const& iToken,
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
358 
359  //threading
360  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
361 
362  // Even if numberOfThreads was set to zero in the Python configuration, the code
363  // in cmsRun.cpp should have reset it to something else.
364  assert(nThreads != 0);
365 
366  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
367  if (nStreams == 0) {
368  nStreams = nThreads;
369  }
370  if (nThreads > 1 or nStreams > 1) {
371  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
372  }
373  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
374  if (nConcurrentRuns != 1) {
375  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
376  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
377  }
378  unsigned int nConcurrentLumis =
379  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
380  if (nConcurrentLumis == 0) {
381  nConcurrentLumis = nConcurrentRuns;
382  }
383 
384  //Check that relationships between threading parameters makes sense
385  /*
386  if(nThreads<nStreams) {
387  //bad
388  }
389  if(nConcurrentRuns>nStreams) {
390  //bad
391  }
392  if(nConcurrentRuns>nConcurrentLumis) {
393  //bad
394  }
395  */
396  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
397 
398  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
399 
400  // Now do general initialization
402 
403  //initialize the services
404  auto& serviceSets = processDesc->getServicesPSets();
405  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
406  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
407 
408  //make the services available
410 
411  if (nStreams > 1) {
413  handler->willBeUsingThreads();
414  }
415 
416  // intialize miscellaneous items
417  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
418 
419  // intialize the event setup provider
420  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
421  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
422 
423  // initialize the looper, if any
425  if (looper_) {
426  looper_->setActionTable(items.act_table_.get());
427  looper_->attachTo(*items.actReg_);
428 
429  //For now loopers make us run only 1 transition at a time
430  nStreams = 1;
431  nConcurrentLumis = 1;
432  nConcurrentRuns = 1;
433  }
434  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
435 
436  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
437 
438  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
439  streamQueues_.resize(nStreams);
440  streamLumiStatus_.resize(nStreams);
441 
442  // initialize the input source
444  *common,
445  items.preg(),
446  items.branchIDListHelper(),
447  items.thinnedAssociationsHelper(),
448  items.actReg_,
449  items.processConfiguration(),
451 
452  // intialize the Schedule
453  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
454 
455  // set the data members
456  act_table_ = std::move(items.act_table_);
457  actReg_ = items.actReg_;
458  preg_ = items.preg();
460  branchIDListHelper_ = items.branchIDListHelper();
461  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
462  processConfiguration_ = items.processConfiguration();
464  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
465 
466  FDEBUG(2) << parameterSet << std::endl;
467 
469  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
470  // Reusable event principal
471  auto ep = std::make_shared<EventPrincipal>(preg(),
475  historyAppender_.get(),
476  index);
478  }
479 
480  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
481  auto lp =
482  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
484  }
485 
486  // fill the subprocesses, if there are any
487  subProcesses_.reserve(subProcessVParameterSet.size());
488  for (auto& subProcessPSet : subProcessVParameterSet) {
489  subProcesses_.emplace_back(subProcessPSet,
490  *parameterSet,
491  preg(),
496  *actReg_,
497  token,
500  &processContext_);
501  }
502  }
503 
505  // Make the services available while everything is being deleted.
508 
509  // manually destroy all these thing that may need the services around
510  // propagate_const<T> has no reset() function
511  espController_ = nullptr;
512  esp_ = nullptr;
513  schedule_ = nullptr;
514  input_ = nullptr;
515  looper_ = nullptr;
516  actReg_ = nullptr;
517 
520  }
521 
523  if (beginJobCalled_)
524  return;
525  beginJobCalled_ = true;
526  bk::beginJob();
527 
528  // StateSentry toerror(this); // should we add this ?
529  //make the services available
531 
536  actReg_->preallocateSignal_(bounds);
537  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
539 
540  //NOTE: this may throw
542  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
543 
546  }
547  //NOTE: This implementation assumes 'Job' means one call
548  // the EventProcessor::run
549  // If it really means once per 'application' then this code will
550  // have to be changed.
551  // Also have to deal with case where have 'run' then new Module
552  // added and do 'run'
553  // again. In that case the newly added Module needs its 'beginJob'
554  // to be called.
555 
556  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
557  // For now we delay calling beginOfJob until first beginOfRun
558  //if(looper_) {
559  // looper_->beginOfJob(es);
560  //}
561  try {
562  convertException::wrap([&]() { input_->doBeginJob(); });
563  } catch (cms::Exception& ex) {
564  ex.addContext("Calling beginJob for the source");
565  throw;
566  }
567  espController_->finishConfiguration();
568  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
569  // toerror.succeeded(); // should we add this?
570  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
571  actReg_->postBeginJobSignal_();
572 
573  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
574  schedule_->beginStream(i);
575  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
576  }
577  }
578 
580  // Collects exceptions, so we don't throw before all operations are performed.
582  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
583 
584  //make the services available
586 
587  //NOTE: this really should go elsewhere in the future
588  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
589  c.call([this, i]() { this->schedule_->endStream(i); });
590  for (auto& subProcess : subProcesses_) {
591  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
592  }
593  }
594  auto actReg = actReg_.get();
595  c.call([actReg]() { actReg->preEndJobSignal_(); });
596  schedule_->endJob(c);
597  for (auto& subProcess : subProcesses_) {
598  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
599  }
600  c.call(std::bind(&InputSource::doEndJob, input_.get()));
601  if (looper_) {
602  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
603  }
604  c.call([actReg]() { actReg->postEndJobSignal_(); });
605  if (c.hasThrown()) {
606  c.rethrow();
607  }
608  }
609 
611 
612  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
613  return schedule_->getAllModuleDescriptions();
614  }
615 
616  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
617 
618  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
619 
620  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
621 
622  void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); }
623 
624  bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); }
625 
626  void EventProcessor::getTriggerReport(TriggerReport& rep) const { schedule_->getTriggerReport(rep); }
627 
628  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
629 
630  namespace {
631 #include "TransitionProcessors.icc"
632  }
633 
635  bool returnValue = false;
636 
637  // Look for a shutdown signal
638  if (shutdown_flag.load(std::memory_order_acquire)) {
639  returnValue = true;
641  }
642  return returnValue;
643  }
644 
646  if (deferredExceptionPtrIsSet_.load()) {
648  return InputSource::IsStop;
649  }
650 
651  SendSourceTerminationSignalIfException sentry(actReg_.get());
652  InputSource::ItemType itemType;
653  //For now, do nothing with InputSource::IsSynchronize
654  do {
655  itemType = input_->nextItemType();
656  } while (itemType == InputSource::IsSynchronize);
657 
658  lastSourceTransition_ = itemType;
659  sentry.completedSuccessfully();
660 
662 
664  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
666  }
667 
668  return lastSourceTransition_;
669  }
670 
671  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
672  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
673  }
674 
676 
680  {
681  beginJob(); //make sure this was called
682 
683  // make the services available
685 
687  try {
688  FilesProcessor fp(fileModeNoMerge_);
689 
690  convertException::wrap([&]() {
691  bool firstTime = true;
692  do {
693  if (not firstTime) {
695  rewindInput();
696  } else {
697  firstTime = false;
698  }
699  startingNewLoop();
700 
701  auto trans = fp.processFiles(*this);
702 
703  fp.normalEnd();
704 
705  if (deferredExceptionPtrIsSet_.load()) {
706  std::rethrow_exception(deferredExceptionPtr_);
707  }
708  if (trans != InputSource::IsStop) {
709  //problem with the source
710  doErrorStuff();
711 
712  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
713  }
714  } while (not endOfLoop());
715  }); // convertException::wrap
716 
717  } // Try block
718  catch (cms::Exception& e) {
720  std::string message(
721  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
722  e.addAdditionalInfo(message);
723  if (e.alreadyPrinted()) {
724  LogAbsolute("Additional Exceptions") << message;
725  }
726  }
727  if (!exceptionMessageRuns_.empty()) {
728  e.addAdditionalInfo(exceptionMessageRuns_);
729  if (e.alreadyPrinted()) {
730  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
731  }
732  }
733  if (!exceptionMessageFiles_.empty()) {
734  e.addAdditionalInfo(exceptionMessageFiles_);
735  if (e.alreadyPrinted()) {
736  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
737  }
738  }
739  throw;
740  }
741  }
742 
743  return returnCode;
744  }
745 
747  FDEBUG(1) << " \treadFile\n";
748  size_t size = preg_->size();
749  SendSourceTerminationSignalIfException sentry(actReg_.get());
750 
752 
753  fb_ = input_->readFile();
754  if (size < preg_->size()) {
756  }
759  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
760  }
761  sentry.completedSuccessfully();
762  }
763 
764  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
765  if (fb_.get() != nullptr) {
766  SendSourceTerminationSignalIfException sentry(actReg_.get());
767  input_->closeFile(fb_.get(), cleaningUpAfterException);
768  sentry.completedSuccessfully();
769  }
770  FDEBUG(1) << "\tcloseInputFile\n";
771  }
772 
774  if (fb_.get() != nullptr) {
775  schedule_->openOutputFiles(*fb_);
776  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
777  }
778  FDEBUG(1) << "\topenOutputFiles\n";
779  }
780 
782  if (fb_.get() != nullptr) {
783  schedule_->closeOutputFiles();
784  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
785  }
786  FDEBUG(1) << "\tcloseOutputFiles\n";
787  }
788 
791  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
792  if (fb_.get() != nullptr) {
793  schedule_->respondToOpenInputFile(*fb_);
794  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
795  }
796  FDEBUG(1) << "\trespondToOpenInputFile\n";
797  }
798 
800  if (fb_.get() != nullptr) {
801  schedule_->respondToCloseInputFile(*fb_);
802  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
803  }
804  FDEBUG(1) << "\trespondToCloseInputFile\n";
805  }
806 
808  shouldWeStop_ = false;
809  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
810  // until after we've called beginOfJob
811  if (looper_ && looperBeginJobRun_) {
812  looper_->doStartingNewLoop();
813  }
814  FDEBUG(1) << "\tstartingNewLoop\n";
815  }
816 
818  if (looper_) {
819  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
820  looper_->setModuleChanger(&changer);
821  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
822  looper_->setModuleChanger(nullptr);
824  return true;
825  else
826  return false;
827  }
828  FDEBUG(1) << "\tendOfLoop\n";
829  return true;
830  }
831 
833  input_->repeat();
834  input_->rewind();
835  FDEBUG(1) << "\trewind\n";
836  }
837 
839  looper_->prepareForNextLoop(esp_.get());
840  FDEBUG(1) << "\tprepareForNextLoop\n";
841  }
842 
844  FDEBUG(1) << "\tshouldWeCloseOutput\n";
845  if (!subProcesses_.empty()) {
846  for (auto const& subProcess : subProcesses_) {
847  if (subProcess.shouldWeCloseOutput()) {
848  return true;
849  }
850  }
851  return false;
852  }
853  return schedule_->shouldWeCloseOutput();
854  }
855 
857  FDEBUG(1) << "\tdoErrorStuff\n";
858  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
859  << "and went to the error state\n"
860  << "Will attempt to terminate processing normally\n"
861  << "(IF using the looper the next loop will be attempted)\n"
862  << "This likely indicates a bug in an input module or corrupted input or both\n";
863  }
864 
867  bool& globalBeginSucceeded,
868  bool& eventSetupForInstanceSucceeded) {
869  globalBeginSucceeded = false;
870  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
871  {
872  SendSourceTerminationSignalIfException sentry(actReg_.get());
873 
874  input_->doBeginRun(runPrincipal, &processContext_);
875  sentry.completedSuccessfully();
876  }
877 
878  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
880  espController_->forceCacheClear();
881  }
882  {
883  SendSourceTerminationSignalIfException sentry(actReg_.get());
884  espController_->eventSetupForInstance(ts);
885  eventSetupForInstanceSucceeded = true;
886  sentry.completedSuccessfully();
887  }
888  auto const& es = esp_->eventSetupImpl();
889  if (looper_ && looperBeginJobRun_ == false) {
890  looper_->copyInfo(ScheduleInfo(schedule_.get()));
891  looper_->beginOfJob(es);
892  looperBeginJobRun_ = true;
893  looper_->doStartingNewLoop();
894  }
895  {
897  auto globalWaitTask = make_empty_waiting_task();
898  globalWaitTask->increment_ref_count();
899  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
900  *schedule_,
901  runPrincipal,
902  ts,
903  es,
904  nullptr,
906  subProcesses_);
907  globalWaitTask->wait_for_all();
908  if (globalWaitTask->exceptionPtr() != nullptr) {
909  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
910  }
911  }
912  globalBeginSucceeded = true;
913  FDEBUG(1) << "\tbeginRun " << run << "\n";
914  if (looper_) {
915  looper_->doBeginRun(runPrincipal, es, &processContext_);
916  }
917  {
918  //To wait, the ref count has to be 1+#streams
919  auto streamLoopWaitTask = make_empty_waiting_task();
920  streamLoopWaitTask->increment_ref_count();
921 
923 
924  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
925  *schedule_,
927  runPrincipal,
928  ts,
929  es,
930  nullptr,
932  subProcesses_);
933 
934  streamLoopWaitTask->wait_for_all();
935  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
936  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
937  }
938  }
939  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
940  if (looper_) {
941  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
942  }
943  }
944 
947  bool globalBeginSucceeded,
948  bool cleaningUpAfterException,
949  bool eventSetupForInstanceSucceeded) {
950  if (eventSetupForInstanceSucceeded) {
951  //If we skip empty runs, this would be called conditionally
952  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
953 
954  if (globalBeginSucceeded) {
956  t->increment_ref_count();
957  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
958  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
959  mergeableRunProductMetadata->preWriteRun();
960  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
961  t->wait_for_all();
962  mergeableRunProductMetadata->postWriteRun();
963  if (t->exceptionPtr()) {
964  std::rethrow_exception(*t->exceptionPtr());
965  }
966  }
967  }
968  deleteRunFromCache(phid, run);
969  }
970 
973  bool globalBeginSucceeded,
974  bool cleaningUpAfterException) {
975  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
976  runPrincipal.setEndTime(input_->timestamp());
977 
978  IOVSyncValue ts(
980  runPrincipal.endTime());
981  {
982  SendSourceTerminationSignalIfException sentry(actReg_.get());
983  espController_->eventSetupForInstance(ts);
984  sentry.completedSuccessfully();
985  }
986  auto const& es = esp_->eventSetupImpl();
987  if (globalBeginSucceeded) {
988  //To wait, the ref count has to be 1+#streams
989  auto streamLoopWaitTask = make_empty_waiting_task();
990  streamLoopWaitTask->increment_ref_count();
991 
993 
994  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
995  *schedule_,
997  runPrincipal,
998  ts,
999  es,
1000  nullptr,
1001  serviceToken_,
1002  subProcesses_,
1003  cleaningUpAfterException);
1004 
1005  streamLoopWaitTask->wait_for_all();
1006  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1007  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1008  }
1009  }
1010  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1011  if (looper_) {
1012  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1013  }
1014  {
1015  auto globalWaitTask = make_empty_waiting_task();
1016  globalWaitTask->increment_ref_count();
1017 
1019  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1020  *schedule_,
1021  runPrincipal,
1022  ts,
1023  es,
1024  nullptr,
1025  serviceToken_,
1026  subProcesses_,
1027  cleaningUpAfterException);
1028  globalWaitTask->wait_for_all();
1029  if (globalWaitTask->exceptionPtr() != nullptr) {
1030  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1031  }
1032  }
1033  FDEBUG(1) << "\tendRun " << run << "\n";
1034  if (looper_) {
1035  looper_->doEndRun(runPrincipal, es, &processContext_);
1036  }
1037  }
1038 
1039  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1040  auto waitTask = make_empty_waiting_task();
1041  waitTask->increment_ref_count();
1042 
1043  if (streamLumiActive_ > 0) {
1045  // Continue after opening a new input file
1046  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1047  } else {
1048  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1049  input_->luminosityBlockAuxiliary()->beginTime()),
1050  iRunResource,
1051  WaitingTaskHolder{waitTask.get()});
1052  }
1053  waitTask->wait_for_all();
1054 
1055  if (waitTask->exceptionPtr() != nullptr) {
1056  std::rethrow_exception(*(waitTask->exceptionPtr()));
1057  }
1058  return lastTransitionType();
1059  }
1060 
1062  std::shared_ptr<void> const& iRunResource,
1063  edm::WaitingTaskHolder iHolder) {
1064  if (iHolder.taskHasFailed()) {
1065  return;
1066  }
1067 
1068  // We must be careful with the status object here and in code this function calls. IF we want
1069  // endRun to be called, then we must call resetResources before the things waiting on
1070  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1071  // endRun to be called much later than it should be, because status is holding iRunResource).
1072  // Note that this must be done explicitly. Relying on the destructor does not work well
1073  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1074  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1075  // destroyed inside this function and lumiWork.
1076  auto status =
1077  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1078 
1079  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1080  if (iHolder.taskHasFailed()) {
1081  status->resetResources();
1082  return;
1083  }
1084 
1085  status->setResumer(std::move(iResumer));
1086 
1087  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1088  //make the services available
1090  // Caught exception is propagated via WaitingTaskHolder
1091  CMS_SA_ALLOW try {
1093 
1094  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1095  {
1096  SendSourceTerminationSignalIfException sentry(actReg_.get());
1097 
1098  input_->doBeginLumi(lumiPrincipal, &processContext_);
1099  sentry.completedSuccessfully();
1100  }
1101 
1103  if (rng.isAvailable()) {
1104  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1105  rng->preBeginLumi(lb);
1106  }
1107 
1108  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1109 
1110  //Task to start the stream beginLumis
1111  auto beginStreamsTask = make_waiting_task(
1112  tbb::task::allocate_root(), [this, holder = iHolder, status, ts](std::exception_ptr const* iPtr) mutable {
1113  if (iPtr) {
1114  status->resetResources();
1115  holder.doneWaiting(*iPtr);
1116  } else {
1117  status->globalBeginDidSucceed();
1118  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1119 
1120  if (looper_) {
1121  // Caught exception is propagated via WaitingTaskHolder
1122  CMS_SA_ALLOW try {
1123  //make the services available
1124  ServiceRegistry::Operate operateLooper(serviceToken_);
1125  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1126  } catch (...) {
1127  status->resetResources();
1128  holder.doneWaiting(std::current_exception());
1129  return;
1130  }
1131  }
1133 
1134  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1135  streamQueues_[i].push([this, i, status, holder, ts, &es]() mutable {
1136  streamQueues_[i].pause();
1137 
1138  auto eventTask =
1139  edm::make_waiting_task(tbb::task::allocate_root(),
1140  [this, i, h = std::move(holder)](
1141  std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1142  if (exceptionFromBeginStreamLumi) {
1144  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1146  } else {
1148  }
1149  });
1150  auto& event = principalCache_.eventPrincipal(i);
1151  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1152  // held by the container as this lambda may not finish executing before all the tasks it
1153  // spawns have already started to run.
1154  auto eventSetupImpls = &status->eventSetupImpls();
1155  auto lp = status->lumiPrincipal().get();
1158  event.setLuminosityBlockPrincipal(lp);
1159  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1160  *schedule_,
1161  i,
1162  *lp,
1163  ts,
1164  es,
1165  eventSetupImpls,
1166  serviceToken_,
1167  subProcesses_);
1168  });
1169  }
1170  }
1171  }); // beginStreamTask
1172 
1173  //task to start the global begin lumi
1174  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1175 
1176  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1177  {
1179  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1180  *schedule_,
1181  lumiPrincipal,
1182  ts,
1183  es,
1184  &status->eventSetupImpls(),
1185  serviceToken_,
1186  subProcesses_);
1187  }
1188  } catch (...) {
1189  status->resetResources();
1190  iHolder.doneWaiting(std::current_exception());
1191  }
1192  }); // task in sourceResourcesAcquirer
1193  }; // end lumiWork
1194 
1195  auto queueLumiWorkTask = make_waiting_task(
1196  tbb::task::allocate_root(),
1197  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1198  if (iPtr) {
1199  iHolder.doneWaiting(*iPtr);
1200  }
1201  lumiQueue_->pushAndPause(std::move(lumiWorkLambda));
1202  });
1203 
1204  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1205  // We only get here inside this block if there is an EventSetup
1206  // module not able to handle concurrent IOVs (usually an ESSource)
1207  // and the new sync value is outside the current IOV of that module.
1208 
1209  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1210 
1211  queueWhichWaitsForIOVsToFinish_.push([this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1212  // Caught exception is propagated via WaitingTaskHolder
1213  CMS_SA_ALLOW try {
1214  SendSourceTerminationSignalIfException sentry(actReg_.get());
1215  // Pass in iSync to let the EventSetup system know which run and lumi
1216  // need to be processed and prepare IOVs for it.
1217  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1218  // lumi is done and no longer needs its EventSetup IOVs.
1219  espController_->eventSetupForInstance(
1220  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1221  sentry.completedSuccessfully();
1222  } catch (...) {
1223  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1224  }
1226  });
1227 
1228  } else {
1230 
1231  // This holder will be used to wait until the EventSetup IOVs are ready
1232  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1233  // Caught exception is propagated via WaitingTaskHolder
1234  CMS_SA_ALLOW try {
1235  SendSourceTerminationSignalIfException sentry(actReg_.get());
1236 
1237  // Pass in iSync to let the EventSetup system know which run and lumi
1238  // need to be processed and prepare IOVs for it.
1239  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1240  // lumi is done and no longer needs its EventSetup IOVs.
1241  espController_->eventSetupForInstance(
1242  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1243  sentry.completedSuccessfully();
1244 
1245  } catch (...) {
1246  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1247  }
1248  }
1249  }
1250 
1252  {
1253  //all streams are sharing the same status at the moment
1254  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1255  status->needToContinueLumi();
1256  status->startProcessingEvents();
1257  }
1258 
1259  unsigned int streamIndex = 0;
1260  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1261  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1262  handleNextEventForStreamAsync(std::move(h), streamIndex);
1263  }));
1264  }
1265  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1266  handleNextEventForStreamAsync(std::move(h), streamIndex);
1267  }));
1268  }
1269 
1270  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1271  if (setDeferredException(*iPtr)) {
1272  WaitingTaskHolder tmp(holder);
1273  tmp.doneWaiting(*iPtr);
1274  } else {
1276  }
1277  }
1278 
1280  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1281  // Get some needed info out of the status object before moving
1282  // it into finalTaskForThisLumi.
1283  auto& lp = *(iLumiStatus->lumiPrincipal());
1284  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1285  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1286  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1287  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1288 
1289  auto finalTaskForThisLumi = edm::make_waiting_task(
1290  tbb::task::allocate_root(),
1291  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1292  std::exception_ptr ptr;
1293  if (iPtr) {
1294  handleEndLumiExceptions(iPtr, iTask);
1295  } else {
1296  // Caught exception is passed to handleEndLumiExceptions()
1297  CMS_SA_ALLOW try {
1299  if (looper_) {
1300  auto& lumiPrincipal = *(status->lumiPrincipal());
1301  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1302  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1303  }
1304  } catch (...) {
1305  ptr = std::current_exception();
1306  }
1307  }
1309 
1310  // Try hard to clean up resources so the
1311  // process can terminate in a controlled
1312  // fashion even after exceptions have occurred.
1313  // Caught exception is passed to handleEndLumiExceptions()
1314  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1315  if (not ptr) {
1316  ptr = std::current_exception();
1317  }
1318  }
1319  // Caught exception is passed to handleEndLumiExceptions()
1320  CMS_SA_ALLOW try {
1321  status->resumeGlobalLumiQueue();
1323  } catch (...) {
1324  if (not ptr) {
1325  ptr = std::current_exception();
1326  }
1327  }
1328  // Caught exception is passed to handleEndLumiExceptions()
1329  CMS_SA_ALLOW try {
1330  // This call to status.resetResources() must occur before iTask is destroyed.
1331  // Otherwise there will be a data race which could result in endRun
1332  // being delayed until it is too late to successfully call it.
1333  status->resetResources();
1334  status.reset();
1335  } catch (...) {
1336  if (not ptr) {
1337  ptr = std::current_exception();
1338  }
1339  }
1340 
1341  if (ptr) {
1342  handleEndLumiExceptions(&ptr, iTask);
1343  }
1344  });
1345 
1346  auto writeT = edm::make_waiting_task(
1347  tbb::task::allocate_root(),
1348  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1349  std::exception_ptr const* iExcept) mutable {
1350  if (iExcept) {
1351  task.doneWaiting(*iExcept);
1352  } else {
1353  //Only call writeLumi if beginLumi succeeded
1354  if (didGlobalBeginSucceed) {
1355  writeLumiAsync(std::move(task), lumiPrincipal);
1356  }
1357  }
1358  });
1359 
1360  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1361 
1363 
1364  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1365  *schedule_,
1366  lp,
1367  ts,
1368  es,
1369  eventSetupImpls,
1370  serviceToken_,
1371  subProcesses_,
1372  cleaningUpAfterException);
1373  }
1374 
1375  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1376  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1377  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1378  if (iPtr) {
1379  handleEndLumiExceptions(iPtr, iTask);
1380  }
1381  auto status = streamLumiStatus_[iStreamIndex];
1382  //reset status before releasing queue else get race condtion
1383  streamLumiStatus_[iStreamIndex].reset();
1385  streamQueues_[iStreamIndex].resume();
1386 
1387  //are we the last one?
1388  if (status->streamFinishedLumi()) {
1390  }
1391  });
1392 
1393  edm::WaitingTaskHolder lumiDoneTask{t};
1394 
1395  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1396  // therefore we do not want to hold the shared_ptr
1397  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1398  lumiStatus->setEndTime();
1399 
1400  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1401 
1402  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1403  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1404 
1405  if (lumiStatus->didGlobalBeginSucceed()) {
1406  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1407  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1408  lumiPrincipal.endTime());
1410  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1411  *schedule_,
1412  iStreamIndex,
1413  lumiPrincipal,
1414  ts,
1415  es,
1416  eventSetupImpls,
1417  serviceToken_,
1418  subProcesses_,
1419  cleaningUpAfterException);
1420  }
1421  }
1422 
1424  if (streamLumiActive_.load() > 0) {
1425  auto globalWaitTask = make_empty_waiting_task();
1426  globalWaitTask->increment_ref_count();
1427  {
1428  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1429  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1430  if (streamLumiStatus_[i]) {
1431  streamEndLumiAsync(globalTaskHolder, i);
1432  }
1433  }
1434  }
1435  globalWaitTask->wait_for_all();
1436  if (globalWaitTask->exceptionPtr() != nullptr) {
1437  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1438  }
1439  }
1440  }
1441 
1442  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1444  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1445  << "Illegal attempt to insert run into cache\n"
1446  << "Contact a Framework Developer\n";
1447  }
1448  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1449  preg(),
1451  historyAppender_.get(),
1452  0,
1453  true,
1455  {
1456  SendSourceTerminationSignalIfException sentry(actReg_.get());
1457  input_->readRun(*rp, *historyAppender_);
1458  sentry.completedSuccessfully();
1459  }
1460  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1461  principalCache_.insert(rp);
1462  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1463  }
1464 
1465  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1466  principalCache_.merge(input_->runAuxiliary(), preg());
1467  auto runPrincipal = principalCache_.runPrincipalPtr();
1468  {
1469  SendSourceTerminationSignalIfException sentry(actReg_.get());
1470  input_->readAndMergeRun(*runPrincipal);
1471  sentry.completedSuccessfully();
1472  }
1473  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1474  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1475  }
1476 
1479  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1480  << "Illegal attempt to insert lumi into cache\n"
1481  << "Run is invalid\n"
1482  << "Contact a Framework Developer\n";
1483  }
1485  assert(lbp);
1486  lbp->setAux(*input_->luminosityBlockAuxiliary());
1487  {
1488  SendSourceTerminationSignalIfException sentry(actReg_.get());
1489  input_->readLuminosityBlock(*lbp, *historyAppender_);
1490  sentry.completedSuccessfully();
1491  }
1492  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1493  iStatus.lumiPrincipal() = std::move(lbp);
1494  }
1495 
1497  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1498  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1499  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1500  input_->processHistoryRegistry().reducedProcessHistoryID(
1501  input_->luminosityBlockAuxiliary()->processHistoryID()));
1502  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1503  assert(lumiOK);
1504  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1505  {
1506  SendSourceTerminationSignalIfException sentry(actReg_.get());
1507  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1508  sentry.completedSuccessfully();
1509  }
1510  return input_->luminosityBlock();
1511  }
1512 
1514  ProcessHistoryID const& phid,
1515  RunNumber_t run,
1516  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1517  auto subsT = edm::make_waiting_task(
1518  tbb::task::allocate_root(),
1519  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1520  if (iExcept) {
1521  task.doneWaiting(*iExcept);
1522  } else {
1524  for (auto& s : subProcesses_) {
1525  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1526  }
1527  }
1528  });
1530  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1532  &processContext_,
1533  actReg_.get(),
1534  mergeableRunProductMetadata);
1535  }
1536 
1538  principalCache_.deleteRun(phid, run);
1539  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1540  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1541  }
1542 
1544  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1545  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1546  if (iExcept) {
1547  task.doneWaiting(*iExcept);
1548  } else {
1550  for (auto& s : subProcesses_) {
1551  s.writeLumiAsync(task, lumiPrincipal);
1552  }
1553  }
1554  });
1556 
1557  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1558 
1559  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1560  }
1561 
1563  for (auto& s : subProcesses_) {
1564  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1565  }
1566  iStatus.lumiPrincipal()->clearPrincipal();
1567  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1568  }
1569 
1571  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1572  iStatus.endLumi();
1573  return false;
1574  }
1575 
1576  if (iStatus.wasEventProcessingStopped()) {
1577  return false;
1578  }
1579 
1580  if (shouldWeStop()) {
1582  iStatus.stopProcessingEvents();
1583  iStatus.endLumi();
1584  return false;
1585  }
1586 
1588  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1589  CMS_SA_ALLOW try {
1590  //need to use lock in addition to the serial task queue because
1591  // of delayed provenance reading and reading data in response to
1592  // edm::Refs etc
1593  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1594 
1595  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1596  if (InputSource::IsLumi == itemType) {
1597  iStatus.haveContinuedLumi();
1598  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1599  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1600  readAndMergeLumi(iStatus);
1601  itemType = nextTransitionType();
1602  }
1603  if (InputSource::IsLumi == itemType) {
1604  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1605  input_->luminosityBlockAuxiliary()->beginTime()));
1606  }
1607  }
1608  if (InputSource::IsEvent != itemType) {
1609  iStatus.stopProcessingEvents();
1610 
1611  //IsFile may continue processing the lumi and
1612  // looper_ can cause the input source to declare a new IsRun which is actually
1613  // just a continuation of the previous run
1614  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1615  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1616  iStatus.endLumi();
1617  }
1618  return false;
1619  }
1620  readEvent(iStreamIndex);
1621  } catch (...) {
1622  bool expected = false;
1623  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1624  deferredExceptionPtr_ = std::current_exception();
1625  iStatus.endLumi();
1626  }
1627  return false;
1628  }
1629  return true;
1630  }
1631 
1632  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1633  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1635  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1636  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1637  auto status = streamLumiStatus_[iStreamIndex].get();
1638  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1639  CMS_SA_ALLOW try {
1640  if (readNextEventForStream(iStreamIndex, *status)) {
1641  auto recursionTask = make_waiting_task(
1642  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1643  if (iPtr) {
1644  // Try to end the stream properly even if an exception was
1645  // thrown on an event.
1646  bool expected = false;
1647  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1648  // This is the case where the exception in iPtr is the primary
1649  // exception and we want to see its message.
1650  deferredExceptionPtr_ = *iPtr;
1651  WaitingTaskHolder tempHolder(iTask);
1652  tempHolder.doneWaiting(*iPtr);
1653  }
1654  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1655  //the stream will stop now
1656  return;
1657  }
1658  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1659  });
1660 
1661  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1662  } else {
1663  //the stream will stop now
1664  if (status->isLumiEnding()) {
1665  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1666  status->startNextLumi();
1667  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1668  }
1669  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1670  } else {
1671  iTask.doneWaiting(std::exception_ptr{});
1672  }
1673  }
1674  } catch (...) {
1675  // It is unlikely we will ever get in here ...
1676  // But if we do try to clean up and propagate the exception
1677  if (streamLumiStatus_[iStreamIndex]) {
1678  streamEndLumiAsync(iTask, iStreamIndex);
1679  }
1680  bool expected = false;
1681  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1682  auto e = std::current_exception();
1684  iTask.doneWaiting(e);
1685  }
1686  }
1687  });
1688  }
1689 
1690  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1691  //TODO this will have to become per stream
1692  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1693  StreamContext streamContext(event.streamID(), &processContext_);
1694 
1695  SendSourceTerminationSignalIfException sentry(actReg_.get());
1696  input_->readEvent(event, streamContext);
1697 
1698  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1699  sentry.completedSuccessfully();
1700 
1701  FDEBUG(1) << "\treadEvent\n";
1702  }
1703 
1704  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1705  tbb::task::spawn(
1706  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1707  }
1708 
1709  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1710  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1711 
1714  if (rng.isAvailable()) {
1715  Event ev(*pep, ModuleDescription(), nullptr);
1716  rng->postEventRead(ev);
1717  }
1718 
1719  WaitingTaskHolder finalizeEventTask(make_waiting_task(
1720  tbb::task::allocate_root(), [this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1721  //NOTE: If we have a looper we only have one Stream
1722  if (looper_) {
1723  ServiceRegistry::Operate operateLooper(serviceToken_);
1724  processEventWithLooper(*pep, iStreamIndex);
1725  }
1726 
1727  FDEBUG(1) << "\tprocessEvent\n";
1728  pep->clearEventPrincipal();
1729  if (iPtr) {
1730  iHolder.doneWaiting(*iPtr);
1731  } else {
1732  iHolder.doneWaiting(std::exception_ptr());
1733  }
1734  }));
1735  WaitingTaskHolder afterProcessTask;
1736  if (subProcesses_.empty()) {
1737  afterProcessTask = std::move(finalizeEventTask);
1738  } else {
1739  //Need to run SubProcesses after schedule has finished
1740  // with the event
1741  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1742  tbb::task::allocate_root(),
1743  [this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1744  if (not iPtr) {
1745  //when run with 1 thread, we want to the order to be what
1746  // it was before. This requires reversing the order since
1747  // tasks are run last one in first one out
1748  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1749  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1750  }
1751  } else {
1752  finalizeEventTask.doneWaiting(*iPtr);
1753  }
1754  }));
1755  }
1756 
1757  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1758  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, *pep, es, serviceToken_);
1759  }
1760 
1761  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1762  bool randomAccess = input_->randomAccess();
1763  ProcessingController::ForwardState forwardState = input_->forwardState();
1764  ProcessingController::ReverseState reverseState = input_->reverseState();
1765  ProcessingController pc(forwardState, reverseState, randomAccess);
1766 
1768  do {
1769  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1770  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1771  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1772 
1773  bool succeeded = true;
1774  if (randomAccess) {
1776  input_->skipEvents(-2);
1778  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1779  }
1780  }
1782  } while (!pc.lastOperationSucceeded());
1784  shouldWeStop_ = true;
1786  }
1787  }
1788 
1790  FDEBUG(1) << "\tshouldWeStop\n";
1791  if (shouldWeStop_)
1792  return true;
1793  if (!subProcesses_.empty()) {
1794  for (auto const& subProcess : subProcesses_) {
1795  if (subProcess.terminate()) {
1796  return true;
1797  }
1798  }
1799  return false;
1800  }
1801  return schedule_->terminate();
1802  }
1803 
1805 
1807 
1809 
1810  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1811  bool expected = false;
1812  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1813  deferredExceptionPtr_ = iException;
1814  return true;
1815  }
1816  return false;
1817  }
1818 
1820  std::unique_ptr<LogSystem> s;
1821  for (auto worker : schedule_->allWorkers()) {
1822  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1823  if (not s) {
1824  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1825  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1826  }
1827  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1828  }
1829  }
1830  }
1831 } // namespace edm
edm::ParameterSet::registerIt
ParameterSet const & registerIt()
Definition: ParameterSet.cc:106
ConfigurationDescriptions.h
edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:349
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::LogAbsolute
Definition: MessageLogger.h:469
ParameterSetDescriptionFillerBase.h
IllegalParameters.h
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1537
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:79
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:330
edm::ProcessingController::lastOperationSucceeded
bool lastOperationSucceeded() const
Definition: ProcessingController.cc:86
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:291
edm::RunPrincipal::endTime
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::LuminosityBlockPrincipal::runPrincipal
RunPrincipal const & runPrincipal() const
Definition: LuminosityBlockPrincipal.h:45
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:336
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:832
edm::EventProcessor::respondToCloseInputFile
void respondToCloseInputFile()
Definition: EventProcessor.cc:799
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:355
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::totalEventsPassed
int totalEventsPassed() const
Definition: EventProcessor.cc:618
edm::SubProcessParentageHelper
Definition: SubProcessParentageHelper.h:21
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:309
edm::MergeableRunProductMetadata::postWriteRun
void postWriteRun()
Definition: MergeableRunProductMetadata.cc:155
input
static const std::string input
Definition: EdmProvDump.cc:48
ServiceRegistry.h
edm::EventProcessor::readRun
std::pair< ProcessHistoryID, RunNumber_t > readRun()
Definition: EventProcessor.cc:1442
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:693
edm::EventProcessor::shouldWeCloseOutput
bool shouldWeCloseOutput() const
Definition: EventProcessor.cc:843
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1513
MessageLogger.h
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:74
funct::false
false
Definition: Factorize.h:34
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:817
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:313
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:610
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:807
EventSetupProvider.h
ScheduleInfo.h
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2129
edm::EventProcessor::totalEvents
int totalEvents() const
Definition: EventProcessor.cc:616
propagate_const.h
edm::errors::LogicError
Definition: EDMException.h:37
edm::EventSetupImpl
Definition: EventSetupImpl.h:44
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:88
edm::LuminosityBlock
Definition: LuminosityBlock.h:50
BranchIDListHelper.h
mps_update.status
status
Definition: mps_update.py:69
CalibrationSummaryClient_cfi.params
params
Definition: CalibrationSummaryClient_cfi.py:14
WaitingTaskHolder.h
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:65
LuminosityBlock.h
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1562
edm
HLT enums.
Definition: AlignableModifier.h:19
RandomNumberGenerator.h
edm::EventProcessor::endJob
void endJob()
Definition: EventProcessor.cc:579
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:76
edm::TerminationOrigin::ExternalSignal
MessageForSource.h
edm::EventProcessor::getTriggerReport
void getTriggerReport(TriggerReport &rep) const
Definition: EventProcessor.cc:626
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1279
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:4
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:677
edm::EventProcessor::readAndMergeRun
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
Definition: EventProcessor.cc:1465
edm::LogInfo
Definition: MessageLogger.h:254
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:316
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:343
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1690
edm::EventProcessor::processLumis
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
Definition: EventProcessor.cc:1039
edm::InputSourceFactory::get
static InputSourceFactory const * get()
Definition: InputSourceFactory.cc:18
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
Algorithms.h
edm::EventProcessor::beginRun
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:865
edm::ModuleDescription::getUniqueID
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
Definition: ModuleDescription.cc:87
edm::EventProcessor::enableEndPaths
void enableEndPaths(bool active)
Definition: EventProcessor.cc:622
edm::EventProcessor::endUnfinishedRun
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:945
edm::ScheduleItems
Definition: ScheduleItems.h:28
edm::InputSourceDescription
Definition: InputSourceDescription.h:20
cms::cuda::assert
assert(be >=bs)
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:258
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:28
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:215
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:337
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
Definition: EventProcessor.cc:192
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:325
ProcessHistoryRegistry.h
ESRecordsToProxyIndices.h
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:320
EventSetupsController.h
edm::ParameterSet::id
ParameterSetID id() const
Definition: ParameterSet.cc:182
edm::LuminosityBlockPrincipal::run
RunNumber_t run() const
Definition: LuminosityBlockPrincipal.h:66
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:314
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:75
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:321
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:345
edm::LuminosityBlockNumber_t
unsigned int LuminosityBlockNumber_t
Definition: RunLumiEventNumber.h:13
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:117
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:354
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:27
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::EventProcessor::totalEventsFailed
int totalEventsFailed() const
Definition: EventProcessor.cc:620
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:14
edm::ProcessingController
Definition: ProcessingController.h:29
edm::EventProcessor::setExceptionMessageRuns
void setExceptionMessageRuns(std::string &message)
Definition: EventProcessor.cc:1806
edm::ModuleDescription
Definition: ModuleDescription.h:21
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::eventsetup::ComponentFactory::get
static ComponentFactory< T > const * get()
EventSetupRecord.h
ParameterSetDescriptionFillerPluginFactory.h
edm::InputSource::IsRun
Definition: InputSource.h:78
edm::EventProcessor::setExceptionMessageFiles
void setExceptionMessageFiles(std::string &message)
Definition: EventProcessor.cc:1804
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:104
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
ModuleDescription.h
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:57
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1709
edm::ProcessingController::requestedTransition
Transition requestedTransition() const
Definition: ProcessingController.cc:82
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:210
EDMException.h
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::ServiceToken
Definition: ServiceToken.h:40
ParentageRegistry.h
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:20
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
edm::RunPrincipal::beginTime
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:347
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1496
edm::LuminosityBlockProcessingStatus::endLumi
void endLumi()
Definition: LuminosityBlockProcessingStatus.h:78
edm::EventPrincipal
Definition: EventPrincipal.h:46
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:317
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:180
edm::EventProcessor::epSignal
Definition: EventProcessor.h:78
edm::StreamContext
Definition: StreamContext.h:31
edm::EventProcessor::EventProcessor
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
Definition: EventProcessor.cc:215
edm::ExceptionCollector
Definition: ExceptionCollector.h:33
edm::eventsetup::EventSetupProvider
Definition: EventSetupProvider.h:49
edm::EventProcessor::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: EventProcessor.cc:612
edm::SerialTaskQueue::push
void push(const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:187
edm::EventProcessor::endPathsEnabled
bool endPathsEnabled() const
Definition: EventProcessor.cc:624
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const
Definition: WaitingTaskHolder.h:58
edm::SerialTaskQueueChain::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:86
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:53
EventPrincipal.h
Service.h
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:324
edm::EventProcessor::openOutputFiles
void openOutputFiles()
Definition: EventProcessor.cc:773
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1477
SubProcessParentageHelper.h
edm::InputSource::IsSynchronize
Definition: InputSource.h:78
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:340
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::ActivityRegistry
Definition: ActivityRegistry.h:132
edm::EventProcessor::clearCounters
void clearCounters()
Clears counters used by trigger report.
Definition: EventProcessor.cc:628
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:342
edm::MergeableRunProductMetadata
Definition: MergeableRunProductMetadata.h:52
CommonParams.h
edm::LuminosityBlockProcessingStatus
Definition: LuminosityBlockProcessingStatus.h:41
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:57
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:130
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:344
ProcessDesc.h
edm::Hash< ProcessHistoryType >
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1819
h
ConvertException.h
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:344
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:142
ExceptionCollector.h
OrderedSet.t
t
Definition: OrderedSet.py:90
edm::LuminosityBlockProcessingStatus::lumiPrincipal
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
Definition: LuminosityBlockProcessingStatus.h:51
edm::CommonParams
Definition: CommonParams.h:14
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
InputSourceDescription.h
edm::IOVSyncValue
Definition: IOVSyncValue.h:31
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:308
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:346
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:142
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:362
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:352
streamTransitionAsync.h
edm::Parentage
Definition: Parentage.h:25
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:96
LooperFactory.h
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:215
HistoryAppender.h
UnixSignalHandlers.h
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:101
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:318
ProcessingController.h
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1061
edm::ParameterSet
Definition: ParameterSet.h:36
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:315
edm::LogError
Definition: MessageLogger.h:183
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:326
edm::LuminosityBlockProcessingStatus::stopProcessingEvents
void stopProcessingEvents()
Definition: LuminosityBlockProcessingStatus.h:74
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:219
Event.h
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:30
EDLooperBase.h
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:78
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1375
LuminosityBlockProcessingStatus.h
trigObjTnPSource_cfi.filler
filler
Definition: trigObjTnPSource_cfi.py:21
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:312
thread_safety_macros.h
RunPrincipal.h
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:103
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:194
edm::RunPrincipal::run
RunNumber_t run() const
Definition: RunPrincipal.h:61
edm::LuminosityBlockProcessingStatus::haveContinuedLumi
void haveContinuedLumi()
Definition: LuminosityBlockProcessingStatus.h:81
Schedule.h
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
edm::Service
Definition: Service.h:30
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:311
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1570
edm::SharedResourcesRegistry
Definition: SharedResourcesRegistry.h:39
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:263
cuy.rep
rep
Definition: cuy.py:1190
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:348
edm::InputSource::IsStop
Definition: InputSource.h:78
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:120
edm::EventProcessor::fb_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
Definition: EventProcessor.h:332
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:106
edm::HistoryAppender
Definition: HistoryAppender.h:13
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:79
edm::ScheduleInfo
Definition: ScheduleInfo.h:32
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:90
get
#define get
edm::eventsetup::EventSetupsController
Definition: EventSetupsController.h:79
edm::TriggerReport
Definition: TriggerReport.h:56
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:327
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:322
LuminosityBlockPrincipal.h
edm::EventProcessor::endUnfinishedLumi
void endUnfinishedLumi()
Definition: EventProcessor.cc:1423
WaitingTask.h
edm::make_empty_waiting_task
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
Definition: WaitingTaskList.h:96
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
edm::EventProcessor::readFile
void readFile()
Definition: EventProcessor.cc:746
SubProcess.h
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:355
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:66
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:333
FileBlock.h
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:339
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1810
edm::LuminosityBlockPrincipal::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockPrincipal.h:61
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:78
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1251
Registry.h
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:856
OccurrenceTraits.h
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::EventProcessor::~EventProcessor
~EventProcessor()
Definition: EventProcessor.cc:504
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:24
edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped
bool wasEventProcessingStopped() const
Definition: LuminosityBlockProcessingStatus.h:73
eostools.move
def move(src, dest)
Definition: eostools.py:511
writedatasetfile.run
run
Definition: writedatasetfile.py:27
SharedResourcesRegistry.h
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:79
edm::EventProcessor::closeInputFile
void closeInputFile(bool cleaningUpAfterException)
Definition: EventProcessor.cc:764
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:341
MergeableRunProductMetadata.h
InputSourceFactory.h
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:287
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:49
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:336
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:57
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:75
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:319
Exception
Definition: hltDiff.cc:246
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:367
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::parameterSet
ParameterSet const & parameterSet(Provenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1270
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1761
RootHandlers.h
Exception.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:296
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:28
edm::LuminosityBlockProcessingStatus::setNextSyncValue
void setNextSyncValue(IOVSyncValue iValue)
Definition: LuminosityBlockProcessingStatus.h:101
ParameterSetID.h
globalTransitionAsync.h
DebugMacros.h
validateTopLevelParameterSets.h
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:971
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1808
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:297
cms::Exception
Definition: Exception.h:70
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:675
edm::InputSource::IsEvent
Definition: InputSource.h:78
edm::EventProcessor::closeOutputFiles
void closeOutputFiles()
Definition: EventProcessor.cc:781
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:329
edm::LuminosityBlockPrincipal::beginTime
Timestamp const & beginTime() const
Definition: LuminosityBlockPrincipal.h:55
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:645
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:838
StreamContext.h
edm::RunPrincipal
Definition: RunPrincipal.h:34
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:61
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:325
MessageForParent.h
edm::EventProcessor::respondToOpenInputFile
void respondToOpenInputFile()
Definition: EventProcessor.cc:789
EventProcessor.h
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1704
event
Definition: event.py:1
edm::EventID
Definition: EventID.h:31
ModuleChanger.h
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
edm::Event
Definition: Event.h:73
edm::ProcessingController::specifiedEventTransition
edm::EventID specifiedEventTransition() const
Definition: ProcessingController.cc:84
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1789
CommonMethods.cp
def cp(fromDir, toDir, listOfFiles, overwrite=False, smallList=False)
Definition: CommonMethods.py:192
edm::LuminosityBlockProcessingStatus::continuingLumi
bool continuingLumi() const
Definition: LuminosityBlockProcessingStatus.h:80
IOVSyncValue.h
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::PrincipalCache::insert
void insert(std::shared_ptr< RunPrincipal > rp)
Definition: PrincipalCache.cc:81
edm::errors::Configuration
Definition: EDMException.h:36
SystemBounds.h
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:285
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:323
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1543
edm::EventProcessor::nextRunID
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
Definition: EventProcessor.cc:671
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1632
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:310
edm::ProcessingController::setLastOperationSucceeded
void setLastOperationSucceeded(bool value)
Definition: ProcessingController.cc:71
Breakpoints.h
edm::ModuleChanger
Definition: ModuleChanger.h:36
trackingPlots.common
common
Definition: trackingPlots.py:205
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:130
edm::serviceregistry::ServiceLegacy
ServiceLegacy
Definition: ServiceLegacy.h:29
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:634
common
Definition: common.py:1
edm::MergeableRunProductMetadata::writeLumi
void writeLumi(LuminosityBlockNumber_t lumi)
Definition: MergeableRunProductMetadata.cc:118
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:350
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:522
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:16
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18