CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
36 
38 
47 
52 
54 
66 
67 #include "MessageForSource.h"
68 #include "MessageForParent.h"
70 
71 #include "boost/range/adaptor/reversed.hpp"
72 
73 #include <cassert>
74 #include <exception>
75 #include <iomanip>
76 #include <iostream>
77 #include <utility>
78 #include <sstream>
79 
80 #include <sys/ipc.h>
81 #include <sys/msg.h>
82 
83 #include "tbb/task.h"
84 
85 //Used for CPU affinity
86 #ifndef __APPLE__
87 #include <sched.h>
88 #endif
89 
90 namespace {
91  //Sentry class to only send a signal if an
92  // exception occurs. An exception is identified
93  // by the destructor being called without first
94  // calling completedSuccessfully().
95  class SendSourceTerminationSignalIfException {
96  public:
97  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
98  reg_(iReg) {}
99  ~SendSourceTerminationSignalIfException() {
100  if(reg_) {
101  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
102  }
103  }
104  void completedSuccessfully() {
105  reg_ = nullptr;
106  }
107  private:
108  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
109  };
110 
111 }
112 
113 namespace edm {
114 
115  // ---------------------------------------------------------------
116  std::unique_ptr<InputSource>
118  CommonParams const& common,
119  std::shared_ptr<ProductRegistry> preg,
120  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122  std::shared_ptr<ActivityRegistry> areg,
123  std::shared_ptr<ProcessConfiguration const> processConfiguration,
124  PreallocationConfiguration const& allocations) {
125  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
126  if(main_input == nullptr) {
128  << "There must be exactly one source in the configuration.\n"
129  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
130  }
131 
132  std::string modtype(main_input->getParameter<std::string>("@module_type"));
133 
134  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
136  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
137  filler->fill(descriptions);
138 
139  try {
140  convertException::wrap([&]() {
141  descriptions.validate(*main_input, std::string("source"));
142  });
143  }
144  catch (cms::Exception & iException) {
145  std::ostringstream ost;
146  ost << "Validating configuration of input source of type " << modtype;
147  iException.addContext(ost.str());
148  throw;
149  }
150 
151  main_input->registerIt();
152 
153  // Fill in "ModuleDescription", in case the input source produces
154  // any EDProducts, which would be registered in the ProductRegistry.
155  // Also fill in the process history item for this process.
156  // There is no module label for the unnamed input source, so
157  // just use "source".
158  // Only the tracked parameters belong in the process configuration.
159  ModuleDescription md(main_input->id(),
160  main_input->getParameter<std::string>("@module_type"),
161  "source",
162  processConfiguration.get(),
163  ModuleDescription::getUniqueID());
164 
165  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
166  common.maxEventsInput_, common.maxLumisInput_,
167  common.maxSecondsUntilRampdown_, allocations);
168 
169  areg->preSourceConstructionSignal_(md);
170  std::unique_ptr<InputSource> input;
171  try {
172  //even if we have an exception, send the signal
173  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
174  convertException::wrap([&]() {
175  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
178  });
179  }
180  catch (cms::Exception& iException) {
181  std::ostringstream ost;
182  ost << "Constructing input source of type " << modtype;
183  iException.addContext(ost.str());
184  throw;
185  }
186  return input;
187  }
188 
189  // ---------------------------------------------------------------
190  std::shared_ptr<EDLooperBase>
193  ParameterSet& params) {
194  std::shared_ptr<EDLooperBase> vLooper;
195 
196  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
197 
198  if(loopers.empty()) {
199  return vLooper;
200  }
201 
202  assert(1 == loopers.size());
203 
204  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
205  itName != itNameEnd;
206  ++itName) {
207 
208  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
209  providerPSet->registerIt();
210  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
211  cp,
212  *providerPSet);
213  }
214  return vLooper;
215  }
216 
217  // ---------------------------------------------------------------
218  EventProcessor::EventProcessor(std::string const& config,
219  ServiceToken const& iToken,
221  std::vector<std::string> const& defaultServices,
222  std::vector<std::string> const& forcedServices) :
223  actReg_(),
224  preg_(),
225  branchIDListHelper_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
231  processConfiguration_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
237  deferredExceptionPtrIsSet_(false),
238  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
239  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  fileModeNoMerge_(false),
244  exceptionMessageFiles_(),
245  exceptionMessageRuns_(),
246  exceptionMessageLumis_(),
247  forceLooperToEnd_(false),
248  looperBeginJobRun_(false),
249  forceESCacheClearOnNewRun_(false),
250  eventSetupDataToExcludeFromPrefetching_() {
251  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
252  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
253  processDesc->addServices(defaultServices, forcedServices);
254  init(processDesc, iToken, iLegacy);
255  }
256 
258  std::vector<std::string> const& defaultServices,
259  std::vector<std::string> const& forcedServices) :
260  actReg_(),
261  preg_(),
263  serviceToken_(),
264  input_(),
265  espController_(new eventsetup::EventSetupsController),
266  esp_(),
267  act_table_(),
269  schedule_(),
270  subProcesses_(),
272  fb_(),
273  looper_(),
275  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
276  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
277  principalCache_(),
289  {
290  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
291  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
292  processDesc->addServices(defaultServices, forcedServices);
294  }
295 
296  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
297  ServiceToken const& token,
299  actReg_(),
300  preg_(),
302  serviceToken_(),
303  input_(),
304  espController_(new eventsetup::EventSetupsController),
305  esp_(),
306  act_table_(),
308  schedule_(),
309  subProcesses_(),
311  fb_(),
312  looper_(),
314  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
315  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
316  principalCache_(),
328  {
329  init(processDesc, token, legacy);
330  }
331 
332 
334  actReg_(),
335  preg_(),
337  serviceToken_(),
338  input_(),
339  espController_(new eventsetup::EventSetupsController),
340  esp_(),
341  act_table_(),
343  schedule_(),
344  subProcesses_(),
346  fb_(),
347  looper_(),
349  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
350  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
351  principalCache_(),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
374 
375  void
376  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
377  ServiceToken const& iToken,
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
395  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
396  // set in here if the parameters were not explicitly set.
397  validateTopLevelParameterSets(parameterSet.get());
398 
399  // Now set some parameters specific to the main process.
400  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
401  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
402  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
403  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
404  << fileMode << ".\n"
405  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
406  } else {
407  fileModeNoMerge_ = (fileMode == "NOMERGE");
408  }
409  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
410 
411  //threading
412  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
413 
414  // Even if numberOfThreads was set to zero in the Python configuration, the code
415  // in cmsRun.cpp should have reset it to something else.
416  assert(nThreads != 0);
417 
418  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if (nStreams == 0) {
420  nStreams = nThreads;
421  }
422  if(nThreads > 1) {
423  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
424  }
425  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
426  if (nConcurrentRuns != 1) {
427  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
428  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
429  }
430  unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
431  if (nConcurrentLumis == 0) {
432  nConcurrentLumis = nConcurrentRuns;
433  }
434 
435  //Check that relationships between threading parameters makes sense
436  /*
437  if(nThreads<nStreams) {
438  //bad
439  }
440  if(nConcurrentRuns>nStreams) {
441  //bad
442  }
443  if(nConcurrentRuns>nConcurrentLumis) {
444  //bad
445  }
446  */
447  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
448 
449  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
450 
451  // Now do general initialization
453 
454  //initialize the services
455  auto& serviceSets = processDesc->getServicesPSets();
456  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
457  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
458 
459  //make the services available
461 
462  if(nStreams>1) {
464  handler->willBeUsingThreads();
465  }
466 
467  // intialize miscellaneous items
468  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
469 
470  // intialize the event setup provider
471  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
472 
473  // initialize the looper, if any
474  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
475  if(looper_) {
476  looper_->setActionTable(items.act_table_.get());
477  looper_->attachTo(*items.actReg_);
478 
479  //For now loopers make us run only 1 transition at a time
480  nStreams=1;
481  nConcurrentLumis=1;
482  nConcurrentRuns=1;
483  }
484 
485  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
486 
487  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
488  streamQueues_.resize(nStreams);
489  streamLumiStatus_.resize(nStreams);
490 
491  // initialize the input source
492  input_ = makeInput(*parameterSet,
493  *common,
494  items.preg(),
495  items.branchIDListHelper(),
496  items.thinnedAssociationsHelper(),
497  items.actReg_,
498  items.processConfiguration(),
500 
501  // intialize the Schedule
502  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
503 
504  // set the data members
505  act_table_ = std::move(items.act_table_);
506  actReg_ = items.actReg_;
507  preg_ = items.preg();
508  branchIDListHelper_ = items.branchIDListHelper();
509  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
510  processConfiguration_ = items.processConfiguration();
512  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
513 
514  FDEBUG(2) << parameterSet << std::endl;
515 
517  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
518  // Reusable event principal
519  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
522  }
523 
524  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
525  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_,
526  historyAppender_.get(), index);
528  }
529 
530  // fill the subprocesses, if there are any
531  subProcesses_.reserve(subProcessVParameterSet.size());
532  for(auto& subProcessPSet : subProcessVParameterSet) {
533  subProcesses_.emplace_back(subProcessPSet,
534  *parameterSet,
535  preg(),
540  *actReg_,
541  token,
544  &processContext_);
545  }
546  }
547 
549  // Make the services available while everything is being deleted.
550  ServiceToken token = getToken();
551  ServiceRegistry::Operate op(token);
552 
553  // manually destroy all these thing that may need the services around
554  // propagate_const<T> has no reset() function
555  espController_ = nullptr;
556  esp_ = nullptr;
557  schedule_ = nullptr;
558  input_ = nullptr;
559  looper_ = nullptr;
560  actReg_ = nullptr;
561 
564  }
565 
566  void
568  if(beginJobCalled_) return;
569  beginJobCalled_=true;
570  bk::beginJob();
571 
572  // StateSentry toerror(this); // should we add this ?
573  //make the services available
575 
580  actReg_->preallocateSignal_(bounds);
581  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
583 
584  //NOTE: this may throw
586  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
587 
588  //NOTE: This implementation assumes 'Job' means one call
589  // the EventProcessor::run
590  // If it really means once per 'application' then this code will
591  // have to be changed.
592  // Also have to deal with case where have 'run' then new Module
593  // added and do 'run'
594  // again. In that case the newly added Module needs its 'beginJob'
595  // to be called.
596 
597  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
598  // For now we delay calling beginOfJob until first beginOfRun
599  //if(looper_) {
600  // looper_->beginOfJob(es);
601  //}
602  try {
603  convertException::wrap([&]() {
604  input_->doBeginJob();
605  });
606  }
607  catch(cms::Exception& ex) {
608  ex.addContext("Calling beginJob for the source");
609  throw;
610  }
611  schedule_->beginJob(*preg_);
612  // toerror.succeeded(); // should we add this?
613  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
614  actReg_->postBeginJobSignal_();
615 
616  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
617  schedule_->beginStream(i);
618  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
619  }
620  }
621 
622  void
624  // Collects exceptions, so we don't throw before all operations are performed.
625  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
626 
627  //make the services available
629 
630  //NOTE: this really should go elsewhere in the future
631  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
632  c.call([this,i](){this->schedule_->endStream(i);});
633  for(auto& subProcess : subProcesses_) {
634  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
635  }
636  }
637  auto actReg = actReg_.get();
638  c.call([actReg](){actReg->preEndJobSignal_();});
639  schedule_->endJob(c);
640  for(auto& subProcess : subProcesses_) {
641  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
642  }
643  c.call(std::bind(&InputSource::doEndJob, input_.get()));
644  if(looper_) {
645  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
646  }
647  c.call([actReg](){actReg->postEndJobSignal_();});
648  if(c.hasThrown()) {
649  c.rethrow();
650  }
651  }
652 
655  return serviceToken_;
656  }
657 
658  std::vector<ModuleDescription const*>
660  return schedule_->getAllModuleDescriptions();
661  }
662 
663  int
665  return schedule_->totalEvents();
666  }
667 
668  int
670  return schedule_->totalEventsPassed();
671  }
672 
673  int
675  return schedule_->totalEventsFailed();
676  }
677 
678  void
680  schedule_->enableEndPaths(active);
681  }
682 
683  bool
685  return schedule_->endPathsEnabled();
686  }
687 
688  void
690  schedule_->getTriggerReport(rep);
691  }
692 
693  void
695  schedule_->clearCounters();
696  }
697 
698  namespace {
699 #include "TransitionProcessors.icc"
700  }
701 
702  bool
704  bool returnValue = false;
705 
706  // Look for a shutdown signal
707  if(shutdown_flag.load(std::memory_order_acquire)) {
708  returnValue = true;
709  returnCode = epSignal;
710  }
711  return returnValue;
712  }
713 
716  if (deferredExceptionPtrIsSet_.load()) {
718  return InputSource::IsStop;
719  }
720 
721  SendSourceTerminationSignalIfException sentry(actReg_.get());
722  InputSource::ItemType itemType;
723  //For now, do nothing with InputSource::IsSynchronize
724  do {
725  itemType = input_->nextItemType();
726  } while( itemType == InputSource::IsSynchronize);
727 
728  lastSourceTransition_ = itemType;
729  sentry.completedSuccessfully();
730 
732 
733  if(checkForAsyncStopRequest(returnCode)) {
734  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
736  }
737 
738  return lastSourceTransition_;
739  }
740 
741  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
743  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
744  }
745 
748  return input_->luminosityBlock();
749  }
750 
753 
756  {
757  beginJob(); //make sure this was called
758 
759  // make the services available
761 
763  try {
764  FilesProcessor fp(fileModeNoMerge_);
765 
766  convertException::wrap([&]() {
767  bool firstTime = true;
768  do {
769  if(not firstTime) {
771  rewindInput();
772  } else {
773  firstTime = false;
774  }
775  startingNewLoop();
776 
777  auto trans = fp.processFiles(*this);
778 
779  fp.normalEnd();
780 
781  if(deferredExceptionPtrIsSet_.load()) {
782  std::rethrow_exception(deferredExceptionPtr_);
783  }
784  if(trans != InputSource::IsStop) {
785  //problem with the source
786  doErrorStuff();
787 
788  throw cms::Exception("BadTransition")
789  << "Unexpected transition change "
790  << trans;
791 
792  }
793  } while(not endOfLoop());
794  }); // convertException::wrap
795 
796  } // Try block
797  catch (cms::Exception & e) {
798  if (!exceptionMessageLumis_.empty()) {
800  if (e.alreadyPrinted()) {
801  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
802  }
803  }
804  if (!exceptionMessageRuns_.empty()) {
806  if (e.alreadyPrinted()) {
807  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
808  }
809  }
810  if (!exceptionMessageFiles_.empty()) {
812  if (e.alreadyPrinted()) {
813  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
814  }
815  }
816  throw;
817  }
818  }
819 
820  return returnCode;
821  }
822 
824  FDEBUG(1) << " \treadFile\n";
825  size_t size = preg_->size();
826  SendSourceTerminationSignalIfException sentry(actReg_.get());
827 
828  fb_ = input_->readFile();
829  if(size < preg_->size()) {
831  }
835  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
836  }
837  sentry.completedSuccessfully();
838  }
839 
840  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
841  if (fb_.get() != nullptr) {
842  SendSourceTerminationSignalIfException sentry(actReg_.get());
843  input_->closeFile(fb_.get(), cleaningUpAfterException);
844  sentry.completedSuccessfully();
845  }
846  FDEBUG(1) << "\tcloseInputFile\n";
847  }
848 
850  if (fb_.get() != nullptr) {
851  schedule_->openOutputFiles(*fb_);
852  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
853  }
854  FDEBUG(1) << "\topenOutputFiles\n";
855  }
856 
858  if (fb_.get() != nullptr) {
859  schedule_->closeOutputFiles();
860  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
861  }
862  FDEBUG(1) << "\tcloseOutputFiles\n";
863  }
864 
866  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
867  if (fb_.get() != nullptr) {
868  schedule_->respondToOpenInputFile(*fb_);
869  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
870  }
871  FDEBUG(1) << "\trespondToOpenInputFile\n";
872  }
873 
875  if (fb_.get() != nullptr) {
876  schedule_->respondToCloseInputFile(*fb_);
877  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
878  }
879  FDEBUG(1) << "\trespondToCloseInputFile\n";
880  }
881 
883  shouldWeStop_ = false;
884  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
885  // until after we've called beginOfJob
886  if(looper_ && looperBeginJobRun_) {
887  looper_->doStartingNewLoop();
888  }
889  FDEBUG(1) << "\tstartingNewLoop\n";
890  }
891 
893  if(looper_) {
894  ModuleChanger changer(schedule_.get(),preg_.get());
895  looper_->setModuleChanger(&changer);
896  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
897  looper_->setModuleChanger(nullptr);
898  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
899  else return false;
900  }
901  FDEBUG(1) << "\tendOfLoop\n";
902  return true;
903  }
904 
906  input_->repeat();
907  input_->rewind();
908  FDEBUG(1) << "\trewind\n";
909  }
910 
912  looper_->prepareForNextLoop(esp_.get());
913  FDEBUG(1) << "\tprepareForNextLoop\n";
914  }
915 
917  FDEBUG(1) << "\tshouldWeCloseOutput\n";
918  if(!subProcesses_.empty()) {
919  for(auto const& subProcess : subProcesses_) {
920  if(subProcess.shouldWeCloseOutput()) {
921  return true;
922  }
923  }
924  return false;
925  }
926  return schedule_->shouldWeCloseOutput();
927  }
928 
930  FDEBUG(1) << "\tdoErrorStuff\n";
931  LogError("StateMachine")
932  << "The EventProcessor state machine encountered an unexpected event\n"
933  << "and went to the error state\n"
934  << "Will attempt to terminate processing normally\n"
935  << "(IF using the looper the next loop will be attempted)\n"
936  << "This likely indicates a bug in an input module or corrupted input or both\n";
937  }
938 
939  void EventProcessor::beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded) {
940  globalBeginSucceeded = false;
941  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
942  {
943  SendSourceTerminationSignalIfException sentry(actReg_.get());
944 
945  input_->doBeginRun(runPrincipal, &processContext_);
946  sentry.completedSuccessfully();
947  }
948 
949  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
950  runPrincipal.beginTime());
952  espController_->forceCacheClear();
953  }
954  {
955  SendSourceTerminationSignalIfException sentry(actReg_.get());
956  espController_->eventSetupForInstance(ts);
957  sentry.completedSuccessfully();
958  }
959  EventSetup const& es = esp_->eventSetup();
960  if(looper_ && looperBeginJobRun_== false) {
961  looper_->copyInfo(ScheduleInfo(schedule_.get()));
962  looper_->beginOfJob(es);
963  looperBeginJobRun_ = true;
964  looper_->doStartingNewLoop();
965  }
966  {
968  auto globalWaitTask = make_empty_waiting_task();
969  globalWaitTask->increment_ref_count();
970  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
971  *schedule_,
972  runPrincipal,
973  ts,
974  es,
976  subProcesses_);
977  globalWaitTask->wait_for_all();
978  if(globalWaitTask->exceptionPtr() != nullptr) {
979  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
980  }
981  }
982  globalBeginSucceeded = true;
983  FDEBUG(1) << "\tbeginRun " << run << "\n";
984  if(looper_) {
985  looper_->doBeginRun(runPrincipal, es, &processContext_);
986  }
987  {
988  //To wait, the ref count has to be 1+#streams
989  auto streamLoopWaitTask = make_empty_waiting_task();
990  streamLoopWaitTask->increment_ref_count();
991 
993 
994  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
995  *schedule_,
997  runPrincipal,
998  ts,
999  es,
1000  serviceToken_,
1001  subProcesses_);
1002 
1003  streamLoopWaitTask->wait_for_all();
1004  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1005  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1006  }
1007  }
1008  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1009  if(looper_) {
1010  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1011  }
1012  }
1013 
1014  void EventProcessor::endUnfinishedRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1015  //If we skip empty runs, this would be called conditionally
1016  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1017 
1018  if(globalBeginSucceeded) {
1020  t->increment_ref_count();
1021  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run);
1022  t->wait_for_all();
1023  if(t->exceptionPtr()) {
1024  std::rethrow_exception(*t->exceptionPtr());
1025  }
1026  }
1027  deleteRunFromCache(phid, run);
1028  }
1029 
1030  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1031  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1032  runPrincipal.setEndTime(input_->timestamp());
1033 
1035  runPrincipal.endTime());
1036  {
1037  SendSourceTerminationSignalIfException sentry(actReg_.get());
1038  espController_->eventSetupForInstance(ts);
1039  sentry.completedSuccessfully();
1040  }
1041  EventSetup const& es = esp_->eventSetup();
1042  if(globalBeginSucceeded){
1043  //To wait, the ref count has to be 1+#streams
1044  auto streamLoopWaitTask = make_empty_waiting_task();
1045  streamLoopWaitTask->increment_ref_count();
1046 
1048 
1049  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1050  *schedule_,
1052  runPrincipal,
1053  ts,
1054  es,
1055  serviceToken_,
1056  subProcesses_,
1057  cleaningUpAfterException);
1058 
1059  streamLoopWaitTask->wait_for_all();
1060  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1061  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1062  }
1063  }
1064  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1065  if(looper_) {
1066  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1067  }
1068  {
1069  auto globalWaitTask = make_empty_waiting_task();
1070  globalWaitTask->increment_ref_count();
1071 
1073  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1074  *schedule_,
1075  runPrincipal,
1076  ts,
1077  es,
1078  serviceToken_,
1079  subProcesses_,
1080  cleaningUpAfterException);
1081  globalWaitTask->wait_for_all();
1082  if(globalWaitTask->exceptionPtr() != nullptr) {
1083  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1084  }
1085  }
1086  FDEBUG(1) << "\tendRun " << run << "\n";
1087  if(looper_) {
1088  looper_->doEndRun(runPrincipal, es, &processContext_);
1089  }
1090  }
1091 
1093  EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1094  auto waitTask = make_empty_waiting_task();
1095  waitTask->increment_ref_count();
1096 
1097  if(streamLumiActive_> 0) {
1099  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1100  } else {
1101  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1102  input_->luminosityBlockAuxiliary()->beginTime()),
1103  iRunResource,
1104  WaitingTaskHolder{waitTask.get()});
1105  }
1106  waitTask->wait_for_all();
1107 
1108  if(waitTask->exceptionPtr() != nullptr) {
1109  std::rethrow_exception(* (waitTask->exceptionPtr()) );
1110  }
1111  return lastTransitionType();
1112  }
1113 
1114  void
1116  std::shared_ptr<void> const& iRunResource, edm::WaitingTaskHolder iHolder) {
1117  if(iHolder.taskHasFailed()) { return; }
1118 
1119  auto status= std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource) ;
1120 
1121  auto lumiWork = [this, iHolder, iSync, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1122  if(iHolder.taskHasFailed()) { return; }
1123 
1124  status->setResumer(std::move(iResumer));
1125 
1126  sourceResourcesAcquirer_.serialQueueChain().push([this,iHolder,status]() mutable {
1127  //make the services available
1129 
1130  try {
1132 
1133  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1134  {
1135  SendSourceTerminationSignalIfException sentry(actReg_.get());
1136 
1137  input_->doBeginLumi(lumiPrincipal, &processContext_);
1138  sentry.completedSuccessfully();
1139  }
1140 
1142  if(rng.isAvailable()) {
1143  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1144  rng->preBeginLumi(lb);
1145  }
1146 
1147  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1148 
1149  //Task to start the stream beginLumis
1150  auto beginStreamsTask= make_waiting_task(tbb::task::allocate_root()
1151  ,[this, holder = iHolder, status, ts] (std::exception_ptr const* iPtr) mutable {
1152  if (iPtr) {
1153  holder.doneWaiting(*iPtr);
1154  } else {
1155 
1156  status->globalBeginDidSucceed();
1157  EventSetup const& es = esp_->eventSetup();
1158  if(looper_) {
1159  try {
1160  //make the services available
1162  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1163  }catch(...) {
1164  holder.doneWaiting(std::current_exception());
1165  return;
1166  }
1167  }
1169 
1170  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1171  streamQueues_[i].push([this,i,status,holder,ts,&es] () {
1172  streamQueues_[i].pause();
1173 
1174  auto eventTask = edm::make_waiting_task(tbb::task::allocate_root(),
1175  [this,i,h = holder](std::exception_ptr const* iPtr) mutable
1176  {
1177  if(iPtr) {
1178  h.doneWaiting(*iPtr);
1179  } else {
1181  }
1182  });
1183  auto& event = principalCache_.eventPrincipal(i);
1186  auto lp = status->lumiPrincipal();
1187  event.setLuminosityBlockPrincipal(lp.get());
1188  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1189  *schedule_,i,*lp,ts,es,
1191  });
1192  }
1193  }
1194  });
1195 
1196  //task to start the global begin lumi
1197  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1198  EventSetup const& es = esp_->eventSetup();
1199  {
1201  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1202  *schedule_,
1203  *(status->lumiPrincipal()),
1204  ts,
1205  es,
1206  serviceToken_,
1207  subProcesses_);
1208  }
1209  } catch(...) {
1210  iHolder.doneWaiting(std::current_exception());
1211  }
1212  });
1213  };
1214 
1215  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1216  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1217  if(espController_->isWithinValidityInterval(iSync)) {
1218  iovQueue_.pause();
1219  lumiQueue_->pushAndPause(std::move(lumiWork));
1220  } else {
1221  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1222  iovQueue_.push([this,iHolder,lumiWork,iSync]() mutable {
1223  try {
1224  SendSourceTerminationSignalIfException sentry(actReg_.get());
1225  espController_->eventSetupForInstance(iSync);
1226  sentry.completedSuccessfully();
1227  } catch(...) {
1228  iHolder.doneWaiting(std::current_exception());
1229  return;
1230  }
1231  iovQueue_.pause();
1232  lumiQueue_->pushAndPause(std::move(lumiWork));
1233  });
1234  }
1235  }
1236 
1237  void
1239  {
1240  //all streams are sharing the same status at the moment
1241  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1242  status->needToContinueLumi();
1243  status->startProcessingEvents();
1244  }
1245 
1246  unsigned int streamIndex = 0;
1247  for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
1248  tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
1249  [this,streamIndex,h = iHolder](){
1250  handleNextEventForStreamAsync(std::move(h), streamIndex);
1251  }) );
1252 
1253  }
1254  tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
1256  }) );
1257  }
1258 
1259  void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1260  auto t = edm::make_waiting_task(tbb::task::allocate_root(), [t = std::move(iTask), status = iLumiStatus, this] (std::exception_ptr const* iPtr) mutable {
1261  std::exception_ptr ptr;
1262  if(iPtr) {
1263  ptr = *iPtr;
1265  //set the exception early to prevent a beginLumi from running
1266  // we use a copy to keep t from resetting on doneWaiting call.
1267  tmp.doneWaiting(ptr);
1268  } else {
1269  try {
1271  if(looper_) {
1272  auto& lp = *(status->lumiPrincipal());
1273  EventSetup const& es = esp_->eventSetup();
1274  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1275  }
1276  }catch(...) {
1277  if(not ptr) {
1278  ptr = std::current_exception();
1279  }
1280  }
1281  }
1283  //release our hold on the IOV
1284  iovQueue_.resume();
1285  status->resumeGlobalLumiQueue();
1286  try {
1287  status.reset();
1288  } catch(...) {
1289  if( not ptr) {
1290  ptr = std::current_exception();
1291  }
1292  }
1293  //have to wait until reset is called since that could call endRun
1294  t.doneWaiting(ptr);
1295  });
1296 
1297  auto writeT = edm::make_waiting_task(tbb::task::allocate_root(), [this,status =iLumiStatus, task = WaitingTaskHolder(t)] (std::exception_ptr const* iExcept) mutable {
1298  if(iExcept) {
1299  task.doneWaiting(*iExcept);
1300  } else {
1301  //Only call writeLumi if beginLumi succeeded
1302  if(status->didGlobalBeginSucceed()) {
1304  }
1305  }
1306  });
1307  auto& lp = *(iLumiStatus->lumiPrincipal());
1308 
1309  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()),
1310  lp.beginTime());
1311 
1312 
1314  EventSetup const& es = esp_->eventSetup();
1315 
1316  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1317  *schedule_,
1318  lp,
1319  ts,
1320  es,
1321  serviceToken_,
1322  subProcesses_,
1323  iLumiStatus->cleaningUpAfterException());
1324  }
1325 
1327  unsigned int iStreamIndex,
1328  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1329 
1330  auto t =edm::make_waiting_task(tbb::task::allocate_root(), [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1331  std::exception_ptr ptr;
1332  if(iPtr) {
1333  ptr = *iPtr;
1334  }
1335  auto status =streamLumiStatus_[iStreamIndex];
1336  //reset status before releasing queue else get race condtion
1337  streamLumiStatus_[iStreamIndex].reset();
1339  streamQueues_[iStreamIndex].resume();
1340 
1341  //are we the last one?
1342  if( status->streamFinishedLumi()) {
1344  }
1345  iTask.doneWaiting(ptr);
1346  });
1347 
1348  edm::WaitingTaskHolder lumiDoneTask{t};
1349 
1350  iLumiStatus->setEndTime();
1351 
1352  if(iLumiStatus->didGlobalBeginSucceed()) {
1353  auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1354  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1355  lumiPrincipal.endTime());
1356  EventSetup const& es = esp_->eventSetup();
1357 
1358  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1359 
1361  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1362  *schedule_,iStreamIndex,
1363  lumiPrincipal,ts,es,
1364  serviceToken_,
1365  subProcesses_,cleaningUpAfterException);
1366  }
1367  }
1368 
1369 
1371  if(streamLumiActive_.load() > 0) {
1372  auto globalWaitTask = make_empty_waiting_task();
1373  globalWaitTask->increment_ref_count();
1374  {
1375  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1376  for(unsigned int i=0; i< preallocations_.numberOfStreams(); ++i) {
1377  if(streamLumiStatus_[i]) {
1378  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1379  }
1380  }
1381  }
1382  globalWaitTask->wait_for_all();
1383  if(globalWaitTask->exceptionPtr() != nullptr) {
1384  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1385  }
1386  }
1387  }
1388 
1389  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1392  << "EventProcessor::readRun\n"
1393  << "Illegal attempt to insert run into cache\n"
1394  << "Contact a Framework Developer\n";
1395  }
1396  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1397  {
1398  SendSourceTerminationSignalIfException sentry(actReg_.get());
1399  input_->readRun(*rp, *historyAppender_);
1400  sentry.completedSuccessfully();
1401  }
1402  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1403  principalCache_.insert(rp);
1404  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1405  }
1406 
1407  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1408  principalCache_.merge(input_->runAuxiliary(), preg());
1409  auto runPrincipal =principalCache_.runPrincipalPtr();
1410  {
1411  SendSourceTerminationSignalIfException sentry(actReg_.get());
1412  input_->readAndMergeRun(*runPrincipal);
1413  sentry.completedSuccessfully();
1414  }
1415  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1416  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1417  }
1418 
1422  << "EventProcessor::readLuminosityBlock\n"
1423  << "Illegal attempt to insert lumi into cache\n"
1424  << "Run is invalid\n"
1425  << "Contact a Framework Developer\n";
1426  }
1428  assert(lbp);
1429  lbp->setAux(*input_->luminosityBlockAuxiliary());
1430  {
1431  SendSourceTerminationSignalIfException sentry(actReg_.get());
1432  input_->readLuminosityBlock(*lbp, *historyAppender_);
1433  sentry.completedSuccessfully();
1434  }
1435  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1436  iStatus.lumiPrincipal() = std::move(lbp);
1437  }
1438 
1440  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1441  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1442  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) == input_->processHistoryRegistry().reducedProcessHistoryID(input_->luminosityBlockAuxiliary()->processHistoryID()));
1443  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1444  assert(lumiOK);
1445  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1446  {
1447  SendSourceTerminationSignalIfException sentry(actReg_.get());
1448  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1449  sentry.completedSuccessfully();
1450  }
1451  return input_->luminosityBlock();
1452  }
1453 
1455  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,phid,run,task](std::exception_ptr const* iExcept) mutable {
1456  if(iExcept) {
1457  task.doneWaiting(*iExcept);
1458  } else {
1460  for(auto&s : subProcesses_) {
1461  s.writeRunAsync(task,phid,run);
1462  }
1463  }
1464  });
1466  schedule_->writeRunAsync(WaitingTaskHolder(subsT), principalCache_.runPrincipal(phid, run), &processContext_, actReg_.get());
1467  }
1468 
1470  principalCache_.deleteRun(phid, run);
1471  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1472  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1473  }
1474 
1475  void EventProcessor::writeLumiAsync(WaitingTaskHolder task, std::shared_ptr<LuminosityBlockProcessingStatus> iStatus) {
1476  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,task, iStatus](std::exception_ptr const* iExcept) mutable {
1477  if(iExcept) {
1478  task.doneWaiting(*iExcept);
1479  } else {
1481  for(auto&s : subProcesses_) {
1482  s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1483  }
1484  }
1485  });
1487 
1488  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, *(iStatus->lumiPrincipal()), &processContext_, actReg_.get());
1489  }
1490 
1492  for(auto& s: subProcesses_) { s.deleteLumiFromCache(*iStatus.lumiPrincipal());}
1493  iStatus.lumiPrincipal()->clearPrincipal();
1494  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1495  }
1496 
1497  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1499  if(shouldWeStop()) {
1500  return false;
1501  }
1502 
1503  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1504  return false;
1505  }
1506 
1507  if(iStatus.wasEventProcessingStopped()) {
1508  return false;
1509  }
1510 
1512  try {
1513  //need to use lock in addition to the serial task queue because
1514  // of delayed provenance reading and reading data in response to
1515  // edm::Refs etc
1516  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1517 
1518  auto itemType = iStatus.continuingLumi()? InputSource::IsLumi : nextTransitionType();
1519  if(InputSource::IsLumi == itemType) {
1520  iStatus.haveContinuedLumi();
1521  while(itemType == InputSource::IsLumi and
1522  iStatus.lumiPrincipal()->run() == input_->run() and
1523  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1524  readAndMergeLumi(iStatus);
1525  itemType = nextTransitionType();
1526  }
1527  if(InputSource::IsLumi == itemType) {
1528  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1529  input_->luminosityBlockAuxiliary()->beginTime()));
1530  }
1531  }
1532  if(InputSource::IsEvent != itemType) {
1533  iStatus.stopProcessingEvents();
1534 
1535  //IsFile may continue processing the lumi and
1536  // looper_ can cause the input source to declare a new IsRun which is actually
1537  // just a continuation of the previous run
1538  if(InputSource::IsStop == itemType or
1539  InputSource::IsLumi == itemType or
1540  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1541  iStatus.endLumi();
1542  }
1543  return false;
1544  }
1545  readEvent(iStreamIndex);
1546  } catch (...) {
1547  bool expected =false;
1548  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1549  deferredExceptionPtr_ = std::current_exception();
1550  }
1551  return false;
1552  }
1553  return true;
1554  }
1555 
1557  unsigned int iStreamIndex)
1558  {
1559  sourceResourcesAcquirer_.serialQueueChain().push([this,iTask,iStreamIndex]() mutable {
1561  auto& status = streamLumiStatus_[iStreamIndex];
1562  try {
1563  if(readNextEventForStream(iStreamIndex, *status) ) {
1564  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex](std::exception_ptr const* iPtr) mutable {
1565  if(iPtr) {
1566  bool expected = false;
1567  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1568  deferredExceptionPtr_ = *iPtr;
1569  iTask.doneWaiting(*iPtr);
1570  }
1571  //the stream will stop now
1572  return;
1573  }
1574  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1575  });
1576 
1577  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1578  } else {
1579  //the stream will stop now
1580  if(status->isLumiEnding()) {
1581  if(lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1582  status->startNextLumi();
1583  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1584  }
1585  streamEndLumiAsync(std::move(iTask),iStreamIndex, status);
1586  } else {
1587  iTask.doneWaiting(std::exception_ptr{});
1588  }
1589  }
1590  } catch(...) {
1591  bool expected = false;
1592  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1593  auto e =std::current_exception();
1595  iTask.doneWaiting(e);
1596  }
1597  }
1598  });
1599  }
1600 
1601  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1602  //TODO this will have to become per stream
1603  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1604  StreamContext streamContext(event.streamID(), &processContext_);
1605 
1606  SendSourceTerminationSignalIfException sentry(actReg_.get());
1607  input_->readEvent(event, streamContext);
1608 
1609  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1610  sentry.completedSuccessfully();
1611 
1612  FDEBUG(1) << "\treadEvent\n";
1613  }
1614 
1616  unsigned int iStreamIndex) {
1617  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1618  processEventAsyncImpl(iHolder, iStreamIndex);
1619  }) );
1620  }
1621 
1623  unsigned int iStreamIndex) {
1624  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1625 
1628  if(rng.isAvailable()) {
1629  Event ev(*pep, ModuleDescription(), nullptr);
1630  rng->postEventRead(ev);
1631  }
1632 
1633  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1634  tbb::task::allocate_root(),
1635  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1636  {
1637 
1638  //NOTE: If we have a looper we only have one Stream
1639  if(looper_) {
1641  processEventWithLooper(*pep);
1642  }
1643 
1644  FDEBUG(1) << "\tprocessEvent\n";
1645  pep->clearEventPrincipal();
1646  if(iPtr) {
1647  iHolder.doneWaiting(*iPtr);
1648  } else {
1649  iHolder.doneWaiting(std::exception_ptr());
1650  }
1651  }
1652  )
1653  );
1654  WaitingTaskHolder afterProcessTask;
1655  if(subProcesses_.empty()) {
1656  afterProcessTask = std::move(finalizeEventTask);
1657  } else {
1658  //Need to run SubProcesses after schedule has finished
1659  // with the event
1660  afterProcessTask = WaitingTaskHolder(
1661  make_waiting_task(tbb::task::allocate_root(),
1662  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1663  {
1664  if(not iPtr) {
1665  //when run with 1 thread, we want to the order to be what
1666  // it was before. This requires reversing the order since
1667  // tasks are run last one in first one out
1668  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1669  subProcess.doEventAsync(finalizeEventTask,*pep);
1670  }
1671  } else {
1672  finalizeEventTask.doneWaiting(*iPtr);
1673  }
1674  })
1675  );
1676  }
1677 
1678  schedule_->processOneEventAsync(std::move(afterProcessTask),
1679  iStreamIndex,*pep, esp_->eventSetup(), serviceToken_);
1680 
1681  }
1682 
1684  bool randomAccess = input_->randomAccess();
1685  ProcessingController::ForwardState forwardState = input_->forwardState();
1686  ProcessingController::ReverseState reverseState = input_->reverseState();
1687  ProcessingController pc(forwardState, reverseState, randomAccess);
1688 
1690  do {
1691 
1692  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1693  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1694 
1695  bool succeeded = true;
1696  if(randomAccess) {
1698  input_->skipEvents(-2);
1699  }
1701  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1702  }
1703  }
1704  pc.setLastOperationSucceeded(succeeded);
1705  } while(!pc.lastOperationSucceeded());
1706  if(status != EDLooperBase::kContinue) {
1707  shouldWeStop_ = true;
1709  }
1710  }
1711 
1713  FDEBUG(1) << "\tshouldWeStop\n";
1714  if(shouldWeStop_) return true;
1715  if(!subProcesses_.empty()) {
1716  for(auto const& subProcess : subProcesses_) {
1717  if(subProcess.terminate()) {
1718  return true;
1719  }
1720  }
1721  return false;
1722  }
1723  return schedule_->terminate();
1724  }
1725 
1728  }
1729 
1732  }
1733 
1736  }
1737 
1738  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1739  bool expected =false;
1740  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1741  deferredExceptionPtr_ = iException;
1742  return true;
1743  }
1744  return false;
1745  }
1746 }
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool ev
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
Definition: config.py:1
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:61
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:15
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
config
Definition: looper.py:287
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
ServiceToken serviceToken_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run)
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:45
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
int totalEvents() const
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< edm::ParameterSet > parameterSet() const
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:684
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:13
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31