CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
37 
39 
47 
52 
54 
66 
67 #include "MessageForSource.h"
68 #include "MessageForParent.h"
70 
71 #include "boost/range/adaptor/reversed.hpp"
72 
73 #include <cassert>
74 #include <exception>
75 #include <iomanip>
76 #include <iostream>
77 #include <utility>
78 #include <sstream>
79 
80 #include <sys/ipc.h>
81 #include <sys/msg.h>
82 
83 #include "tbb/task.h"
84 
85 //Used for CPU affinity
86 #ifndef __APPLE__
87 #include <sched.h>
88 #endif
89 
90 namespace {
91  //Sentry class to only send a signal if an
92  // exception occurs. An exception is identified
93  // by the destructor being called without first
94  // calling completedSuccessfully().
95  class SendSourceTerminationSignalIfException {
96  public:
97  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
98  reg_(iReg) {}
99  ~SendSourceTerminationSignalIfException() {
100  if(reg_) {
101  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
102  }
103  }
104  void completedSuccessfully() {
105  reg_ = nullptr;
106  }
107  private:
108  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
109  };
110 
111 }
112 
113 namespace edm {
114 
115  // ---------------------------------------------------------------
116  std::unique_ptr<InputSource>
118  CommonParams const& common,
119  std::shared_ptr<ProductRegistry> preg,
120  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122  std::shared_ptr<ActivityRegistry> areg,
123  std::shared_ptr<ProcessConfiguration const> processConfiguration,
124  PreallocationConfiguration const& allocations) {
125  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
126  if(main_input == nullptr) {
128  << "There must be exactly one source in the configuration.\n"
129  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
130  }
131 
132  std::string modtype(main_input->getParameter<std::string>("@module_type"));
133 
134  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
136  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
137  filler->fill(descriptions);
138 
139  try {
140  convertException::wrap([&]() {
141  descriptions.validate(*main_input, std::string("source"));
142  });
143  }
144  catch (cms::Exception & iException) {
145  std::ostringstream ost;
146  ost << "Validating configuration of input source of type " << modtype;
147  iException.addContext(ost.str());
148  throw;
149  }
150 
151  main_input->registerIt();
152 
153  // Fill in "ModuleDescription", in case the input source produces
154  // any EDProducts, which would be registered in the ProductRegistry.
155  // Also fill in the process history item for this process.
156  // There is no module label for the unnamed input source, so
157  // just use "source".
158  // Only the tracked parameters belong in the process configuration.
159  ModuleDescription md(main_input->id(),
160  main_input->getParameter<std::string>("@module_type"),
161  "source",
162  processConfiguration.get(),
163  ModuleDescription::getUniqueID());
164 
165  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
166  common.maxEventsInput_, common.maxLumisInput_,
167  common.maxSecondsUntilRampdown_, allocations);
168 
169  areg->preSourceConstructionSignal_(md);
170  std::unique_ptr<InputSource> input;
171  try {
172  //even if we have an exception, send the signal
173  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
174  convertException::wrap([&]() {
175  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
178  });
179  }
180  catch (cms::Exception& iException) {
181  std::ostringstream ost;
182  ost << "Constructing input source of type " << modtype;
183  iException.addContext(ost.str());
184  throw;
185  }
186  return input;
187  }
188 
189  // ---------------------------------------------------------------
190  std::shared_ptr<EDLooperBase>
193  ParameterSet& params) {
194  std::shared_ptr<EDLooperBase> vLooper;
195 
196  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
197 
198  if(loopers.empty()) {
199  return vLooper;
200  }
201 
202  assert(1 == loopers.size());
203 
204  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
205  itName != itNameEnd;
206  ++itName) {
207 
208  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
209  providerPSet->registerIt();
210  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
211  cp,
212  *providerPSet);
213  }
214  return vLooper;
215  }
216 
217  // ---------------------------------------------------------------
218  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,//std::string const& config,
219  ServiceToken const& iToken,
221  std::vector<std::string> const& defaultServices,
222  std::vector<std::string> const& forcedServices) :
223  actReg_(),
224  preg_(),
225  branchIDListHelper_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
231  processConfiguration_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
237  deferredExceptionPtrIsSet_(false),
238  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
239  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  fileModeNoMerge_(false),
244  exceptionMessageFiles_(),
245  exceptionMessageRuns_(),
246  exceptionMessageLumis_(),
247  forceLooperToEnd_(false),
248  looperBeginJobRun_(false),
249  forceESCacheClearOnNewRun_(false),
250  eventSetupDataToExcludeFromPrefetching_() {
251  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
252  processDesc->addServices(defaultServices, forcedServices);
253  init(processDesc, iToken, iLegacy);
254  }
255 
256  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,//std::string const& config,
257  std::vector<std::string> const& defaultServices,
258  std::vector<std::string> const& forcedServices) :
259  actReg_(),
260  preg_(),
262  serviceToken_(),
263  input_(),
264  espController_(new eventsetup::EventSetupsController),
265  esp_(),
266  act_table_(),
268  schedule_(),
269  subProcesses_(),
271  fb_(),
272  looper_(),
274  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
275  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
276  principalCache_(),
288  {
289  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
290  processDesc->addServices(defaultServices, forcedServices);
292  }
293 
294  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
295  ServiceToken const& token,
297  actReg_(),
298  preg_(),
300  serviceToken_(),
301  input_(),
302  espController_(new eventsetup::EventSetupsController),
303  esp_(),
304  act_table_(),
306  schedule_(),
307  subProcesses_(),
309  fb_(),
310  looper_(),
312  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
313  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
314  principalCache_(),
326  {
327  init(processDesc, token, legacy);
328  }
329 
330  void
331  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
332  ServiceToken const& iToken,
334 
335  //std::cerr << processDesc->dump() << std::endl;
336 
337  // register the empty parentage vector , once and for all
339 
340  // register the empty parameter set, once and for all.
342 
343  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
344 
345  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
346  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
347  bool const hasSubProcesses = !subProcessVParameterSet.empty();
348 
349  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
350  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
351  // set in here if the parameters were not explicitly set.
352  validateTopLevelParameterSets(parameterSet.get());
353 
354  // Now set some parameters specific to the main process.
355  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
356  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
357  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
358  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
359  << fileMode << ".\n"
360  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361  } else {
362  fileModeNoMerge_ = (fileMode == "NOMERGE");
363  }
364  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
365 
366  //threading
367  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
368 
369  // Even if numberOfThreads was set to zero in the Python configuration, the code
370  // in cmsRun.cpp should have reset it to something else.
371  assert(nThreads != 0);
372 
373  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
374  if (nStreams == 0) {
375  nStreams = nThreads;
376  }
377  if (nThreads > 1 or nStreams > 1) {
378  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
379  }
380  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
381  if (nConcurrentRuns != 1) {
382  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
383  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
384  }
385  unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
386  if (nConcurrentLumis == 0) {
387  nConcurrentLumis = nConcurrentRuns;
388  }
389 
390  //Check that relationships between threading parameters makes sense
391  /*
392  if(nThreads<nStreams) {
393  //bad
394  }
395  if(nConcurrentRuns>nStreams) {
396  //bad
397  }
398  if(nConcurrentRuns>nConcurrentLumis) {
399  //bad
400  }
401  */
402  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
403 
404  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
405 
406  // Now do general initialization
408 
409  //initialize the services
410  auto& serviceSets = processDesc->getServicesPSets();
411  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
412  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
413 
414  //make the services available
416 
417  if(nStreams>1) {
419  handler->willBeUsingThreads();
420  }
421 
422  // intialize miscellaneous items
423  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
424 
425  // intialize the event setup provider
426  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
427 
428  // initialize the looper, if any
429  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
430  if(looper_) {
431  looper_->setActionTable(items.act_table_.get());
432  looper_->attachTo(*items.actReg_);
433 
434  //For now loopers make us run only 1 transition at a time
435  nStreams=1;
436  nConcurrentLumis=1;
437  nConcurrentRuns=1;
438  }
439 
440  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
441 
442  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
443  streamQueues_.resize(nStreams);
444  streamLumiStatus_.resize(nStreams);
445 
446  // initialize the input source
447  input_ = makeInput(*parameterSet,
448  *common,
449  items.preg(),
450  items.branchIDListHelper(),
451  items.thinnedAssociationsHelper(),
452  items.actReg_,
453  items.processConfiguration(),
455 
456  // intialize the Schedule
457  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
458 
459  // set the data members
460  act_table_ = std::move(items.act_table_);
461  actReg_ = items.actReg_;
462  preg_ = items.preg();
464  branchIDListHelper_ = items.branchIDListHelper();
465  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
466  processConfiguration_ = items.processConfiguration();
468  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
469 
470  FDEBUG(2) << parameterSet << std::endl;
471 
473  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
474  // Reusable event principal
475  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
478  }
479 
480  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
481  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_,
482  historyAppender_.get(), index);
484  }
485 
486  // fill the subprocesses, if there are any
487  subProcesses_.reserve(subProcessVParameterSet.size());
488  for(auto& subProcessPSet : subProcessVParameterSet) {
489  subProcesses_.emplace_back(subProcessPSet,
490  *parameterSet,
491  preg(),
496  *actReg_,
497  token,
500  &processContext_);
501  }
502  }
503 
505  // Make the services available while everything is being deleted.
506  ServiceToken token = getToken();
507  ServiceRegistry::Operate op(token);
508 
509  // manually destroy all these thing that may need the services around
510  // propagate_const<T> has no reset() function
511  espController_ = nullptr;
512  esp_ = nullptr;
513  schedule_ = nullptr;
514  input_ = nullptr;
515  looper_ = nullptr;
516  actReg_ = nullptr;
517 
520  }
521 
522  void
524  if(beginJobCalled_) return;
525  beginJobCalled_=true;
526  bk::beginJob();
527 
528  // StateSentry toerror(this); // should we add this ?
529  //make the services available
531 
536  actReg_->preallocateSignal_(bounds);
537  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
539 
540  //NOTE: this may throw
542  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
543 
544  //NOTE: This implementation assumes 'Job' means one call
545  // the EventProcessor::run
546  // If it really means once per 'application' then this code will
547  // have to be changed.
548  // Also have to deal with case where have 'run' then new Module
549  // added and do 'run'
550  // again. In that case the newly added Module needs its 'beginJob'
551  // to be called.
552 
553  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
554  // For now we delay calling beginOfJob until first beginOfRun
555  //if(looper_) {
556  // looper_->beginOfJob(es);
557  //}
558  try {
559  convertException::wrap([&]() {
560  input_->doBeginJob();
561  });
562  }
563  catch(cms::Exception& ex) {
564  ex.addContext("Calling beginJob for the source");
565  throw;
566  }
567  schedule_->beginJob(*preg_);
568  // toerror.succeeded(); // should we add this?
569  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
570  actReg_->postBeginJobSignal_();
571 
572  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
573  schedule_->beginStream(i);
574  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
575  }
576  }
577 
578  void
580  // Collects exceptions, so we don't throw before all operations are performed.
581  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
582 
583  //make the services available
585 
586  //NOTE: this really should go elsewhere in the future
587  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
588  c.call([this,i](){this->schedule_->endStream(i);});
589  for(auto& subProcess : subProcesses_) {
590  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
591  }
592  }
593  auto actReg = actReg_.get();
594  c.call([actReg](){actReg->preEndJobSignal_();});
595  schedule_->endJob(c);
596  for(auto& subProcess : subProcesses_) {
597  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
598  }
599  c.call(std::bind(&InputSource::doEndJob, input_.get()));
600  if(looper_) {
601  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
602  }
603  c.call([actReg](){actReg->postEndJobSignal_();});
604  if(c.hasThrown()) {
605  c.rethrow();
606  }
607  }
608 
611  return serviceToken_;
612  }
613 
614  std::vector<ModuleDescription const*>
616  return schedule_->getAllModuleDescriptions();
617  }
618 
619  int
621  return schedule_->totalEvents();
622  }
623 
624  int
626  return schedule_->totalEventsPassed();
627  }
628 
629  int
631  return schedule_->totalEventsFailed();
632  }
633 
634  void
636  schedule_->enableEndPaths(active);
637  }
638 
639  bool
641  return schedule_->endPathsEnabled();
642  }
643 
644  void
646  schedule_->getTriggerReport(rep);
647  }
648 
649  void
651  schedule_->clearCounters();
652  }
653 
654  namespace {
655 #include "TransitionProcessors.icc"
656  }
657 
658  bool
660  bool returnValue = false;
661 
662  // Look for a shutdown signal
663  if(shutdown_flag.load(std::memory_order_acquire)) {
664  returnValue = true;
665  returnCode = epSignal;
666  }
667  return returnValue;
668  }
669 
672  if (deferredExceptionPtrIsSet_.load()) {
674  return InputSource::IsStop;
675  }
676 
677  SendSourceTerminationSignalIfException sentry(actReg_.get());
678  InputSource::ItemType itemType;
679  //For now, do nothing with InputSource::IsSynchronize
680  do {
681  itemType = input_->nextItemType();
682  } while( itemType == InputSource::IsSynchronize);
683 
684  lastSourceTransition_ = itemType;
685  sentry.completedSuccessfully();
686 
688 
689  if(checkForAsyncStopRequest(returnCode)) {
690  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
692  }
693 
694  return lastSourceTransition_;
695  }
696 
697  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
699  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
700  }
701 
704  return input_->luminosityBlock();
705  }
706 
709 
712  {
713  beginJob(); //make sure this was called
714 
715  // make the services available
717 
719  try {
720  FilesProcessor fp(fileModeNoMerge_);
721 
722  convertException::wrap([&]() {
723  bool firstTime = true;
724  do {
725  if(not firstTime) {
727  rewindInput();
728  } else {
729  firstTime = false;
730  }
731  startingNewLoop();
732 
733  auto trans = fp.processFiles(*this);
734 
735  fp.normalEnd();
736 
737  if(deferredExceptionPtrIsSet_.load()) {
738  std::rethrow_exception(deferredExceptionPtr_);
739  }
740  if(trans != InputSource::IsStop) {
741  //problem with the source
742  doErrorStuff();
743 
744  throw cms::Exception("BadTransition")
745  << "Unexpected transition change "
746  << trans;
747 
748  }
749  } while(not endOfLoop());
750  }); // convertException::wrap
751 
752  } // Try block
753  catch (cms::Exception & e) {
754  if (!exceptionMessageLumis_.empty()) {
756  if (e.alreadyPrinted()) {
757  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
758  }
759  }
760  if (!exceptionMessageRuns_.empty()) {
762  if (e.alreadyPrinted()) {
763  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
764  }
765  }
766  if (!exceptionMessageFiles_.empty()) {
768  if (e.alreadyPrinted()) {
769  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
770  }
771  }
772  throw;
773  }
774  }
775 
776  return returnCode;
777  }
778 
780  FDEBUG(1) << " \treadFile\n";
781  size_t size = preg_->size();
782  SendSourceTerminationSignalIfException sentry(actReg_.get());
783 
785 
786  fb_ = input_->readFile();
787  if(size < preg_->size()) {
789  }
793  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
794  }
795  sentry.completedSuccessfully();
796  }
797 
798  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
799  if (fb_.get() != nullptr) {
800  SendSourceTerminationSignalIfException sentry(actReg_.get());
801  input_->closeFile(fb_.get(), cleaningUpAfterException);
802  sentry.completedSuccessfully();
803  }
804  FDEBUG(1) << "\tcloseInputFile\n";
805  }
806 
808  if (fb_.get() != nullptr) {
809  schedule_->openOutputFiles(*fb_);
810  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
811  }
812  FDEBUG(1) << "\topenOutputFiles\n";
813  }
814 
816  if (fb_.get() != nullptr) {
817  schedule_->closeOutputFiles();
818  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
819  }
820  FDEBUG(1) << "\tcloseOutputFiles\n";
821  }
822 
824  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
825  if (fb_.get() != nullptr) {
826  schedule_->respondToOpenInputFile(*fb_);
827  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
828  }
829  FDEBUG(1) << "\trespondToOpenInputFile\n";
830  }
831 
833  if (fb_.get() != nullptr) {
834  schedule_->respondToCloseInputFile(*fb_);
835  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
836  }
837  FDEBUG(1) << "\trespondToCloseInputFile\n";
838  }
839 
841  shouldWeStop_ = false;
842  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
843  // until after we've called beginOfJob
844  if(looper_ && looperBeginJobRun_) {
845  looper_->doStartingNewLoop();
846  }
847  FDEBUG(1) << "\tstartingNewLoop\n";
848  }
849 
851  if(looper_) {
852  ModuleChanger changer(schedule_.get(),preg_.get());
853  looper_->setModuleChanger(&changer);
854  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
855  looper_->setModuleChanger(nullptr);
856  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
857  else return false;
858  }
859  FDEBUG(1) << "\tendOfLoop\n";
860  return true;
861  }
862 
864  input_->repeat();
865  input_->rewind();
866  FDEBUG(1) << "\trewind\n";
867  }
868 
870  looper_->prepareForNextLoop(esp_.get());
871  FDEBUG(1) << "\tprepareForNextLoop\n";
872  }
873 
875  FDEBUG(1) << "\tshouldWeCloseOutput\n";
876  if(!subProcesses_.empty()) {
877  for(auto const& subProcess : subProcesses_) {
878  if(subProcess.shouldWeCloseOutput()) {
879  return true;
880  }
881  }
882  return false;
883  }
884  return schedule_->shouldWeCloseOutput();
885  }
886 
888  FDEBUG(1) << "\tdoErrorStuff\n";
889  LogError("StateMachine")
890  << "The EventProcessor state machine encountered an unexpected event\n"
891  << "and went to the error state\n"
892  << "Will attempt to terminate processing normally\n"
893  << "(IF using the looper the next loop will be attempted)\n"
894  << "This likely indicates a bug in an input module or corrupted input or both\n";
895  }
896 
897  void EventProcessor::beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded,
898  bool& eventSetupForInstanceSucceeded) {
899  globalBeginSucceeded = false;
900  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
901  {
902  SendSourceTerminationSignalIfException sentry(actReg_.get());
903 
904  input_->doBeginRun(runPrincipal, &processContext_);
905  sentry.completedSuccessfully();
906  }
907 
908  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
909  runPrincipal.beginTime());
911  espController_->forceCacheClear();
912  }
913  {
914  SendSourceTerminationSignalIfException sentry(actReg_.get());
915  espController_->eventSetupForInstance(ts);
916  eventSetupForInstanceSucceeded = true;
917  sentry.completedSuccessfully();
918  }
919  EventSetup const& es = esp_->eventSetup();
920  if(looper_ && looperBeginJobRun_== false) {
921  looper_->copyInfo(ScheduleInfo(schedule_.get()));
922  looper_->beginOfJob(es);
923  looperBeginJobRun_ = true;
924  looper_->doStartingNewLoop();
925  }
926  {
928  auto globalWaitTask = make_empty_waiting_task();
929  globalWaitTask->increment_ref_count();
930  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
931  *schedule_,
932  runPrincipal,
933  ts,
934  es,
936  subProcesses_);
937  globalWaitTask->wait_for_all();
938  if(globalWaitTask->exceptionPtr() != nullptr) {
939  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
940  }
941  }
942  globalBeginSucceeded = true;
943  FDEBUG(1) << "\tbeginRun " << run << "\n";
944  if(looper_) {
945  looper_->doBeginRun(runPrincipal, es, &processContext_);
946  }
947  {
948  //To wait, the ref count has to be 1+#streams
949  auto streamLoopWaitTask = make_empty_waiting_task();
950  streamLoopWaitTask->increment_ref_count();
951 
953 
954  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
955  *schedule_,
957  runPrincipal,
958  ts,
959  es,
961  subProcesses_);
962 
963  streamLoopWaitTask->wait_for_all();
964  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
965  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
966  }
967  }
968  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
969  if(looper_) {
970  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
971  }
972  }
973 
975  bool globalBeginSucceeded, bool cleaningUpAfterException,
976  bool eventSetupForInstanceSucceeded) {
977  if (eventSetupForInstanceSucceeded) {
978  //If we skip empty runs, this would be called conditionally
979  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
980 
981  if(globalBeginSucceeded) {
983  t->increment_ref_count();
984  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
985  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
986  mergeableRunProductMetadata->preWriteRun();
987  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
988  t->wait_for_all();
989  mergeableRunProductMetadata->postWriteRun();
990  if(t->exceptionPtr()) {
991  std::rethrow_exception(*t->exceptionPtr());
992  }
993  }
994  }
995  deleteRunFromCache(phid, run);
996  }
997 
998  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
999  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1000  runPrincipal.setEndTime(input_->timestamp());
1001 
1003  runPrincipal.endTime());
1004  {
1005  SendSourceTerminationSignalIfException sentry(actReg_.get());
1006  espController_->eventSetupForInstance(ts);
1007  sentry.completedSuccessfully();
1008  }
1009  EventSetup const& es = esp_->eventSetup();
1010  if(globalBeginSucceeded){
1011  //To wait, the ref count has to be 1+#streams
1012  auto streamLoopWaitTask = make_empty_waiting_task();
1013  streamLoopWaitTask->increment_ref_count();
1014 
1016 
1017  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1018  *schedule_,
1020  runPrincipal,
1021  ts,
1022  es,
1023  serviceToken_,
1024  subProcesses_,
1025  cleaningUpAfterException);
1026 
1027  streamLoopWaitTask->wait_for_all();
1028  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1029  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1030  }
1031  }
1032  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1033  if(looper_) {
1034  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1035  }
1036  {
1037  auto globalWaitTask = make_empty_waiting_task();
1038  globalWaitTask->increment_ref_count();
1039 
1041  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1042  *schedule_,
1043  runPrincipal,
1044  ts,
1045  es,
1046  serviceToken_,
1047  subProcesses_,
1048  cleaningUpAfterException);
1049  globalWaitTask->wait_for_all();
1050  if(globalWaitTask->exceptionPtr() != nullptr) {
1051  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1052  }
1053  }
1054  FDEBUG(1) << "\tendRun " << run << "\n";
1055  if(looper_) {
1056  looper_->doEndRun(runPrincipal, es, &processContext_);
1057  }
1058  }
1059 
1061  EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1062  auto waitTask = make_empty_waiting_task();
1063  waitTask->increment_ref_count();
1064 
1065  if(streamLumiActive_> 0) {
1067  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1068  } else {
1069  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1070  input_->luminosityBlockAuxiliary()->beginTime()),
1071  iRunResource,
1072  WaitingTaskHolder{waitTask.get()});
1073  }
1074  waitTask->wait_for_all();
1075 
1076  if(waitTask->exceptionPtr() != nullptr) {
1077  std::rethrow_exception(* (waitTask->exceptionPtr()) );
1078  }
1079  return lastTransitionType();
1080  }
1081 
1082  void
1084  std::shared_ptr<void> const& iRunResource, edm::WaitingTaskHolder iHolder) {
1085  if(iHolder.taskHasFailed()) { return; }
1086 
1087  auto status= std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource) ;
1088 
1089  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1090  if(iHolder.taskHasFailed()) { return; }
1091 
1092  status->setResumer(std::move(iResumer));
1093 
1094  sourceResourcesAcquirer_.serialQueueChain().push([this,iHolder,status]() mutable {
1095  //make the services available
1097 
1098  try {
1100 
1101  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1102  {
1103  SendSourceTerminationSignalIfException sentry(actReg_.get());
1104 
1105  input_->doBeginLumi(lumiPrincipal, &processContext_);
1106  sentry.completedSuccessfully();
1107  }
1108 
1110  if(rng.isAvailable()) {
1111  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1112  rng->preBeginLumi(lb);
1113  }
1114 
1115  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1116 
1117  //Task to start the stream beginLumis
1118  auto beginStreamsTask= make_waiting_task(tbb::task::allocate_root()
1119  ,[this, holder = iHolder, status, ts] (std::exception_ptr const* iPtr) mutable {
1120  if (iPtr) {
1121  holder.doneWaiting(*iPtr);
1122  } else {
1123 
1124  status->globalBeginDidSucceed();
1125  EventSetup const& es = esp_->eventSetup();
1126  if(looper_) {
1127  try {
1128  //make the services available
1130  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1131  }catch(...) {
1132  holder.doneWaiting(std::current_exception());
1133  return;
1134  }
1135  }
1137 
1138  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1139  streamQueues_[i].push([this,i,status,holder,ts,&es] () {
1140  streamQueues_[i].pause();
1141 
1142  auto eventTask = edm::make_waiting_task(tbb::task::allocate_root(),
1143  [this,i,h = holder](std::exception_ptr const* iPtr) mutable
1144  {
1145  if(iPtr) {
1146  h.doneWaiting(*iPtr);
1147  } else {
1149  }
1150  });
1151  auto& event = principalCache_.eventPrincipal(i);
1154  auto lp = status->lumiPrincipal();
1155  event.setLuminosityBlockPrincipal(lp.get());
1156  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1157  *schedule_,i,*lp,ts,es,
1159  });
1160  }
1161  }
1162  });
1163 
1164  //task to start the global begin lumi
1165  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1166  EventSetup const& es = esp_->eventSetup();
1167  {
1169  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1170  *schedule_,
1171  *(status->lumiPrincipal()),
1172  ts,
1173  es,
1174  serviceToken_,
1175  subProcesses_);
1176  }
1177  } catch(...) {
1178  iHolder.doneWaiting(std::current_exception());
1179  }
1180  });
1181  };
1182 
1183  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1184  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1185  if(espController_->isWithinValidityInterval(iSync)) {
1186  iovQueue_.pause();
1187  lumiQueue_->pushAndPause(std::move(lumiWork));
1188  } else {
1189  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1190  iovQueue_.push([this,iHolder,lumiWork,iSync]() mutable {
1191  try {
1192  SendSourceTerminationSignalIfException sentry(actReg_.get());
1193  espController_->eventSetupForInstance(iSync);
1194  sentry.completedSuccessfully();
1195  } catch(...) {
1196  iHolder.doneWaiting(std::current_exception());
1197  return;
1198  }
1199  iovQueue_.pause();
1200  lumiQueue_->pushAndPause(std::move(lumiWork));
1201  });
1202  }
1203  }
1204 
1205  void
1207  {
1208  //all streams are sharing the same status at the moment
1209  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1210  status->needToContinueLumi();
1211  status->startProcessingEvents();
1212  }
1213 
1214  unsigned int streamIndex = 0;
1215  for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
1216  tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
1217  [this,streamIndex,h = iHolder](){
1218  handleNextEventForStreamAsync(std::move(h), streamIndex);
1219  }) );
1220 
1221  }
1222  tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
1224  }) );
1225  }
1226 
1227  void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1228  //Need to be sure iTask is always destroyed after iLumiStatus since iLumiStatus can cause endRun to start.
1229  auto t = edm::make_waiting_task(tbb::task::allocate_root(), [ items = std::make_pair(iLumiStatus,std::move(iTask)), this] (std::exception_ptr const* iPtr) mutable {
1230  std::exception_ptr ptr;
1231  //use an easier to remember variable name
1232  auto status = std::move(items.first);
1233  if(iPtr) {
1234  ptr = *iPtr;
1235  WaitingTaskHolder tmp(items.second);
1236  //set the exception early to prevent a beginLumi from running
1237  // we use a copy to keep t from resetting on doneWaiting call.
1238  tmp.doneWaiting(ptr);
1239  } else {
1240  try {
1242  if(looper_) {
1243  auto& lp = *(status->lumiPrincipal());
1244  EventSetup const& es = esp_->eventSetup();
1245  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1246  }
1247  }catch(...) {
1248  if(not ptr) {
1249  ptr = std::current_exception();
1250  }
1251  }
1252  }
1254  try {
1256  //release our hold on the IOV
1257  iovQueue_.resume();
1258  status->resumeGlobalLumiQueue();
1259  } catch(...) {
1260  if( not ptr) {
1261  ptr = std::current_exception();
1262  }
1263  }
1264  try {
1265  status.reset();
1266  } catch(...) {
1267  if( not ptr) {
1268  ptr = std::current_exception();
1269  }
1270  }
1271  //have to wait until reset is called since that could call endRun
1272  items.second.doneWaiting(ptr);
1273  });
1274 
1275  auto writeT = edm::make_waiting_task(tbb::task::allocate_root(), [this,status =iLumiStatus, task = WaitingTaskHolder(t)] (std::exception_ptr const* iExcept) mutable {
1276  if(iExcept) {
1277  task.doneWaiting(*iExcept);
1278  } else {
1279  //Only call writeLumi if beginLumi succeeded
1280  if(status->didGlobalBeginSucceed()) {
1282  }
1283  }
1284  });
1285  auto& lp = *(iLumiStatus->lumiPrincipal());
1286 
1287  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()),
1288  lp.beginTime());
1289 
1290 
1292  EventSetup const& es = esp_->eventSetup();
1293 
1294  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1295  *schedule_,
1296  lp,
1297  ts,
1298  es,
1299  serviceToken_,
1300  subProcesses_,
1301  iLumiStatus->cleaningUpAfterException());
1302  }
1303 
1305  unsigned int iStreamIndex,
1306  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1307 
1308  auto t =edm::make_waiting_task(tbb::task::allocate_root(), [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1309  std::exception_ptr ptr;
1310  if(iPtr) {
1311  ptr = *iPtr;
1312  }
1313  auto status =streamLumiStatus_[iStreamIndex];
1314  //reset status before releasing queue else get race condtion
1315  streamLumiStatus_[iStreamIndex].reset();
1317  streamQueues_[iStreamIndex].resume();
1318 
1319  //are we the last one?
1320  if( status->streamFinishedLumi()) {
1322  }
1323  iTask.doneWaiting(ptr);
1324  });
1325 
1326  edm::WaitingTaskHolder lumiDoneTask{t};
1327 
1328  iLumiStatus->setEndTime();
1329 
1330  if(iLumiStatus->didGlobalBeginSucceed()) {
1331  auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1332  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1333  lumiPrincipal.endTime());
1334  EventSetup const& es = esp_->eventSetup();
1335 
1336  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1337 
1339  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1340  *schedule_,iStreamIndex,
1341  lumiPrincipal,ts,es,
1342  serviceToken_,
1343  subProcesses_,cleaningUpAfterException);
1344  }
1345  }
1346 
1347 
1349  if(streamLumiActive_.load() > 0) {
1350  auto globalWaitTask = make_empty_waiting_task();
1351  globalWaitTask->increment_ref_count();
1352  {
1353  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1354  for(unsigned int i=0; i< preallocations_.numberOfStreams(); ++i) {
1355  if(streamLumiStatus_[i]) {
1356  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1357  }
1358  }
1359  }
1360  globalWaitTask->wait_for_all();
1361  if(globalWaitTask->exceptionPtr() != nullptr) {
1362  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1363  }
1364  }
1365  }
1366 
1367  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1370  << "EventProcessor::readRun\n"
1371  << "Illegal attempt to insert run into cache\n"
1372  << "Contact a Framework Developer\n";
1373  }
1374  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(),
1376  0, true, &mergeableRunProductProcesses_);
1377  {
1378  SendSourceTerminationSignalIfException sentry(actReg_.get());
1379  input_->readRun(*rp, *historyAppender_);
1380  sentry.completedSuccessfully();
1381  }
1382  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1383  principalCache_.insert(rp);
1384  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1385  }
1386 
1387  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1388  principalCache_.merge(input_->runAuxiliary(), preg());
1389  auto runPrincipal =principalCache_.runPrincipalPtr();
1390  {
1391  SendSourceTerminationSignalIfException sentry(actReg_.get());
1392  input_->readAndMergeRun(*runPrincipal);
1393  sentry.completedSuccessfully();
1394  }
1395  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1396  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1397  }
1398 
1402  << "EventProcessor::readLuminosityBlock\n"
1403  << "Illegal attempt to insert lumi into cache\n"
1404  << "Run is invalid\n"
1405  << "Contact a Framework Developer\n";
1406  }
1408  assert(lbp);
1409  lbp->setAux(*input_->luminosityBlockAuxiliary());
1410  {
1411  SendSourceTerminationSignalIfException sentry(actReg_.get());
1412  input_->readLuminosityBlock(*lbp, *historyAppender_);
1413  sentry.completedSuccessfully();
1414  }
1415  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1416  iStatus.lumiPrincipal() = std::move(lbp);
1417  }
1418 
1420  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1421  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1422  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) == input_->processHistoryRegistry().reducedProcessHistoryID(input_->luminosityBlockAuxiliary()->processHistoryID()));
1423  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1424  assert(lumiOK);
1425  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1426  {
1427  SendSourceTerminationSignalIfException sentry(actReg_.get());
1428  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1429  sentry.completedSuccessfully();
1430  }
1431  return input_->luminosityBlock();
1432  }
1433 
1435  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1436  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,phid,run,task,mergeableRunProductMetadata]
1437  (std::exception_ptr const* iExcept) mutable {
1438  if(iExcept) {
1439  task.doneWaiting(*iExcept);
1440  } else {
1442  for(auto&s : subProcesses_) {
1443  s.writeRunAsync(task,phid,run,mergeableRunProductMetadata);
1444  }
1445  }
1446  });
1448  schedule_->writeRunAsync(WaitingTaskHolder(subsT), principalCache_.runPrincipal(phid, run),
1449  &processContext_, actReg_.get(), mergeableRunProductMetadata);
1450  }
1451 
1453  principalCache_.deleteRun(phid, run);
1454  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1455  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1456  }
1457 
1458  void EventProcessor::writeLumiAsync(WaitingTaskHolder task, std::shared_ptr<LuminosityBlockProcessingStatus> iStatus) {
1459  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,task, iStatus](std::exception_ptr const* iExcept) mutable {
1460  if(iExcept) {
1461  task.doneWaiting(*iExcept);
1462  } else {
1464  for(auto&s : subProcesses_) {
1465  s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1466  }
1467  }
1468  });
1470 
1471  std::shared_ptr<LuminosityBlockPrincipal> const& lumiPrincipal = iStatus->lumiPrincipal();
1472  lumiPrincipal->runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal->luminosityBlock());
1473 
1474  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, *lumiPrincipal, &processContext_, actReg_.get());
1475  }
1476 
1478  for(auto& s: subProcesses_) { s.deleteLumiFromCache(*iStatus.lumiPrincipal());}
1479  iStatus.lumiPrincipal()->clearPrincipal();
1480  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1481  }
1482 
1483  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1485  if(shouldWeStop()) {
1486  return false;
1487  }
1488 
1489  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1490  return false;
1491  }
1492 
1493  if(iStatus.wasEventProcessingStopped()) {
1494  return false;
1495  }
1496 
1498  try {
1499  //need to use lock in addition to the serial task queue because
1500  // of delayed provenance reading and reading data in response to
1501  // edm::Refs etc
1502  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1503 
1504  auto itemType = iStatus.continuingLumi()? InputSource::IsLumi : nextTransitionType();
1505  if(InputSource::IsLumi == itemType) {
1506  iStatus.haveContinuedLumi();
1507  while(itemType == InputSource::IsLumi and
1508  iStatus.lumiPrincipal()->run() == input_->run() and
1509  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1510  readAndMergeLumi(iStatus);
1511  itemType = nextTransitionType();
1512  }
1513  if(InputSource::IsLumi == itemType) {
1514  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1515  input_->luminosityBlockAuxiliary()->beginTime()));
1516  }
1517  }
1518  if(InputSource::IsEvent != itemType) {
1519  iStatus.stopProcessingEvents();
1520 
1521  //IsFile may continue processing the lumi and
1522  // looper_ can cause the input source to declare a new IsRun which is actually
1523  // just a continuation of the previous run
1524  if(InputSource::IsStop == itemType or
1525  InputSource::IsLumi == itemType or
1526  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1527  iStatus.endLumi();
1528  }
1529  return false;
1530  }
1531  readEvent(iStreamIndex);
1532  } catch (...) {
1533  bool expected =false;
1534  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1535  deferredExceptionPtr_ = std::current_exception();
1536  }
1537  return false;
1538  }
1539  return true;
1540  }
1541 
1543  unsigned int iStreamIndex)
1544  {
1545  sourceResourcesAcquirer_.serialQueueChain().push([this,iTask,iStreamIndex]() mutable {
1547  auto& status = streamLumiStatus_[iStreamIndex];
1548  try {
1549  if(readNextEventForStream(iStreamIndex, *status) ) {
1550  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex](std::exception_ptr const* iPtr) mutable {
1551  if(iPtr) {
1552  bool expected = false;
1553  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1554  deferredExceptionPtr_ = *iPtr;
1555  iTask.doneWaiting(*iPtr);
1556  }
1557  //the stream will stop now
1558  return;
1559  }
1560  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1561  });
1562 
1563  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1564  } else {
1565  //the stream will stop now
1566  if(status->isLumiEnding()) {
1567  if(lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1568  status->startNextLumi();
1569  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1570  }
1571  streamEndLumiAsync(std::move(iTask),iStreamIndex, status);
1572  } else {
1573  iTask.doneWaiting(std::exception_ptr{});
1574  }
1575  }
1576  } catch(...) {
1577  bool expected = false;
1578  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1579  auto e =std::current_exception();
1581  iTask.doneWaiting(e);
1582  }
1583  }
1584  });
1585  }
1586 
1587  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1588  //TODO this will have to become per stream
1589  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1590  StreamContext streamContext(event.streamID(), &processContext_);
1591 
1592  SendSourceTerminationSignalIfException sentry(actReg_.get());
1593  input_->readEvent(event, streamContext);
1594 
1595  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1596  sentry.completedSuccessfully();
1597 
1598  FDEBUG(1) << "\treadEvent\n";
1599  }
1600 
1602  unsigned int iStreamIndex) {
1603  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1604  processEventAsyncImpl(iHolder, iStreamIndex);
1605  }) );
1606  }
1607 
1609  unsigned int iStreamIndex) {
1610  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1611 
1614  if(rng.isAvailable()) {
1615  Event ev(*pep, ModuleDescription(), nullptr);
1616  rng->postEventRead(ev);
1617  }
1618 
1619  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1620  tbb::task::allocate_root(),
1621  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1622  {
1623 
1624  //NOTE: If we have a looper we only have one Stream
1625  if(looper_) {
1627  processEventWithLooper(*pep);
1628  }
1629 
1630  FDEBUG(1) << "\tprocessEvent\n";
1631  pep->clearEventPrincipal();
1632  if(iPtr) {
1633  iHolder.doneWaiting(*iPtr);
1634  } else {
1635  iHolder.doneWaiting(std::exception_ptr());
1636  }
1637  }
1638  )
1639  );
1640  WaitingTaskHolder afterProcessTask;
1641  if(subProcesses_.empty()) {
1642  afterProcessTask = std::move(finalizeEventTask);
1643  } else {
1644  //Need to run SubProcesses after schedule has finished
1645  // with the event
1646  afterProcessTask = WaitingTaskHolder(
1647  make_waiting_task(tbb::task::allocate_root(),
1648  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1649  {
1650  if(not iPtr) {
1651  //when run with 1 thread, we want to the order to be what
1652  // it was before. This requires reversing the order since
1653  // tasks are run last one in first one out
1654  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1655  subProcess.doEventAsync(finalizeEventTask,*pep);
1656  }
1657  } else {
1658  finalizeEventTask.doneWaiting(*iPtr);
1659  }
1660  })
1661  );
1662  }
1663 
1664  schedule_->processOneEventAsync(std::move(afterProcessTask),
1665  iStreamIndex,*pep, esp_->eventSetup(), serviceToken_);
1666 
1667  }
1668 
1670  bool randomAccess = input_->randomAccess();
1671  ProcessingController::ForwardState forwardState = input_->forwardState();
1672  ProcessingController::ReverseState reverseState = input_->reverseState();
1673  ProcessingController pc(forwardState, reverseState, randomAccess);
1674 
1676  do {
1677 
1678  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1679  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1680 
1681  bool succeeded = true;
1682  if(randomAccess) {
1684  input_->skipEvents(-2);
1685  }
1687  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1688  }
1689  }
1690  pc.setLastOperationSucceeded(succeeded);
1691  } while(!pc.lastOperationSucceeded());
1692  if(status != EDLooperBase::kContinue) {
1693  shouldWeStop_ = true;
1695  }
1696  }
1697 
1699  FDEBUG(1) << "\tshouldWeStop\n";
1700  if(shouldWeStop_) return true;
1701  if(!subProcesses_.empty()) {
1702  for(auto const& subProcess : subProcesses_) {
1703  if(subProcess.terminate()) {
1704  return true;
1705  }
1706  }
1707  return false;
1708  }
1709  return schedule_->terminate();
1710  }
1711 
1713  exceptionMessageFiles_ = message;
1714  }
1715 
1717  exceptionMessageRuns_ = message;
1718  }
1719 
1721  exceptionMessageLumis_ = message;
1722  }
1723 
1724  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1725  bool expected =false;
1726  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1727  deferredExceptionPtr_ = iException;
1728  return true;
1729  }
1730  return false;
1731  }
1732 }
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
bool ev
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:65
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
Definition: Exception.cc:251
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:15
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:45
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:85
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:77
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:45
Timestamp const & endTime() const
Definition: RunPrincipal.h:81
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1190
int totalEvents() const
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:686
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:100
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:13
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31