CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
36 
38 
46 
51 
53 
65 
66 #include "MessageForSource.h"
67 #include "MessageForParent.h"
68 
69 #include "boost/range/adaptor/reversed.hpp"
70 
71 #include <exception>
72 #include <iomanip>
73 #include <iostream>
74 #include <utility>
75 #include <sstream>
76 
77 #include <sys/ipc.h>
78 #include <sys/msg.h>
79 
80 #include "tbb/task.h"
81 
82 //Used for CPU affinity
83 #ifndef __APPLE__
84 #include <sched.h>
85 #endif
86 
87 namespace {
88  //Sentry class to only send a signal if an
89  // exception occurs. An exception is identified
90  // by the destructor being called without first
91  // calling completedSuccessfully().
92  class SendSourceTerminationSignalIfException {
93  public:
94  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
95  reg_(iReg) {}
96  ~SendSourceTerminationSignalIfException() {
97  if(reg_) {
98  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
99  }
100  }
101  void completedSuccessfully() {
102  reg_ = nullptr;
103  }
104  private:
105  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
106  };
107 
108 }
109 
110 namespace edm {
111 
112  // ---------------------------------------------------------------
113  std::unique_ptr<InputSource>
115  CommonParams const& common,
116  std::shared_ptr<ProductRegistry> preg,
117  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
118  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
119  std::shared_ptr<ActivityRegistry> areg,
120  std::shared_ptr<ProcessConfiguration const> processConfiguration,
121  PreallocationConfiguration const& allocations) {
122  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
123  if(main_input == nullptr) {
125  << "There must be exactly one source in the configuration.\n"
126  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
127  }
128 
129  std::string modtype(main_input->getParameter<std::string>("@module_type"));
130 
131  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
133  ConfigurationDescriptions descriptions(filler->baseType());
134  filler->fill(descriptions);
135 
136  try {
137  convertException::wrap([&]() {
138  descriptions.validate(*main_input, std::string("source"));
139  });
140  }
141  catch (cms::Exception & iException) {
142  std::ostringstream ost;
143  ost << "Validating configuration of input source of type " << modtype;
144  iException.addContext(ost.str());
145  throw;
146  }
147 
148  main_input->registerIt();
149 
150  // Fill in "ModuleDescription", in case the input source produces
151  // any EDProducts, which would be registered in the ProductRegistry.
152  // Also fill in the process history item for this process.
153  // There is no module label for the unnamed input source, so
154  // just use "source".
155  // Only the tracked parameters belong in the process configuration.
156  ModuleDescription md(main_input->id(),
157  main_input->getParameter<std::string>("@module_type"),
158  "source",
159  processConfiguration.get(),
160  ModuleDescription::getUniqueID());
161 
162  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
163  common.maxEventsInput_, common.maxLumisInput_,
164  common.maxSecondsUntilRampdown_, allocations);
165 
166  areg->preSourceConstructionSignal_(md);
167  std::unique_ptr<InputSource> input;
168  try {
169  //even if we have an exception, send the signal
170  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
171  convertException::wrap([&]() {
172  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
173  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
174  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
175  });
176  }
177  catch (cms::Exception& iException) {
178  std::ostringstream ost;
179  ost << "Constructing input source of type " << modtype;
180  iException.addContext(ost.str());
181  throw;
182  }
183  return input;
184  }
185 
186  // ---------------------------------------------------------------
187  std::shared_ptr<EDLooperBase>
190  ParameterSet& params) {
191  std::shared_ptr<EDLooperBase> vLooper;
192 
193  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
194 
195  if(loopers.empty()) {
196  return vLooper;
197  }
198 
199  assert(1 == loopers.size());
200 
201  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
202  itName != itNameEnd;
203  ++itName) {
204 
205  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
206  providerPSet->registerIt();
207  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
208  cp,
209  *providerPSet);
210  }
211  return vLooper;
212  }
213 
214  // ---------------------------------------------------------------
215  EventProcessor::EventProcessor(std::string const& config,
216  ServiceToken const& iToken,
218  std::vector<std::string> const& defaultServices,
219  std::vector<std::string> const& forcedServices) :
220  actReg_(),
221  preg_(),
222  branchIDListHelper_(),
223  serviceToken_(),
224  input_(),
225  espController_(new eventsetup::EventSetupsController),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(),
243  exceptionMessageLumis_(),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
249  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
250  processDesc->addServices(defaultServices, forcedServices);
251  init(processDesc, iToken, iLegacy);
252  }
253 
255  std::vector<std::string> const& defaultServices,
256  std::vector<std::string> const& forcedServices) :
257  actReg_(),
258  preg_(),
260  serviceToken_(),
261  input_(),
262  espController_(new eventsetup::EventSetupsController),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
287  {
288  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
289  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
290  processDesc->addServices(defaultServices, forcedServices);
292  }
293 
294  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
295  ServiceToken const& token,
297  actReg_(),
298  preg_(),
300  serviceToken_(),
301  input_(),
302  espController_(new eventsetup::EventSetupsController),
303  esp_(),
304  act_table_(),
306  schedule_(),
307  subProcesses_(),
309  fb_(),
310  looper_(),
312  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
313  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
314  principalCache_(),
327  {
328  init(processDesc, token, legacy);
329  }
330 
331 
333  actReg_(),
334  preg_(),
336  serviceToken_(),
337  input_(),
338  espController_(new eventsetup::EventSetupsController),
339  esp_(),
340  act_table_(),
342  schedule_(),
343  subProcesses_(),
345  fb_(),
346  looper_(),
348  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
349  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
350  principalCache_(),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
374 
375  void
376  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
377  ServiceToken const& iToken,
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Now set some parameters specific to the main process.
395  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
396  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode", "FULLMERGE");
397  if(fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
398  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
399  << fileMode << ".\n"
400  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
401  } else {
402  fileModeNoMerge_ = (fileMode == "NOMERGE");
403  }
404  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
405  //threading
406  unsigned int nThreads=1;
407  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
408  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
409  if(nThreads == 0) {
410  nThreads = 1;
411  }
412  }
413  /* TODO: when we support having each stream run in a different thread use this default
414  unsigned int nStreams =nThreads;
415  */
416  unsigned int nStreams =1;
417  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
418  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if(nStreams==0) {
420  nStreams = nThreads;
421  }
422  }
423  if(nThreads >1) {
424  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
425  }
426 
427  /*
428  bool nRunsSet = false;
429  */
430  unsigned int nConcurrentRuns =1;
431  /*
432  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
433  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
434  }
435  */
436  unsigned int nConcurrentLumis =1;
437  /*
438  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
439  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
440  } else {
441  nConcurrentLumis = nConcurrentRuns;
442  }
443  */
444  //Check that relationships between threading parameters makes sense
445  /*
446  if(nThreads<nStreams) {
447  //bad
448  }
449  if(nConcurrentRuns>nStreams) {
450  //bad
451  }
452  if(nConcurrentRuns>nConcurrentLumis) {
453  //bad
454  }
455  */
456  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
457 
458  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
459 
460  // Now do general initialization
462 
463  //initialize the services
464  auto& serviceSets = processDesc->getServicesPSets();
465  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
466  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
467 
468  //make the services available
470 
471  if(nStreams>1) {
473  handler->willBeUsingThreads();
474  }
475 
476  // intialize miscellaneous items
477  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
478 
479  // intialize the event setup provider
480  esp_ = espController_->makeProvider(*parameterSet);
481 
482  // initialize the looper, if any
483  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
484  if(looper_) {
485  looper_->setActionTable(items.act_table_.get());
486  looper_->attachTo(*items.actReg_);
487 
488  //For now loopers make us run only 1 transition at a time
489  nStreams=1;
490  nConcurrentLumis=1;
491  nConcurrentRuns=1;
492  }
493 
494  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
495 
496  // initialize the input source
497  input_ = makeInput(*parameterSet,
498  *common,
499  items.preg(),
500  items.branchIDListHelper(),
502  items.actReg_,
503  items.processConfiguration(),
505 
506  // intialize the Schedule
507  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
508 
509  // set the data members
511  actReg_ = items.actReg_;
512  preg_ = items.preg();
517  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
518 
519  FDEBUG(2) << parameterSet << std::endl;
520 
522  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
523  // Reusable event principal
524  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
527  }
528 
529  // fill the subprocesses, if there are any
530  subProcesses_.reserve(subProcessVParameterSet.size());
531  for(auto& subProcessPSet : subProcessVParameterSet) {
532  subProcesses_.emplace_back(subProcessPSet,
533  *parameterSet,
534  preg(),
539  *actReg_,
540  token,
543  &processContext_);
544  }
545  }
546 
548  // Make the services available while everything is being deleted.
549  ServiceToken token = getToken();
550  ServiceRegistry::Operate op(token);
551 
552  // manually destroy all these thing that may need the services around
553  // propagate_const<T> has no reset() function
554  espController_ = nullptr;
555  esp_ = nullptr;
556  schedule_ = nullptr;
557  input_ = nullptr;
558  looper_ = nullptr;
559  actReg_ = nullptr;
560 
563  }
564 
565  void
567  if(beginJobCalled_) return;
568  beginJobCalled_=true;
569  bk::beginJob();
570 
571  // StateSentry toerror(this); // should we add this ?
572  //make the services available
574 
579  actReg_->preallocateSignal_(bounds);
580  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
582 
583  //NOTE: this may throw
585  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
586 
587  //NOTE: This implementation assumes 'Job' means one call
588  // the EventProcessor::run
589  // If it really means once per 'application' then this code will
590  // have to be changed.
591  // Also have to deal with case where have 'run' then new Module
592  // added and do 'run'
593  // again. In that case the newly added Module needs its 'beginJob'
594  // to be called.
595 
596  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
597  // For now we delay calling beginOfJob until first beginOfRun
598  //if(looper_) {
599  // looper_->beginOfJob(es);
600  //}
601  try {
602  convertException::wrap([&]() {
603  input_->doBeginJob();
604  });
605  }
606  catch(cms::Exception& ex) {
607  ex.addContext("Calling beginJob for the source");
608  throw;
609  }
610  schedule_->beginJob(*preg_);
611  // toerror.succeeded(); // should we add this?
612  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
613  actReg_->postBeginJobSignal_();
614 
615  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
616  schedule_->beginStream(i);
617  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
618  }
619  }
620 
621  void
623  // Collects exceptions, so we don't throw before all operations are performed.
624  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
625 
626  //make the services available
628 
629  //NOTE: this really should go elsewhere in the future
630  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
631  c.call([this,i](){this->schedule_->endStream(i);});
632  for(auto& subProcess : subProcesses_) {
633  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
634  }
635  }
636  auto actReg = actReg_.get();
637  c.call([actReg](){actReg->preEndJobSignal_();});
638  schedule_->endJob(c);
639  for(auto& subProcess : subProcesses_) {
640  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
641  }
642  c.call(std::bind(&InputSource::doEndJob, input_.get()));
643  if(looper_) {
644  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
645  }
646  c.call([actReg](){actReg->postEndJobSignal_();});
647  if(c.hasThrown()) {
648  c.rethrow();
649  }
650  }
651 
654  return serviceToken_;
655  }
656 
657  std::vector<ModuleDescription const*>
659  return schedule_->getAllModuleDescriptions();
660  }
661 
662  int
664  return schedule_->totalEvents();
665  }
666 
667  int
669  return schedule_->totalEventsPassed();
670  }
671 
672  int
674  return schedule_->totalEventsFailed();
675  }
676 
677  void
679  schedule_->enableEndPaths(active);
680  }
681 
682  bool
684  return schedule_->endPathsEnabled();
685  }
686 
687  void
689  schedule_->getTriggerReport(rep);
690  }
691 
692  void
694  schedule_->clearCounters();
695  }
696 
697  namespace {
698 #include "TransitionProcessors.icc"
699  }
700 
701  bool
703  bool returnValue = false;
704 
705  // Look for a shutdown signal
706  if(shutdown_flag.load(std::memory_order_acquire)) {
707  returnValue = true;
708  returnCode = epSignal;
709  }
710  return returnValue;
711  }
712 
715  SendSourceTerminationSignalIfException sentry(actReg_.get());
716  InputSource::ItemType itemType;
717  //For now, do nothing with InputSource::IsSynchronize
718  do {
719  itemType = input_->nextItemType();
720  } while( itemType == InputSource::IsSynchronize);
721 
722  sentry.completedSuccessfully();
723 
725 
726  if(checkForAsyncStopRequest(returnCode)) {
727  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
728  return InputSource::IsStop;
729  }
730 
731  return itemType;
732  }
733 
734  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
736  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
737  }
738 
741  return input_->luminosityBlock();
742  }
743 
746 
749  {
750  beginJob(); //make sure this was called
751 
752  // make the services available
754 
755 
758  try {
759  FilesProcessor fp(fileModeNoMerge_);
760 
761  convertException::wrap([&]() {
762  bool firstTime = true;
763  do {
764  if(not firstTime) {
766  rewindInput();
767  } else {
768  firstTime = false;
769  }
770  startingNewLoop();
771 
772  auto trans = fp.processFiles(*this);
773 
774  fp.normalEnd();
775 
776  if(deferredExceptionPtrIsSet_.load()) {
777  std::rethrow_exception(deferredExceptionPtr_);
778  }
779  if(trans != InputSource::IsStop) {
780  //problem with the source
781  doErrorStuff();
782 
783  throw cms::Exception("BadTransition")
784  << "Unexpected transition change "
785  << trans;
786 
787  }
788  } while(not endOfLoop());
789  }); // convertException::wrap
790 
791  } // Try block
792  catch (cms::Exception & e) {
793  if (!exceptionMessageLumis_.empty()) {
795  if (e.alreadyPrinted()) {
796  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
797  }
798  }
799  if (!exceptionMessageRuns_.empty()) {
801  if (e.alreadyPrinted()) {
802  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
803  }
804  }
805  if (!exceptionMessageFiles_.empty()) {
807  if (e.alreadyPrinted()) {
808  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
809  }
810  }
811  throw;
812  }
813  }
814 
815  return returnCode;
816  }
817 
819  FDEBUG(1) << " \treadFile\n";
820  size_t size = preg_->size();
821  SendSourceTerminationSignalIfException sentry(actReg_.get());
822 
823  fb_ = input_->readFile();
824  if(size < preg_->size()) {
826  }
830  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
831  }
832  sentry.completedSuccessfully();
833  }
834 
835  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
836  if (fb_.get() != nullptr) {
837  SendSourceTerminationSignalIfException sentry(actReg_.get());
838  input_->closeFile(fb_.get(), cleaningUpAfterException);
839  sentry.completedSuccessfully();
840  }
841  FDEBUG(1) << "\tcloseInputFile\n";
842  }
843 
845  if (fb_.get() != nullptr) {
846  schedule_->openOutputFiles(*fb_);
847  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
848  }
849  FDEBUG(1) << "\topenOutputFiles\n";
850  }
851 
853  if (fb_.get() != nullptr) {
854  schedule_->closeOutputFiles();
855  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
856  }
857  FDEBUG(1) << "\tcloseOutputFiles\n";
858  }
859 
861  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
862  if (fb_.get() != nullptr) {
863  schedule_->respondToOpenInputFile(*fb_);
864  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
865  }
866  FDEBUG(1) << "\trespondToOpenInputFile\n";
867  }
868 
870  if (fb_.get() != nullptr) {
871  schedule_->respondToCloseInputFile(*fb_);
872  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
873  }
874  FDEBUG(1) << "\trespondToCloseInputFile\n";
875  }
876 
878  shouldWeStop_ = false;
879  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
880  // until after we've called beginOfJob
881  if(looper_ && looperBeginJobRun_) {
882  looper_->doStartingNewLoop();
883  }
884  FDEBUG(1) << "\tstartingNewLoop\n";
885  }
886 
888  if(looper_) {
889  ModuleChanger changer(schedule_.get(),preg_.get());
890  looper_->setModuleChanger(&changer);
891  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
892  looper_->setModuleChanger(nullptr);
893  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
894  else return false;
895  }
896  FDEBUG(1) << "\tendOfLoop\n";
897  return true;
898  }
899 
901  input_->repeat();
902  input_->rewind();
903  FDEBUG(1) << "\trewind\n";
904  }
905 
907  looper_->prepareForNextLoop(esp_.get());
908  FDEBUG(1) << "\tprepareForNextLoop\n";
909  }
910 
912  FDEBUG(1) << "\tshouldWeCloseOutput\n";
913  if(!subProcesses_.empty()) {
914  for(auto const& subProcess : subProcesses_) {
915  if(subProcess.shouldWeCloseOutput()) {
916  return true;
917  }
918  }
919  return false;
920  }
921  return schedule_->shouldWeCloseOutput();
922  }
923 
925  FDEBUG(1) << "\tdoErrorStuff\n";
926  LogError("StateMachine")
927  << "The EventProcessor state machine encountered an unexpected event\n"
928  << "and went to the error state\n"
929  << "Will attempt to terminate processing normally\n"
930  << "(IF using the looper the next loop will be attempted)\n"
931  << "This likely indicates a bug in an input module or corrupted input or both\n";
932  }
933 
935  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
936  {
937  SendSourceTerminationSignalIfException sentry(actReg_.get());
938 
939  input_->doBeginRun(runPrincipal, &processContext_);
940  sentry.completedSuccessfully();
941  }
942 
943  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
944  runPrincipal.beginTime());
946  espController_->forceCacheClear();
947  }
948  {
949  SendSourceTerminationSignalIfException sentry(actReg_.get());
950  espController_->eventSetupForInstance(ts);
951  sentry.completedSuccessfully();
952  }
953  EventSetup const& es = esp_->eventSetup();
954  if(looper_ && looperBeginJobRun_== false) {
955  looper_->copyInfo(ScheduleInfo(schedule_.get()));
956  looper_->beginOfJob(es);
957  looperBeginJobRun_ = true;
958  looper_->doStartingNewLoop();
959  }
960  {
962  auto globalWaitTask = make_empty_waiting_task();
963  globalWaitTask->increment_ref_count();
964  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
965  *schedule_,
966  runPrincipal,
967  ts,
968  es,
969  subProcesses_);
970  globalWaitTask->wait_for_all();
971  if(globalWaitTask->exceptionPtr() != nullptr) {
972  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
973  }
974  }
975  FDEBUG(1) << "\tbeginRun " << run << "\n";
976  if(looper_) {
977  looper_->doBeginRun(runPrincipal, es, &processContext_);
978  }
979  {
980  //To wait, the ref count has to be 1+#streams
981  auto streamLoopWaitTask = make_empty_waiting_task();
982  streamLoopWaitTask->increment_ref_count();
983 
985 
986  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
987  *schedule_,
989  runPrincipal,
990  ts,
991  es,
992  subProcesses_);
993 
994  streamLoopWaitTask->wait_for_all();
995  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
996  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
997  }
998  }
999  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1000  if(looper_) {
1001  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1002  }
1003  }
1004 
1005  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool cleaningUpAfterException) {
1006  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1007  runPrincipal.setAtEndTransition(true);
1008  //We need to reset failed items since they might
1009  // be set this time around
1010  runPrincipal.resetFailedFromThisProcess();
1011 
1012  {
1013  SendSourceTerminationSignalIfException sentry(actReg_.get());
1014 
1015  runPrincipal.setEndTime(input_->timestamp());
1016  runPrincipal.setComplete();
1017  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1018  sentry.completedSuccessfully();
1019  }
1020 
1022  runPrincipal.endTime());
1023  {
1024  SendSourceTerminationSignalIfException sentry(actReg_.get());
1025  espController_->eventSetupForInstance(ts);
1026  sentry.completedSuccessfully();
1027  }
1028  EventSetup const& es = esp_->eventSetup();
1029  {
1030  //To wait, the ref count has to be 1+#streams
1031  auto streamLoopWaitTask = make_empty_waiting_task();
1032  streamLoopWaitTask->increment_ref_count();
1033 
1035 
1036  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1037  *schedule_,
1039  runPrincipal,
1040  ts,
1041  es,
1042  subProcesses_,
1043  cleaningUpAfterException);
1044 
1045  streamLoopWaitTask->wait_for_all();
1046  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1047  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1048  }
1049  }
1050  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1051  if(looper_) {
1052  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1053  }
1054  {
1055  auto globalWaitTask = make_empty_waiting_task();
1056  globalWaitTask->increment_ref_count();
1057 
1059  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1060  *schedule_,
1061  runPrincipal,
1062  ts,
1063  es,
1064  subProcesses_,
1065  cleaningUpAfterException);
1066  globalWaitTask->wait_for_all();
1067  if(globalWaitTask->exceptionPtr() != nullptr) {
1068  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1069  }
1070  }
1071  FDEBUG(1) << "\tendRun " << run << "\n";
1072  if(looper_) {
1073  looper_->doEndRun(runPrincipal, es, &processContext_);
1074  }
1075  }
1076 
1078  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1079  {
1080  SendSourceTerminationSignalIfException sentry(actReg_.get());
1081 
1082  input_->doBeginLumi(lumiPrincipal, &processContext_);
1083  sentry.completedSuccessfully();
1084  }
1085 
1087  if(rng.isAvailable()) {
1088  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1089  rng->preBeginLumi(lb);
1090  }
1091 
1092  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1093  // lumi blocks know their start and end times why not also start and end events?
1094  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1095  {
1096  SendSourceTerminationSignalIfException sentry(actReg_.get());
1097  espController_->eventSetupForInstance(ts);
1098  sentry.completedSuccessfully();
1099  }
1100  EventSetup const& es = esp_->eventSetup();
1101  {
1103  auto globalWaitTask = make_empty_waiting_task();
1104  globalWaitTask->increment_ref_count();
1105  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1106  *schedule_,
1107  lumiPrincipal,
1108  ts,
1109  es,
1110  subProcesses_);
1111  globalWaitTask->wait_for_all();
1112  if(globalWaitTask->exceptionPtr() != nullptr) {
1113  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1114  }
1115  }
1116  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1117  if(looper_) {
1118  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1119  }
1120  {
1121  //To wait, the ref count has to b 1+#streams
1122  auto streamLoopWaitTask = make_empty_waiting_task();
1123  streamLoopWaitTask->increment_ref_count();
1124 
1126 
1127  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1128  *schedule_,
1130  lumiPrincipal,
1131  ts,
1132  es,
1133  subProcesses_);
1134  streamLoopWaitTask->wait_for_all();
1135  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1136  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1137  }
1138  }
1139 
1140  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1141  if(looper_) {
1142  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1143  }
1144  }
1145 
1146  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1147  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1148  lumiPrincipal.setAtEndTransition(true);
1149  //We need to reset failed items since they might
1150  // be set this time around
1151  lumiPrincipal.resetFailedFromThisProcess();
1152 
1153  {
1154  SendSourceTerminationSignalIfException sentry(actReg_.get());
1155 
1156  lumiPrincipal.setEndTime(input_->timestamp());
1157  lumiPrincipal.setComplete();
1158  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1159  sentry.completedSuccessfully();
1160  }
1161  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1162  // lumi blocks know their start and end times why not also start and end events?
1163  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1164  lumiPrincipal.endTime());
1165  {
1166  SendSourceTerminationSignalIfException sentry(actReg_.get());
1167  espController_->eventSetupForInstance(ts);
1168  sentry.completedSuccessfully();
1169  }
1170  EventSetup const& es = esp_->eventSetup();
1171  {
1172  //To wait, the ref count has to b 1+#streams
1173  auto streamLoopWaitTask = make_empty_waiting_task();
1174  streamLoopWaitTask->increment_ref_count();
1175 
1177 
1178  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1179  *schedule_,
1181  lumiPrincipal,
1182  ts,
1183  es,
1184  subProcesses_,
1185  cleaningUpAfterException);
1186  streamLoopWaitTask->wait_for_all();
1187  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1188  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1189  }
1190  }
1191  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1192  if(looper_) {
1193  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1194  }
1195  {
1196  auto globalWaitTask = make_empty_waiting_task();
1197  globalWaitTask->increment_ref_count();
1198 
1200  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1201  *schedule_,
1202  lumiPrincipal,
1203  ts,
1204  es,
1205  subProcesses_,
1206  cleaningUpAfterException);
1207  globalWaitTask->wait_for_all();
1208  if(globalWaitTask->exceptionPtr() != nullptr) {
1209  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1210  }
1211  }
1212  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1213  if(looper_) {
1214  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1215  }
1216  }
1217 
1218  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1221  << "EventProcessor::readRun\n"
1222  << "Illegal attempt to insert run into cache\n"
1223  << "Contact a Framework Developer\n";
1224  }
1225  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1226  {
1227  SendSourceTerminationSignalIfException sentry(actReg_.get());
1228  input_->readRun(*rp, *historyAppender_);
1229  sentry.completedSuccessfully();
1230  }
1231  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1232  principalCache_.insert(rp);
1233  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1234  }
1235 
1236  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1237  principalCache_.merge(input_->runAuxiliary(), preg());
1238  auto runPrincipal =principalCache_.runPrincipalPtr();
1239  {
1240  SendSourceTerminationSignalIfException sentry(actReg_.get());
1241  input_->readAndMergeRun(*runPrincipal);
1242  sentry.completedSuccessfully();
1243  }
1244  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1245  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1246  }
1247 
1251  << "EventProcessor::readRun\n"
1252  << "Illegal attempt to insert lumi into cache\n"
1253  << "Contact a Framework Developer\n";
1254  }
1257  << "EventProcessor::readRun\n"
1258  << "Illegal attempt to insert lumi into cache\n"
1259  << "Run is invalid\n"
1260  << "Contact a Framework Developer\n";
1261  }
1262  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1263  {
1264  SendSourceTerminationSignalIfException sentry(actReg_.get());
1265  input_->readLuminosityBlock(*lbp, *historyAppender_);
1266  sentry.completedSuccessfully();
1267  }
1268  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1269  principalCache_.insert(lbp);
1270  return input_->luminosityBlock();
1271  }
1272 
1274  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1275  {
1276  SendSourceTerminationSignalIfException sentry(actReg_.get());
1277  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1278  sentry.completedSuccessfully();
1279  }
1280  return input_->luminosityBlock();
1281  }
1282 
1284  schedule_->writeRun(principalCache_.runPrincipal(phid, run), &processContext_);
1285  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.writeRun(phid, run); });
1286  FDEBUG(1) << "\twriteRun " << run << "\n";
1287  }
1288 
1290  principalCache_.deleteRun(phid, run);
1291  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1292  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1293  }
1294 
1296  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1297  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1298  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1299  }
1300 
1302  principalCache_.deleteLumi(phid, run, lumi);
1303  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1304  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1305  }
1306 
1307  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1308  std::atomic<bool>* finishedProcessingEvents) {
1309  if(shouldWeStop()) {
1310  return false;
1311  }
1312 
1313  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1314  return false;
1315  }
1316 
1317  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1318  return false;
1319  }
1320 
1322  try {
1323  //need to use lock in addition to the serial task queue because
1324  // of delayed provenance reading and reading data in response to
1325  // edm::Refs etc
1326  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1327  if(not firstEventInBlock_) {
1328  //The state machine already called input_->nextItemType
1329  // and found an event. We can't call input_->nextItemType
1330  // again since it would move to the next transition
1331  InputSource::ItemType itemType = input_->nextItemType();
1332  if (InputSource::IsEvent !=itemType) {
1334  finishedProcessingEvents->store(true,std::memory_order_release);
1335  //std::cerr<<"next item type "<<itemType<<"\n";
1336  return false;
1337  }
1339  //std::cerr<<"task told to async stop\n";
1340  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1341  return false;
1342  }
1343  } else {
1344  firstEventInBlock_ = false;
1345  }
1346  readEvent(iStreamIndex);
1347  } catch (...) {
1348  bool expected =false;
1349  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1350  deferredExceptionPtr_ = std::current_exception();
1351 
1352  }
1353  return false;
1354  }
1355  return true;
1356  }
1357 
1359  unsigned int iStreamIndex,
1360  std::atomic<bool>* finishedProcessingEvents)
1361  {
1362  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
1363  if(iPtr) {
1364  bool expected = false;
1365  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1366  deferredExceptionPtr_ = *iPtr;
1367  {
1368  WaitingTaskHolder h(iTask);
1369  h.doneWaiting(*iPtr);
1370  }
1371  }
1372  //the stream will stop now
1373  iTask->decrement_ref_count();
1374  return;
1375  }
1376 
1377  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
1378  });
1379 
1380  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
1382 
1383  try {
1384  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
1385  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1386  } else {
1387  //the stream will stop now
1388  tbb::task::destroy(*recursionTask);
1389  iTask->decrement_ref_count();
1390  }
1391  } catch(...) {
1392  WaitingTaskHolder h(recursionTask);
1393  h.doneWaiting(std::current_exception());
1394  }
1395  });
1396  }
1397 
1401 
1402  std::atomic<bool> finishedProcessingEvents{false};
1403  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
1404 
1405  //The state machine already found the event so
1406  // we have to avoid looking again
1407  firstEventInBlock_ = true;
1408 
1409  //To wait, the ref count has to b 1+#streams
1410  auto eventLoopWaitTask = make_empty_waiting_task();
1411  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
1412  eventLoopWaitTask->increment_ref_count();
1413 
1414  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1415  unsigned int iStreamIndex = 0;
1416  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1417  eventLoopWaitTask->increment_ref_count();
1418  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
1419  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
1420  }) );
1421  }
1422  eventLoopWaitTask->increment_ref_count();
1423  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
1424  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
1425  }));
1426 
1427  //One of the processing threads saw an exception
1429  std::rethrow_exception(deferredExceptionPtr_);
1430  }
1432  }
1433 
1434  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1435  //TODO this will have to become per stream
1436  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1437  StreamContext streamContext(event.streamID(), &processContext_);
1438 
1439  SendSourceTerminationSignalIfException sentry(actReg_.get());
1440  input_->readEvent(event, streamContext);
1441  sentry.completedSuccessfully();
1442 
1443  FDEBUG(1) << "\treadEvent\n";
1444  }
1445 
1447  unsigned int iStreamIndex) {
1448  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1449  processEventAsyncImpl(iHolder, iStreamIndex);
1450  }) );
1451  }
1452 
1454  unsigned int iStreamIndex) {
1455  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1456  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1457 
1460  if(rng.isAvailable()) {
1461  Event ev(*pep, ModuleDescription(), nullptr);
1462  rng->postEventRead(ev);
1463  }
1464  assert(pep->luminosityBlockPrincipalPtrValid());
1465  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1466  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1467 
1468  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1469  tbb::task::allocate_root(),
1470  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1471  {
1473 
1474  //NOTE: If we have a looper we only have one Stream
1475  if(looper_) {
1476  processEventWithLooper(*pep);
1477  }
1478 
1479  FDEBUG(1) << "\tprocessEvent\n";
1480  pep->clearEventPrincipal();
1481  if(iPtr) {
1482  iHolder.doneWaiting(*iPtr);
1483  } else {
1484  iHolder.doneWaiting(std::exception_ptr());
1485  }
1486  }
1487  )
1488  );
1489  WaitingTaskHolder afterProcessTask;
1490  if(subProcesses_.empty()) {
1491  afterProcessTask = std::move(finalizeEventTask);
1492  } else {
1493  //Need to run SubProcesses after schedule has finished
1494  // with the event
1495  afterProcessTask = WaitingTaskHolder(
1496  make_waiting_task(tbb::task::allocate_root(),
1497  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1498  {
1499  if(not iPtr) {
1501 
1502  //when run with 1 thread, we want to the order to be what
1503  // it was before. This requires reversing the order since
1504  // tasks are run last one in first one out
1505  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1506  subProcess.doEventAsync(finalizeEventTask,*pep);
1507  }
1508  } else {
1509  finalizeEventTask.doneWaiting(*iPtr);
1510  }
1511  })
1512  );
1513  }
1514 
1515  schedule_->processOneEventAsync(std::move(afterProcessTask),
1516  iStreamIndex,*pep, esp_->eventSetup());
1517 
1518  }
1519 
1521  bool randomAccess = input_->randomAccess();
1522  ProcessingController::ForwardState forwardState = input_->forwardState();
1523  ProcessingController::ReverseState reverseState = input_->reverseState();
1524  ProcessingController pc(forwardState, reverseState, randomAccess);
1525 
1527  do {
1528 
1529  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1530  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1531 
1532  bool succeeded = true;
1533  if(randomAccess) {
1535  input_->skipEvents(-2);
1536  }
1538  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1539  }
1540  }
1541  pc.setLastOperationSucceeded(succeeded);
1542  } while(!pc.lastOperationSucceeded());
1543  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1544  }
1545 
1547  FDEBUG(1) << "\tshouldWeStop\n";
1548  if(shouldWeStop_) return true;
1549  if(!subProcesses_.empty()) {
1550  for(auto const& subProcess : subProcesses_) {
1551  if(subProcess.terminate()) {
1552  return true;
1553  }
1554  }
1555  return false;
1556  }
1557  return schedule_->terminate();
1558  }
1559 
1562  }
1563 
1566  }
1567 
1570  }
1571 
1572  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1573  bool expected =false;
1574  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1575  deferredExceptionPtr_ = iException;
1576  return true;
1577  }
1578  return false;
1579  }
1580 
1581 }
size
Write out results.
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
static PFTauRenderPlugin instance
def destroy(e)
Definition: pyrootRender.py:13
void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void setExceptionMessageFiles(std::string &message)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void writeRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool ev
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:325
bool hasRunPrincipal() const
Definition: config.py:1
RunNumber_t run() const
Definition: RunPrincipal.h:61
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
void beginJob()
Definition: Breakpoints.cc:15
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool cleaningUpAfterException)
U second(std::pair< T, U > const &p)
config
Definition: looper.py:287
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run)
void setExceptionMessageLumis(std::string &message)
void setExceptionMessageRuns(std::string &message)
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:44
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
int totalEvents() const
std::shared_ptr< edm::ParameterSet > parameterSet() const
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:657
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
InputSource::ItemType readAndProcessEvents()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void resetFailedFromThisProcess()
Definition: Principal.cc:902
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31