CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
37 
39 
48 
53 
55 
67 
68 #include "MessageForSource.h"
69 #include "MessageForParent.h"
71 
72 #include "boost/range/adaptor/reversed.hpp"
73 
74 #include <cassert>
75 #include <exception>
76 #include <iomanip>
77 #include <iostream>
78 #include <utility>
79 #include <sstream>
80 
81 #include <sys/ipc.h>
82 #include <sys/msg.h>
83 
84 #include "tbb/task.h"
85 
86 //Used for CPU affinity
87 #ifndef __APPLE__
88 #include <sched.h>
89 #endif
90 
91 namespace {
92  //Sentry class to only send a signal if an
93  // exception occurs. An exception is identified
94  // by the destructor being called without first
95  // calling completedSuccessfully().
96  class SendSourceTerminationSignalIfException {
97  public:
98  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
99  reg_(iReg) {}
100  ~SendSourceTerminationSignalIfException() {
101  if(reg_) {
102  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
103  }
104  }
105  void completedSuccessfully() {
106  reg_ = nullptr;
107  }
108  private:
109  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
110  };
111 
112 }
113 
114 namespace edm {
115 
116  // ---------------------------------------------------------------
117  std::unique_ptr<InputSource>
119  CommonParams const& common,
120  std::shared_ptr<ProductRegistry> preg,
121  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
122  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
123  std::shared_ptr<ActivityRegistry> areg,
124  std::shared_ptr<ProcessConfiguration const> processConfiguration,
125  PreallocationConfiguration const& allocations) {
126  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
127  if(main_input == nullptr) {
129  << "There must be exactly one source in the configuration.\n"
130  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
131  }
132 
133  std::string modtype(main_input->getParameter<std::string>("@module_type"));
134 
135  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
137  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
138  filler->fill(descriptions);
139 
140  try {
141  convertException::wrap([&]() {
142  descriptions.validate(*main_input, std::string("source"));
143  });
144  }
145  catch (cms::Exception & iException) {
146  std::ostringstream ost;
147  ost << "Validating configuration of input source of type " << modtype;
148  iException.addContext(ost.str());
149  throw;
150  }
151 
152  main_input->registerIt();
153 
154  // Fill in "ModuleDescription", in case the input source produces
155  // any EDProducts, which would be registered in the ProductRegistry.
156  // Also fill in the process history item for this process.
157  // There is no module label for the unnamed input source, so
158  // just use "source".
159  // Only the tracked parameters belong in the process configuration.
160  ModuleDescription md(main_input->id(),
161  main_input->getParameter<std::string>("@module_type"),
162  "source",
163  processConfiguration.get(),
164  ModuleDescription::getUniqueID());
165 
166  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
167  common.maxEventsInput_, common.maxLumisInput_,
168  common.maxSecondsUntilRampdown_, allocations);
169 
170  areg->preSourceConstructionSignal_(md);
171  std::unique_ptr<InputSource> input;
172  try {
173  //even if we have an exception, send the signal
174  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
175  convertException::wrap([&]() {
176  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
177  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
178  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
179  });
180  }
181  catch (cms::Exception& iException) {
182  std::ostringstream ost;
183  ost << "Constructing input source of type " << modtype;
184  iException.addContext(ost.str());
185  throw;
186  }
187  return input;
188  }
189 
190  // ---------------------------------------------------------------
191  std::shared_ptr<EDLooperBase>
194  ParameterSet& params) {
195  std::shared_ptr<EDLooperBase> vLooper;
196 
197  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
198 
199  if(loopers.empty()) {
200  return vLooper;
201  }
202 
203  assert(1 == loopers.size());
204 
205  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
206  itName != itNameEnd;
207  ++itName) {
208 
209  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
210  providerPSet->registerIt();
211  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
212  cp,
213  *providerPSet);
214  }
215  return vLooper;
216  }
217 
218  // ---------------------------------------------------------------
219  EventProcessor::EventProcessor(std::string const& config,
220  ServiceToken const& iToken,
222  std::vector<std::string> const& defaultServices,
223  std::vector<std::string> const& forcedServices) :
224  actReg_(),
225  preg_(),
226  branchIDListHelper_(),
227  serviceToken_(),
228  input_(),
229  espController_(new eventsetup::EventSetupsController),
230  esp_(),
231  act_table_(),
232  processConfiguration_(),
233  schedule_(),
234  subProcesses_(),
235  historyAppender_(new HistoryAppender),
236  fb_(),
237  looper_(),
238  deferredExceptionPtrIsSet_(false),
239  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
240  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
241  principalCache_(),
242  beginJobCalled_(false),
243  shouldWeStop_(false),
244  fileModeNoMerge_(false),
245  exceptionMessageFiles_(),
246  exceptionMessageRuns_(),
247  exceptionMessageLumis_(),
248  forceLooperToEnd_(false),
249  looperBeginJobRun_(false),
250  forceESCacheClearOnNewRun_(false),
251  eventSetupDataToExcludeFromPrefetching_() {
252  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
253  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
254  processDesc->addServices(defaultServices, forcedServices);
255  init(processDesc, iToken, iLegacy);
256  }
257 
259  std::vector<std::string> const& defaultServices,
260  std::vector<std::string> const& forcedServices) :
261  actReg_(),
262  preg_(),
264  serviceToken_(),
265  input_(),
266  espController_(new eventsetup::EventSetupsController),
267  esp_(),
268  act_table_(),
270  schedule_(),
271  subProcesses_(),
273  fb_(),
274  looper_(),
276  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
277  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
278  principalCache_(),
290  {
291  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
292  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
293  processDesc->addServices(defaultServices, forcedServices);
295  }
296 
297  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
298  ServiceToken const& token,
300  actReg_(),
301  preg_(),
303  serviceToken_(),
304  input_(),
305  espController_(new eventsetup::EventSetupsController),
306  esp_(),
307  act_table_(),
309  schedule_(),
310  subProcesses_(),
312  fb_(),
313  looper_(),
315  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
316  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
317  principalCache_(),
329  {
330  init(processDesc, token, legacy);
331  }
332 
333 
335  actReg_(),
336  preg_(),
338  serviceToken_(),
339  input_(),
340  espController_(new eventsetup::EventSetupsController),
341  esp_(),
342  act_table_(),
344  schedule_(),
345  subProcesses_(),
347  fb_(),
348  looper_(),
350  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
351  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
352  principalCache_(),
364  {
365  if(isPython) {
366  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
367  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
369  }
370  else {
371  auto processDesc = std::make_shared<ProcessDesc>(config);
373  }
374  }
375 
376  void
377  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
378  ServiceToken const& iToken,
380 
381  //std::cerr << processDesc->dump() << std::endl;
382 
383  // register the empty parentage vector , once and for all
385 
386  // register the empty parameter set, once and for all.
388 
389  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
390 
391  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
392  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
393  bool const hasSubProcesses = !subProcessVParameterSet.empty();
394 
395  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
396  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
397  // set in here if the parameters were not explicitly set.
398  validateTopLevelParameterSets(parameterSet.get());
399 
400  // Now set some parameters specific to the main process.
401  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
402  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
403  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
404  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
405  << fileMode << ".\n"
406  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
407  } else {
408  fileModeNoMerge_ = (fileMode == "NOMERGE");
409  }
410  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
411 
412  //threading
413  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
414 
415  // Even if numberOfThreads was set to zero in the Python configuration, the code
416  // in cmsRun.cpp should have reset it to something else.
417  assert(nThreads != 0);
418 
419  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
420  if (nStreams == 0) {
421  nStreams = nThreads;
422  }
423  if (nThreads > 1 or nStreams > 1) {
424  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
425  }
426  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
427  if (nConcurrentRuns != 1) {
428  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
429  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
430  }
431  unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
432  if (nConcurrentLumis == 0) {
433  nConcurrentLumis = nConcurrentRuns;
434  }
435 
436  //Check that relationships between threading parameters makes sense
437  /*
438  if(nThreads<nStreams) {
439  //bad
440  }
441  if(nConcurrentRuns>nStreams) {
442  //bad
443  }
444  if(nConcurrentRuns>nConcurrentLumis) {
445  //bad
446  }
447  */
448  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
449 
450  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
451 
452  // Now do general initialization
454 
455  //initialize the services
456  auto& serviceSets = processDesc->getServicesPSets();
457  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
458  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
459 
460  //make the services available
462 
463  if(nStreams>1) {
465  handler->willBeUsingThreads();
466  }
467 
468  // intialize miscellaneous items
469  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
470 
471  // intialize the event setup provider
472  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
473 
474  // initialize the looper, if any
475  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
476  if(looper_) {
477  looper_->setActionTable(items.act_table_.get());
478  looper_->attachTo(*items.actReg_);
479 
480  //For now loopers make us run only 1 transition at a time
481  nStreams=1;
482  nConcurrentLumis=1;
483  nConcurrentRuns=1;
484  }
485 
486  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
487 
488  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
489  streamQueues_.resize(nStreams);
490  streamLumiStatus_.resize(nStreams);
491 
492  // initialize the input source
493  input_ = makeInput(*parameterSet,
494  *common,
495  items.preg(),
496  items.branchIDListHelper(),
497  items.thinnedAssociationsHelper(),
498  items.actReg_,
499  items.processConfiguration(),
501 
502  // intialize the Schedule
503  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
504 
505  // set the data members
506  act_table_ = std::move(items.act_table_);
507  actReg_ = items.actReg_;
508  preg_ = items.preg();
510  branchIDListHelper_ = items.branchIDListHelper();
511  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
512  processConfiguration_ = items.processConfiguration();
514  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
515 
516  FDEBUG(2) << parameterSet << std::endl;
517 
519  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
520  // Reusable event principal
521  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
524  }
525 
526  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
527  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_,
528  historyAppender_.get(), index);
530  }
531 
532  // fill the subprocesses, if there are any
533  subProcesses_.reserve(subProcessVParameterSet.size());
534  for(auto& subProcessPSet : subProcessVParameterSet) {
535  subProcesses_.emplace_back(subProcessPSet,
536  *parameterSet,
537  preg(),
542  *actReg_,
543  token,
546  &processContext_);
547  }
548  }
549 
551  // Make the services available while everything is being deleted.
552  ServiceToken token = getToken();
553  ServiceRegistry::Operate op(token);
554 
555  // manually destroy all these thing that may need the services around
556  // propagate_const<T> has no reset() function
557  espController_ = nullptr;
558  esp_ = nullptr;
559  schedule_ = nullptr;
560  input_ = nullptr;
561  looper_ = nullptr;
562  actReg_ = nullptr;
563 
566  }
567 
568  void
570  if(beginJobCalled_) return;
571  beginJobCalled_=true;
572  bk::beginJob();
573 
574  // StateSentry toerror(this); // should we add this ?
575  //make the services available
577 
582  actReg_->preallocateSignal_(bounds);
583  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
585 
586  //NOTE: this may throw
588  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
589 
590  //NOTE: This implementation assumes 'Job' means one call
591  // the EventProcessor::run
592  // If it really means once per 'application' then this code will
593  // have to be changed.
594  // Also have to deal with case where have 'run' then new Module
595  // added and do 'run'
596  // again. In that case the newly added Module needs its 'beginJob'
597  // to be called.
598 
599  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
600  // For now we delay calling beginOfJob until first beginOfRun
601  //if(looper_) {
602  // looper_->beginOfJob(es);
603  //}
604  try {
605  convertException::wrap([&]() {
606  input_->doBeginJob();
607  });
608  }
609  catch(cms::Exception& ex) {
610  ex.addContext("Calling beginJob for the source");
611  throw;
612  }
613  schedule_->beginJob(*preg_);
614  // toerror.succeeded(); // should we add this?
615  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
616  actReg_->postBeginJobSignal_();
617 
618  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
619  schedule_->beginStream(i);
620  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
621  }
622  }
623 
624  void
626  // Collects exceptions, so we don't throw before all operations are performed.
627  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
628 
629  //make the services available
631 
632  //NOTE: this really should go elsewhere in the future
633  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
634  c.call([this,i](){this->schedule_->endStream(i);});
635  for(auto& subProcess : subProcesses_) {
636  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
637  }
638  }
639  auto actReg = actReg_.get();
640  c.call([actReg](){actReg->preEndJobSignal_();});
641  schedule_->endJob(c);
642  for(auto& subProcess : subProcesses_) {
643  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
644  }
645  c.call(std::bind(&InputSource::doEndJob, input_.get()));
646  if(looper_) {
647  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
648  }
649  c.call([actReg](){actReg->postEndJobSignal_();});
650  if(c.hasThrown()) {
651  c.rethrow();
652  }
653  }
654 
657  return serviceToken_;
658  }
659 
660  std::vector<ModuleDescription const*>
662  return schedule_->getAllModuleDescriptions();
663  }
664 
665  int
667  return schedule_->totalEvents();
668  }
669 
670  int
672  return schedule_->totalEventsPassed();
673  }
674 
675  int
677  return schedule_->totalEventsFailed();
678  }
679 
680  void
682  schedule_->enableEndPaths(active);
683  }
684 
685  bool
687  return schedule_->endPathsEnabled();
688  }
689 
690  void
692  schedule_->getTriggerReport(rep);
693  }
694 
695  void
697  schedule_->clearCounters();
698  }
699 
700  namespace {
701 #include "TransitionProcessors.icc"
702  }
703 
704  bool
706  bool returnValue = false;
707 
708  // Look for a shutdown signal
709  if(shutdown_flag.load(std::memory_order_acquire)) {
710  returnValue = true;
711  returnCode = epSignal;
712  }
713  return returnValue;
714  }
715 
718  if (deferredExceptionPtrIsSet_.load()) {
720  return InputSource::IsStop;
721  }
722 
723  SendSourceTerminationSignalIfException sentry(actReg_.get());
724  InputSource::ItemType itemType;
725  //For now, do nothing with InputSource::IsSynchronize
726  do {
727  itemType = input_->nextItemType();
728  } while( itemType == InputSource::IsSynchronize);
729 
730  lastSourceTransition_ = itemType;
731  sentry.completedSuccessfully();
732 
734 
735  if(checkForAsyncStopRequest(returnCode)) {
736  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
738  }
739 
740  return lastSourceTransition_;
741  }
742 
743  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
745  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
746  }
747 
750  return input_->luminosityBlock();
751  }
752 
755 
758  {
759  beginJob(); //make sure this was called
760 
761  // make the services available
763 
765  try {
766  FilesProcessor fp(fileModeNoMerge_);
767 
768  convertException::wrap([&]() {
769  bool firstTime = true;
770  do {
771  if(not firstTime) {
773  rewindInput();
774  } else {
775  firstTime = false;
776  }
777  startingNewLoop();
778 
779  auto trans = fp.processFiles(*this);
780 
781  fp.normalEnd();
782 
783  if(deferredExceptionPtrIsSet_.load()) {
784  std::rethrow_exception(deferredExceptionPtr_);
785  }
786  if(trans != InputSource::IsStop) {
787  //problem with the source
788  doErrorStuff();
789 
790  throw cms::Exception("BadTransition")
791  << "Unexpected transition change "
792  << trans;
793 
794  }
795  } while(not endOfLoop());
796  }); // convertException::wrap
797 
798  } // Try block
799  catch (cms::Exception & e) {
800  if (!exceptionMessageLumis_.empty()) {
802  if (e.alreadyPrinted()) {
803  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
804  }
805  }
806  if (!exceptionMessageRuns_.empty()) {
808  if (e.alreadyPrinted()) {
809  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
810  }
811  }
812  if (!exceptionMessageFiles_.empty()) {
814  if (e.alreadyPrinted()) {
815  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
816  }
817  }
818  throw;
819  }
820  }
821 
822  return returnCode;
823  }
824 
826  FDEBUG(1) << " \treadFile\n";
827  size_t size = preg_->size();
828  SendSourceTerminationSignalIfException sentry(actReg_.get());
829 
831 
832  fb_ = input_->readFile();
833  if(size < preg_->size()) {
835  }
839  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
840  }
841  sentry.completedSuccessfully();
842  }
843 
844  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
845  if (fb_.get() != nullptr) {
846  SendSourceTerminationSignalIfException sentry(actReg_.get());
847  input_->closeFile(fb_.get(), cleaningUpAfterException);
848  sentry.completedSuccessfully();
849  }
850  FDEBUG(1) << "\tcloseInputFile\n";
851  }
852 
854  if (fb_.get() != nullptr) {
855  schedule_->openOutputFiles(*fb_);
856  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
857  }
858  FDEBUG(1) << "\topenOutputFiles\n";
859  }
860 
862  if (fb_.get() != nullptr) {
863  schedule_->closeOutputFiles();
864  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
865  }
866  FDEBUG(1) << "\tcloseOutputFiles\n";
867  }
868 
870  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
871  if (fb_.get() != nullptr) {
872  schedule_->respondToOpenInputFile(*fb_);
873  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
874  }
875  FDEBUG(1) << "\trespondToOpenInputFile\n";
876  }
877 
879  if (fb_.get() != nullptr) {
880  schedule_->respondToCloseInputFile(*fb_);
881  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
882  }
883  FDEBUG(1) << "\trespondToCloseInputFile\n";
884  }
885 
887  shouldWeStop_ = false;
888  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
889  // until after we've called beginOfJob
890  if(looper_ && looperBeginJobRun_) {
891  looper_->doStartingNewLoop();
892  }
893  FDEBUG(1) << "\tstartingNewLoop\n";
894  }
895 
897  if(looper_) {
898  ModuleChanger changer(schedule_.get(),preg_.get());
899  looper_->setModuleChanger(&changer);
900  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
901  looper_->setModuleChanger(nullptr);
902  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
903  else return false;
904  }
905  FDEBUG(1) << "\tendOfLoop\n";
906  return true;
907  }
908 
910  input_->repeat();
911  input_->rewind();
912  FDEBUG(1) << "\trewind\n";
913  }
914 
916  looper_->prepareForNextLoop(esp_.get());
917  FDEBUG(1) << "\tprepareForNextLoop\n";
918  }
919 
921  FDEBUG(1) << "\tshouldWeCloseOutput\n";
922  if(!subProcesses_.empty()) {
923  for(auto const& subProcess : subProcesses_) {
924  if(subProcess.shouldWeCloseOutput()) {
925  return true;
926  }
927  }
928  return false;
929  }
930  return schedule_->shouldWeCloseOutput();
931  }
932 
934  FDEBUG(1) << "\tdoErrorStuff\n";
935  LogError("StateMachine")
936  << "The EventProcessor state machine encountered an unexpected event\n"
937  << "and went to the error state\n"
938  << "Will attempt to terminate processing normally\n"
939  << "(IF using the looper the next loop will be attempted)\n"
940  << "This likely indicates a bug in an input module or corrupted input or both\n";
941  }
942 
943  void EventProcessor::beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded,
944  bool& eventSetupForInstanceSucceeded) {
945  globalBeginSucceeded = false;
946  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
947  {
948  SendSourceTerminationSignalIfException sentry(actReg_.get());
949 
950  input_->doBeginRun(runPrincipal, &processContext_);
951  sentry.completedSuccessfully();
952  }
953 
954  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
955  runPrincipal.beginTime());
957  espController_->forceCacheClear();
958  }
959  {
960  SendSourceTerminationSignalIfException sentry(actReg_.get());
961  espController_->eventSetupForInstance(ts);
962  eventSetupForInstanceSucceeded = true;
963  sentry.completedSuccessfully();
964  }
965  EventSetup const& es = esp_->eventSetup();
966  if(looper_ && looperBeginJobRun_== false) {
967  looper_->copyInfo(ScheduleInfo(schedule_.get()));
968  looper_->beginOfJob(es);
969  looperBeginJobRun_ = true;
970  looper_->doStartingNewLoop();
971  }
972  {
974  auto globalWaitTask = make_empty_waiting_task();
975  globalWaitTask->increment_ref_count();
976  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
977  *schedule_,
978  runPrincipal,
979  ts,
980  es,
982  subProcesses_);
983  globalWaitTask->wait_for_all();
984  if(globalWaitTask->exceptionPtr() != nullptr) {
985  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
986  }
987  }
988  globalBeginSucceeded = true;
989  FDEBUG(1) << "\tbeginRun " << run << "\n";
990  if(looper_) {
991  looper_->doBeginRun(runPrincipal, es, &processContext_);
992  }
993  {
994  //To wait, the ref count has to be 1+#streams
995  auto streamLoopWaitTask = make_empty_waiting_task();
996  streamLoopWaitTask->increment_ref_count();
997 
999 
1000  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1001  *schedule_,
1003  runPrincipal,
1004  ts,
1005  es,
1006  serviceToken_,
1007  subProcesses_);
1008 
1009  streamLoopWaitTask->wait_for_all();
1010  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1011  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1012  }
1013  }
1014  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1015  if(looper_) {
1016  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1017  }
1018  }
1019 
1021  bool globalBeginSucceeded, bool cleaningUpAfterException,
1022  bool eventSetupForInstanceSucceeded) {
1023  if (eventSetupForInstanceSucceeded) {
1024  //If we skip empty runs, this would be called conditionally
1025  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1026 
1027  if(globalBeginSucceeded) {
1029  t->increment_ref_count();
1030  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1031  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1032  mergeableRunProductMetadata->preWriteRun();
1033  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
1034  t->wait_for_all();
1035  mergeableRunProductMetadata->postWriteRun();
1036  if(t->exceptionPtr()) {
1037  std::rethrow_exception(*t->exceptionPtr());
1038  }
1039  }
1040  }
1041  deleteRunFromCache(phid, run);
1042  }
1043 
1044  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1045  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1046  runPrincipal.setEndTime(input_->timestamp());
1047 
1049  runPrincipal.endTime());
1050  {
1051  SendSourceTerminationSignalIfException sentry(actReg_.get());
1052  espController_->eventSetupForInstance(ts);
1053  sentry.completedSuccessfully();
1054  }
1055  EventSetup const& es = esp_->eventSetup();
1056  if(globalBeginSucceeded){
1057  //To wait, the ref count has to be 1+#streams
1058  auto streamLoopWaitTask = make_empty_waiting_task();
1059  streamLoopWaitTask->increment_ref_count();
1060 
1062 
1063  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1064  *schedule_,
1066  runPrincipal,
1067  ts,
1068  es,
1069  serviceToken_,
1070  subProcesses_,
1071  cleaningUpAfterException);
1072 
1073  streamLoopWaitTask->wait_for_all();
1074  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1075  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1076  }
1077  }
1078  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1079  if(looper_) {
1080  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1081  }
1082  {
1083  auto globalWaitTask = make_empty_waiting_task();
1084  globalWaitTask->increment_ref_count();
1085 
1087  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1088  *schedule_,
1089  runPrincipal,
1090  ts,
1091  es,
1092  serviceToken_,
1093  subProcesses_,
1094  cleaningUpAfterException);
1095  globalWaitTask->wait_for_all();
1096  if(globalWaitTask->exceptionPtr() != nullptr) {
1097  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1098  }
1099  }
1100  FDEBUG(1) << "\tendRun " << run << "\n";
1101  if(looper_) {
1102  looper_->doEndRun(runPrincipal, es, &processContext_);
1103  }
1104  }
1105 
1107  EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1108  auto waitTask = make_empty_waiting_task();
1109  waitTask->increment_ref_count();
1110 
1111  if(streamLumiActive_> 0) {
1113  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1114  } else {
1115  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1116  input_->luminosityBlockAuxiliary()->beginTime()),
1117  iRunResource,
1118  WaitingTaskHolder{waitTask.get()});
1119  }
1120  waitTask->wait_for_all();
1121 
1122  if(waitTask->exceptionPtr() != nullptr) {
1123  std::rethrow_exception(* (waitTask->exceptionPtr()) );
1124  }
1125  return lastTransitionType();
1126  }
1127 
1128  void
1130  std::shared_ptr<void> const& iRunResource, edm::WaitingTaskHolder iHolder) {
1131  if(iHolder.taskHasFailed()) { return; }
1132 
1133  auto status= std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource) ;
1134 
1135  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1136  if(iHolder.taskHasFailed()) { return; }
1137 
1138  status->setResumer(std::move(iResumer));
1139 
1140  sourceResourcesAcquirer_.serialQueueChain().push([this,iHolder,status]() mutable {
1141  //make the services available
1143 
1144  try {
1146 
1147  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1148  {
1149  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150 
1151  input_->doBeginLumi(lumiPrincipal, &processContext_);
1152  sentry.completedSuccessfully();
1153  }
1154 
1156  if(rng.isAvailable()) {
1157  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1158  rng->preBeginLumi(lb);
1159  }
1160 
1161  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1162 
1163  //Task to start the stream beginLumis
1164  auto beginStreamsTask= make_waiting_task(tbb::task::allocate_root()
1165  ,[this, holder = iHolder, status, ts] (std::exception_ptr const* iPtr) mutable {
1166  if (iPtr) {
1167  holder.doneWaiting(*iPtr);
1168  } else {
1169 
1170  status->globalBeginDidSucceed();
1171  EventSetup const& es = esp_->eventSetup();
1172  if(looper_) {
1173  try {
1174  //make the services available
1176  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1177  }catch(...) {
1178  holder.doneWaiting(std::current_exception());
1179  return;
1180  }
1181  }
1183 
1184  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1185  streamQueues_[i].push([this,i,status,holder,ts,&es] () {
1186  streamQueues_[i].pause();
1187 
1188  auto eventTask = edm::make_waiting_task(tbb::task::allocate_root(),
1189  [this,i,h = holder](std::exception_ptr const* iPtr) mutable
1190  {
1191  if(iPtr) {
1192  h.doneWaiting(*iPtr);
1193  } else {
1195  }
1196  });
1197  auto& event = principalCache_.eventPrincipal(i);
1200  auto lp = status->lumiPrincipal();
1201  event.setLuminosityBlockPrincipal(lp.get());
1202  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1203  *schedule_,i,*lp,ts,es,
1205  });
1206  }
1207  }
1208  });
1209 
1210  //task to start the global begin lumi
1211  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1212  EventSetup const& es = esp_->eventSetup();
1213  {
1215  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1216  *schedule_,
1217  *(status->lumiPrincipal()),
1218  ts,
1219  es,
1220  serviceToken_,
1221  subProcesses_);
1222  }
1223  } catch(...) {
1224  iHolder.doneWaiting(std::current_exception());
1225  }
1226  });
1227  };
1228 
1229  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1230  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1231  if(espController_->isWithinValidityInterval(iSync)) {
1232  iovQueue_.pause();
1233  lumiQueue_->pushAndPause(std::move(lumiWork));
1234  } else {
1235  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1236  iovQueue_.push([this,iHolder,lumiWork,iSync]() mutable {
1237  try {
1238  SendSourceTerminationSignalIfException sentry(actReg_.get());
1239  espController_->eventSetupForInstance(iSync);
1240  sentry.completedSuccessfully();
1241  } catch(...) {
1242  iHolder.doneWaiting(std::current_exception());
1243  return;
1244  }
1245  iovQueue_.pause();
1246  lumiQueue_->pushAndPause(std::move(lumiWork));
1247  });
1248  }
1249  }
1250 
1251  void
1253  {
1254  //all streams are sharing the same status at the moment
1255  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1256  status->needToContinueLumi();
1257  status->startProcessingEvents();
1258  }
1259 
1260  unsigned int streamIndex = 0;
1261  for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
1262  tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
1263  [this,streamIndex,h = iHolder](){
1264  handleNextEventForStreamAsync(std::move(h), streamIndex);
1265  }) );
1266 
1267  }
1268  tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
1270  }) );
1271  }
1272 
1273  void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1274  //Need to be sure iTask is always destroyed after iLumiStatus since iLumiStatus can cause endRun to start.
1275  auto t = edm::make_waiting_task(tbb::task::allocate_root(), [ items = std::make_pair(iLumiStatus,std::move(iTask)), this] (std::exception_ptr const* iPtr) mutable {
1276  std::exception_ptr ptr;
1277  //use an easier to remember variable name
1278  auto status = std::move(items.first);
1279  if(iPtr) {
1280  ptr = *iPtr;
1281  WaitingTaskHolder tmp(items.second);
1282  //set the exception early to prevent a beginLumi from running
1283  // we use a copy to keep t from resetting on doneWaiting call.
1284  tmp.doneWaiting(ptr);
1285  } else {
1286  try {
1288  if(looper_) {
1289  auto& lp = *(status->lumiPrincipal());
1290  EventSetup const& es = esp_->eventSetup();
1291  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1292  }
1293  }catch(...) {
1294  if(not ptr) {
1295  ptr = std::current_exception();
1296  }
1297  }
1298  }
1300  try {
1302  //release our hold on the IOV
1303  iovQueue_.resume();
1304  status->resumeGlobalLumiQueue();
1305  } catch(...) {
1306  if( not ptr) {
1307  ptr = std::current_exception();
1308  }
1309  }
1310  try {
1311  status.reset();
1312  } catch(...) {
1313  if( not ptr) {
1314  ptr = std::current_exception();
1315  }
1316  }
1317  //have to wait until reset is called since that could call endRun
1318  items.second.doneWaiting(ptr);
1319  });
1320 
1321  auto writeT = edm::make_waiting_task(tbb::task::allocate_root(), [this,status =iLumiStatus, task = WaitingTaskHolder(t)] (std::exception_ptr const* iExcept) mutable {
1322  if(iExcept) {
1323  task.doneWaiting(*iExcept);
1324  } else {
1325  //Only call writeLumi if beginLumi succeeded
1326  if(status->didGlobalBeginSucceed()) {
1328  }
1329  }
1330  });
1331  auto& lp = *(iLumiStatus->lumiPrincipal());
1332 
1333  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()),
1334  lp.beginTime());
1335 
1336 
1338  EventSetup const& es = esp_->eventSetup();
1339 
1340  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1341  *schedule_,
1342  lp,
1343  ts,
1344  es,
1345  serviceToken_,
1346  subProcesses_,
1347  iLumiStatus->cleaningUpAfterException());
1348  }
1349 
1351  unsigned int iStreamIndex,
1352  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1353 
1354  auto t =edm::make_waiting_task(tbb::task::allocate_root(), [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1355  std::exception_ptr ptr;
1356  if(iPtr) {
1357  ptr = *iPtr;
1358  }
1359  auto status =streamLumiStatus_[iStreamIndex];
1360  //reset status before releasing queue else get race condtion
1361  streamLumiStatus_[iStreamIndex].reset();
1363  streamQueues_[iStreamIndex].resume();
1364 
1365  //are we the last one?
1366  if( status->streamFinishedLumi()) {
1368  }
1369  iTask.doneWaiting(ptr);
1370  });
1371 
1372  edm::WaitingTaskHolder lumiDoneTask{t};
1373 
1374  iLumiStatus->setEndTime();
1375 
1376  if(iLumiStatus->didGlobalBeginSucceed()) {
1377  auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1378  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1379  lumiPrincipal.endTime());
1380  EventSetup const& es = esp_->eventSetup();
1381 
1382  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1383 
1385  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1386  *schedule_,iStreamIndex,
1387  lumiPrincipal,ts,es,
1388  serviceToken_,
1389  subProcesses_,cleaningUpAfterException);
1390  }
1391  }
1392 
1393 
1395  if(streamLumiActive_.load() > 0) {
1396  auto globalWaitTask = make_empty_waiting_task();
1397  globalWaitTask->increment_ref_count();
1398  {
1399  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1400  for(unsigned int i=0; i< preallocations_.numberOfStreams(); ++i) {
1401  if(streamLumiStatus_[i]) {
1402  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1403  }
1404  }
1405  }
1406  globalWaitTask->wait_for_all();
1407  if(globalWaitTask->exceptionPtr() != nullptr) {
1408  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1409  }
1410  }
1411  }
1412 
1413  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1416  << "EventProcessor::readRun\n"
1417  << "Illegal attempt to insert run into cache\n"
1418  << "Contact a Framework Developer\n";
1419  }
1420  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(),
1422  0, true, &mergeableRunProductProcesses_);
1423  {
1424  SendSourceTerminationSignalIfException sentry(actReg_.get());
1425  input_->readRun(*rp, *historyAppender_);
1426  sentry.completedSuccessfully();
1427  }
1428  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1429  principalCache_.insert(rp);
1430  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1431  }
1432 
1433  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1434  principalCache_.merge(input_->runAuxiliary(), preg());
1435  auto runPrincipal =principalCache_.runPrincipalPtr();
1436  {
1437  SendSourceTerminationSignalIfException sentry(actReg_.get());
1438  input_->readAndMergeRun(*runPrincipal);
1439  sentry.completedSuccessfully();
1440  }
1441  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1442  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1443  }
1444 
1448  << "EventProcessor::readLuminosityBlock\n"
1449  << "Illegal attempt to insert lumi into cache\n"
1450  << "Run is invalid\n"
1451  << "Contact a Framework Developer\n";
1452  }
1454  assert(lbp);
1455  lbp->setAux(*input_->luminosityBlockAuxiliary());
1456  {
1457  SendSourceTerminationSignalIfException sentry(actReg_.get());
1458  input_->readLuminosityBlock(*lbp, *historyAppender_);
1459  sentry.completedSuccessfully();
1460  }
1461  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1462  iStatus.lumiPrincipal() = std::move(lbp);
1463  }
1464 
1466  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1467  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1468  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) == input_->processHistoryRegistry().reducedProcessHistoryID(input_->luminosityBlockAuxiliary()->processHistoryID()));
1469  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1470  assert(lumiOK);
1471  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1472  {
1473  SendSourceTerminationSignalIfException sentry(actReg_.get());
1474  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1475  sentry.completedSuccessfully();
1476  }
1477  return input_->luminosityBlock();
1478  }
1479 
1481  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1482  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,phid,run,task,mergeableRunProductMetadata]
1483  (std::exception_ptr const* iExcept) mutable {
1484  if(iExcept) {
1485  task.doneWaiting(*iExcept);
1486  } else {
1488  for(auto&s : subProcesses_) {
1489  s.writeRunAsync(task,phid,run,mergeableRunProductMetadata);
1490  }
1491  }
1492  });
1494  schedule_->writeRunAsync(WaitingTaskHolder(subsT), principalCache_.runPrincipal(phid, run),
1495  &processContext_, actReg_.get(), mergeableRunProductMetadata);
1496  }
1497 
1499  principalCache_.deleteRun(phid, run);
1500  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1501  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1502  }
1503 
1504  void EventProcessor::writeLumiAsync(WaitingTaskHolder task, std::shared_ptr<LuminosityBlockProcessingStatus> iStatus) {
1505  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,task, iStatus](std::exception_ptr const* iExcept) mutable {
1506  if(iExcept) {
1507  task.doneWaiting(*iExcept);
1508  } else {
1510  for(auto&s : subProcesses_) {
1511  s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1512  }
1513  }
1514  });
1516 
1517  std::shared_ptr<LuminosityBlockPrincipal> const& lumiPrincipal = iStatus->lumiPrincipal();
1518  lumiPrincipal->runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal->luminosityBlock());
1519 
1520  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, *lumiPrincipal, &processContext_, actReg_.get());
1521  }
1522 
1524  for(auto& s: subProcesses_) { s.deleteLumiFromCache(*iStatus.lumiPrincipal());}
1525  iStatus.lumiPrincipal()->clearPrincipal();
1526  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1527  }
1528 
1529  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1531  if(shouldWeStop()) {
1532  return false;
1533  }
1534 
1535  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1536  return false;
1537  }
1538 
1539  if(iStatus.wasEventProcessingStopped()) {
1540  return false;
1541  }
1542 
1544  try {
1545  //need to use lock in addition to the serial task queue because
1546  // of delayed provenance reading and reading data in response to
1547  // edm::Refs etc
1548  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1549 
1550  auto itemType = iStatus.continuingLumi()? InputSource::IsLumi : nextTransitionType();
1551  if(InputSource::IsLumi == itemType) {
1552  iStatus.haveContinuedLumi();
1553  while(itemType == InputSource::IsLumi and
1554  iStatus.lumiPrincipal()->run() == input_->run() and
1555  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1556  readAndMergeLumi(iStatus);
1557  itemType = nextTransitionType();
1558  }
1559  if(InputSource::IsLumi == itemType) {
1560  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1561  input_->luminosityBlockAuxiliary()->beginTime()));
1562  }
1563  }
1564  if(InputSource::IsEvent != itemType) {
1565  iStatus.stopProcessingEvents();
1566 
1567  //IsFile may continue processing the lumi and
1568  // looper_ can cause the input source to declare a new IsRun which is actually
1569  // just a continuation of the previous run
1570  if(InputSource::IsStop == itemType or
1571  InputSource::IsLumi == itemType or
1572  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1573  iStatus.endLumi();
1574  }
1575  return false;
1576  }
1577  readEvent(iStreamIndex);
1578  } catch (...) {
1579  bool expected =false;
1580  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1581  deferredExceptionPtr_ = std::current_exception();
1582  }
1583  return false;
1584  }
1585  return true;
1586  }
1587 
1589  unsigned int iStreamIndex)
1590  {
1591  sourceResourcesAcquirer_.serialQueueChain().push([this,iTask,iStreamIndex]() mutable {
1593  auto& status = streamLumiStatus_[iStreamIndex];
1594  try {
1595  if(readNextEventForStream(iStreamIndex, *status) ) {
1596  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex](std::exception_ptr const* iPtr) mutable {
1597  if(iPtr) {
1598  bool expected = false;
1599  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1600  deferredExceptionPtr_ = *iPtr;
1601  iTask.doneWaiting(*iPtr);
1602  }
1603  //the stream will stop now
1604  return;
1605  }
1606  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1607  });
1608 
1609  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1610  } else {
1611  //the stream will stop now
1612  if(status->isLumiEnding()) {
1613  if(lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1614  status->startNextLumi();
1615  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1616  }
1617  streamEndLumiAsync(std::move(iTask),iStreamIndex, status);
1618  } else {
1619  iTask.doneWaiting(std::exception_ptr{});
1620  }
1621  }
1622  } catch(...) {
1623  bool expected = false;
1624  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1625  auto e =std::current_exception();
1627  iTask.doneWaiting(e);
1628  }
1629  }
1630  });
1631  }
1632 
1633  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1634  //TODO this will have to become per stream
1635  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1636  StreamContext streamContext(event.streamID(), &processContext_);
1637 
1638  SendSourceTerminationSignalIfException sentry(actReg_.get());
1639  input_->readEvent(event, streamContext);
1640 
1641  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1642  sentry.completedSuccessfully();
1643 
1644  FDEBUG(1) << "\treadEvent\n";
1645  }
1646 
1648  unsigned int iStreamIndex) {
1649  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1650  processEventAsyncImpl(iHolder, iStreamIndex);
1651  }) );
1652  }
1653 
1655  unsigned int iStreamIndex) {
1656  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1657 
1660  if(rng.isAvailable()) {
1661  Event ev(*pep, ModuleDescription(), nullptr);
1662  rng->postEventRead(ev);
1663  }
1664 
1665  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1666  tbb::task::allocate_root(),
1667  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1668  {
1669 
1670  //NOTE: If we have a looper we only have one Stream
1671  if(looper_) {
1673  processEventWithLooper(*pep);
1674  }
1675 
1676  FDEBUG(1) << "\tprocessEvent\n";
1677  pep->clearEventPrincipal();
1678  if(iPtr) {
1679  iHolder.doneWaiting(*iPtr);
1680  } else {
1681  iHolder.doneWaiting(std::exception_ptr());
1682  }
1683  }
1684  )
1685  );
1686  WaitingTaskHolder afterProcessTask;
1687  if(subProcesses_.empty()) {
1688  afterProcessTask = std::move(finalizeEventTask);
1689  } else {
1690  //Need to run SubProcesses after schedule has finished
1691  // with the event
1692  afterProcessTask = WaitingTaskHolder(
1693  make_waiting_task(tbb::task::allocate_root(),
1694  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1695  {
1696  if(not iPtr) {
1697  //when run with 1 thread, we want to the order to be what
1698  // it was before. This requires reversing the order since
1699  // tasks are run last one in first one out
1700  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1701  subProcess.doEventAsync(finalizeEventTask,*pep);
1702  }
1703  } else {
1704  finalizeEventTask.doneWaiting(*iPtr);
1705  }
1706  })
1707  );
1708  }
1709 
1710  schedule_->processOneEventAsync(std::move(afterProcessTask),
1711  iStreamIndex,*pep, esp_->eventSetup(), serviceToken_);
1712 
1713  }
1714 
1716  bool randomAccess = input_->randomAccess();
1717  ProcessingController::ForwardState forwardState = input_->forwardState();
1718  ProcessingController::ReverseState reverseState = input_->reverseState();
1719  ProcessingController pc(forwardState, reverseState, randomAccess);
1720 
1722  do {
1723 
1724  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1725  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1726 
1727  bool succeeded = true;
1728  if(randomAccess) {
1730  input_->skipEvents(-2);
1731  }
1733  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1734  }
1735  }
1736  pc.setLastOperationSucceeded(succeeded);
1737  } while(!pc.lastOperationSucceeded());
1738  if(status != EDLooperBase::kContinue) {
1739  shouldWeStop_ = true;
1741  }
1742  }
1743 
1745  FDEBUG(1) << "\tshouldWeStop\n";
1746  if(shouldWeStop_) return true;
1747  if(!subProcesses_.empty()) {
1748  for(auto const& subProcess : subProcesses_) {
1749  if(subProcess.terminate()) {
1750  return true;
1751  }
1752  }
1753  return false;
1754  }
1755  return schedule_->terminate();
1756  }
1757 
1759  exceptionMessageFiles_ = message;
1760  }
1761 
1763  exceptionMessageRuns_ = message;
1764  }
1765 
1767  exceptionMessageLumis_ = message;
1768  }
1769 
1770  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1771  bool expected =false;
1772  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1773  deferredExceptionPtr_ = iException;
1774  return true;
1775  }
1776  return false;
1777  }
1778 }
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
bool ev
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
Definition: config.py:1
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:65
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:15
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:45
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:85
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
config
Definition: looper.py:289
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:77
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:45
Timestamp const & endTime() const
Definition: RunPrincipal.h:81
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1190
int totalEvents() const
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< edm::ParameterSet > parameterSet() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:686
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:100
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:13
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31