CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
41 
43 
51 
56 
59 
72 
73 #include "MessageForSource.h"
74 #include "MessageForParent.h"
76 
77 #include "boost/range/adaptor/reversed.hpp"
78 
79 #include <cassert>
80 #include <exception>
81 #include <iomanip>
82 #include <iostream>
83 #include <utility>
84 #include <sstream>
85 
86 #include <sys/ipc.h>
87 #include <sys/msg.h>
88 
89 #include "tbb/task.h"
90 
91 //Used for CPU affinity
92 #ifndef __APPLE__
93 #include <sched.h>
94 #endif
95 
96 namespace {
97  //Sentry class to only send a signal if an
98  // exception occurs. An exception is identified
99  // by the destructor being called without first
100  // calling completedSuccessfully().
101  class SendSourceTerminationSignalIfException {
102  public:
103  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg) : reg_(iReg) {}
104  ~SendSourceTerminationSignalIfException() {
105  if (reg_) {
106  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
107  }
108  }
109  void completedSuccessfully() { reg_ = nullptr; }
110 
111  private:
112  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
113  };
114 
115 } // namespace
116 
117 namespace edm {
118 
119  // ---------------------------------------------------------------
120  std::unique_ptr<InputSource> makeInput(ParameterSet& params,
121  CommonParams const& common,
122  std::shared_ptr<ProductRegistry> preg,
123  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
124  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
125  std::shared_ptr<ActivityRegistry> areg,
126  std::shared_ptr<ProcessConfiguration const> processConfiguration,
127  PreallocationConfiguration const& allocations) {
128  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
129  if (main_input == nullptr) {
131  << "There must be exactly one source in the configuration.\n"
132  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
133  }
134 
135  std::string modtype(main_input->getParameter<std::string>("@module_type"));
136 
137  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
139  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
140  filler->fill(descriptions);
141 
142  try {
143  convertException::wrap([&]() { descriptions.validate(*main_input, std::string("source")); });
144  } catch (cms::Exception& iException) {
145  std::ostringstream ost;
146  ost << "Validating configuration of input source of type " << modtype;
147  iException.addContext(ost.str());
148  throw;
149  }
150 
151  main_input->registerIt();
152 
153  // Fill in "ModuleDescription", in case the input source produces
154  // any EDProducts, which would be registered in the ProductRegistry.
155  // Also fill in the process history item for this process.
156  // There is no module label for the unnamed input source, so
157  // just use "source".
158  // Only the tracked parameters belong in the process configuration.
159  ModuleDescription md(main_input->id(),
160  main_input->getParameter<std::string>("@module_type"),
161  "source",
162  processConfiguration.get(),
164 
165  InputSourceDescription isdesc(md,
166  preg,
167  branchIDListHelper,
168  thinnedAssociationsHelper,
169  areg,
170  common.maxEventsInput_,
171  common.maxLumisInput_,
172  common.maxSecondsUntilRampdown_,
173  allocations);
174 
175  areg->preSourceConstructionSignal_(md);
176  std::unique_ptr<InputSource> input;
177  try {
178  //even if we have an exception, send the signal
179  std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
180  convertException::wrap([&]() {
181  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
182  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
183  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
184  });
185  } catch (cms::Exception& iException) {
186  std::ostringstream ost;
187  ost << "Constructing input source of type " << modtype;
188  iException.addContext(ost.str());
189  throw;
190  }
191  return input;
192  }
193 
194  // ---------------------------------------------------------------
195  std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
197  ParameterSet& params) {
198  std::shared_ptr<EDLooperBase> vLooper;
199 
200  std::vector<std::string> loopers = params.getParameter<std::vector<std::string>>("@all_loopers");
201 
202  if (loopers.empty()) {
203  return vLooper;
204  }
205 
206  assert(1 == loopers.size());
207 
208  for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
209  ++itName) {
210  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
211  providerPSet->registerIt();
212  vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
213  }
214  return vLooper;
215  }
216 
217  // ---------------------------------------------------------------
218  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
219  ServiceToken const& iToken,
221  std::vector<std::string> const& defaultServices,
222  std::vector<std::string> const& forcedServices)
223  : actReg_(),
224  preg_(),
225  branchIDListHelper_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
231  processConfiguration_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
237  deferredExceptionPtrIsSet_(false),
238  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
239  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  fileModeNoMerge_(false),
244  exceptionMessageFiles_(),
245  exceptionMessageRuns_(),
246  exceptionMessageLumis_(false),
247  forceLooperToEnd_(false),
248  looperBeginJobRun_(false),
249  forceESCacheClearOnNewRun_(false),
250  eventSetupDataToExcludeFromPrefetching_() {
251  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
252  processDesc->addServices(defaultServices, forcedServices);
253  init(processDesc, iToken, iLegacy);
254  }
255 
256  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet, //std::string const& config,
257  std::vector<std::string> const& defaultServices,
258  std::vector<std::string> const& forcedServices)
259  : actReg_(),
260  preg_(),
261  branchIDListHelper_(),
262  serviceToken_(),
263  input_(),
264  espController_(new eventsetup::EventSetupsController),
265  esp_(),
266  act_table_(),
267  processConfiguration_(),
268  schedule_(),
269  subProcesses_(),
270  historyAppender_(new HistoryAppender),
271  fb_(),
272  looper_(),
273  deferredExceptionPtrIsSet_(false),
274  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
275  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
276  principalCache_(),
277  beginJobCalled_(false),
278  shouldWeStop_(false),
279  fileModeNoMerge_(false),
280  exceptionMessageFiles_(),
281  exceptionMessageRuns_(),
282  exceptionMessageLumis_(false),
283  forceLooperToEnd_(false),
284  looperBeginJobRun_(false),
285  forceESCacheClearOnNewRun_(false),
286  asyncStopRequestedWhileProcessingEvents_(false),
287  eventSetupDataToExcludeFromPrefetching_() {
288  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
289  processDesc->addServices(defaultServices, forcedServices);
291  }
292 
293  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
294  ServiceToken const& token,
296  : actReg_(),
297  preg_(),
298  branchIDListHelper_(),
299  serviceToken_(),
300  input_(),
301  espController_(new eventsetup::EventSetupsController),
302  esp_(),
303  act_table_(),
304  processConfiguration_(),
305  schedule_(),
306  subProcesses_(),
307  historyAppender_(new HistoryAppender),
308  fb_(),
309  looper_(),
310  deferredExceptionPtrIsSet_(false),
311  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
312  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
313  principalCache_(),
314  beginJobCalled_(false),
315  shouldWeStop_(false),
316  fileModeNoMerge_(false),
317  exceptionMessageFiles_(),
318  exceptionMessageRuns_(),
319  exceptionMessageLumis_(false),
320  forceLooperToEnd_(false),
321  looperBeginJobRun_(false),
322  forceESCacheClearOnNewRun_(false),
323  asyncStopRequestedWhileProcessingEvents_(false),
324  eventSetupDataToExcludeFromPrefetching_() {
325  init(processDesc, token, legacy);
326  }
327 
328  void EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
329  ServiceToken const& iToken,
331  //std::cerr << processDesc->dump() << std::endl;
332 
333  // register the empty parentage vector , once and for all
335 
336  // register the empty parameter set, once and for all.
338 
339  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
340 
341  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
342  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
343  bool const hasSubProcesses = !subProcessVParameterSet.empty();
344 
345  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
346  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
347  // set in here if the parameters were not explicitly set.
349 
350  // Now set some parameters specific to the main process.
351  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
352  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
353  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
354  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
355  << fileMode << ".\n"
356  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
357  } else {
358  fileModeNoMerge_ = (fileMode == "NOMERGE");
359  }
360  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
361 
362  //threading
363  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
364 
365  // Even if numberOfThreads was set to zero in the Python configuration, the code
366  // in cmsRun.cpp should have reset it to something else.
367  assert(nThreads != 0);
368 
369  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
370  if (nStreams == 0) {
371  nStreams = nThreads;
372  }
373  if (nThreads > 1 or nStreams > 1) {
374  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
375  }
376  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
377  if (nConcurrentRuns != 1) {
378  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
379  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
380  }
381  unsigned int nConcurrentLumis =
382  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
383  if (nConcurrentLumis == 0) {
384  nConcurrentLumis = nConcurrentRuns;
385  }
386 
387  //Check that relationships between threading parameters makes sense
388  /*
389  if(nThreads<nStreams) {
390  //bad
391  }
392  if(nConcurrentRuns>nStreams) {
393  //bad
394  }
395  if(nConcurrentRuns>nConcurrentLumis) {
396  //bad
397  }
398  */
399  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
400 
401  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
402 
403  // Now do general initialization
405 
406  //initialize the services
407  auto& serviceSets = processDesc->getServicesPSets();
408  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
409  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
410 
411  //make the services available
413 
414  if (nStreams > 1) {
416  handler->willBeUsingThreads();
417  }
418 
419  // intialize miscellaneous items
420  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
421 
422  // intialize the event setup provider
423  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
424  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
425 
426  // initialize the looper, if any
428  if (looper_) {
429  looper_->setActionTable(items.act_table_.get());
430  looper_->attachTo(*items.actReg_);
431 
432  //For now loopers make us run only 1 transition at a time
433  nStreams = 1;
434  nConcurrentLumis = 1;
435  nConcurrentRuns = 1;
436  }
437  espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
438 
439  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
440 
441  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
442  streamQueues_.resize(nStreams);
443  streamLumiStatus_.resize(nStreams);
444 
445  // initialize the input source
447  *common,
448  items.preg(),
449  items.branchIDListHelper(),
450  items.thinnedAssociationsHelper(),
451  items.actReg_,
452  items.processConfiguration(),
454 
455  // intialize the Schedule
456  schedule_ = items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_);
457 
458  // set the data members
459  act_table_ = std::move(items.act_table_);
460  actReg_ = items.actReg_;
461  preg_ = items.preg();
463  branchIDListHelper_ = items.branchIDListHelper();
464  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
465  processConfiguration_ = items.processConfiguration();
467  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
468 
469  FDEBUG(2) << parameterSet << std::endl;
470 
472  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
473  // Reusable event principal
474  auto ep = std::make_shared<EventPrincipal>(preg(),
478  historyAppender_.get(),
479  index);
481  }
482 
483  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
484  auto lp =
485  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
487  }
488 
489  {
490  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
492 
493  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
495  }
496 
497  // fill the subprocesses, if there are any
498  subProcesses_.reserve(subProcessVParameterSet.size());
499  for (auto& subProcessPSet : subProcessVParameterSet) {
500  subProcesses_.emplace_back(subProcessPSet,
501  *parameterSet,
502  preg(),
507  *actReg_,
508  token,
511  &processContext_);
512  }
513  }
514 
516  // Make the services available while everything is being deleted.
519 
520  // manually destroy all these thing that may need the services around
521  // propagate_const<T> has no reset() function
522  espController_ = nullptr;
523  esp_ = nullptr;
524  schedule_ = nullptr;
525  input_ = nullptr;
526  looper_ = nullptr;
527  actReg_ = nullptr;
528 
531  }
532 
534 
536  if (beginJobCalled_)
537  return;
538  beginJobCalled_ = true;
539  bk::beginJob();
540 
541  // StateSentry toerror(this); // should we add this ?
542  //make the services available
544 
549  actReg_->preallocateSignal_(bounds);
550  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
552 
553  //NOTE: this may throw
555  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
556 
559  }
560  //NOTE: This implementation assumes 'Job' means one call
561  // the EventProcessor::run
562  // If it really means once per 'application' then this code will
563  // have to be changed.
564  // Also have to deal with case where have 'run' then new Module
565  // added and do 'run'
566  // again. In that case the newly added Module needs its 'beginJob'
567  // to be called.
568 
569  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
570  // For now we delay calling beginOfJob until first beginOfRun
571  //if(looper_) {
572  // looper_->beginOfJob(es);
573  //}
574  try {
575  convertException::wrap([&]() { input_->doBeginJob(); });
576  } catch (cms::Exception& ex) {
577  ex.addContext("Calling beginJob for the source");
578  throw;
579  }
580  espController_->finishConfiguration();
581  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices());
582  // toerror.succeeded(); // should we add this?
583  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
584  actReg_->postBeginJobSignal_();
585 
586  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
587  schedule_->beginStream(i);
588  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
589  }
590  }
591 
593  // Collects exceptions, so we don't throw before all operations are performed.
595  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
596 
597  //make the services available
599 
600  //NOTE: this really should go elsewhere in the future
601  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
602  c.call([this, i]() { this->schedule_->endStream(i); });
603  for (auto& subProcess : subProcesses_) {
604  c.call([&subProcess, i]() { subProcess.doEndStream(i); });
605  }
606  }
607  auto actReg = actReg_.get();
608  c.call([actReg]() { actReg->preEndJobSignal_(); });
609  schedule_->endJob(c);
610  for (auto& subProcess : subProcesses_) {
611  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
612  }
613  c.call(std::bind(&InputSource::doEndJob, input_.get()));
614  if (looper_) {
615  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
616  }
617  c.call([actReg]() { actReg->postEndJobSignal_(); });
618  if (c.hasThrown()) {
619  c.rethrow();
620  }
621  }
622 
624 
625  std::vector<ModuleDescription const*> EventProcessor::getAllModuleDescriptions() const {
626  return schedule_->getAllModuleDescriptions();
627  }
628 
629  int EventProcessor::totalEvents() const { return schedule_->totalEvents(); }
630 
631  int EventProcessor::totalEventsPassed() const { return schedule_->totalEventsPassed(); }
632 
633  int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }
634 
635  void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); }
636 
637  bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); }
638 
639  void EventProcessor::clearCounters() { schedule_->clearCounters(); }
640 
641  namespace {
642 #include "TransitionProcessors.icc"
643  }
644 
646  bool returnValue = false;
647 
648  // Look for a shutdown signal
649  if (shutdown_flag.load(std::memory_order_acquire)) {
650  returnValue = true;
652  }
653  return returnValue;
654  }
655 
657  if (deferredExceptionPtrIsSet_.load()) {
659  return InputSource::IsStop;
660  }
661 
662  SendSourceTerminationSignalIfException sentry(actReg_.get());
663  InputSource::ItemType itemType;
664  //For now, do nothing with InputSource::IsSynchronize
665  do {
666  itemType = input_->nextItemType();
667  } while (itemType == InputSource::IsSynchronize);
668 
669  lastSourceTransition_ = itemType;
670  sentry.completedSuccessfully();
671 
673 
675  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
677  }
678 
679  return lastSourceTransition_;
680  }
681 
682  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> EventProcessor::nextRunID() {
683  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
684  }
685 
687 
691  {
692  beginJob(); //make sure this was called
693 
694  // make the services available
696 
698  try {
699  FilesProcessor fp(fileModeNoMerge_);
700 
701  convertException::wrap([&]() {
702  bool firstTime = true;
703  do {
704  if (not firstTime) {
706  rewindInput();
707  } else {
708  firstTime = false;
709  }
710  startingNewLoop();
711 
712  auto trans = fp.processFiles(*this);
713 
714  fp.normalEnd();
715 
716  if (deferredExceptionPtrIsSet_.load()) {
717  std::rethrow_exception(deferredExceptionPtr_);
718  }
719  if (trans != InputSource::IsStop) {
720  //problem with the source
721  doErrorStuff();
722 
723  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
724  }
725  } while (not endOfLoop());
726  }); // convertException::wrap
727 
728  } // Try block
729  catch (cms::Exception& e) {
731  std::string message(
732  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
733  e.addAdditionalInfo(message);
734  if (e.alreadyPrinted()) {
735  LogAbsolute("Additional Exceptions") << message;
736  }
737  }
738  if (!exceptionMessageRuns_.empty()) {
739  e.addAdditionalInfo(exceptionMessageRuns_);
740  if (e.alreadyPrinted()) {
741  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
742  }
743  }
744  if (!exceptionMessageFiles_.empty()) {
745  e.addAdditionalInfo(exceptionMessageFiles_);
746  if (e.alreadyPrinted()) {
747  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
748  }
749  }
750  throw;
751  }
752  }
753 
754  return returnCode;
755  }
756 
758  FDEBUG(1) << " \treadFile\n";
759  size_t size = preg_->size();
760  SendSourceTerminationSignalIfException sentry(actReg_.get());
761 
763 
764  fb_ = input_->readFile();
765  if (size < preg_->size()) {
767  }
770  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
771  }
772  sentry.completedSuccessfully();
773  }
774 
775  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
776  if (fb_.get() != nullptr) {
777  SendSourceTerminationSignalIfException sentry(actReg_.get());
778  input_->closeFile(fb_.get(), cleaningUpAfterException);
779  sentry.completedSuccessfully();
780  }
781  FDEBUG(1) << "\tcloseInputFile\n";
782  }
783 
785  if (fb_.get() != nullptr) {
786  schedule_->openOutputFiles(*fb_);
787  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
788  }
789  FDEBUG(1) << "\topenOutputFiles\n";
790  }
791 
793  if (fb_.get() != nullptr) {
794  schedule_->closeOutputFiles();
795  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
796  }
797  FDEBUG(1) << "\tcloseOutputFiles\n";
798  }
799 
802  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
803  if (fb_.get() != nullptr) {
804  schedule_->respondToOpenInputFile(*fb_);
805  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
806  }
807  FDEBUG(1) << "\trespondToOpenInputFile\n";
808  }
809 
811  if (fb_.get() != nullptr) {
812  schedule_->respondToCloseInputFile(*fb_);
813  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
814  }
815  FDEBUG(1) << "\trespondToCloseInputFile\n";
816  }
817 
819  shouldWeStop_ = false;
820  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
821  // until after we've called beginOfJob
822  if (looper_ && looperBeginJobRun_) {
823  looper_->doStartingNewLoop();
824  }
825  FDEBUG(1) << "\tstartingNewLoop\n";
826  }
827 
829  if (looper_) {
830  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
831  looper_->setModuleChanger(&changer);
832  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
833  looper_->setModuleChanger(nullptr);
835  return true;
836  else
837  return false;
838  }
839  FDEBUG(1) << "\tendOfLoop\n";
840  return true;
841  }
842 
844  input_->repeat();
845  input_->rewind();
846  FDEBUG(1) << "\trewind\n";
847  }
848 
850  looper_->prepareForNextLoop(esp_.get());
851  FDEBUG(1) << "\tprepareForNextLoop\n";
852  }
853 
855  FDEBUG(1) << "\tshouldWeCloseOutput\n";
856  if (!subProcesses_.empty()) {
857  for (auto const& subProcess : subProcesses_) {
858  if (subProcess.shouldWeCloseOutput()) {
859  return true;
860  }
861  }
862  return false;
863  }
864  return schedule_->shouldWeCloseOutput();
865  }
866 
868  FDEBUG(1) << "\tdoErrorStuff\n";
869  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
870  << "and went to the error state\n"
871  << "Will attempt to terminate processing normally\n"
872  << "(IF using the looper the next loop will be attempted)\n"
873  << "This likely indicates a bug in an input module or corrupted input or both\n";
874  }
875 
876  void EventProcessor::beginProcessBlock(bool& beginProcessBlockSucceeded) {
878  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
879 
881  auto globalWaitTask = make_empty_waiting_task();
882  globalWaitTask->increment_ref_count();
883 
884  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
885  beginGlobalTransitionAsync<Traits>(
886  WaitingTaskHolder(globalWaitTask.get()), *schedule_, transitionInfo, serviceToken_, subProcesses_);
887 
888  globalWaitTask->wait_for_all();
889  if (globalWaitTask->exceptionPtr() != nullptr) {
890  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
891  }
892  beginProcessBlockSucceeded = true;
893  }
894 
897  // For now the input source always returns false from readProcessBlock,
898  // so this does nothing at all.
899  // Eventually the ProcessBlockPrincipal needs to be properly filled
900  // and cleared. The delayed reader needs to be set. The correct process name
901  // needs to be supplied.
902  while (input_->readProcessBlock()) {
903  DelayedReader* reader = nullptr;
904  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName(), reader);
905 
907  auto globalWaitTask = make_empty_waiting_task();
908  globalWaitTask->increment_ref_count();
909 
910  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
911  beginGlobalTransitionAsync<Traits>(
912  WaitingTaskHolder(globalWaitTask.get()), *schedule_, transitionInfo, serviceToken_, subProcesses_);
913 
914  globalWaitTask->wait_for_all();
915  if (globalWaitTask->exceptionPtr() != nullptr) {
916  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
917  }
918 
919  auto writeWaitTask = edm::make_empty_waiting_task();
920  writeWaitTask->increment_ref_count();
922  writeWaitTask->wait_for_all();
923  if (writeWaitTask->exceptionPtr()) {
924  std::rethrow_exception(*writeWaitTask->exceptionPtr());
925  }
926 
927  processBlockPrincipal.clearPrincipal();
928  for (auto& s : subProcesses_) {
929  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
930  }
931  }
932  }
933 
934  void EventProcessor::endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded) {
936 
938  auto globalWaitTask = make_empty_waiting_task();
939  globalWaitTask->increment_ref_count();
940 
941  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
942  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
943  *schedule_,
944  transitionInfo,
947  cleaningUpAfterException);
948 
949  globalWaitTask->wait_for_all();
950  if (globalWaitTask->exceptionPtr() != nullptr) {
951  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
952  }
953 
954  if (beginProcessBlockSucceeded) {
955  auto writeWaitTask = edm::make_empty_waiting_task();
956  writeWaitTask->increment_ref_count();
958  writeWaitTask->wait_for_all();
959  if (writeWaitTask->exceptionPtr()) {
960  std::rethrow_exception(*writeWaitTask->exceptionPtr());
961  }
962  }
963 
964  processBlockPrincipal.clearPrincipal();
965  for (auto& s : subProcesses_) {
966  s.clearProcessBlockPrincipal(ProcessBlockType::New);
967  }
968  }
969 
972  bool& globalBeginSucceeded,
973  bool& eventSetupForInstanceSucceeded) {
974  globalBeginSucceeded = false;
975  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
976  {
977  SendSourceTerminationSignalIfException sentry(actReg_.get());
978 
979  input_->doBeginRun(runPrincipal, &processContext_);
980  sentry.completedSuccessfully();
981  }
982 
983  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
985  espController_->forceCacheClear();
986  }
987  {
988  SendSourceTerminationSignalIfException sentry(actReg_.get());
989  espController_->eventSetupForInstance(ts);
990  eventSetupForInstanceSucceeded = true;
991  sentry.completedSuccessfully();
992  }
993  auto const& es = esp_->eventSetupImpl();
994  if (looper_ && looperBeginJobRun_ == false) {
995  looper_->copyInfo(ScheduleInfo(schedule_.get()));
996  looper_->beginOfJob(es);
997  looperBeginJobRun_ = true;
998  looper_->doStartingNewLoop();
999  }
1000  {
1002  auto globalWaitTask = make_empty_waiting_task();
1003  globalWaitTask->increment_ref_count();
1004  RunTransitionInfo transitionInfo(runPrincipal, es);
1005  beginGlobalTransitionAsync<Traits>(
1006  WaitingTaskHolder(globalWaitTask.get()), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1007  globalWaitTask->wait_for_all();
1008  if (globalWaitTask->exceptionPtr() != nullptr) {
1009  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1010  }
1011  }
1012  globalBeginSucceeded = true;
1013  FDEBUG(1) << "\tbeginRun " << run << "\n";
1014  if (looper_) {
1015  looper_->doBeginRun(runPrincipal, es, &processContext_);
1016  }
1017  {
1018  //To wait, the ref count has to be 1+#streams
1019  auto streamLoopWaitTask = make_empty_waiting_task();
1020  streamLoopWaitTask->increment_ref_count();
1021 
1023 
1024  RunTransitionInfo transitionInfo(runPrincipal, es);
1025  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1026  *schedule_,
1028  transitionInfo,
1029  serviceToken_,
1030  subProcesses_);
1031 
1032  streamLoopWaitTask->wait_for_all();
1033  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1034  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1035  }
1036  }
1037  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1038  if (looper_) {
1039  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1040  }
1041  }
1042 
1044  RunNumber_t run,
1045  bool globalBeginSucceeded,
1046  bool cleaningUpAfterException,
1047  bool eventSetupForInstanceSucceeded) {
1048  if (eventSetupForInstanceSucceeded) {
1049  //If we skip empty runs, this would be called conditionally
1050  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1051 
1052  if (globalBeginSucceeded) {
1054  t->increment_ref_count();
1055  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1056  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1057  mergeableRunProductMetadata->preWriteRun();
1058  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
1059  t->wait_for_all();
1060  mergeableRunProductMetadata->postWriteRun();
1061  if (t->exceptionPtr()) {
1062  std::rethrow_exception(*t->exceptionPtr());
1063  }
1064  }
1065  }
1066  deleteRunFromCache(phid, run);
1067  }
1068 
1070  RunNumber_t run,
1071  bool globalBeginSucceeded,
1072  bool cleaningUpAfterException) {
1073  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1074  runPrincipal.setEndTime(input_->timestamp());
1075 
1076  IOVSyncValue ts(
1078  runPrincipal.endTime());
1079  {
1080  SendSourceTerminationSignalIfException sentry(actReg_.get());
1081  espController_->eventSetupForInstance(ts);
1082  sentry.completedSuccessfully();
1083  }
1084  auto const& es = esp_->eventSetupImpl();
1085  if (globalBeginSucceeded) {
1086  //To wait, the ref count has to be 1+#streams
1087  auto streamLoopWaitTask = make_empty_waiting_task();
1088  streamLoopWaitTask->increment_ref_count();
1089 
1091 
1092  RunTransitionInfo transitionInfo(runPrincipal, es);
1093  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1094  *schedule_,
1096  transitionInfo,
1097  serviceToken_,
1098  subProcesses_,
1099  cleaningUpAfterException);
1100 
1101  streamLoopWaitTask->wait_for_all();
1102  if (streamLoopWaitTask->exceptionPtr() != nullptr) {
1103  std::rethrow_exception(*(streamLoopWaitTask->exceptionPtr()));
1104  }
1105  }
1106  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1107  if (looper_) {
1108  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1109  }
1110  {
1111  auto globalWaitTask = make_empty_waiting_task();
1112  globalWaitTask->increment_ref_count();
1113 
1114  RunTransitionInfo transitionInfo(runPrincipal, es);
1116  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1117  *schedule_,
1118  transitionInfo,
1119  serviceToken_,
1120  subProcesses_,
1121  cleaningUpAfterException);
1122  globalWaitTask->wait_for_all();
1123  if (globalWaitTask->exceptionPtr() != nullptr) {
1124  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1125  }
1126  }
1127  FDEBUG(1) << "\tendRun " << run << "\n";
1128  if (looper_) {
1129  looper_->doEndRun(runPrincipal, es, &processContext_);
1130  }
1131  }
1132 
1133  InputSource::ItemType EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1134  auto waitTask = make_empty_waiting_task();
1135  waitTask->increment_ref_count();
1136 
1137  if (streamLumiActive_ > 0) {
1139  // Continue after opening a new input file
1140  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1141  } else {
1142  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1143  input_->luminosityBlockAuxiliary()->beginTime()),
1144  iRunResource,
1145  WaitingTaskHolder{waitTask.get()});
1146  }
1147  waitTask->wait_for_all();
1148 
1149  if (waitTask->exceptionPtr() != nullptr) {
1150  std::rethrow_exception(*(waitTask->exceptionPtr()));
1151  }
1152  return lastTransitionType();
1153  }
1154 
1156  std::shared_ptr<void> const& iRunResource,
1157  edm::WaitingTaskHolder iHolder) {
1158  if (iHolder.taskHasFailed()) {
1159  return;
1160  }
1161 
1162  // We must be careful with the status object here and in code this function calls. IF we want
1163  // endRun to be called, then we must call resetResources before the things waiting on
1164  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1165  // endRun to be called much later than it should be, because status is holding iRunResource).
1166  // Note that this must be done explicitly. Relying on the destructor does not work well
1167  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1168  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1169  // destroyed inside this function and lumiWork.
1170  auto status =
1171  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1172 
1173  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1174  if (iHolder.taskHasFailed()) {
1175  status->resetResources();
1176  return;
1177  }
1178 
1179  status->setResumer(std::move(iResumer));
1180 
1181  sourceResourcesAcquirer_.serialQueueChain().push([this, iHolder, status = std::move(status)]() mutable {
1182  //make the services available
1184  // Caught exception is propagated via WaitingTaskHolder
1185  CMS_SA_ALLOW try {
1187 
1188  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1189  {
1190  SendSourceTerminationSignalIfException sentry(actReg_.get());
1191 
1192  input_->doBeginLumi(lumiPrincipal, &processContext_);
1193  sentry.completedSuccessfully();
1194  }
1195 
1197  if (rng.isAvailable()) {
1198  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1199  rng->preBeginLumi(lb);
1200  }
1201 
1202  //Task to start the stream beginLumis
1203  auto beginStreamsTask = make_waiting_task(
1204  tbb::task::allocate_root(), [this, holder = iHolder, status](std::exception_ptr const* iPtr) mutable {
1205  if (iPtr) {
1206  status->resetResources();
1207  holder.doneWaiting(*iPtr);
1208  } else {
1209  status->globalBeginDidSucceed();
1210  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1211 
1212  if (looper_) {
1213  // Caught exception is propagated via WaitingTaskHolder
1214  CMS_SA_ALLOW try {
1215  //make the services available
1216  ServiceRegistry::Operate operateLooper(serviceToken_);
1217  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1218  } catch (...) {
1219  status->resetResources();
1220  holder.doneWaiting(std::current_exception());
1221  return;
1222  }
1223  }
1225 
1226  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1227  streamQueues_[i].push([this, i, status, holder, &es]() mutable {
1228  streamQueues_[i].pause();
1229 
1230  auto eventTask =
1231  edm::make_waiting_task(tbb::task::allocate_root(),
1232  [this, i, h = std::move(holder)](
1233  std::exception_ptr const* exceptionFromBeginStreamLumi) mutable {
1234  if (exceptionFromBeginStreamLumi) {
1236  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1238  } else {
1240  }
1241  });
1242  auto& event = principalCache_.eventPrincipal(i);
1243  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1244  // held by the container as this lambda may not finish executing before all the tasks it
1245  // spawns have already started to run.
1246  auto eventSetupImpls = &status->eventSetupImpls();
1247  auto lp = status->lumiPrincipal().get();
1250  event.setLuminosityBlockPrincipal(lp);
1251  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1252  beginStreamTransitionAsync<Traits>(
1253  WaitingTaskHolder{eventTask}, *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1254  });
1255  }
1256  }
1257  }); // beginStreamTask
1258 
1259  //task to start the global begin lumi
1260  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1261 
1262  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1263  {
1264  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1266  beginGlobalTransitionAsync<Traits>(
1267  beginStreamsHolder, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1268  }
1269  } catch (...) {
1270  status->resetResources();
1271  iHolder.doneWaiting(std::current_exception());
1272  }
1273  }); // task in sourceResourcesAcquirer
1274  }; // end lumiWork
1275 
1276  auto queueLumiWorkTask = make_waiting_task(
1277  tbb::task::allocate_root(),
1278  [this, lumiWorkLambda = std::move(lumiWork), iHolder](std::exception_ptr const* iPtr) mutable {
1279  if (iPtr) {
1280  iHolder.doneWaiting(*iPtr);
1281  }
1282  lumiQueue_->pushAndPause(std::move(lumiWorkLambda));
1283  });
1284 
1285  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1286  // We only get here inside this block if there is an EventSetup
1287  // module not able to handle concurrent IOVs (usually an ESSource)
1288  // and the new sync value is outside the current IOV of that module.
1289 
1290  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1291 
1292  queueWhichWaitsForIOVsToFinish_.push([this, queueLumiWorkTaskHolder, iSync, status]() mutable {
1293  // Caught exception is propagated via WaitingTaskHolder
1294  CMS_SA_ALLOW try {
1295  SendSourceTerminationSignalIfException sentry(actReg_.get());
1296  // Pass in iSync to let the EventSetup system know which run and lumi
1297  // need to be processed and prepare IOVs for it.
1298  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1299  // lumi is done and no longer needs its EventSetup IOVs.
1300  espController_->eventSetupForInstance(
1301  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1302  sentry.completedSuccessfully();
1303  } catch (...) {
1304  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1305  }
1307  });
1308 
1309  } else {
1311 
1312  // This holder will be used to wait until the EventSetup IOVs are ready
1313  WaitingTaskHolder queueLumiWorkTaskHolder{queueLumiWorkTask};
1314  // Caught exception is propagated via WaitingTaskHolder
1315  CMS_SA_ALLOW try {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317 
1318  // Pass in iSync to let the EventSetup system know which run and lumi
1319  // need to be processed and prepare IOVs for it.
1320  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1321  // lumi is done and no longer needs its EventSetup IOVs.
1322  espController_->eventSetupForInstance(
1323  iSync, queueLumiWorkTaskHolder, status->endIOVWaitingTasks(), status->eventSetupImpls());
1324  sentry.completedSuccessfully();
1325 
1326  } catch (...) {
1327  queueLumiWorkTaskHolder.doneWaiting(std::current_exception());
1328  }
1329  }
1330  }
1331 
1333  {
1334  //all streams are sharing the same status at the moment
1335  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1336  status->needToContinueLumi();
1337  status->startProcessingEvents();
1338  }
1339 
1340  unsigned int streamIndex = 0;
1341  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1342  tbb::task::enqueue(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = iHolder]() {
1343  handleNextEventForStreamAsync(h, streamIndex);
1344  }));
1345  }
1346  tbb::task::spawn(*edm::make_functor_task(tbb::task::allocate_root(), [this, streamIndex, h = std::move(iHolder)]() {
1347  handleNextEventForStreamAsync(h, streamIndex);
1348  }));
1349  }
1350 
1351  void EventProcessor::handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder) {
1352  if (setDeferredException(*iPtr)) {
1353  WaitingTaskHolder tmp(holder);
1354  tmp.doneWaiting(*iPtr);
1355  } else {
1357  }
1358  }
1359 
1361  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1362  // Get some needed info out of the status object before moving
1363  // it into finalTaskForThisLumi.
1364  auto& lp = *(iLumiStatus->lumiPrincipal());
1365  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1366  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1367  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1368  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1369 
1370  auto finalTaskForThisLumi = edm::make_waiting_task(
1371  tbb::task::allocate_root(),
1372  [status = std::move(iLumiStatus), iTask = std::move(iTask), this](std::exception_ptr const* iPtr) mutable {
1373  std::exception_ptr ptr;
1374  if (iPtr) {
1375  handleEndLumiExceptions(iPtr, iTask);
1376  } else {
1377  // Caught exception is passed to handleEndLumiExceptions()
1378  CMS_SA_ALLOW try {
1380  if (looper_) {
1381  auto& lumiPrincipal = *(status->lumiPrincipal());
1382  EventSetupImpl const& eventSetupImpl = status->eventSetupImpl(esp_->subProcessIndex());
1383  looper_->doEndLuminosityBlock(lumiPrincipal, eventSetupImpl, &processContext_);
1384  }
1385  } catch (...) {
1386  ptr = std::current_exception();
1387  }
1388  }
1390 
1391  // Try hard to clean up resources so the
1392  // process can terminate in a controlled
1393  // fashion even after exceptions have occurred.
1394  // Caught exception is passed to handleEndLumiExceptions()
1395  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1396  if (not ptr) {
1397  ptr = std::current_exception();
1398  }
1399  }
1400  // Caught exception is passed to handleEndLumiExceptions()
1401  CMS_SA_ALLOW try {
1402  status->resumeGlobalLumiQueue();
1404  } catch (...) {
1405  if (not ptr) {
1406  ptr = std::current_exception();
1407  }
1408  }
1409  // Caught exception is passed to handleEndLumiExceptions()
1410  CMS_SA_ALLOW try {
1411  // This call to status.resetResources() must occur before iTask is destroyed.
1412  // Otherwise there will be a data race which could result in endRun
1413  // being delayed until it is too late to successfully call it.
1414  status->resetResources();
1415  status.reset();
1416  } catch (...) {
1417  if (not ptr) {
1418  ptr = std::current_exception();
1419  }
1420  }
1421 
1422  if (ptr) {
1423  handleEndLumiExceptions(&ptr, iTask);
1424  }
1425  });
1426 
1427  auto writeT = edm::make_waiting_task(
1428  tbb::task::allocate_root(),
1429  [this, didGlobalBeginSucceed, &lumiPrincipal = lp, task = WaitingTaskHolder(finalTaskForThisLumi)](
1430  std::exception_ptr const* iExcept) mutable {
1431  if (iExcept) {
1432  task.doneWaiting(*iExcept);
1433  } else {
1434  //Only call writeLumi if beginLumi succeeded
1435  if (didGlobalBeginSucceed) {
1436  writeLumiAsync(std::move(task), lumiPrincipal);
1437  }
1438  }
1439  });
1440 
1441  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1442 
1443  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1445  endGlobalTransitionAsync<Traits>(
1446  WaitingTaskHolder(writeT), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1447  }
1448 
1449  void EventProcessor::streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1450  auto t = edm::make_waiting_task(tbb::task::allocate_root(),
1451  [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1452  if (iPtr) {
1453  handleEndLumiExceptions(iPtr, iTask);
1454  }
1455  auto status = streamLumiStatus_[iStreamIndex];
1456  //reset status before releasing queue else get race condtion
1457  streamLumiStatus_[iStreamIndex].reset();
1459  streamQueues_[iStreamIndex].resume();
1460 
1461  //are we the last one?
1462  if (status->streamFinishedLumi()) {
1464  }
1465  });
1466 
1467  edm::WaitingTaskHolder lumiDoneTask{t};
1468 
1469  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1470  // therefore we do not want to hold the shared_ptr
1471  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1472  lumiStatus->setEndTime();
1473 
1474  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1475 
1476  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1477  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1478 
1479  if (lumiStatus->didGlobalBeginSucceed()) {
1480  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1481  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1482  lumiPrincipal.endTime());
1484  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1485  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1486  *schedule_,
1487  iStreamIndex,
1488  transitionInfo,
1489  serviceToken_,
1490  subProcesses_,
1491  cleaningUpAfterException);
1492  }
1493  }
1494 
1496  if (streamLumiActive_.load() > 0) {
1497  auto globalWaitTask = make_empty_waiting_task();
1498  globalWaitTask->increment_ref_count();
1499  {
1500  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1501  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1502  if (streamLumiStatus_[i]) {
1503  streamEndLumiAsync(globalTaskHolder, i);
1504  }
1505  }
1506  }
1507  globalWaitTask->wait_for_all();
1508  if (globalWaitTask->exceptionPtr() != nullptr) {
1509  std::rethrow_exception(*(globalWaitTask->exceptionPtr()));
1510  }
1511  }
1512  }
1513 
1514  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readRun() {
1516  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1517  << "Illegal attempt to insert run into cache\n"
1518  << "Contact a Framework Developer\n";
1519  }
1520  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1521  preg(),
1523  historyAppender_.get(),
1524  0,
1525  true,
1527  {
1528  SendSourceTerminationSignalIfException sentry(actReg_.get());
1529  input_->readRun(*rp, *historyAppender_);
1530  sentry.completedSuccessfully();
1531  }
1532  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1533  principalCache_.insert(rp);
1534  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1535  }
1536 
1537  std::pair<ProcessHistoryID, RunNumber_t> EventProcessor::readAndMergeRun() {
1538  principalCache_.merge(input_->runAuxiliary(), preg());
1539  auto runPrincipal = principalCache_.runPrincipalPtr();
1540  {
1541  SendSourceTerminationSignalIfException sentry(actReg_.get());
1542  input_->readAndMergeRun(*runPrincipal);
1543  sentry.completedSuccessfully();
1544  }
1545  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1546  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1547  }
1548 
1551  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1552  << "Illegal attempt to insert lumi into cache\n"
1553  << "Run is invalid\n"
1554  << "Contact a Framework Developer\n";
1555  }
1557  assert(lbp);
1558  lbp->setAux(*input_->luminosityBlockAuxiliary());
1559  {
1560  SendSourceTerminationSignalIfException sentry(actReg_.get());
1561  input_->readLuminosityBlock(*lbp, *historyAppender_);
1562  sentry.completedSuccessfully();
1563  }
1564  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1565  iStatus.lumiPrincipal() = std::move(lbp);
1566  }
1567 
1569  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1570  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1571  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1572  input_->processHistoryRegistry().reducedProcessHistoryID(
1573  input_->luminosityBlockAuxiliary()->processHistoryID()));
1574  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1575  assert(lumiOK);
1576  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1577  {
1578  SendSourceTerminationSignalIfException sentry(actReg_.get());
1579  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1580  sentry.completedSuccessfully();
1581  }
1582  return input_->luminosityBlock();
1583  }
1584 
1586  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1587  [this, task, processBlockType](std::exception_ptr const* iExcept) mutable {
1588  if (iExcept) {
1589  task.doneWaiting(*iExcept);
1590  } else {
1592  for (auto& s : subProcesses_) {
1593  s.writeProcessBlockAsync(task, processBlockType);
1594  }
1595  }
1596  });
1598  schedule_->writeProcessBlockAsync(WaitingTaskHolder(subsT),
1599  principalCache_.processBlockPrincipal(processBlockType),
1600  &processContext_,
1601  actReg_.get());
1602  }
1603 
1605  ProcessHistoryID const& phid,
1606  RunNumber_t run,
1607  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1608  auto subsT = edm::make_waiting_task(
1609  tbb::task::allocate_root(),
1610  [this, phid, run, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
1611  if (iExcept) {
1612  task.doneWaiting(*iExcept);
1613  } else {
1615  for (auto& s : subProcesses_) {
1616  s.writeRunAsync(task, phid, run, mergeableRunProductMetadata);
1617  }
1618  }
1619  });
1621  schedule_->writeRunAsync(WaitingTaskHolder(subsT),
1623  &processContext_,
1624  actReg_.get(),
1625  mergeableRunProductMetadata);
1626  }
1627 
1629  principalCache_.deleteRun(phid, run);
1630  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1631  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1632  }
1633 
1635  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(),
1636  [this, task, &lumiPrincipal](std::exception_ptr const* iExcept) mutable {
1637  if (iExcept) {
1638  task.doneWaiting(*iExcept);
1639  } else {
1641  for (auto& s : subProcesses_) {
1642  s.writeLumiAsync(task, lumiPrincipal);
1643  }
1644  }
1645  });
1647 
1648  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1649 
1650  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, lumiPrincipal, &processContext_, actReg_.get());
1651  }
1652 
1654  for (auto& s : subProcesses_) {
1655  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1656  }
1657  iStatus.lumiPrincipal()->clearPrincipal();
1658  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1659  }
1660 
1662  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1663  iStatus.endLumi();
1664  return false;
1665  }
1666 
1667  if (iStatus.wasEventProcessingStopped()) {
1668  return false;
1669  }
1670 
1671  if (shouldWeStop()) {
1673  iStatus.stopProcessingEvents();
1674  iStatus.endLumi();
1675  return false;
1676  }
1677 
1679  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1680  CMS_SA_ALLOW try {
1681  //need to use lock in addition to the serial task queue because
1682  // of delayed provenance reading and reading data in response to
1683  // edm::Refs etc
1684  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1685 
1686  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1687  if (InputSource::IsLumi == itemType) {
1688  iStatus.haveContinuedLumi();
1689  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1690  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1691  readAndMergeLumi(iStatus);
1692  itemType = nextTransitionType();
1693  }
1694  if (InputSource::IsLumi == itemType) {
1695  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1696  input_->luminosityBlockAuxiliary()->beginTime()));
1697  }
1698  }
1699  if (InputSource::IsEvent != itemType) {
1700  iStatus.stopProcessingEvents();
1701 
1702  //IsFile may continue processing the lumi and
1703  // looper_ can cause the input source to declare a new IsRun which is actually
1704  // just a continuation of the previous run
1705  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1706  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1707  iStatus.endLumi();
1708  }
1709  return false;
1710  }
1711  readEvent(iStreamIndex);
1712  } catch (...) {
1713  bool expected = false;
1714  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1715  deferredExceptionPtr_ = std::current_exception();
1716  iStatus.endLumi();
1717  }
1718  return false;
1719  }
1720  return true;
1721  }
1722 
1723  void EventProcessor::handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex) {
1724  sourceResourcesAcquirer_.serialQueueChain().push([this, iTask, iStreamIndex]() mutable {
1726  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1727  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1728  auto status = streamLumiStatus_[iStreamIndex].get();
1729  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1730  CMS_SA_ALLOW try {
1731  if (readNextEventForStream(iStreamIndex, *status)) {
1732  auto recursionTask = make_waiting_task(
1733  tbb::task::allocate_root(), [this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1734  if (iPtr) {
1735  // Try to end the stream properly even if an exception was
1736  // thrown on an event.
1737  bool expected = false;
1738  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1739  // This is the case where the exception in iPtr is the primary
1740  // exception and we want to see its message.
1741  deferredExceptionPtr_ = *iPtr;
1742  WaitingTaskHolder tempHolder(iTask);
1743  tempHolder.doneWaiting(*iPtr);
1744  }
1745  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1746  //the stream will stop now
1747  return;
1748  }
1749  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1750  });
1751 
1752  processEventAsync(WaitingTaskHolder(recursionTask), iStreamIndex);
1753  } else {
1754  //the stream will stop now
1755  if (status->isLumiEnding()) {
1756  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1757  status->startNextLumi();
1758  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1759  }
1760  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1761  } else {
1762  iTask.doneWaiting(std::exception_ptr{});
1763  }
1764  }
1765  } catch (...) {
1766  // It is unlikely we will ever get in here ...
1767  // But if we do try to clean up and propagate the exception
1768  if (streamLumiStatus_[iStreamIndex]) {
1769  streamEndLumiAsync(iTask, iStreamIndex);
1770  }
1771  bool expected = false;
1772  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1773  auto e = std::current_exception();
1775  iTask.doneWaiting(e);
1776  }
1777  }
1778  });
1779  }
1780 
1781  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1782  //TODO this will have to become per stream
1783  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1784  StreamContext streamContext(event.streamID(), &processContext_);
1785 
1786  SendSourceTerminationSignalIfException sentry(actReg_.get());
1787  input_->readEvent(event, streamContext);
1788 
1789  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1790  sentry.completedSuccessfully();
1791 
1792  FDEBUG(1) << "\treadEvent\n";
1793  }
1794 
1795  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1796  tbb::task::spawn(
1797  *make_functor_task(tbb::task::allocate_root(), [=]() { processEventAsyncImpl(iHolder, iStreamIndex); }));
1798  }
1799 
1800  void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
1801  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1802 
1805  if (rng.isAvailable()) {
1806  Event ev(*pep, ModuleDescription(), nullptr);
1807  rng->postEventRead(ev);
1808  }
1809 
1810  WaitingTaskHolder finalizeEventTask(make_waiting_task(
1811  tbb::task::allocate_root(), [this, pep, iHolder, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1812  //NOTE: If we have a looper we only have one Stream
1813  if (looper_) {
1814  ServiceRegistry::Operate operateLooper(serviceToken_);
1815  processEventWithLooper(*pep, iStreamIndex);
1816  }
1817 
1818  FDEBUG(1) << "\tprocessEvent\n";
1819  pep->clearEventPrincipal();
1820  if (iPtr) {
1821  iHolder.doneWaiting(*iPtr);
1822  } else {
1823  iHolder.doneWaiting(std::exception_ptr());
1824  }
1825  }));
1826  WaitingTaskHolder afterProcessTask;
1827  if (subProcesses_.empty()) {
1828  afterProcessTask = std::move(finalizeEventTask);
1829  } else {
1830  //Need to run SubProcesses after schedule has finished
1831  // with the event
1832  afterProcessTask = WaitingTaskHolder(make_waiting_task(
1833  tbb::task::allocate_root(),
1834  [this, pep, finalizeEventTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1835  if (not iPtr) {
1836  //when run with 1 thread, we want to the order to be what
1837  // it was before. This requires reversing the order since
1838  // tasks are run last one in first one out
1839  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1840  subProcess.doEventAsync(finalizeEventTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1841  }
1842  } else {
1843  finalizeEventTask.doneWaiting(*iPtr);
1844  }
1845  }));
1846  }
1847 
1848  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1849  EventTransitionInfo info(*pep, es);
1850  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex, info, serviceToken_);
1851  }
1852 
1853  void EventProcessor::processEventWithLooper(EventPrincipal& iPrincipal, unsigned int iStreamIndex) {
1854  bool randomAccess = input_->randomAccess();
1855  ProcessingController::ForwardState forwardState = input_->forwardState();
1856  ProcessingController::ReverseState reverseState = input_->reverseState();
1857  ProcessingController pc(forwardState, reverseState, randomAccess);
1858 
1860  do {
1861  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1862  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1863  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1864 
1865  bool succeeded = true;
1866  if (randomAccess) {
1868  input_->skipEvents(-2);
1870  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1871  }
1872  }
1874  } while (!pc.lastOperationSucceeded());
1876  shouldWeStop_ = true;
1878  }
1879  }
1880 
1882  FDEBUG(1) << "\tshouldWeStop\n";
1883  if (shouldWeStop_)
1884  return true;
1885  if (!subProcesses_.empty()) {
1886  for (auto const& subProcess : subProcesses_) {
1887  if (subProcess.terminate()) {
1888  return true;
1889  }
1890  }
1891  return false;
1892  }
1893  return schedule_->terminate();
1894  }
1895 
1897 
1899 
1901 
1902  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1903  bool expected = false;
1904  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1905  deferredExceptionPtr_ = iException;
1906  return true;
1907  }
1908  return false;
1909  }
1910 
1912  std::unique_ptr<LogSystem> s;
1913  for (auto worker : schedule_->allWorkers()) {
1914  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1915  if (not s) {
1916  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1917  (*s) << "The following modules require synchronizing on LuminosityBlock boundaries:";
1918  }
1919  (*s) << "\n " << worker->description().moduleName() << " " << worker->description().moduleLabel();
1920  }
1921  }
1922  }
1923 } // namespace edm
edm::ParameterSet::registerIt
ParameterSet const & registerIt()
Definition: ParameterSet.cc:113
ConfigurationDescriptions.h
edm::EventProcessor::looperBeginJobRun_
bool looperBeginJobRun_
Definition: EventProcessor.h:353
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::SharedResourcesAcquirer::serialQueueChain
SerialTaskQueueChain & serialQueueChain() const
Definition: SharedResourcesAcquirer.h:54
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
ParameterSetDescriptionFillerBase.h
IllegalParameters.h
edm::EventProcessor::deleteRunFromCache
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
Definition: EventProcessor.cc:1628
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
edm::EDLooperBase::Status
Status
Definition: EDLooperBase.h:79
edm::EventProcessor::historyAppender_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
Definition: EventProcessor.h:334
edm::ProcessingController::lastOperationSucceeded
bool lastOperationSucceeded() const
Definition: ProcessingController.cc:86
bk::beginJob
void beginJob()
Definition: Breakpoints.cc:14
edm::EventProcessor::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: EventProcessor.h:295
edm::RunPrincipal::endTime
Timestamp const & endTime() const
Definition: RunPrincipal.h:69
processOptions_cff.fileMode
fileMode
Definition: processOptions_cff.py:5
edm::LuminosityBlockPrincipal::runPrincipal
RunPrincipal const & runPrincipal() const
Definition: LuminosityBlockPrincipal.h:45
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:356
edm::EventProcessor::rewindInput
void rewindInput()
Definition: EventProcessor.cc:843
edm::PrincipalCache::ProcessBlockType::New
edm::EventProcessor::respondToCloseInputFile
void respondToCloseInputFile()
Definition: EventProcessor.cc:810
edm::TerminationOrigin::ExceptionFromThisContext
mps_fire.i
i
Definition: mps_fire.py:428
edm::PreallocationConfiguration::numberOfRuns
unsigned int numberOfRuns() const
Definition: PreallocationConfiguration.h:37
edm::EventProcessor::totalEventsPassed
int totalEventsPassed() const
Definition: EventProcessor.cc:631
edm::SubProcessParentageHelper
Definition: SubProcessParentageHelper.h:21
edm::EventProcessor::preg_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: EventProcessor.h:313
edm::MergeableRunProductMetadata::postWriteRun
void postWriteRun()
Definition: MergeableRunProductMetadata.cc:155
input
static const std::string input
Definition: EdmProvDump.cc:48
ServiceRegistry.h
edm::EventProcessor::readRun
std::pair< ProcessHistoryID, RunNumber_t > readRun()
Definition: EventProcessor.cc:1514
edm::popSubProcessVParameterSet
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:713
edm::EventProcessor::shouldWeCloseOutput
bool shouldWeCloseOutput() const
Definition: EventProcessor.cc:854
edm::EventProcessor::writeRunAsync
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
Definition: EventProcessor.cc:1604
MessageLogger.h
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:397
edm::EventProcessor::StatusCode
StatusCode
Definition: EventProcessor.h:74
edm::ProcessBlockTransitionInfo
Definition: TransitionInfoTypes.h:80
funct::false
false
Definition: Factorize.h:29
edm::EventProcessor::endOfLoop
bool endOfLoop()
Definition: EventProcessor.cc:828
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::EventProcessor::input_
edm::propagate_const< std::unique_ptr< InputSource > > input_
Definition: EventProcessor.h:317
edm::EventProcessor::getToken
ServiceToken getToken()
Definition: EventProcessor.cc:623
edm::EventProcessor::startingNewLoop
void startingNewLoop()
Definition: EventProcessor.cc:818
EventSetupProvider.h
ScheduleInfo.h
edm::ParameterSet::getUntrackedParameterSet
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: ParameterSet.cc:2136
edm::LumiTransitionInfo
Definition: TransitionInfoTypes.h:42
edm::EventProcessor::totalEvents
int totalEvents() const
Definition: EventProcessor.cc:629
propagate_const.h
edm::errors::LogicError
Definition: EDMException.h:37
edm::EventSetupImpl
Definition: EventSetupImpl.h:48
edm::validateTopLevelParameterSets
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
Definition: validateTopLevelParameterSets.cc:89
edm::LuminosityBlock
Definition: LuminosityBlock.h:50
BranchIDListHelper.h
mps_update.status
status
Definition: mps_update.py:69
CalibrationSummaryClient_cfi.params
params
Definition: CalibrationSummaryClient_cfi.py:14
WaitingTaskHolder.h
edm::PreallocationConfiguration::numberOfThreads
unsigned int numberOfThreads() const
Definition: PreallocationConfiguration.h:34
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:71
LuminosityBlock.h
edm::EventProcessor::deleteLumiFromCache
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1653
edm
HLT enums.
Definition: AlignableModifier.h:19
RandomNumberGenerator.h
edm::EventProcessor::endJob
void endJob()
Definition: EventProcessor.cc:592
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
edm::PrincipalCache::setProcessHistoryRegistry
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
Definition: PrincipalCache.h:87
edm::TerminationOrigin::ExternalSignal
MessageForSource.h
edm::EventProcessor::globalEndLumiAsync
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
Definition: EventProcessor.cc:1360
edm::ParentageRegistry::instance
static ParentageRegistry * instance()
Definition: ParentageRegistry.cc:4
edm::EventProcessor::runToCompletion
StatusCode runToCompletion()
Definition: EventProcessor.cc:688
edm::EventProcessor::readAndMergeRun
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
Definition: EventProcessor.cc:1537
edm::EventProcessor::esp_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: EventProcessor.h:320
edm::EventProcessor::shouldWeStop_
bool shouldWeStop_
Definition: EventProcessor.h:347
edm::EventProcessor::readEvent
void readEvent(unsigned int iStreamIndex)
Definition: EventProcessor.cc:1781
edm::EventProcessor::processLumis
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
Definition: EventProcessor.cc:1133
edm::InputSourceFactory::get
static InputSourceFactory const * get()
Definition: InputSourceFactory.cc:18
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
Algorithms.h
edm::EventProcessor::beginRun
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:970
edm::ModuleDescription::getUniqueID
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription's constructor's modI...
Definition: ModuleDescription.cc:87
edm::EventProcessor::enableEndPaths
void enableEndPaths(bool active)
Definition: EventProcessor.cc:635
edm::EventProcessor::endUnfinishedRun
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
Definition: EventProcessor.cc:1043
edm::ScheduleItems
Definition: ScheduleItems.h:28
edm::InputSourceDescription
Definition: InputSourceDescription.h:20
cms::cuda::assert
assert(be >=bs)
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:274
edm::ParentageRegistry::clear
void clear()
Not thread safe.
Definition: ParentageRegistry.cc:28
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
edm::EventProcessor::deferredExceptionPtr_
std::exception_ptr deferredExceptionPtr_
Definition: EventProcessor.h:341
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
ProcessBlockPrincipal.h
edm::fillLooper
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
Definition: EventProcessor.cc:195
personalPlayback.fp
fp
Definition: personalPlayback.py:523
edm::EventProcessor::lumiQueue_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
Definition: EventProcessor.h:329
ProcessHistoryRegistry.h
ESRecordsToProxyIndices.h
edm::EventProcessor::processContext_
ProcessContext processContext_
Definition: EventProcessor.h:324
EventSetupsController.h
edm::ParameterSet::id
ParameterSetID id() const
Definition: ParameterSet.cc:189
edm::EventProcessor::lastSourceTransition_
InputSource::ItemType lastSourceTransition_
Definition: EventProcessor.h:318
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
edm::WaitingTaskHolder::doneWaiting
void doneWaiting(std::exception_ptr iExcept)
Definition: WaitingTaskHolder.h:75
edm::EventProcessor::pathsAndConsumesOfModules_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: EventProcessor.h:325
createJobs.tmp
tmp
align.sh
Definition: createJobs.py:716
edm::EventProcessor::exceptionMessageFiles_
std::string exceptionMessageFiles_
Definition: EventProcessor.h:349
edm::RunTransitionInfo
Definition: TransitionInfoTypes.h:64
edm::LuminosityBlockNumber_t
unsigned int LuminosityBlockNumber_t
Definition: RunLumiEventNumber.h:13
edm::makeInput
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Definition: EventProcessor.cc:120
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
bool asyncStopRequestedWhileProcessingEvents_
Definition: EventProcessor.h:358
edm::PrincipalCache::runPrincipalPtr
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:28
edm::ProcessBlockPrincipal
Definition: ProcessBlockPrincipal.h:22
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:471
edm::EventProcessor::totalEventsFailed
int totalEventsFailed() const
Definition: EventProcessor.cc:633
edm::PathsAndConsumesOfModules::initialize
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
Definition: PathsAndConsumesOfModules.cc:14
edm::ProcessingController
Definition: ProcessingController.h:29
edm::EventProcessor::setExceptionMessageRuns
void setExceptionMessageRuns(std::string &message)
Definition: EventProcessor.cc:1898
runTheMatrix.nStreams
nStreams
Definition: runTheMatrix.py:362
edm::ModuleDescription
Definition: ModuleDescription.h:21
mps_monitormerge.items
list items
Definition: mps_monitormerge.py:29
groupFilesInBlocks.reverse
reverse
Definition: groupFilesInBlocks.py:131
edm::eventsetup::ComponentFactory::get
static ComponentFactory< T > const * get()
EventSetupRecord.h
ParameterSetDescriptionFillerPluginFactory.h
edm::InputSource::IsRun
Definition: InputSource.h:78
edm::EventProcessor::setExceptionMessageFiles
void setExceptionMessageFiles(std::string &message)
Definition: EventProcessor.cc:1896
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:112
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
ModuleDescription.h
edm::ProcessingController::kToPreviousEvent
Definition: ProcessingController.h:58
edm::EventProcessor::processEventAsyncImpl
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1800
edm::ProcessingController::requestedTransition
Transition requestedTransition() const
Definition: ProcessingController.cc:82
edm::first
T first(std::pair< T, U > const &p)
Definition: ParameterSet.cc:217
edm::EventProcessor::endProcessBlock
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
Definition: EventProcessor.cc:934
EDMException.h
edm::PreallocationConfiguration::numberOfLuminosityBlocks
unsigned int numberOfLuminosityBlocks() const
Definition: PreallocationConfiguration.h:36
edm::ServiceToken
Definition: ServiceToken.h:40
edm::EventProcessor::inputProcessBlocks
void inputProcessBlocks()
Definition: EventProcessor.cc:895
ParentageRegistry.h
edm::PrincipalCache::runPrincipal
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
Definition: PrincipalCache.cc:21
edm::pset::Registry::clear
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::ProcessContext::setProcessConfiguration
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: ProcessContext.cc:19
edm::RunPrincipal::beginTime
Timestamp const & beginTime() const
Definition: RunPrincipal.h:67
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::EventProcessor::exceptionMessageLumis_
std::atomic< bool > exceptionMessageLumis_
Definition: EventProcessor.h:351
edm::ProcessingController::ForwardState
ForwardState
Definition: ProcessingController.h:31
edm::EventProcessor::readAndMergeLumi
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1568
edm::LuminosityBlockProcessingStatus::endLumi
void endLumi()
Definition: LuminosityBlockProcessingStatus.h:78
edm::EventPrincipal
Definition: EventPrincipal.h:46
edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
Definition: EventProcessor.h:321
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:192
edm::EventProcessor::epSignal
Definition: EventProcessor.h:78
edm::StreamContext
Definition: StreamContext.h:31
edm::EventProcessor::EventProcessor
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
Definition: EventProcessor.cc:218
edm::ExceptionCollector
Definition: ExceptionCollector.h:33
edm::eventsetup::EventSetupProvider
Definition: EventSetupProvider.h:49
edm::EventProcessor::getAllModuleDescriptions
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
Definition: EventProcessor.cc:625
edm::SerialTaskQueue::push
void push(const T &iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueue.h:187
edm::EventProcessor::endPathsEnabled
bool endPathsEnabled() const
Definition: EventProcessor.cc:637
DQM.reader
reader
Definition: DQM.py:105
edm::WaitingTaskHolder::taskHasFailed
bool taskHasFailed() const
Definition: WaitingTaskHolder.h:58
edm::SerialTaskQueueChain::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:86
edm::PrincipalCache::merge
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:54
EventPrincipal.h
Service.h
edm::EventProcessor::streamQueues_
std::vector< edm::SerialTaskQueue > streamQueues_
Definition: EventProcessor.h:328
edm::EventProcessor::openOutputFiles
void openOutputFiles()
Definition: EventProcessor.cc:784
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::EventProcessor::readLuminosityBlock
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
Definition: EventProcessor.cc:1549
SubProcessParentageHelper.h
edm::EventProcessor::taskCleanup
void taskCleanup()
Definition: EventProcessor.cc:533
edm::InputSource::IsSynchronize
Definition: InputSource.h:78
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::EventProcessor::sourceMutex_
std::shared_ptr< std::recursive_mutex > sourceMutex_
Definition: EventProcessor.h:344
edm::RunPrincipal::setEndTime
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
edm::ActivityRegistry
Definition: ActivityRegistry.h:133
edm::EventProcessor::clearCounters
void clearCounters()
Clears counters used by trigger report.
Definition: EventProcessor.cc:639
edm::EventProcessor::beginJobCalled_
bool beginJobCalled_
Definition: EventProcessor.h:346
edm::MergeableRunProductMetadata
Definition: MergeableRunProductMetadata.h:52
CommonParams.h
edm::LuminosityBlockProcessingStatus
Definition: LuminosityBlockProcessingStatus.h:41
edm::ProcessingController::kToSpecifiedEvent
Definition: ProcessingController.h:58
edm::checkForModuleDependencyCorrectness
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
Definition: PathsAndConsumesOfModules.cc:130
edm::EventProcessor::fileModeNoMerge_
bool fileModeNoMerge_
Definition: EventProcessor.h:348
ProcessDesc.h
edm::Hash< ProcessHistoryType >
edm::EventProcessor::writeProcessBlockAsync
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
Definition: EventProcessor.cc:1585
edm::EventProcessor::warnAboutModulesRequiringLuminosityBLockSynchronization
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
Definition: EventProcessor.cc:1911
h
ConvertException.h
runTheMatrix.nThreads
nThreads
Definition: runTheMatrix.py:361
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:152
ExceptionCollector.h
edm::LuminosityBlockProcessingStatus::lumiPrincipal
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
Definition: LuminosityBlockProcessingStatus.h:51
edm::CommonParams
Definition: CommonParams.h:14
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::IllegalParameters::setThrowAnException
static void setThrowAnException(bool v)
Definition: IllegalParameters.h:16
InputSourceDescription.h
edm::IOVSyncValue
Definition: IOVSyncValue.h:31
edm::EventProcessor::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: EventProcessor.h:312
edm::EventID::maxEventNumber
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::EventProcessor::exceptionMessageRuns_
std::string exceptionMessageRuns_
Definition: EventProcessor.h:350
edm::PrincipalCache::preReadFile
void preReadFile()
Definition: PrincipalCache.cc:149
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::EventProcessor::printDependencies_
bool printDependencies_
Definition: EventProcessor.h:366
edm::EventProcessor::preallocations_
PreallocationConfiguration preallocations_
Definition: EventProcessor.h:356
streamTransitionAsync.h
edm::Parentage
Definition: Parentage.h:25
edm::SerialTaskQueue::pause
bool pause()
Pauses processing of additional tasks from the queue.
Definition: SerialTaskQueue.h:99
LooperFactory.h
edm::SubProcess::doEndJob
void doEndJob()
Definition: SubProcess.cc:225
HistoryAppender.h
UnixSignalHandlers.h
summarizeEdmComparisonLogfiles.succeeded
succeeded
Definition: summarizeEdmComparisonLogfiles.py:101
edm::EventProcessor::act_table_
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: EventProcessor.h:322
ProcessingController.h
edm::serviceregistry::kConfigurationOverrides
Definition: ServiceLegacy.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
edm::EventProcessor::beginLumiAsync
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1155
edm::ParameterSet
Definition: ParameterSet.h:47
edm::EventProcessor::espController_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
Definition: EventProcessor.h:319
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput >
Definition: OccurrenceTraits.h:434
edm::EventProcessor::streamLumiStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
Definition: EventProcessor.h:330
edm::LuminosityBlockProcessingStatus::stopProcessingEvents
void stopProcessingEvents()
Definition: LuminosityBlockProcessingStatus.h:74
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:233
Event.h
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:30
EDLooperBase.h
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::InputSource::IsLumi
Definition: InputSource.h:78
edm::EventProcessor::streamEndLumiAsync
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1449
LuminosityBlockProcessingStatus.h
trigObjTnPSource_cfi.filler
filler
Definition: trigObjTnPSource_cfi.py:21
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
edm::EventProcessor::serviceToken_
ServiceToken serviceToken_
Definition: EventProcessor.h:316
thread_safety_macros.h
RunPrincipal.h
edm::serviceregistry::kOverlapIsError
Definition: ServiceLegacy.h:29
edm::PrincipalCache::deleteRun
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
Definition: PrincipalCache.cc:110
edm::EventProcessor::lastTransitionType
InputSource::ItemType lastTransitionType() const
Definition: EventProcessor.h:192
edm::RunPrincipal::run
RunNumber_t run() const
Definition: RunPrincipal.h:61
edm::LuminosityBlockProcessingStatus::haveContinuedLumi
void haveContinuedLumi()
Definition: LuminosityBlockProcessingStatus.h:81
Schedule.h
edm::InputSource::doEndJob
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
edm::Service
Definition: Service.h:30
edm::EventProcessor::thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: EventProcessor.h:315
edm::EventProcessor::readNextEventForStream
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
Definition: EventProcessor.cc:1661
edm::SharedResourcesRegistry
Definition: SharedResourcesRegistry.h:39
runEdmFileComparison.returnCode
returnCode
Definition: runEdmFileComparison.py:263
edm::EventProcessor::forceLooperToEnd_
bool forceLooperToEnd_
Definition: EventProcessor.h:352
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
edm::PrincipalCache::insertForInput
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:98
edm::InputSource::IsStop
Definition: InputSource.h:78
edm::PrincipalCache::adjustEventsToNewProductRegistry
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
Definition: PrincipalCache.cc:127
edm::EventProcessor::fb_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
Definition: EventProcessor.h:336
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:106
edm::HistoryAppender
Definition: HistoryAppender.h:13
edm::EDLooperBase::kContinue
Definition: EDLooperBase.h:79
edm::ScheduleInfo
Definition: ScheduleInfo.h:32
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
edm::EDLooperBase::endOfJob
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
get
#define get
edm::eventsetup::EventSetupsController
Definition: EventSetupsController.h:79
edm::Principal::clearPrincipal
void clearPrincipal()
Definition: Principal.cc:373
edm::EventProcessor::streamLumiActive_
std::atomic< unsigned int > streamLumiActive_
Definition: EventProcessor.h:331
edm::EventProcessor::mergeableRunProductProcesses_
MergeableRunProductProcesses mergeableRunProductProcesses_
Definition: EventProcessor.h:326
LuminosityBlockPrincipal.h
edm::EventProcessor::endUnfinishedLumi
void endUnfinishedLumi()
Definition: EventProcessor.cc:1495
WaitingTask.h
edm::make_empty_waiting_task
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
Definition: WaitingTaskList.h:96
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
edm::EventProcessor::readFile
void readFile()
Definition: EventProcessor.cc:757
SubProcess.h
edm::PrincipalCache::ProcessBlockType
ProcessBlockType
Definition: PrincipalCache.h:57
edm::PrincipalCache::inputProcessBlockPrincipal
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
Definition: PrincipalCache.h:55
edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
StatusCode asyncStopStatusCodeFromProcessingEvents_
Definition: EventProcessor.h:359
edm::LimitedTaskQueue::Resumer
Definition: LimitedTaskQueue.h:68
edm::EventProcessor::looper_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
Definition: EventProcessor.h:337
FileBlock.h
edm::ProcessBlockPrincipal::fillProcessBlockPrincipal
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
Definition: ProcessBlockPrincipal.cc:15
edm::EventProcessor::sourceResourcesAcquirer_
SharedResourcesAcquirer sourceResourcesAcquirer_
Definition: EventProcessor.h:343
edm::EventProcessor::setDeferredException
bool setDeferredException(std::exception_ptr)
Definition: EventProcessor.cc:1902
edm::LuminosityBlockPrincipal::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockPrincipal.h:61
edm::InputSource::ItemType
ItemType
Definition: InputSource.h:78
edm::EventProcessor::continueLumiAsync
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
Definition: EventProcessor.cc:1332
Registry.h
edm::EventProcessor::doErrorStuff
void doErrorStuff()
Definition: EventProcessor.cc:867
ScheduleItems.h
OccurrenceTraits.h
edm::PreallocationConfiguration
Definition: PreallocationConfiguration.h:27
edm::EventProcessor::~EventProcessor
~EventProcessor()
Definition: EventProcessor.cc:515
edm::EventProcessor::beginProcessBlock
void beginProcessBlock(bool &beginProcessBlockSucceeded)
Definition: EventProcessor.cc:876
edm::PrincipalCache::ProcessBlockType::Input
edm::ParentageRegistry::insertMapped
bool insertMapped(value_type const &v)
Definition: ParentageRegistry.cc:24
edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped
bool wasEventProcessingStopped() const
Definition: LuminosityBlockProcessingStatus.h:73
eostools.move
def move(src, dest)
Definition: eostools.py:511
writedatasetfile.run
run
Definition: writedatasetfile.py:27
SharedResourcesRegistry.h
edm::RunPrincipal::mergeableRunProductMetadata
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:79
edm::EventProcessor::closeInputFile
void closeInputFile(bool cleaningUpAfterException)
Definition: EventProcessor.cc:775
edm::EventProcessor::principalCache_
PrincipalCache principalCache_
Definition: EventProcessor.h:345
MergeableRunProductMetadata.h
InputSourceFactory.h
edm::EventProcessor::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: EventProcessor.h:291
edm::PrincipalCache::getAvailableLumiPrincipalPtr
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
Definition: PrincipalCache.cc:50
edm::EventProcessor::deferredExceptionPtrIsSet_
std::atomic< bool > deferredExceptionPtrIsSet_
Definition: EventProcessor.h:340
edm::PrincipalCache::hasRunPrincipal
bool hasRunPrincipal() const
Definition: PrincipalCache.h:66
edm::EventProcessor::epSuccess
Definition: EventProcessor.h:75
ev
bool ev
Definition: Hydjet2Hadronizer.cc:95
edm::EventProcessor::processConfiguration_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: EventProcessor.h:323
Exception
Definition: hltDiff.cc:246
edm::EventProcessor::run
StatusCode run()
Definition: EventProcessor.h:371
edm::MergeableRunProductMetadata::preWriteRun
void preWriteRun()
Definition: MergeableRunProductMetadata.cc:125
edm::parameterSet
ParameterSet const & parameterSet(Provenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::EventProcessor::handleEndLumiExceptions
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
Definition: EventProcessor.cc:1351
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::DelayedReader
Definition: DelayedReader.h:29
edm::EventProcessor::processEventWithLooper
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1853
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
RootHandlers.h
Exception.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:314
edm::PreallocationConfiguration::numberOfStreams
unsigned int numberOfStreams() const
Definition: PreallocationConfiguration.h:35
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::FileBlock::ParallelProcesses
Definition: FileBlock.h:28
edm::PrincipalCache::processBlockPrincipal
ProcessBlockPrincipal & processBlockPrincipal() const
Definition: PrincipalCache.h:54
ParameterSetID.h
globalTransitionAsync.h
DebugMacros.h
validateTopLevelParameterSets.h
edm::EventProcessor::endRun
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
Definition: EventProcessor.cc:1069
edm::EventProcessor::setExceptionMessageLumis
void setExceptionMessageLumis()
Definition: EventProcessor.cc:1900
edm::EventProcessor::looper
std::shared_ptr< EDLooperBase const > looper() const
Definition: EventProcessor.h:301
cms::Exception
Definition: Exception.h:70
TransitionInfoTypes.h
edm::EventProcessor::nextLuminosityBlockID
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
Definition: EventProcessor.cc:686
edm::InputSource::IsEvent
Definition: InputSource.h:78
edm::EventProcessor::closeOutputFiles
void closeOutputFiles()
Definition: EventProcessor.cc:792
edm::EventProcessor::subProcesses_
std::vector< SubProcess > subProcesses_
Definition: EventProcessor.h:333
edm::EventProcessor::nextTransitionType
InputSource::ItemType nextTransitionType()
Definition: EventProcessor.cc:656
edm::EventProcessor::prepareForNextLoop
void prepareForNextLoop()
Definition: EventProcessor.cc:849
StreamContext.h
edm::RunPrincipal
Definition: RunPrincipal.h:34
edm::PrincipalCache::eventPrincipal
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: PrincipalCache.h:70
edm::EventProcessor::init
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
Definition: EventProcessor.cc:328
MessageForParent.h
edm::EventProcessor::respondToOpenInputFile
void respondToOpenInputFile()
Definition: EventProcessor.cc:800
edm::PrincipalCache::insert
void insert(std::unique_ptr< ProcessBlockPrincipal >)
Definition: PrincipalCache.cc:96
EventProcessor.h
edm::EventProcessor::processEventAsync
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1795
event
Definition: event.py:1
edm::EventID
Definition: EventID.h:31
ModuleChanger.h
edm::ProcessingController::ReverseState
ReverseState
Definition: ProcessingController.h:38
edm::Event
Definition: Event.h:73
edm::ProcessingController::specifiedEventTransition
edm::EventID specifiedEventTransition() const
Definition: ProcessingController.cc:84
edm::EventProcessor::shouldWeStop
bool shouldWeStop() const
Definition: EventProcessor.cc:1881
CommonMethods.cp
def cp(fromDir, toDir, listOfFiles, overwrite=False, smallList=False)
Definition: CommonMethods.py:192
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::LuminosityBlockProcessingStatus::continuingLumi
bool continuingLumi() const
Definition: LuminosityBlockProcessingStatus.h:80
IOVSyncValue.h
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::errors::Configuration
Definition: EDMException.h:36
SystemBounds.h
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::EventProcessor::preg
std::shared_ptr< ProductRegistry const > preg() const
Definition: EventProcessor.h:289
edm::EventProcessor::schedule_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: EventProcessor.h:327
edm::LuminosityBlockProcessingStatus::setNextSyncValue
void setNextSyncValue(IOVSyncValue const &iValue)
Definition: LuminosityBlockProcessingStatus.h:101
edm::EventProcessor::writeLumiAsync
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
Definition: EventProcessor.cc:1634
edm::EventProcessor::nextRunID
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
Definition: EventProcessor.cc:682
edm::EventProcessor::handleNextEventForStreamAsync
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
Definition: EventProcessor.cc:1723
edm::EventProcessor::branchIDListHelper_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: EventProcessor.h:314
edm::ProcessingController::setLastOperationSucceeded
void setLastOperationSucceeded(bool value)
Definition: ProcessingController.cc:71
Breakpoints.h
edm::ModuleChanger
Definition: ModuleChanger.h:36
trackingPlots.common
common
Definition: trackingPlots.py:206
edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: PrincipalCache.cc:137
edm::serviceregistry::ServiceLegacy
ServiceLegacy
Definition: ServiceLegacy.h:29
edm::LuminosityBlockID::maxLuminosityBlockNumber
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Definition: LuminosityBlockID.h:84
edm::EventProcessor::checkForAsyncStopRequest
bool checkForAsyncStopRequest(StatusCode &)
Definition: EventProcessor.cc:645
common
Definition: common.py:1
edm::MergeableRunProductMetadata::writeLumi
void writeLumi(LuminosityBlockNumber_t lumi)
Definition: MergeableRunProductMetadata.cc:118
edm::EventProcessor::forceESCacheClearOnNewRun_
bool forceESCacheClearOnNewRun_
Definition: EventProcessor.h:354
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::EventProcessor::beginJob
void beginJob()
Definition: EventProcessor.cc:535
edm::PrincipalCache::setNumberOfConcurrentPrincipals
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
Definition: PrincipalCache.cc:17
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:318
edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
Definition: MergeableRunProductProcesses.cc:18