CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
37 
39 
47 
52 
54 
66 
67 #include "MessageForSource.h"
68 #include "MessageForParent.h"
70 
71 #include "boost/range/adaptor/reversed.hpp"
72 
73 #include <cassert>
74 #include <exception>
75 #include <iomanip>
76 #include <iostream>
77 #include <utility>
78 #include <sstream>
79 
80 #include <sys/ipc.h>
81 #include <sys/msg.h>
82 
83 #include "tbb/task.h"
84 
85 //Used for CPU affinity
86 #ifndef __APPLE__
87 #include <sched.h>
88 #endif
89 
90 namespace {
91  //Sentry class to only send a signal if an
92  // exception occurs. An exception is identified
93  // by the destructor being called without first
94  // calling completedSuccessfully().
95  class SendSourceTerminationSignalIfException {
96  public:
97  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
98  reg_(iReg) {}
99  ~SendSourceTerminationSignalIfException() {
100  if(reg_) {
101  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
102  }
103  }
104  void completedSuccessfully() {
105  reg_ = nullptr;
106  }
107  private:
108  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
109  };
110 
111 }
112 
113 namespace edm {
114 
115  // ---------------------------------------------------------------
116  std::unique_ptr<InputSource>
118  CommonParams const& common,
119  std::shared_ptr<ProductRegistry> preg,
120  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
121  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
122  std::shared_ptr<ActivityRegistry> areg,
123  std::shared_ptr<ProcessConfiguration const> processConfiguration,
124  PreallocationConfiguration const& allocations) {
125  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
126  if(main_input == nullptr) {
128  << "There must be exactly one source in the configuration.\n"
129  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
130  }
131 
132  std::string modtype(main_input->getParameter<std::string>("@module_type"));
133 
134  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
136  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
137  filler->fill(descriptions);
138 
139  try {
140  convertException::wrap([&]() {
141  descriptions.validate(*main_input, std::string("source"));
142  });
143  }
144  catch (cms::Exception & iException) {
145  std::ostringstream ost;
146  ost << "Validating configuration of input source of type " << modtype;
147  iException.addContext(ost.str());
148  throw;
149  }
150 
151  main_input->registerIt();
152 
153  // Fill in "ModuleDescription", in case the input source produces
154  // any EDProducts, which would be registered in the ProductRegistry.
155  // Also fill in the process history item for this process.
156  // There is no module label for the unnamed input source, so
157  // just use "source".
158  // Only the tracked parameters belong in the process configuration.
159  ModuleDescription md(main_input->id(),
160  main_input->getParameter<std::string>("@module_type"),
161  "source",
162  processConfiguration.get(),
163  ModuleDescription::getUniqueID());
164 
165  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
166  common.maxEventsInput_, common.maxLumisInput_,
167  common.maxSecondsUntilRampdown_, allocations);
168 
169  areg->preSourceConstructionSignal_(md);
170  std::unique_ptr<InputSource> input;
171  try {
172  //even if we have an exception, send the signal
173  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
174  convertException::wrap([&]() {
175  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
176  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
177  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
178  });
179  }
180  catch (cms::Exception& iException) {
181  std::ostringstream ost;
182  ost << "Constructing input source of type " << modtype;
183  iException.addContext(ost.str());
184  throw;
185  }
186  return input;
187  }
188 
189  // ---------------------------------------------------------------
190  std::shared_ptr<EDLooperBase>
193  ParameterSet& params) {
194  std::shared_ptr<EDLooperBase> vLooper;
195 
196  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
197 
198  if(loopers.empty()) {
199  return vLooper;
200  }
201 
202  assert(1 == loopers.size());
203 
204  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
205  itName != itNameEnd;
206  ++itName) {
207 
208  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
209  providerPSet->registerIt();
210  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
211  cp,
212  *providerPSet);
213  }
214  return vLooper;
215  }
216 
217  // ---------------------------------------------------------------
218  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,//std::string const& config,
219  ServiceToken const& iToken,
221  std::vector<std::string> const& defaultServices,
222  std::vector<std::string> const& forcedServices) :
223  actReg_(),
224  preg_(),
225  branchIDListHelper_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
231  processConfiguration_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
237  deferredExceptionPtrIsSet_(false),
238  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
239  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
243  fileModeNoMerge_(false),
244  exceptionMessageFiles_(),
245  exceptionMessageRuns_(),
246  exceptionMessageLumis_(),
247  forceLooperToEnd_(false),
248  looperBeginJobRun_(false),
249  forceESCacheClearOnNewRun_(false),
250  eventSetupDataToExcludeFromPrefetching_() {
251  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
252  processDesc->addServices(defaultServices, forcedServices);
253  init(processDesc, iToken, iLegacy);
254  }
255 
256  EventProcessor::EventProcessor(std::unique_ptr<ParameterSet> parameterSet,//std::string const& config,
257  std::vector<std::string> const& defaultServices,
258  std::vector<std::string> const& forcedServices) :
259  actReg_(),
260  preg_(),
262  serviceToken_(),
263  input_(),
264  espController_(new eventsetup::EventSetupsController),
265  esp_(),
266  act_table_(),
268  schedule_(),
269  subProcesses_(),
271  fb_(),
272  looper_(),
274  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
275  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
276  principalCache_(),
288  {
289  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
290  processDesc->addServices(defaultServices, forcedServices);
292  }
293 
294  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
295  ServiceToken const& token,
297  actReg_(),
298  preg_(),
300  serviceToken_(),
301  input_(),
302  espController_(new eventsetup::EventSetupsController),
303  esp_(),
304  act_table_(),
306  schedule_(),
307  subProcesses_(),
309  fb_(),
310  looper_(),
312  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
313  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
314  principalCache_(),
326  {
327  init(processDesc, token, legacy);
328  }
329 
330  void
331  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
332  ServiceToken const& iToken,
334 
335  //std::cerr << processDesc->dump() << std::endl;
336 
337  // register the empty parentage vector , once and for all
339 
340  // register the empty parameter set, once and for all.
342 
343  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
344 
345  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
346  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
347  bool const hasSubProcesses = !subProcessVParameterSet.empty();
348 
349  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
350  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
351  // set in here if the parameters were not explicitly set.
352  validateTopLevelParameterSets(parameterSet.get());
353 
354  // Now set some parameters specific to the main process.
355  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
356  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
357  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
358  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
359  << fileMode << ".\n"
360  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
361  } else {
362  fileModeNoMerge_ = (fileMode == "NOMERGE");
363  }
364  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
365 
366  //threading
367  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
368 
369  // Even if numberOfThreads was set to zero in the Python configuration, the code
370  // in cmsRun.cpp should have reset it to something else.
371  assert(nThreads != 0);
372 
373  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
374  if (nStreams == 0) {
375  nStreams = nThreads;
376  }
377  if (nThreads > 1 or nStreams > 1) {
378  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
379  }
380  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
381  if (nConcurrentRuns != 1) {
382  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
383  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
384  }
385  unsigned int nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
386  if (nConcurrentLumis == 0) {
387  nConcurrentLumis = nConcurrentRuns;
388  }
389 
390  //Check that relationships between threading parameters makes sense
391  /*
392  if(nThreads<nStreams) {
393  //bad
394  }
395  if(nConcurrentRuns>nStreams) {
396  //bad
397  }
398  if(nConcurrentRuns>nConcurrentLumis) {
399  //bad
400  }
401  */
402  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
403 
404  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
405 
406  // Now do general initialization
408 
409  //initialize the services
410  auto& serviceSets = processDesc->getServicesPSets();
411  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
412  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
413 
414  //make the services available
416 
417  if(nStreams>1) {
419  handler->willBeUsingThreads();
420  }
421 
422  // intialize miscellaneous items
423  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
424 
425  // intialize the event setup provider
426  esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get());
427 
428  // initialize the looper, if any
429  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
430  if(looper_) {
431  looper_->setActionTable(items.act_table_.get());
432  looper_->attachTo(*items.actReg_);
433 
434  //For now loopers make us run only 1 transition at a time
435  nStreams=1;
436  nConcurrentLumis=1;
437  nConcurrentRuns=1;
438  }
439 
440  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
441 
442  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
443  streamQueues_.resize(nStreams);
444  streamLumiStatus_.resize(nStreams);
445 
446  // initialize the input source
447  input_ = makeInput(*parameterSet,
448  *common,
449  items.preg(),
450  items.branchIDListHelper(),
451  items.thinnedAssociationsHelper(),
452  items.actReg_,
453  items.processConfiguration(),
455 
456  // intialize the Schedule
457  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
458 
459  // set the data members
460  act_table_ = std::move(items.act_table_);
461  actReg_ = items.actReg_;
462  preg_ = items.preg();
464  branchIDListHelper_ = items.branchIDListHelper();
465  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
466  processConfiguration_ = items.processConfiguration();
468  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
469 
470  FDEBUG(2) << parameterSet << std::endl;
471 
473  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
474  // Reusable event principal
475  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
478  }
479 
480  for(unsigned int index =0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
481  auto lp = std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_,
482  historyAppender_.get(), index);
484  }
485 
486  // fill the subprocesses, if there are any
487  subProcesses_.reserve(subProcessVParameterSet.size());
488  for(auto& subProcessPSet : subProcessVParameterSet) {
489  subProcesses_.emplace_back(subProcessPSet,
490  *parameterSet,
491  preg(),
496  *actReg_,
497  token,
500  &processContext_);
501  }
502  }
503 
505  // Make the services available while everything is being deleted.
506  ServiceToken token = getToken();
507  ServiceRegistry::Operate op(token);
508 
509  // manually destroy all these thing that may need the services around
510  // propagate_const<T> has no reset() function
511  espController_ = nullptr;
512  esp_ = nullptr;
513  schedule_ = nullptr;
514  input_ = nullptr;
515  looper_ = nullptr;
516  actReg_ = nullptr;
517 
520  }
521 
522  void
524  if(beginJobCalled_) return;
525  beginJobCalled_=true;
526  bk::beginJob();
527 
528  // StateSentry toerror(this); // should we add this ?
529  //make the services available
531 
536  actReg_->preallocateSignal_(bounds);
537  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
539 
540  //NOTE: this may throw
542  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
543 
546  }
547  //NOTE: This implementation assumes 'Job' means one call
548  // the EventProcessor::run
549  // If it really means once per 'application' then this code will
550  // have to be changed.
551  // Also have to deal with case where have 'run' then new Module
552  // added and do 'run'
553  // again. In that case the newly added Module needs its 'beginJob'
554  // to be called.
555 
556  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
557  // For now we delay calling beginOfJob until first beginOfRun
558  //if(looper_) {
559  // looper_->beginOfJob(es);
560  //}
561  try {
562  convertException::wrap([&]() {
563  input_->doBeginJob();
564  });
565  }
566  catch(cms::Exception& ex) {
567  ex.addContext("Calling beginJob for the source");
568  throw;
569  }
570  schedule_->beginJob(*preg_);
571  // toerror.succeeded(); // should we add this?
572  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
573  actReg_->postBeginJobSignal_();
574 
575  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
576  schedule_->beginStream(i);
577  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
578  }
579  }
580 
581  void
583  // Collects exceptions, so we don't throw before all operations are performed.
584  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
585 
586  //make the services available
588 
589  //NOTE: this really should go elsewhere in the future
590  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
591  c.call([this,i](){this->schedule_->endStream(i);});
592  for(auto& subProcess : subProcesses_) {
593  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
594  }
595  }
596  auto actReg = actReg_.get();
597  c.call([actReg](){actReg->preEndJobSignal_();});
598  schedule_->endJob(c);
599  for(auto& subProcess : subProcesses_) {
600  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
601  }
602  c.call(std::bind(&InputSource::doEndJob, input_.get()));
603  if(looper_) {
604  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
605  }
606  c.call([actReg](){actReg->postEndJobSignal_();});
607  if(c.hasThrown()) {
608  c.rethrow();
609  }
610  }
611 
614  return serviceToken_;
615  }
616 
617  std::vector<ModuleDescription const*>
619  return schedule_->getAllModuleDescriptions();
620  }
621 
622  int
624  return schedule_->totalEvents();
625  }
626 
627  int
629  return schedule_->totalEventsPassed();
630  }
631 
632  int
634  return schedule_->totalEventsFailed();
635  }
636 
637  void
639  schedule_->enableEndPaths(active);
640  }
641 
642  bool
644  return schedule_->endPathsEnabled();
645  }
646 
647  void
649  schedule_->getTriggerReport(rep);
650  }
651 
652  void
654  schedule_->clearCounters();
655  }
656 
657  namespace {
658 #include "TransitionProcessors.icc"
659  }
660 
661  bool
663  bool returnValue = false;
664 
665  // Look for a shutdown signal
666  if(shutdown_flag.load(std::memory_order_acquire)) {
667  returnValue = true;
668  returnCode = epSignal;
669  }
670  return returnValue;
671  }
672 
675  if (deferredExceptionPtrIsSet_.load()) {
677  return InputSource::IsStop;
678  }
679 
680  SendSourceTerminationSignalIfException sentry(actReg_.get());
681  InputSource::ItemType itemType;
682  //For now, do nothing with InputSource::IsSynchronize
683  do {
684  itemType = input_->nextItemType();
685  } while( itemType == InputSource::IsSynchronize);
686 
687  lastSourceTransition_ = itemType;
688  sentry.completedSuccessfully();
689 
691 
692  if(checkForAsyncStopRequest(returnCode)) {
693  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
695  }
696 
697  return lastSourceTransition_;
698  }
699 
700  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
702  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
703  }
704 
707  return input_->luminosityBlock();
708  }
709 
712 
715  {
716  beginJob(); //make sure this was called
717 
718  // make the services available
720 
722  try {
723  FilesProcessor fp(fileModeNoMerge_);
724 
725  convertException::wrap([&]() {
726  bool firstTime = true;
727  do {
728  if(not firstTime) {
730  rewindInput();
731  } else {
732  firstTime = false;
733  }
734  startingNewLoop();
735 
736  auto trans = fp.processFiles(*this);
737 
738  fp.normalEnd();
739 
740  if(deferredExceptionPtrIsSet_.load()) {
741  std::rethrow_exception(deferredExceptionPtr_);
742  }
743  if(trans != InputSource::IsStop) {
744  //problem with the source
745  doErrorStuff();
746 
747  throw cms::Exception("BadTransition")
748  << "Unexpected transition change "
749  << trans;
750 
751  }
752  } while(not endOfLoop());
753  }); // convertException::wrap
754 
755  } // Try block
756  catch (cms::Exception & e) {
757  if (!exceptionMessageLumis_.empty()) {
759  if (e.alreadyPrinted()) {
760  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
761  }
762  }
763  if (!exceptionMessageRuns_.empty()) {
765  if (e.alreadyPrinted()) {
766  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
767  }
768  }
769  if (!exceptionMessageFiles_.empty()) {
771  if (e.alreadyPrinted()) {
772  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
773  }
774  }
775  throw;
776  }
777  }
778 
779  return returnCode;
780  }
781 
783  FDEBUG(1) << " \treadFile\n";
784  size_t size = preg_->size();
785  SendSourceTerminationSignalIfException sentry(actReg_.get());
786 
788 
789  fb_ = input_->readFile();
790  if(size < preg_->size()) {
792  }
796  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
797  }
798  sentry.completedSuccessfully();
799  }
800 
801  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
802  if (fb_.get() != nullptr) {
803  SendSourceTerminationSignalIfException sentry(actReg_.get());
804  input_->closeFile(fb_.get(), cleaningUpAfterException);
805  sentry.completedSuccessfully();
806  }
807  FDEBUG(1) << "\tcloseInputFile\n";
808  }
809 
811  if (fb_.get() != nullptr) {
812  schedule_->openOutputFiles(*fb_);
813  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
814  }
815  FDEBUG(1) << "\topenOutputFiles\n";
816  }
817 
819  if (fb_.get() != nullptr) {
820  schedule_->closeOutputFiles();
821  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
822  }
823  FDEBUG(1) << "\tcloseOutputFiles\n";
824  }
825 
827  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
828  if (fb_.get() != nullptr) {
829  schedule_->respondToOpenInputFile(*fb_);
830  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
831  }
832  FDEBUG(1) << "\trespondToOpenInputFile\n";
833  }
834 
836  if (fb_.get() != nullptr) {
837  schedule_->respondToCloseInputFile(*fb_);
838  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
839  }
840  FDEBUG(1) << "\trespondToCloseInputFile\n";
841  }
842 
844  shouldWeStop_ = false;
845  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
846  // until after we've called beginOfJob
847  if(looper_ && looperBeginJobRun_) {
848  looper_->doStartingNewLoop();
849  }
850  FDEBUG(1) << "\tstartingNewLoop\n";
851  }
852 
854  if(looper_) {
855  ModuleChanger changer(schedule_.get(),preg_.get());
856  looper_->setModuleChanger(&changer);
857  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
858  looper_->setModuleChanger(nullptr);
859  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
860  else return false;
861  }
862  FDEBUG(1) << "\tendOfLoop\n";
863  return true;
864  }
865 
867  input_->repeat();
868  input_->rewind();
869  FDEBUG(1) << "\trewind\n";
870  }
871 
873  looper_->prepareForNextLoop(esp_.get());
874  FDEBUG(1) << "\tprepareForNextLoop\n";
875  }
876 
878  FDEBUG(1) << "\tshouldWeCloseOutput\n";
879  if(!subProcesses_.empty()) {
880  for(auto const& subProcess : subProcesses_) {
881  if(subProcess.shouldWeCloseOutput()) {
882  return true;
883  }
884  }
885  return false;
886  }
887  return schedule_->shouldWeCloseOutput();
888  }
889 
891  FDEBUG(1) << "\tdoErrorStuff\n";
892  LogError("StateMachine")
893  << "The EventProcessor state machine encountered an unexpected event\n"
894  << "and went to the error state\n"
895  << "Will attempt to terminate processing normally\n"
896  << "(IF using the looper the next loop will be attempted)\n"
897  << "This likely indicates a bug in an input module or corrupted input or both\n";
898  }
899 
900  void EventProcessor::beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded,
901  bool& eventSetupForInstanceSucceeded) {
902  globalBeginSucceeded = false;
903  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
904  {
905  SendSourceTerminationSignalIfException sentry(actReg_.get());
906 
907  input_->doBeginRun(runPrincipal, &processContext_);
908  sentry.completedSuccessfully();
909  }
910 
911  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
912  runPrincipal.beginTime());
914  espController_->forceCacheClear();
915  }
916  {
917  SendSourceTerminationSignalIfException sentry(actReg_.get());
918  espController_->eventSetupForInstance(ts);
919  eventSetupForInstanceSucceeded = true;
920  sentry.completedSuccessfully();
921  }
922  EventSetup const& es = esp_->eventSetup();
923  if(looper_ && looperBeginJobRun_== false) {
924  looper_->copyInfo(ScheduleInfo(schedule_.get()));
925  looper_->beginOfJob(es);
926  looperBeginJobRun_ = true;
927  looper_->doStartingNewLoop();
928  }
929  {
931  auto globalWaitTask = make_empty_waiting_task();
932  globalWaitTask->increment_ref_count();
933  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
934  *schedule_,
935  runPrincipal,
936  ts,
937  es,
939  subProcesses_);
940  globalWaitTask->wait_for_all();
941  if(globalWaitTask->exceptionPtr() != nullptr) {
942  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
943  }
944  }
945  globalBeginSucceeded = true;
946  FDEBUG(1) << "\tbeginRun " << run << "\n";
947  if(looper_) {
948  looper_->doBeginRun(runPrincipal, es, &processContext_);
949  }
950  {
951  //To wait, the ref count has to be 1+#streams
952  auto streamLoopWaitTask = make_empty_waiting_task();
953  streamLoopWaitTask->increment_ref_count();
954 
956 
957  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
958  *schedule_,
960  runPrincipal,
961  ts,
962  es,
964  subProcesses_);
965 
966  streamLoopWaitTask->wait_for_all();
967  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
968  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
969  }
970  }
971  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
972  if(looper_) {
973  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
974  }
975  }
976 
978  bool globalBeginSucceeded, bool cleaningUpAfterException,
979  bool eventSetupForInstanceSucceeded) {
980  if (eventSetupForInstanceSucceeded) {
981  //If we skip empty runs, this would be called conditionally
982  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
983 
984  if(globalBeginSucceeded) {
986  t->increment_ref_count();
987  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
988  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
989  mergeableRunProductMetadata->preWriteRun();
990  writeRunAsync(edm::WaitingTaskHolder{t.get()}, phid, run, mergeableRunProductMetadata);
991  t->wait_for_all();
992  mergeableRunProductMetadata->postWriteRun();
993  if(t->exceptionPtr()) {
994  std::rethrow_exception(*t->exceptionPtr());
995  }
996  }
997  }
998  deleteRunFromCache(phid, run);
999  }
1000 
1001  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1002  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1003  runPrincipal.setEndTime(input_->timestamp());
1004 
1006  runPrincipal.endTime());
1007  {
1008  SendSourceTerminationSignalIfException sentry(actReg_.get());
1009  espController_->eventSetupForInstance(ts);
1010  sentry.completedSuccessfully();
1011  }
1012  EventSetup const& es = esp_->eventSetup();
1013  if(globalBeginSucceeded){
1014  //To wait, the ref count has to be 1+#streams
1015  auto streamLoopWaitTask = make_empty_waiting_task();
1016  streamLoopWaitTask->increment_ref_count();
1017 
1019 
1020  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(streamLoopWaitTask.get()),
1021  *schedule_,
1023  runPrincipal,
1024  ts,
1025  es,
1026  serviceToken_,
1027  subProcesses_,
1028  cleaningUpAfterException);
1029 
1030  streamLoopWaitTask->wait_for_all();
1031  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1032  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1033  }
1034  }
1035  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1036  if(looper_) {
1037  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1038  }
1039  {
1040  auto globalWaitTask = make_empty_waiting_task();
1041  globalWaitTask->increment_ref_count();
1042 
1044  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1045  *schedule_,
1046  runPrincipal,
1047  ts,
1048  es,
1049  serviceToken_,
1050  subProcesses_,
1051  cleaningUpAfterException);
1052  globalWaitTask->wait_for_all();
1053  if(globalWaitTask->exceptionPtr() != nullptr) {
1054  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1055  }
1056  }
1057  FDEBUG(1) << "\tendRun " << run << "\n";
1058  if(looper_) {
1059  looper_->doEndRun(runPrincipal, es, &processContext_);
1060  }
1061  }
1062 
1064  EventProcessor::processLumis(std::shared_ptr<void> const& iRunResource) {
1065  auto waitTask = make_empty_waiting_task();
1066  waitTask->increment_ref_count();
1067 
1068  if(streamLumiActive_> 0) {
1070  continueLumiAsync(WaitingTaskHolder{waitTask.get()});
1071  } else {
1072  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1073  input_->luminosityBlockAuxiliary()->beginTime()),
1074  iRunResource,
1075  WaitingTaskHolder{waitTask.get()});
1076  }
1077  waitTask->wait_for_all();
1078 
1079  if(waitTask->exceptionPtr() != nullptr) {
1080  std::rethrow_exception(* (waitTask->exceptionPtr()) );
1081  }
1082  return lastTransitionType();
1083  }
1084 
1085  void
1087  std::shared_ptr<void> const& iRunResource, edm::WaitingTaskHolder iHolder) {
1088  if(iHolder.taskHasFailed()) { return; }
1089 
1090  auto status= std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource) ;
1091 
1092  auto lumiWork = [this, iHolder, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1093  if(iHolder.taskHasFailed()) { return; }
1094 
1095  status->setResumer(std::move(iResumer));
1096 
1097  sourceResourcesAcquirer_.serialQueueChain().push([this,iHolder,status]() mutable {
1098  //make the services available
1100 
1101  try {
1103 
1104  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1105  {
1106  SendSourceTerminationSignalIfException sentry(actReg_.get());
1107 
1108  input_->doBeginLumi(lumiPrincipal, &processContext_);
1109  sentry.completedSuccessfully();
1110  }
1111 
1113  if(rng.isAvailable()) {
1114  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1115  rng->preBeginLumi(lb);
1116  }
1117 
1118  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1119 
1120  //Task to start the stream beginLumis
1121  auto beginStreamsTask= make_waiting_task(tbb::task::allocate_root()
1122  ,[this, holder = iHolder, status, ts] (std::exception_ptr const* iPtr) mutable {
1123  if (iPtr) {
1124  holder.doneWaiting(*iPtr);
1125  } else {
1126 
1127  status->globalBeginDidSucceed();
1128  EventSetup const& es = esp_->eventSetup();
1129  if(looper_) {
1130  try {
1131  //make the services available
1133  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1134  }catch(...) {
1135  holder.doneWaiting(std::current_exception());
1136  return;
1137  }
1138  }
1140 
1141  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1142  streamQueues_[i].push([this,i,status,holder,ts,&es] () {
1143  streamQueues_[i].pause();
1144 
1145  auto eventTask = edm::make_waiting_task(tbb::task::allocate_root(),
1146  [this,i,h = holder](std::exception_ptr const* iPtr) mutable
1147  {
1148  if(iPtr) {
1149  h.doneWaiting(*iPtr);
1150  } else {
1152  }
1153  });
1154  auto& event = principalCache_.eventPrincipal(i);
1157  auto lp = status->lumiPrincipal();
1158  event.setLuminosityBlockPrincipal(lp.get());
1159  beginStreamTransitionAsync<Traits>(WaitingTaskHolder{eventTask},
1160  *schedule_,i,*lp,ts,es,
1162  });
1163  }
1164  }
1165  });
1166 
1167  //task to start the global begin lumi
1168  WaitingTaskHolder beginStreamsHolder{beginStreamsTask};
1169  EventSetup const& es = esp_->eventSetup();
1170  {
1172  beginGlobalTransitionAsync<Traits>(beginStreamsHolder,
1173  *schedule_,
1174  *(status->lumiPrincipal()),
1175  ts,
1176  es,
1177  serviceToken_,
1178  subProcesses_);
1179  }
1180  } catch(...) {
1181  iHolder.doneWaiting(std::current_exception());
1182  }
1183  });
1184  };
1185 
1186  //Safe to do check now since can not have multiple beginLumis at same time in this part of the code
1187  // because we do not attempt to read from the source again until we try to get the first event in a lumi
1188  if(espController_->isWithinValidityInterval(iSync)) {
1189  iovQueue_.pause();
1190  lumiQueue_->pushAndPause(std::move(lumiWork));
1191  } else {
1192  //If EventSetup fails, need beginStreamsHolder in order to pass back exception
1193  iovQueue_.push([this,iHolder,lumiWork,iSync]() mutable {
1194  try {
1195  SendSourceTerminationSignalIfException sentry(actReg_.get());
1196  espController_->eventSetupForInstance(iSync);
1197  sentry.completedSuccessfully();
1198  } catch(...) {
1199  iHolder.doneWaiting(std::current_exception());
1200  return;
1201  }
1202  iovQueue_.pause();
1203  lumiQueue_->pushAndPause(std::move(lumiWork));
1204  });
1205  }
1206  }
1207 
1208  void
1210  {
1211  //all streams are sharing the same status at the moment
1212  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1213  status->needToContinueLumi();
1214  status->startProcessingEvents();
1215  }
1216 
1217  unsigned int streamIndex = 0;
1218  for(; streamIndex< preallocations_.numberOfStreams()-1; ++streamIndex) {
1219  tbb::task::enqueue( *edm::make_functor_task(tbb::task::allocate_root(),
1220  [this,streamIndex,h = iHolder](){
1221  handleNextEventForStreamAsync(std::move(h), streamIndex);
1222  }) );
1223 
1224  }
1225  tbb::task::spawn( *edm::make_functor_task(tbb::task::allocate_root(),[this,streamIndex,h=std::move(iHolder)](){
1227  }) );
1228  }
1229 
1230  void EventProcessor::globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1231  //Need to be sure iTask is always destroyed after iLumiStatus since iLumiStatus can cause endRun to start.
1232  auto t = edm::make_waiting_task(tbb::task::allocate_root(), [ items = std::make_pair(iLumiStatus,std::move(iTask)), this] (std::exception_ptr const* iPtr) mutable {
1233  std::exception_ptr ptr;
1234  //use an easier to remember variable name
1235  auto status = std::move(items.first);
1236  if(iPtr) {
1237  ptr = *iPtr;
1238  WaitingTaskHolder tmp(items.second);
1239  //set the exception early to prevent a beginLumi from running
1240  // we use a copy to keep t from resetting on doneWaiting call.
1241  tmp.doneWaiting(ptr);
1242  } else {
1243  try {
1245  if(looper_) {
1246  auto& lp = *(status->lumiPrincipal());
1247  EventSetup const& es = esp_->eventSetup();
1248  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1249  }
1250  }catch(...) {
1251  if(not ptr) {
1252  ptr = std::current_exception();
1253  }
1254  }
1255  }
1257  try {
1259  //release our hold on the IOV
1260  iovQueue_.resume();
1261  status->resumeGlobalLumiQueue();
1262  } catch(...) {
1263  if( not ptr) {
1264  ptr = std::current_exception();
1265  }
1266  }
1267  try {
1268  status.reset();
1269  } catch(...) {
1270  if( not ptr) {
1271  ptr = std::current_exception();
1272  }
1273  }
1274  //have to wait until reset is called since that could call endRun
1275  items.second.doneWaiting(ptr);
1276  });
1277 
1278  auto writeT = edm::make_waiting_task(tbb::task::allocate_root(), [this,status =iLumiStatus, task = WaitingTaskHolder(t)] (std::exception_ptr const* iExcept) mutable {
1279  if(iExcept) {
1280  task.doneWaiting(*iExcept);
1281  } else {
1282  //Only call writeLumi if beginLumi succeeded
1283  if(status->didGlobalBeginSucceed()) {
1285  }
1286  }
1287  });
1288  auto& lp = *(iLumiStatus->lumiPrincipal());
1289 
1290  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()),
1291  lp.beginTime());
1292 
1293 
1295  EventSetup const& es = esp_->eventSetup();
1296 
1297  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(writeT),
1298  *schedule_,
1299  lp,
1300  ts,
1301  es,
1302  serviceToken_,
1303  subProcesses_,
1304  iLumiStatus->cleaningUpAfterException());
1305  }
1306 
1308  unsigned int iStreamIndex,
1309  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus) {
1310 
1311  auto t =edm::make_waiting_task(tbb::task::allocate_root(), [this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1312  std::exception_ptr ptr;
1313  if(iPtr) {
1314  ptr = *iPtr;
1315  }
1316  auto status =streamLumiStatus_[iStreamIndex];
1317  //reset status before releasing queue else get race condtion
1318  streamLumiStatus_[iStreamIndex].reset();
1320  streamQueues_[iStreamIndex].resume();
1321 
1322  //are we the last one?
1323  if( status->streamFinishedLumi()) {
1325  }
1326  iTask.doneWaiting(ptr);
1327  });
1328 
1329  edm::WaitingTaskHolder lumiDoneTask{t};
1330 
1331  iLumiStatus->setEndTime();
1332 
1333  if(iLumiStatus->didGlobalBeginSucceed()) {
1334  auto & lumiPrincipal = *iLumiStatus->lumiPrincipal();
1335  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1336  lumiPrincipal.endTime());
1337  EventSetup const& es = esp_->eventSetup();
1338 
1339  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1340 
1342  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1343  *schedule_,iStreamIndex,
1344  lumiPrincipal,ts,es,
1345  serviceToken_,
1346  subProcesses_,cleaningUpAfterException);
1347  }
1348  }
1349 
1350 
1352  if(streamLumiActive_.load() > 0) {
1353  auto globalWaitTask = make_empty_waiting_task();
1354  globalWaitTask->increment_ref_count();
1355  {
1356  WaitingTaskHolder globalTaskHolder{globalWaitTask.get()};
1357  for(unsigned int i=0; i< preallocations_.numberOfStreams(); ++i) {
1358  if(streamLumiStatus_[i]) {
1359  streamEndLumiAsync(globalTaskHolder, i, streamLumiStatus_[i]);
1360  }
1361  }
1362  }
1363  globalWaitTask->wait_for_all();
1364  if(globalWaitTask->exceptionPtr() != nullptr) {
1365  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1366  }
1367  }
1368  }
1369 
1370  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1373  << "EventProcessor::readRun\n"
1374  << "Illegal attempt to insert run into cache\n"
1375  << "Contact a Framework Developer\n";
1376  }
1377  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(),
1379  0, true, &mergeableRunProductProcesses_);
1380  {
1381  SendSourceTerminationSignalIfException sentry(actReg_.get());
1382  input_->readRun(*rp, *historyAppender_);
1383  sentry.completedSuccessfully();
1384  }
1385  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1386  principalCache_.insert(rp);
1387  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1388  }
1389 
1390  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1391  principalCache_.merge(input_->runAuxiliary(), preg());
1392  auto runPrincipal =principalCache_.runPrincipalPtr();
1393  {
1394  SendSourceTerminationSignalIfException sentry(actReg_.get());
1395  input_->readAndMergeRun(*runPrincipal);
1396  sentry.completedSuccessfully();
1397  }
1398  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1399  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1400  }
1401 
1405  << "EventProcessor::readLuminosityBlock\n"
1406  << "Illegal attempt to insert lumi into cache\n"
1407  << "Run is invalid\n"
1408  << "Contact a Framework Developer\n";
1409  }
1411  assert(lbp);
1412  lbp->setAux(*input_->luminosityBlockAuxiliary());
1413  {
1414  SendSourceTerminationSignalIfException sentry(actReg_.get());
1415  input_->readLuminosityBlock(*lbp, *historyAppender_);
1416  sentry.completedSuccessfully();
1417  }
1418  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1419  iStatus.lumiPrincipal() = std::move(lbp);
1420  }
1421 
1423  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1424  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1425  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) == input_->processHistoryRegistry().reducedProcessHistoryID(input_->luminosityBlockAuxiliary()->processHistoryID()));
1426  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1427  assert(lumiOK);
1428  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1429  {
1430  SendSourceTerminationSignalIfException sentry(actReg_.get());
1431  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1432  sentry.completedSuccessfully();
1433  }
1434  return input_->luminosityBlock();
1435  }
1436 
1438  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
1439  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,phid,run,task,mergeableRunProductMetadata]
1440  (std::exception_ptr const* iExcept) mutable {
1441  if(iExcept) {
1442  task.doneWaiting(*iExcept);
1443  } else {
1445  for(auto&s : subProcesses_) {
1446  s.writeRunAsync(task,phid,run,mergeableRunProductMetadata);
1447  }
1448  }
1449  });
1451  schedule_->writeRunAsync(WaitingTaskHolder(subsT), principalCache_.runPrincipal(phid, run),
1452  &processContext_, actReg_.get(), mergeableRunProductMetadata);
1453  }
1454 
1456  principalCache_.deleteRun(phid, run);
1457  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1458  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1459  }
1460 
1461  void EventProcessor::writeLumiAsync(WaitingTaskHolder task, std::shared_ptr<LuminosityBlockProcessingStatus> iStatus) {
1462  auto subsT = edm::make_waiting_task(tbb::task::allocate_root(), [this,task, iStatus](std::exception_ptr const* iExcept) mutable {
1463  if(iExcept) {
1464  task.doneWaiting(*iExcept);
1465  } else {
1467  for(auto&s : subProcesses_) {
1468  s.writeLumiAsync(task,*(iStatus->lumiPrincipal()));
1469  }
1470  }
1471  });
1473 
1474  std::shared_ptr<LuminosityBlockPrincipal> const& lumiPrincipal = iStatus->lumiPrincipal();
1475  lumiPrincipal->runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal->luminosityBlock());
1476 
1477  schedule_->writeLumiAsync(WaitingTaskHolder{subsT}, *lumiPrincipal, &processContext_, actReg_.get());
1478  }
1479 
1481  for(auto& s: subProcesses_) { s.deleteLumiFromCache(*iStatus.lumiPrincipal());}
1482  iStatus.lumiPrincipal()->clearPrincipal();
1483  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1484  }
1485 
1486  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1488  if(shouldWeStop()) {
1489  return false;
1490  }
1491 
1492  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1493  return false;
1494  }
1495 
1496  if(iStatus.wasEventProcessingStopped()) {
1497  return false;
1498  }
1499 
1501  try {
1502  //need to use lock in addition to the serial task queue because
1503  // of delayed provenance reading and reading data in response to
1504  // edm::Refs etc
1505  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1506 
1507  auto itemType = iStatus.continuingLumi()? InputSource::IsLumi : nextTransitionType();
1508  if(InputSource::IsLumi == itemType) {
1509  iStatus.haveContinuedLumi();
1510  while(itemType == InputSource::IsLumi and
1511  iStatus.lumiPrincipal()->run() == input_->run() and
1512  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1513  readAndMergeLumi(iStatus);
1514  itemType = nextTransitionType();
1515  }
1516  if(InputSource::IsLumi == itemType) {
1517  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1518  input_->luminosityBlockAuxiliary()->beginTime()));
1519  }
1520  }
1521  if(InputSource::IsEvent != itemType) {
1522  iStatus.stopProcessingEvents();
1523 
1524  //IsFile may continue processing the lumi and
1525  // looper_ can cause the input source to declare a new IsRun which is actually
1526  // just a continuation of the previous run
1527  if(InputSource::IsStop == itemType or
1528  InputSource::IsLumi == itemType or
1529  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1530  iStatus.endLumi();
1531  }
1532  return false;
1533  }
1534  readEvent(iStreamIndex);
1535  } catch (...) {
1536  bool expected =false;
1537  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1538  deferredExceptionPtr_ = std::current_exception();
1539  }
1540  return false;
1541  }
1542  return true;
1543  }
1544 
1546  unsigned int iStreamIndex)
1547  {
1548  sourceResourcesAcquirer_.serialQueueChain().push([this,iTask,iStreamIndex]() mutable {
1550  auto& status = streamLumiStatus_[iStreamIndex];
1551  try {
1552  if(readNextEventForStream(iStreamIndex, *status) ) {
1553  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex](std::exception_ptr const* iPtr) mutable {
1554  if(iPtr) {
1555  bool expected = false;
1556  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1557  deferredExceptionPtr_ = *iPtr;
1558  iTask.doneWaiting(*iPtr);
1559  }
1560  //the stream will stop now
1561  return;
1562  }
1563  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1564  });
1565 
1566  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1567  } else {
1568  //the stream will stop now
1569  if(status->isLumiEnding()) {
1570  if(lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1571  status->startNextLumi();
1572  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1573  }
1574  streamEndLumiAsync(std::move(iTask),iStreamIndex, status);
1575  } else {
1576  iTask.doneWaiting(std::exception_ptr{});
1577  }
1578  }
1579  } catch(...) {
1580  bool expected = false;
1581  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1582  auto e =std::current_exception();
1584  iTask.doneWaiting(e);
1585  }
1586  }
1587  });
1588  }
1589 
1590  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1591  //TODO this will have to become per stream
1592  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1593  StreamContext streamContext(event.streamID(), &processContext_);
1594 
1595  SendSourceTerminationSignalIfException sentry(actReg_.get());
1596  input_->readEvent(event, streamContext);
1597 
1598  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1599  sentry.completedSuccessfully();
1600 
1601  FDEBUG(1) << "\treadEvent\n";
1602  }
1603 
1605  unsigned int iStreamIndex) {
1606  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1607  processEventAsyncImpl(iHolder, iStreamIndex);
1608  }) );
1609  }
1610 
1612  unsigned int iStreamIndex) {
1613  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1614 
1617  if(rng.isAvailable()) {
1618  Event ev(*pep, ModuleDescription(), nullptr);
1619  rng->postEventRead(ev);
1620  }
1621 
1622  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1623  tbb::task::allocate_root(),
1624  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1625  {
1626 
1627  //NOTE: If we have a looper we only have one Stream
1628  if(looper_) {
1630  processEventWithLooper(*pep);
1631  }
1632 
1633  FDEBUG(1) << "\tprocessEvent\n";
1634  pep->clearEventPrincipal();
1635  if(iPtr) {
1636  iHolder.doneWaiting(*iPtr);
1637  } else {
1638  iHolder.doneWaiting(std::exception_ptr());
1639  }
1640  }
1641  )
1642  );
1643  WaitingTaskHolder afterProcessTask;
1644  if(subProcesses_.empty()) {
1645  afterProcessTask = std::move(finalizeEventTask);
1646  } else {
1647  //Need to run SubProcesses after schedule has finished
1648  // with the event
1649  afterProcessTask = WaitingTaskHolder(
1650  make_waiting_task(tbb::task::allocate_root(),
1651  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1652  {
1653  if(not iPtr) {
1654  //when run with 1 thread, we want to the order to be what
1655  // it was before. This requires reversing the order since
1656  // tasks are run last one in first one out
1657  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1658  subProcess.doEventAsync(finalizeEventTask,*pep);
1659  }
1660  } else {
1661  finalizeEventTask.doneWaiting(*iPtr);
1662  }
1663  })
1664  );
1665  }
1666 
1667  schedule_->processOneEventAsync(std::move(afterProcessTask),
1668  iStreamIndex,*pep, esp_->eventSetup(), serviceToken_);
1669 
1670  }
1671 
1673  bool randomAccess = input_->randomAccess();
1674  ProcessingController::ForwardState forwardState = input_->forwardState();
1675  ProcessingController::ReverseState reverseState = input_->reverseState();
1676  ProcessingController pc(forwardState, reverseState, randomAccess);
1677 
1679  do {
1680 
1681  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1682  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1683 
1684  bool succeeded = true;
1685  if(randomAccess) {
1687  input_->skipEvents(-2);
1688  }
1690  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1691  }
1692  }
1693  pc.setLastOperationSucceeded(succeeded);
1694  } while(!pc.lastOperationSucceeded());
1695  if(status != EDLooperBase::kContinue) {
1696  shouldWeStop_ = true;
1698  }
1699  }
1700 
1702  FDEBUG(1) << "\tshouldWeStop\n";
1703  if(shouldWeStop_) return true;
1704  if(!subProcesses_.empty()) {
1705  for(auto const& subProcess : subProcesses_) {
1706  if(subProcess.terminate()) {
1707  return true;
1708  }
1709  }
1710  return false;
1711  }
1712  return schedule_->terminate();
1713  }
1714 
1716  exceptionMessageFiles_ = message;
1717  }
1718 
1720  exceptionMessageRuns_ = message;
1721  }
1722 
1724  exceptionMessageLumis_ = message;
1725  }
1726 
1727  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1728  bool expected =false;
1729  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1730  deferredExceptionPtr_ = iException;
1731  return true;
1732  }
1733  return false;
1734  }
1735 
1737  std::unique_ptr<LogSystem> s;
1738  for( auto worker: schedule_->allWorkers()) {
1739  if( worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
1740  if(not s) {
1741  s = std::make_unique<LogSystem>("ModulesSynchingOnLumis");
1742  (*s) <<"The following modules require synchronizing on LuminosityBlock boundaries:";
1743  }
1744  (*s)<<"\n "<<worker->description().moduleName()<<" "<<worker->description().moduleLabel();
1745  }
1746  }
1747  }
1748 }
size
Write out results.
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void warnAboutModulesRequiringLuminosityBLockSynchronization() const
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void clearCounters()
Clears counters used by trigger report.
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
bool ev
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
bool hasRunPrincipal() const
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:65
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
bool alreadyPrinted() const
Definition: Exception.cc:251
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
void beginJob()
Definition: Breakpoints.cc:15
MergeableRunProductProcesses mergeableRunProductProcesses_
static std::string const input
Definition: EdmProvDump.cc:45
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:85
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
std::shared_ptr< LuminosityBlockPrincipal > & lumiPrincipal()
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
bool resume()
Resumes processing if the queue was paused.
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void setExceptionMessageLumis(std::string &message)
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:77
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:45
Timestamp const & endTime() const
Definition: RunPrincipal.h:81
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1190
int totalEvents() const
void writeLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
InputSource::ItemType lastSourceTransition_
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:686
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
StatusCode asyncStopStatusCodeFromProcessingEvents_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:100
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
void push(const T &iAction)
asynchronously pushes functor iAction into queue
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::atomic< unsigned int > streamLumiActive_
void processEventWithLooper(EventPrincipal &)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool pause()
Pauses processing of additional tasks from the queue.
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:13
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31