CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
36 
38 
47 
52 
54 
66 
67 #include "MessageForSource.h"
68 #include "MessageForParent.h"
70 
71 #include "boost/range/adaptor/reversed.hpp"
72 
73 #include <cassert>
74 #include <exception>
75 #include <iomanip>
76 #include <iostream>
77 #include <utility>
78 #include <sstream>
79 
80 #include <sys/ipc.h>
81 #include <sys/msg.h>
82 
83 #include "tbb/task.h"
84 
85 //Used for CPU affinity
86 #ifndef __APPLE__
87 #include <sched.h>
88 #endif
89 
90 namespace {
91  //Sentry class to only send a signal if an
92  // exception occurs. An exception is identified
93  // by the destructor being called without first
94  // calling completedSuccessfully().
95  class SendSourceTerminationSignalIfException {
96  public:
97  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
98  reg_(iReg) {}
99  ~SendSourceTerminationSignalIfException() {
100  if(reg_) {
101  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
102  }
103  }
104  void completedSuccessfully() {
105  reg_ = nullptr;
106  }
107  private:
108  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
109  };
110 
111 }
112 
113 namespace edm {
114 
115  // ---------------------------------------------------------------
116  std::unique_ptr<InputSource>
118  CommonParams const& common,
119  std::shared_ptr<ProductRegistry> preg,
120  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122  std::shared_ptr<ActivityRegistry> areg,
123  std::shared_ptr<ProcessConfiguration const> processConfiguration,
124  PreallocationConfiguration const& allocations) {
125  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
126  if(main_input == nullptr) {
128  << "There must be exactly one source in the configuration.\n"
129  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
130  }
131 
132  std::string modtype(main_input->getParameter<std::string>("@module_type"));
133 
134  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
136  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
137  filler->fill(descriptions);
138 
139  try {
140  convertException::wrap([&]() {
141  descriptions.validate(*main_input, std::string("source"));
142  });
143  }
144  catch (cms::Exception & iException) {
145  std::ostringstream ost;
146  ost << "Validating configuration of input source of type " << modtype;
147  iException.addContext(ost.str());
148  throw;
149  }
150 
151  main_input->registerIt();
152 
153  // Fill in "ModuleDescription", in case the input source produces
154  // any EDProducts, which would be registered in the ProductRegistry.
155  // Also fill in the process history item for this process.
156  // There is no module label for the unnamed input source, so
157  // just use "source".
158  // Only the tracked parameters belong in the process configuration.
159  ModuleDescription md(main_input->id(),
160  main_input->getParameter<std::string>("@module_type"),
161  "source",
162  processConfiguration.get(),
163  ModuleDescription::getUniqueID());
164 
165  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
166  common.maxEventsInput_, common.maxLumisInput_,
167  common.maxSecondsUntilRampdown_, allocations);
168 
169  areg->preSourceConstructionSignal_(md);
170  std::unique_ptr<InputSource> input;
171  try {
172  //even if we have an exception, send the signal
173  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
174  convertException::wrap([&]() {
175  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
178  });
179  }
180  catch (cms::Exception& iException) {
181  std::ostringstream ost;
182  ost << "Constructing input source of type " << modtype;
183  iException.addContext(ost.str());
184  throw;
185  }
186  return input;
187  }
188 
189  // ---------------------------------------------------------------
190  std::shared_ptr<EDLooperBase>
193  ParameterSet& params) {
194  std::shared_ptr<EDLooperBase> vLooper;
195 
196  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
197 
198  if(loopers.empty()) {
199  return vLooper;
200  }
201 
202  assert(1 == loopers.size());
203 
204  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
205  itName != itNameEnd;
206  ++itName) {
207 
208  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
209  providerPSet->registerIt();
210  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
211  cp,
212  *providerPSet);
213  }
214  return vLooper;
215  }
216 
217  // ---------------------------------------------------------------
218  EventProcessor::EventProcessor(std::string const& config,
219  ServiceToken const& iToken,
221  std::vector<std::string> const& defaultServices,
222  std::vector<std::string> const& forcedServices) :
223  actReg_(),
224  preg_(),
225  branchIDListHelper_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
231  processConfiguration_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
237  deferredExceptionPtrIsSet_(false),
238  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
239  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  fileModeNoMerge_(false),
244  exceptionMessageFiles_(),
245  exceptionMessageRuns_(),
246  exceptionMessageLumis_(),
247  forceLooperToEnd_(false),
248  looperBeginJobRun_(false),
249  forceESCacheClearOnNewRun_(false),
250  eventSetupDataToExcludeFromPrefetching_() {
251  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
252  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
253  processDesc->addServices(defaultServices, forcedServices);
254  init(processDesc, iToken, iLegacy);
255  }
256 
258  std::vector<std::string> const& defaultServices,
259  std::vector<std::string> const& forcedServices) :
260  actReg_(),
261  preg_(),
263  serviceToken_(),
264  input_(),
265  espController_(new eventsetup::EventSetupsController),
266  esp_(),
267  act_table_(),
269  schedule_(),
270  subProcesses_(),
272  fb_(),
273  looper_(),
275  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
276  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
277  principalCache_(),
289  {
290  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
291  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
292  processDesc->addServices(defaultServices, forcedServices);
294  }
295 
296  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
297  ServiceToken const& token,
299  actReg_(),
300  preg_(),
302  serviceToken_(),
303  input_(),
304  espController_(new eventsetup::EventSetupsController),
305  esp_(),
306  act_table_(),
308  schedule_(),
309  subProcesses_(),
311  fb_(),
312  looper_(),
314  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
315  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
316  principalCache_(),
328  {
329  init(processDesc, token, legacy);
330  }
331 
332 
334  actReg_(),
335  preg_(),
337  serviceToken_(),
338  input_(),
339  espController_(new eventsetup::EventSetupsController),
340  esp_(),
341  act_table_(),
343  schedule_(),
344  subProcesses_(),
346  fb_(),
347  looper_(),
349  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
350  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
351  principalCache_(),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
374 
375  void
376  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
377  ServiceToken const& iToken,
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
395  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
396  // set in here if the parameters were not explicitly set.
397  validateTopLevelParameterSets(parameterSet.get());
398 
399  // Now set some parameters specific to the main process.
400  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
401  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
402  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
403  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
404  << fileMode << ".\n"
405  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
406  } else {
407  fileModeNoMerge_ = (fileMode == "NOMERGE");
408  }
409  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
410 
411  //threading
412  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
413 
414  // Even if numberOfThreads was set to zero in the Python configuration, the code
415  // in cmsRun.cpp should have reset it to something else.
416  assert(nThreads != 0);
417 
418  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if (nStreams == 0) {
420  nStreams = nThreads;
421  }
422  if(nThreads > 1) {
423  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
424  }
425  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
426  if (nConcurrentRuns != 1) {
427  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
428  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
429  }
430  unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
431  if (nConcurrentLumis == 0) {
432  nConcurrentLumis = nConcurrentRuns;
433  }
434 
435  //Check that relationships between threading parameters makes sense
436  /*
437  if(nThreads<nStreams) {
438  //bad
439  }
440  if(nConcurrentRuns>nStreams) {
441  //bad
442  }
443  if(nConcurrentRuns>nConcurrentLumis) {
444  //bad
445  }
446  */
447  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
448 
449  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
450 
451  // Now do general initialization
453 
454  //initialize the services
455  auto& serviceSets = processDesc->getServicesPSets();
456  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
457  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
458 
459  //make the services available
461 
462  if(nStreams>1) {
464  handler->willBeUsingThreads();
465  }
466 
467  // intialize miscellaneous items
468  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
469 
470  // intialize the event setup provider
471  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
472 
473  // initialize the looper, if any
474  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
475  if(looper_) {
476  looper_->setActionTable(items.act_table_.get());
477  looper_->attachTo(*items.actReg_);
478 
479  //For now loopers make us run only 1 transition at a time
480  nStreams=1;
481  nConcurrentLumis=1;
482  nConcurrentRuns=1;
483  }
484 
485  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
486 
487  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
488  streamQueues_.resize(nStreams);
489  streamLumiStatus_.resize(nStreams);
490 
491  // initialize the input source
492  input_ = makeInput(*parameterSet,
493  *common,
494  items.preg(),
495  items.branchIDListHelper(),
496  items.thinnedAssociationsHelper(),
497  items.actReg_,
498  items.processConfiguration(),
500 
501  // intialize the Schedule
502  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
503 
504  // set the data members
505  act_table_ = std::move(items.act_table_);
506  actReg_ = items.actReg_;
507  preg_ = items.preg();
508  branchIDListHelper_ = items.branchIDListHelper();
509  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
510  processConfiguration_ = items.processConfiguration();
512  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
513 
514  FDEBUG(2) << parameterSet << std::endl;
515 
517  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
518  // Reusable event principal
519  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
522  }
523 
524  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
525  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_,
526  historyAppender_.get(), index);
528  }
529 
530  // fill the subprocesses, if there are any
531  subProcesses_.reserve(subProcessVParameterSet.size());
532  for(auto& subProcessPSet : subProcessVParameterSet) {
533  subProcesses_.emplace_back(subProcessPSet,
534  *parameterSet,
535  preg(),
540  *actReg_,
541  token,
544  &processContext_);
545  }
546  }
547 
549  // Make the services available while everything is being deleted.
550  ServiceToken token = getToken();
551  ServiceRegistry::Operate op(token);
552 
553  // manually destroy all these thing that may need the services around
554  // propagate_const<T> has no reset() function
555  espController_ = nullptr;
556  esp_ = nullptr;
557  schedule_ = nullptr;
558  input_ = nullptr;
559  looper_ = nullptr;
560  actReg_ = nullptr;
561 
564  }
565 
566  void
568  if(beginJobCalled_) return;
569  beginJobCalled_=true;
570  bk::beginJob();
571 
572  // StateSentry toerror(this); // should we add this ?
573  //make the services available
575 
580  actReg_->preallocateSignal_(bounds);
581  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
583 
584  //NOTE: this may throw
586  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
587 
588  //NOTE: This implementation assumes 'Job' means one call
589  // the EventProcessor::run
590  // If it really means once per 'application' then this code will
591  // have to be changed.
592  // Also have to deal with case where have 'run' then new Module
593  // added and do 'run'
594  // again. In that case the newly added Module needs its 'beginJob'
595  // to be called.
596 
597  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
598  // For now we delay calling beginOfJob until first beginOfRun
599  //if(looper_) {
600  // looper_->beginOfJob(es);
601  //}
602  try {
603  convertException::wrap([&]() {
604  input_->doBeginJob();
605  });
606  }
607  catch(cms::Exception& ex) {
608  ex.addContext("Calling beginJob for the source");
609  throw;
610  }
611  schedule_->beginJob(*preg_);
612  // toerror.succeeded(); // should we add this?
613  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
614  actReg_->postBeginJobSignal_();
615 
616  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
617  schedule_->beginStream(i);
618  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
619  }
620  }
621 
622  void
624  // Collects exceptions, so we don't throw before all operations are performed.
625  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
626 
627  //make the services available
629 
630  //NOTE: this really should go elsewhere in the future
631  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
632  c.call([this,i](){this->schedule_->endStream(i);});
633  for(auto& subProcess : subProcesses_) {
634  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
635  }
636  }
637  auto actReg = actReg_.get();
638  c.call([actReg](){actReg->preEndJobSignal_();});
639  schedule_->endJob(c);
640  for(auto& subProcess : subProcesses_) {
641  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
642  }
643  c.call(std::bind(&InputSource::doEndJob, input_.get()));
644  if(looper_) {
645  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
646  }
647  c.call([actReg](){actReg->postEndJobSignal_();});
648  if(c.hasThrown()) {
649  c.rethrow();
650  }
651  }
652 
655  return serviceToken_;
656  }
657 
658  std::vector<ModuleDescription const*>
660  return schedule_->getAllModuleDescriptions();
661  }
662 
663  int
665  return schedule_->totalEvents();
666  }
667 
668  int
670  return schedule_->totalEventsPassed();
671  }
672 
673  int
675  return schedule_->totalEventsFailed();
676  }
677 
678  void
680  schedule_->enableEndPaths(active);
681  }
682 
683  bool
685  return schedule_->endPathsEnabled();
686  }
687 
688  void
690  schedule_->getTriggerReport(rep);
691  }
692 
693  void
695  schedule_->clearCounters();
696  }
697 
698  namespace {
699 #include "TransitionProcessors.icc"
700  }
701 
702  bool
704  bool returnValue = false;
705 
706  // Look for a shutdown signal
707  if(shutdown_flag.load(std::memory_order_acquire)) {
708  returnValue = true;
709  returnCode = epSignal;
710  }
711  return returnValue;
712  }
713 
716  if (deferredExceptionPtrIsSet_.load()) {
718  return InputSource::IsStop;
719  }
720 
721  SendSourceTerminationSignalIfException sentry(actReg_.get());
722  InputSource::ItemType itemType;
723  //For now, do nothing with InputSource::IsSynchronize
724  do {
725  itemType = input_->nextItemType();
726  } while( itemType == InputSource::IsSynchronize);
727 
728  lastSourceTransition_ = itemType;
729  sentry.completedSuccessfully();
730 
732 
733  if(checkForAsyncStopRequest(returnCode)) {
734  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
736  }
737 
738  return lastSourceTransition_;
739  }
740 
741  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
743  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
744  }
745 
748  return input_->luminosityBlock();
749  }
750 
753 
756  {
757  beginJob(); //make sure this was called
758 
759  // make the services available
761 
763  try {
764  FilesProcessor fp(fileModeNoMerge_);
765 
766  convertException::wrap([&]() {
767  bool firstTime = true;
768  do {
769  if(not firstTime) {
771  rewindInput();
772  } else {
773  firstTime = false;
774  }
775  startingNewLoop();
776 
777  auto trans = fp.processFiles(*this);
778 
779  fp.normalEnd();
780 
781  if(deferredExceptionPtrIsSet_.load()) {
782  std::rethrow_exception(deferredExceptionPtr_);
783  }
784  if(trans != InputSource::IsStop) {
785  //problem with the source
786  doErrorStuff();
787 
788  throw cms::Exception("BadTransition")
789  << "Unexpected transition change "
790  << trans;
791 
792  }
793  } while(not endOfLoop());
794  }); // convertException::wrap
795 
796  } // Try block
797  catch (cms::Exception & e) {
798  if (!exceptionMessageLumis_.empty()) {
800  if (e.alreadyPrinted()) {
801  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
802  }
803  }
804  if (!exceptionMessageRuns_.empty()) {
806  if (e.alreadyPrinted()) {
807  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
808  }
809  }
810  if (!exceptionMessageFiles_.empty()) {
812  if (e.alreadyPrinted()) {
813  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
814  }
815  }
816  throw;
817  }
818  }
819 
820  return returnCode;
821  }
822 
824  FDEBUG(1) << " \treadFile\n";
825  size_t size = preg_->size();
826  SendSourceTerminationSignalIfException sentry(actReg_.get());
827 
828  fb_ = input_->readFile();
829  if(size < preg_->size()) {
831  }
835  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
836  }
837  sentry.completedSuccessfully();
838  }
839 
840  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
841  if (fb_.get() != nullptr) {
842  SendSourceTerminationSignalIfException sentry(actReg_.get());
843  input_->closeFile(fb_.get(), cleaningUpAfterException);
844  sentry.completedSuccessfully();
845  }
846  FDEBUG(1) << "\tcloseInputFile\n";
847  }
848 
850  if (fb_.get() != nullptr) {
851  schedule_->openOutputFiles(*fb_);
852  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
853  }
854  FDEBUG(1) << "\topenOutputFiles\n";
855  }
856 
858  if (fb_.get() != nullptr) {
859  schedule_->closeOutputFiles();
860  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
861  }
862  FDEBUG(1) << "\tcloseOutputFiles\n";
863  }
864 
866  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
867  if (fb_.get() != nullptr) {
868  schedule_->respondToOpenInputFile(*fb_);
869  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
870  }
871  FDEBUG(1) << "\trespondToOpenInputFile\n";
872  }
873 
875  if (fb_.get() != nullptr) {
876  schedule_->respondToCloseInputFile(*fb_);
877  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
878  }
879  FDEBUG(1) << "\trespondToCloseInputFile\n";
880  }
881 
883  shouldWeStop_ = false;
884  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
885  // until after we've called beginOfJob
886  if(looper_ && looperBeginJobRun_) {
887  looper_->doStartingNewLoop();
888  }
889  FDEBUG(1) << "\tstartingNewLoop\n";
890  }
891 
893  if(looper_) {
894  ModuleChanger changer(schedule_.get(),preg_.get());
895  looper_->setModuleChanger(&changer);
896  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
897  looper_->setModuleChanger(nullptr);
898  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
899  else return false;
900  }
901  FDEBUG(1) << "\tendOfLoop\n";
902  return true;
903  }
904 
906  input_->repeat();
907  input_->rewind();
908  FDEBUG(1) << "\trewind\n";
909  }
910 
912  looper_->prepareForNextLoop(esp_.get());
913  FDEBUG(1) << "\tprepareForNextLoop\n";
914  }
915 
917  FDEBUG(1) << "\tshouldWeCloseOutput\n";
918  if(!subProcesses_.empty()) {
919  for(auto const& subProcess : subProcesses_) {
920  if(subProcess.shouldWeCloseOutput()) {
921  return true;
922  }
923  }
924  return false;
925  }
926  return schedule_->shouldWeCloseOutput();
927  }
928 
930  FDEBUG(1) << "\tdoErrorStuff\n";
931  LogError("StateMachine")
932  << "The EventProcessor state machine encountered an unexpected event\n"
933  << "and went to the error state\n"
934  << "Will attempt to terminate processing normally\n"
935  << "(IF using the looper the next loop will be attempted)\n"
936  << "This likely indicates a bug in an input module or corrupted input or both\n";
937  }
938 
939  void EventProcessor::beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded) {
940  globalBeginSucceeded = false;
941  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
942  {
943  SendSourceTerminationSignalIfException sentry(actReg_.get());
944 
945  input_->doBeginRun(runPrincipal, &processContext_);
946  sentry.completedSuccessfully();
947  }
948 
949  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
950  runPrincipal.beginTime());
952  espController_->forceCacheClear();
953  }
954  {
955  SendSourceTerminationSignalIfException sentry(actReg_.get());
956  espController_->eventSetupForInstance(ts);
957  sentry.completedSuccessfully();
958  }
959  EventSetup const& es = esp_->eventSetup();
960  if(looper_ && looperBeginJobRun_== false) {
961  looper_->copyInfo(ScheduleInfo(schedule_.get()));
962  looper_->beginOfJob(es);
963  looperBeginJobRun_ = true;
964  looper_->doStartingNewLoop();
965  }
966  {
968  auto globalWaitTask = make_empty_waiting_task();
969  globalWaitTask->increment_ref_count();
970  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
971  *schedule_,
972  runPrincipal,
973  ts,
974  es,
976  subProcesses_);
977  globalWaitTask->wait_for_all();
978  if(globalWaitTask->exceptionPtr() != nullptr) {
979  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
980  }
981  }
982  globalBeginSucceeded = true;
983  FDEBUG(1) << "\tbeginRun " << run << "\n";
984  if(looper_) {
985  looper_->doBeginRun(runPrincipal, es, &processContext_);
986  }
987  {
988  //To wait, the ref count has to be 1+#streams
989  auto streamLoopWaitTask = make_empty_waiting_task();
990  streamLoopWaitTask->increment_ref_count();
991 
993 
994  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
995  *schedule_,
997  runPrincipal,
998  ts,
999  es,
1000  serviceToken_,
1001  subProcesses_);
1002 
1003  streamLoopWaitTask->wait_for_all();
1004  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1005  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1006  }
1007  }
1008  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1009  if(looper_) {
1010  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1011  }
1012  }
1013 
1014  void EventProcessor::endUnfinishedRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1015  //If we skip empty runs, this would be called conditionally
1016  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1017 
1018  if(globalBeginSucceeded) {
1020  t->increment_ref_count();
1021  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run);
1022  t->wait_for_all();
1023  if(t->exceptionPtr()) {
1024  std::rethrow_exception(*t->exceptionPtr());
1025  }
1026  }
1027  deleteRunFromCache(phid, run);
1028  }
1029 
1030  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1031  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1032  runPrincipal.setEndTime(input_->timestamp());
1033 
1035  runPrincipal.endTime());
1036  {
1037  SendSourceTerminationSignalIfException sentry(actReg_.get());
1038  espController_->eventSetupForInstance(ts);
1039  sentry.completedSuccessfully();
1040  }
1041  EventSetup const& es = esp_->eventSetup();
1042  if(globalBeginSucceeded){
1043  //To wait, the ref count has to be 1+#streams
1044  auto streamLoopWaitTask = make_empty_waiting_task();
1045  streamLoopWaitTask->increment_ref_count();
1046 
1048 
1049  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1050  *schedule_,
1052  runPrincipal,
1053  ts,
1054  es,
1055  serviceToken_,
1056  subProcesses_,
1057  cleaningUpAfterException);
1058 
1059  streamLoopWaitTask->wait_for_all();
1060  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1061  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1062  }
1063  }
1064  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1065  if(looper_) {
1066  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1067  }
1068  {
1069  auto globalWaitTask = make_empty_waiting_task();
1070  globalWaitTask->increment_ref_count();
1071 
1073  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1074  *schedule_,
1075  runPrincipal,
1076  ts,
1077  es,
1078  serviceToken_,
1079  subProcesses_,
1080  cleaningUpAfterException);
1081  globalWaitTask->wait_for_all();
1082  if(globalWaitTask->exceptionPtr() != nullptr) {
1083  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1084  }
1085  }
1086  FDEBUG(1) << "\tendRun " << run << "\n";
1087  if(looper_) {
1088  looper_->doEndRun(runPrincipal, es, &processContext_);
1089  }
1090  }
1091 
1093  EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1094  auto waitTask = make_empty_waiting_task();
1095  waitTask->increment_ref_count();
1096 
1097  if(streamLumiActive_> 0) {
1099  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1100  } else {
1101  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1102  input_->luminosityBlockAuxiliary()->beginTime()),
1103  iRunResource,
1104  WaitingTaskHolder{waitTask.get()});
1105  }
1106  waitTask->wait_for_all();
1107 
1108  if(waitTask->exceptionPtr() != nullptr) {
1109  std::rethrow_exception(* (waitTask->exceptionPtr()) );
1110  }
1111  return lastTransitionType();
1112  }
1113 
1114  void
1116  std::shared_ptr<void> const& iRunResource, edm::WaitingTaskHolder iHolder) {
1117  if(iHolder.taskHasFailed()) { return; }
1118 
1119  auto status= std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource) ;
1120 
1121  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1122  if(iHolder.taskHasFailed()) { return; }
1123 
1124  status->setResumer(std::move(iResumer));
1125 
1126  sourceResourcesAcquirer_.serialQueueChain().push([this,iHolder,status]() mutable {
1127  //make the services available
1129 
1130  try {
1132 
1133  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1134  {
1135  SendSourceTerminationSignalIfException sentry(actReg_.get());
1136 
1137  input_->doBeginLumi(lumiPrincipal, &processContext_);
1138  sentry.completedSuccessfully();
1139  }
1140 
1142  if(rng.isAvailable()) {
1143  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1144  rng->preBeginLumi(lb);
1145  }
1146 
1147  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1148 
1149  //Task to start the stream beginLumis
1150  auto beginStreamsTask= make_waiting_task(tbb::task::allocate_root()
1151  ,[this, holder = iHolder, status, ts] (std::exception_ptr const* iPtr) mutable {
1152  if (iPtr) {
1153  holder.doneWaiting(*iPtr);
1154  } else {
1155 
1156  status->globalBeginDidSucceed();
1157  EventSetup const& es = esp_->eventSetup();
1158  if(looper_) {
1159  try {
1160  //make the services available
1162  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1163  }catch(...) {
1164  holder.doneWaiting(std::current_exception());
1165  return;
1166  }
1167  }
1169 
1170  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1171  streamQueues_[i].push([this,i,status,holder,ts,&es] () {
1172  streamQueues_[i].pause();
1173 
1174  auto eventTask = edm::make_waiting_task(tbb::task::allocate_root(),
1175  [this,i,h = holder](std::exception_ptr const* iPtr) mutable
1176  {
1177  if(iPtr) {
1178  h.doneWaiting(*iPtr);
1179  } else {
1181  }
1182  });
1183  auto& event = principalCache_.eventPrincipal(i);
1186  auto lp = status->lumiPrincipal();
1187  event.setLuminosityBlockPrincipal(lp.get());
1188  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1189  *schedule_,i,*lp,ts,es,
1191  });
1192  }
1193  }
1194  });
1195 
1196  //task to start the global begin lumi
1197  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1198  EventSetup const& es = esp_->eventSetup();
1199  {
1201  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1202  *schedule_,
1203  *(status->lumiPrincipal()),
1204  ts,
1205  es,
1206  serviceToken_,
1207  subProcesses_);
1208  }
1209  } catch(...) {
1210  iHolder.doneWaiting(std::current_exception());
1211  }
1212  });
1213  };
1214 
1215  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1216  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1217  if(espController_->isWithinValidityInterval(iSync)) {
1218  iovQueue_.pause();
1219  lumiQueue_->pushAndPause(std::move(lumiWork));
1220  } else {
1221  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1222  iovQueue_.push([this,iHolder,lumiWork,iSync]() mutable {
1223  try {
1224  SendSourceTerminationSignalIfException sentry(actReg_.get());
1225  espController_->eventSetupForInstance(iSync);
1226  sentry.completedSuccessfully();
1227  } catch(...) {
1228  iHolder.doneWaiting(std::current_exception());
1229  return;
1230  }
1231  iovQueue_.pause();
1232  lumiQueue_->pushAndPause(std::move(lumiWork));
1233  });
1234  }
1235  }
1236 
1237  void
1239  {
1240  //all streams are sharing the same status at the moment
1241  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1242  status->needToContinueLumi();
1243  status->startProcessingEvents();
1244  }
1245 
1246  unsigned int streamIndex = 0;
1247  for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
1248  tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
1249  [this,streamIndex,h = iHolder](){
1250  handleNextEventForStreamAsync(std::move(h), streamIndex);
1251  }) );
1252 
1253  }
1254  tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
1256  }) );
1257  }
1258 
1259  void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1260  //Need to be sure iTask is always destroyed after iLumiStatus since iLumiStatus can cause endRun to start.
1261  auto t = edm::make_waiting_task(tbb::task::allocate_root(), [ items = std::make_pair(iLumiStatus,std::move(iTask)), this] (std::exception_ptr const* iPtr) mutable {
1262  std::exception_ptr ptr;
1263  //use an easier to remember variable name
1264  auto status = std::move(items.first);
1265  if(iPtr) {
1266  ptr = *iPtr;
1267  WaitingTaskHolder tmp(items.second);
1268  //set the exception early to prevent a beginLumi from running
1269  // we use a copy to keep t from resetting on doneWaiting call.
1270  tmp.doneWaiting(ptr);
1271  } else {
1272  try {
1274  if(looper_) {
1275  auto& lp = *(status->lumiPrincipal());
1276  EventSetup const& es = esp_->eventSetup();
1277  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1278  }
1279  }catch(...) {
1280  if(not ptr) {
1281  ptr = std::current_exception();
1282  }
1283  }
1284  }
1286  try {
1288  //release our hold on the IOV
1289  iovQueue_.resume();
1290  status->resumeGlobalLumiQueue();
1291  } catch(...) {
1292  if( not ptr) {
1293  ptr = std::current_exception();
1294  }
1295  }
1296  try {
1297  status.reset();
1298  } catch(...) {
1299  if( not ptr) {
1300  ptr = std::current_exception();
1301  }
1302  }
1303  //have to wait until reset is called since that could call endRun
1304  items.second.doneWaiting(ptr);
1305  });
1306 
1307  auto writeT = edm::make_waiting_task(tbb::task::allocate_root(), [this,status =iLumiStatus, task = WaitingTaskHolder(t)] (std::exception_ptr const* iExcept) mutable {
1308  if(iExcept) {
1309  task.doneWaiting(*iExcept);
1310  } else {
1311  //Only call writeLumi if beginLumi succeeded
1312  if(status->didGlobalBeginSucceed()) {
1314  }
1315  }
1316  });
1317  auto& lp = *(iLumiStatus->lumiPrincipal());
1318 
1319  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()),
1320  lp.beginTime());
1321 
1322 
1324  EventSetup const& es = esp_->eventSetup();
1325 
1326  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1327  *schedule_,
1328  lp,
1329  ts,
1330  es,
1331  serviceToken_,
1332  subProcesses_,
1333  iLumiStatus->cleaningUpAfterException());
1334  }
1335 
1337  unsigned int iStreamIndex,
1338  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1339 
1340  auto t =edm::make_waiting_task(tbb::task::allocate_root(), [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1341  std::exception_ptr ptr;
1342  if(iPtr) {
1343  ptr = *iPtr;
1344  }
1345  auto status =streamLumiStatus_[iStreamIndex];
1346  //reset status before releasing queue else get race condtion
1347  streamLumiStatus_[iStreamIndex].reset();
1349  streamQueues_[iStreamIndex].resume();
1350 
1351  //are we the last one?
1352  if( status->streamFinishedLumi()) {
1354  }
1355  iTask.doneWaiting(ptr);
1356  });
1357 
1358  edm::WaitingTaskHolder lumiDoneTask{t};
1359 
1360  iLumiStatus->setEndTime();
1361 
1362  if(iLumiStatus->didGlobalBeginSucceed()) {
1363  auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1364  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1365  lumiPrincipal.endTime());
1366  EventSetup const& es = esp_->eventSetup();
1367 
1368  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1369 
1371  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1372  *schedule_,iStreamIndex,
1373  lumiPrincipal,ts,es,
1374  serviceToken_,
1375  subProcesses_,cleaningUpAfterException);
1376  }
1377  }
1378 
1379 
1381  if(streamLumiActive_.load() > 0) {
1382  auto globalWaitTask = make_empty_waiting_task();
1383  globalWaitTask->increment_ref_count();
1384  {
1385  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1386  for(unsigned int i=0; i< preallocations_.numberOfStreams(); ++i) {
1387  if(streamLumiStatus_[i]) {
1388  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1389  }
1390  }
1391  }
1392  globalWaitTask->wait_for_all();
1393  if(globalWaitTask->exceptionPtr() != nullptr) {
1394  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1395  }
1396  }
1397  }
1398 
1399  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1402  << "EventProcessor::readRun\n"
1403  << "Illegal attempt to insert run into cache\n"
1404  << "Contact a Framework Developer\n";
1405  }
1406  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1407  {
1408  SendSourceTerminationSignalIfException sentry(actReg_.get());
1409  input_->readRun(*rp, *historyAppender_);
1410  sentry.completedSuccessfully();
1411  }
1412  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1413  principalCache_.insert(rp);
1414  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1415  }
1416 
1417  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1418  principalCache_.merge(input_->runAuxiliary(), preg());
1419  auto runPrincipal =principalCache_.runPrincipalPtr();
1420  {
1421  SendSourceTerminationSignalIfException sentry(actReg_.get());
1422  input_->readAndMergeRun(*runPrincipal);
1423  sentry.completedSuccessfully();
1424  }
1425  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1426  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1427  }
1428 
1432  << "EventProcessor::readLuminosityBlock\n"
1433  << "Illegal attempt to insert lumi into cache\n"
1434  << "Run is invalid\n"
1435  << "Contact a Framework Developer\n";
1436  }
1438  assert(lbp);
1439  lbp->setAux(*input_->luminosityBlockAuxiliary());
1440  {
1441  SendSourceTerminationSignalIfException sentry(actReg_.get());
1442  input_->readLuminosityBlock(*lbp, *historyAppender_);
1443  sentry.completedSuccessfully();
1444  }
1445  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1446  iStatus.lumiPrincipal() = std::move(lbp);
1447  }
1448 
1450  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1451  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1452  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) == input_->processHistoryRegistry().reducedProcessHistoryID(input_->luminosityBlockAuxiliary()->processHistoryID()));
1453  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1454  assert(lumiOK);
1455  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1456  {
1457  SendSourceTerminationSignalIfException sentry(actReg_.get());
1458  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1459  sentry.completedSuccessfully();
1460  }
1461  return input_->luminosityBlock();
1462  }
1463 
1465  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,phid,run,task](std::exception_ptr const* iExcept) mutable {
1466  if(iExcept) {
1467  task.doneWaiting(*iExcept);
1468  } else {
1470  for(auto&s : subProcesses_) {
1471  s.writeRunAsync(task,phid,run);
1472  }
1473  }
1474  });
1476  schedule_->writeRunAsync(WaitingTaskHolder(subsT), principalCache_.runPrincipal(phid, run), &processContext_, actReg_.get());
1477  }
1478 
1480  principalCache_.deleteRun(phid, run);
1481  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1482  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1483  }
1484 
1485  void EventProcessor::writeLumiAsync(WaitingTaskHolder task, std::shared_ptr<LuminosityBlockProcessingStatus> iStatus) {
1486  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,task, iStatus](std::exception_ptr const* iExcept) mutable {
1487  if(iExcept) {
1488  task.doneWaiting(*iExcept);
1489  } else {
1491  for(auto&s : subProcesses_) {
1492  s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1493  }
1494  }
1495  });
1497 
1498  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, *(iStatus->lumiPrincipal()), &processContext_, actReg_.get());
1499  }
1500 
1502  for(auto& s: subProcesses_) { s.deleteLumiFromCache(*iStatus.lumiPrincipal());}
1503  iStatus.lumiPrincipal()->clearPrincipal();
1504  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1505  }
1506 
1507  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1509  if(shouldWeStop()) {
1510  return false;
1511  }
1512 
1513  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1514  return false;
1515  }
1516 
1517  if(iStatus.wasEventProcessingStopped()) {
1518  return false;
1519  }
1520 
1522  try {
1523  //need to use lock in addition to the serial task queue because
1524  // of delayed provenance reading and reading data in response to
1525  // edm::Refs etc
1526  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1527 
1528  auto itemType = iStatus.continuingLumi()? InputSource::IsLumi : nextTransitionType();
1529  if(InputSource::IsLumi == itemType) {
1530  iStatus.haveContinuedLumi();
1531  while(itemType == InputSource::IsLumi and
1532  iStatus.lumiPrincipal()->run() == input_->run() and
1533  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1534  readAndMergeLumi(iStatus);
1535  itemType = nextTransitionType();
1536  }
1537  if(InputSource::IsLumi == itemType) {
1538  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1539  input_->luminosityBlockAuxiliary()->beginTime()));
1540  }
1541  }
1542  if(InputSource::IsEvent != itemType) {
1543  iStatus.stopProcessingEvents();
1544 
1545  //IsFile may continue processing the lumi and
1546  // looper_ can cause the input source to declare a new IsRun which is actually
1547  // just a continuation of the previous run
1548  if(InputSource::IsStop == itemType or
1549  InputSource::IsLumi == itemType or
1550  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1551  iStatus.endLumi();
1552  }
1553  return false;
1554  }
1555  readEvent(iStreamIndex);
1556  } catch (...) {
1557  bool expected =false;
1558  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1559  deferredExceptionPtr_ = std::current_exception();
1560  }
1561  return false;
1562  }
1563  return true;
1564  }
1565 
1567  unsigned int iStreamIndex)
1568  {
1569  sourceResourcesAcquirer_.serialQueueChain().push([this,iTask,iStreamIndex]() mutable {
1571  auto& status = streamLumiStatus_[iStreamIndex];
1572  try {
1573  if(readNextEventForStream(iStreamIndex, *status) ) {
1574  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex](std::exception_ptr const* iPtr) mutable {
1575  if(iPtr) {
1576  bool expected = false;
1577  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1578  deferredExceptionPtr_ = *iPtr;
1579  iTask.doneWaiting(*iPtr);
1580  }
1581  //the stream will stop now
1582  return;
1583  }
1584  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1585  });
1586 
1587  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1588  } else {
1589  //the stream will stop now
1590  if(status->isLumiEnding()) {
1591  if(lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1592  status->startNextLumi();
1593  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1594  }
1595  streamEndLumiAsync(std::move(iTask),iStreamIndex, status);
1596  } else {
1597  iTask.doneWaiting(std::exception_ptr{});
1598  }
1599  }
1600  } catch(...) {
1601  bool expected = false;
1602  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1603  auto e =std::current_exception();
1605  iTask.doneWaiting(e);
1606  }
1607  }
1608  });
1609  }
1610 
1611  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1612  //TODO this will have to become per stream
1613  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1614  StreamContext streamContext(event.streamID(), &processContext_);
1615 
1616  SendSourceTerminationSignalIfException sentry(actReg_.get());
1617  input_->readEvent(event, streamContext);
1618 
1619  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1620  sentry.completedSuccessfully();
1621 
1622  FDEBUG(1) << "\treadEvent\n";
1623  }
1624 
1626  unsigned int iStreamIndex) {
1627  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1628  processEventAsyncImpl(iHolder, iStreamIndex);
1629  }) );
1630  }
1631 
1633  unsigned int iStreamIndex) {
1634  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1635 
1638  if(rng.isAvailable()) {
1639  Event ev(*pep, ModuleDescription(), nullptr);
1640  rng->postEventRead(ev);
1641  }
1642 
1643  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1644  tbb::task::allocate_root(),
1645  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1646  {
1647 
1648  //NOTE: If we have a looper we only have one Stream
1649  if(looper_) {
1651  processEventWithLooper(*pep);
1652  }
1653 
1654  FDEBUG(1) << "\tprocessEvent\n";
1655  pep->clearEventPrincipal();
1656  if(iPtr) {
1657  iHolder.doneWaiting(*iPtr);
1658  } else {
1659  iHolder.doneWaiting(std::exception_ptr());
1660  }
1661  }
1662  )
1663  );
1664  WaitingTaskHolder afterProcessTask;
1665  if(subProcesses_.empty()) {
1666  afterProcessTask = std::move(finalizeEventTask);
1667  } else {
1668  //Need to run SubProcesses after schedule has finished
1669  // with the event
1670  afterProcessTask = WaitingTaskHolder(
1671  make_waiting_task(tbb::task::allocate_root(),
1672  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1673  {
1674  if(not iPtr) {
1675  //when run with 1 thread, we want to the order to be what
1676  // it was before. This requires reversing the order since
1677  // tasks are run last one in first one out
1678  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1679  subProcess.doEventAsync(finalizeEventTask,*pep);
1680  }
1681  } else {
1682  finalizeEventTask.doneWaiting(*iPtr);
1683  }
1684  })
1685  );
1686  }
1687 
1688  schedule_->processOneEventAsync(std::move(afterProcessTask),
1689  iStreamIndex,*pep, esp_->eventSetup(), serviceToken_);
1690 
1691  }
1692 
1694  bool randomAccess = input_->randomAccess();
1695  ProcessingController::ForwardState forwardState = input_->forwardState();
1696  ProcessingController::ReverseState reverseState = input_->reverseState();
1697  ProcessingController pc(forwardState, reverseState, randomAccess);
1698 
1700  do {
1701 
1702  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1703  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1704 
1705  bool succeeded = true;
1706  if(randomAccess) {
1708  input_->skipEvents(-2);
1709  }
1711  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1712  }
1713  }
1714  pc.setLastOperationSucceeded(succeeded);
1715  } while(!pc.lastOperationSucceeded());
1716  if(status != EDLooperBase::kContinue) {
1717  shouldWeStop_ = true;
1719  }
1720  }
1721 
1723  FDEBUG(1) << "\tshouldWeStop\n";
1724  if(shouldWeStop_) return true;
1725  if(!subProcesses_.empty()) {
1726  for(auto const& subProcess : subProcesses_) {
1727  if(subProcess.terminate()) {
1728  return true;
1729  }
1730  }
1731  return false;
1732  }
1733  return schedule_->terminate();
1734  }
1735 
1737  exceptionMessageFiles_ = message;
1738  }
1739 
1741  exceptionMessageRuns_ = message;
1742  }
1743 
1745  exceptionMessageLumis_ = message;
1746  }
1747 
1748  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1749  bool expected =false;
1750  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1751  deferredExceptionPtr_ = iException;
1752  return true;
1753  }
1754  return false;
1755  }
1756 }
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool ev
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
Definition: config.py:1
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:61
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:15
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
config
Definition: looper.py:288
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
ServiceToken serviceToken_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run)
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:45
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1189
int totalEvents() const
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< edm::ParameterSet > parameterSet() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:684
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:13
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31