CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.cc
Go to the documentation of this file.
2 
9 
36 
38 
46 
51 
53 
65 
66 #include "MessageForSource.h"
67 #include "MessageForParent.h"
68 
69 #include "boost/range/adaptor/reversed.hpp"
70 
71 #include <exception>
72 #include <iomanip>
73 #include <iostream>
74 #include <utility>
75 #include <sstream>
76 
77 #include <sys/ipc.h>
78 #include <sys/msg.h>
79 
80 #include "tbb/task.h"
81 
82 //Used for CPU affinity
83 #ifndef __APPLE__
84 #include <sched.h>
85 #endif
86 
87 namespace {
88  //Sentry class to only send a signal if an
89  // exception occurs. An exception is identified
90  // by the destructor being called without first
91  // calling completedSuccessfully().
92  class SendSourceTerminationSignalIfException {
93  public:
94  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
95  reg_(iReg) {}
96  ~SendSourceTerminationSignalIfException() {
97  if(reg_) {
98  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
99  }
100  }
101  void completedSuccessfully() {
102  reg_ = nullptr;
103  }
104  private:
105  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
106  };
107 
108 }
109 
110 namespace edm {
111 
112  // ---------------------------------------------------------------
113  std::unique_ptr<InputSource>
115  CommonParams const& common,
116  std::shared_ptr<ProductRegistry> preg,
117  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
118  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
119  std::shared_ptr<ActivityRegistry> areg,
120  std::shared_ptr<ProcessConfiguration const> processConfiguration,
121  PreallocationConfiguration const& allocations) {
122  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
123  if(main_input == nullptr) {
125  << "There must be exactly one source in the configuration.\n"
126  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
127  }
128 
129  std::string modtype(main_input->getParameter<std::string>("@module_type"));
130 
131  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
133  ConfigurationDescriptions descriptions(filler->baseType(), modtype);
134  filler->fill(descriptions);
135 
136  try {
137  convertException::wrap([&]() {
138  descriptions.validate(*main_input, std::string("source"));
139  });
140  }
141  catch (cms::Exception & iException) {
142  std::ostringstream ost;
143  ost << "Validating configuration of input source of type " << modtype;
144  iException.addContext(ost.str());
145  throw;
146  }
147 
148  main_input->registerIt();
149 
150  // Fill in "ModuleDescription", in case the input source produces
151  // any EDProducts, which would be registered in the ProductRegistry.
152  // Also fill in the process history item for this process.
153  // There is no module label for the unnamed input source, so
154  // just use "source".
155  // Only the tracked parameters belong in the process configuration.
156  ModuleDescription md(main_input->id(),
157  main_input->getParameter<std::string>("@module_type"),
158  "source",
159  processConfiguration.get(),
160  ModuleDescription::getUniqueID());
161 
162  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
163  common.maxEventsInput_, common.maxLumisInput_,
164  common.maxSecondsUntilRampdown_, allocations);
165 
166  areg->preSourceConstructionSignal_(md);
167  std::unique_ptr<InputSource> input;
168  try {
169  //even if we have an exception, send the signal
170  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
171  convertException::wrap([&]() {
172  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
173  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
174  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
175  });
176  }
177  catch (cms::Exception& iException) {
178  std::ostringstream ost;
179  ost << "Constructing input source of type " << modtype;
180  iException.addContext(ost.str());
181  throw;
182  }
183  return input;
184  }
185 
186  // ---------------------------------------------------------------
187  std::shared_ptr<EDLooperBase>
190  ParameterSet& params) {
191  std::shared_ptr<EDLooperBase> vLooper;
192 
193  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
194 
195  if(loopers.empty()) {
196  return vLooper;
197  }
198 
199  assert(1 == loopers.size());
200 
201  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
202  itName != itNameEnd;
203  ++itName) {
204 
205  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
206  providerPSet->registerIt();
207  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
208  cp,
209  *providerPSet);
210  }
211  return vLooper;
212  }
213 
214  // ---------------------------------------------------------------
215  EventProcessor::EventProcessor(std::string const& config,
216  ServiceToken const& iToken,
218  std::vector<std::string> const& defaultServices,
219  std::vector<std::string> const& forcedServices) :
220  actReg_(),
221  preg_(),
222  branchIDListHelper_(),
223  serviceToken_(),
224  input_(),
225  espController_(new eventsetup::EventSetupsController),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(),
243  exceptionMessageLumis_(),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
249  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
250  processDesc->addServices(defaultServices, forcedServices);
251  init(processDesc, iToken, iLegacy);
252  }
253 
255  std::vector<std::string> const& defaultServices,
256  std::vector<std::string> const& forcedServices) :
257  actReg_(),
258  preg_(),
260  serviceToken_(),
261  input_(),
262  espController_(new eventsetup::EventSetupsController),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
287  {
288  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
289  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
290  processDesc->addServices(defaultServices, forcedServices);
292  }
293 
294  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
295  ServiceToken const& token,
297  actReg_(),
298  preg_(),
300  serviceToken_(),
301  input_(),
302  espController_(new eventsetup::EventSetupsController),
303  esp_(),
304  act_table_(),
306  schedule_(),
307  subProcesses_(),
309  fb_(),
310  looper_(),
312  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
313  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
314  principalCache_(),
327  {
328  init(processDesc, token, legacy);
329  }
330 
331 
333  actReg_(),
334  preg_(),
336  serviceToken_(),
337  input_(),
338  espController_(new eventsetup::EventSetupsController),
339  esp_(),
340  act_table_(),
342  schedule_(),
343  subProcesses_(),
345  fb_(),
346  looper_(),
348  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
349  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
350  principalCache_(),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
374 
375  void
376  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
377  ServiceToken const& iToken,
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Now set some parameters specific to the main process.
395  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
396  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode", "FULLMERGE");
397  if(fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
398  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
399  << fileMode << ".\n"
400  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
401  } else {
402  fileModeNoMerge_ = (fileMode == "NOMERGE");
403  }
404  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
405  //threading
406  unsigned int nThreads=1;
407  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
408  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
409  if(nThreads == 0) {
410  nThreads = 1;
411  }
412  }
413  /* TODO: when we support having each stream run in a different thread use this default
414  unsigned int nStreams =nThreads;
415  */
416  unsigned int nStreams =1;
417  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
418  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if(nStreams==0) {
420  nStreams = nThreads;
421  }
422  }
423  if(nThreads >1) {
424  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
425  }
426 
427  /*
428  bool nRunsSet = false;
429  */
430  unsigned int nConcurrentRuns =1;
431  /*
432  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
433  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
434  }
435  */
436  unsigned int nConcurrentLumis =1;
437  /*
438  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
439  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
440  } else {
441  nConcurrentLumis = nConcurrentRuns;
442  }
443  */
444  //Check that relationships between threading parameters makes sense
445  /*
446  if(nThreads<nStreams) {
447  //bad
448  }
449  if(nConcurrentRuns>nStreams) {
450  //bad
451  }
452  if(nConcurrentRuns>nConcurrentLumis) {
453  //bad
454  }
455  */
456  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
457 
458  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
459 
460  // Now do general initialization
462 
463  //initialize the services
464  auto& serviceSets = processDesc->getServicesPSets();
465  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
466  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
467 
468  //make the services available
470 
471  if(nStreams>1) {
473  handler->willBeUsingThreads();
474  }
475 
476  // intialize miscellaneous items
477  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
478 
479  // intialize the event setup provider
480  esp_ = espController_->makeProvider(*parameterSet);
481 
482  // initialize the looper, if any
483  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
484  if(looper_) {
485  looper_->setActionTable(items.act_table_.get());
486  looper_->attachTo(*items.actReg_);
487 
488  //For now loopers make us run only 1 transition at a time
489  nStreams=1;
490  nConcurrentLumis=1;
491  nConcurrentRuns=1;
492  }
493 
494  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
495 
496  // initialize the input source
497  input_ = makeInput(*parameterSet,
498  *common,
499  items.preg(),
500  items.branchIDListHelper(),
502  items.actReg_,
503  items.processConfiguration(),
505 
506  // intialize the Schedule
507  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
508 
509  // set the data members
511  actReg_ = items.actReg_;
512  preg_ = items.preg();
517  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
518 
519  FDEBUG(2) << parameterSet << std::endl;
520 
522  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
523  // Reusable event principal
524  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
527  }
528 
529  // fill the subprocesses, if there are any
530  subProcesses_.reserve(subProcessVParameterSet.size());
531  for(auto& subProcessPSet : subProcessVParameterSet) {
532  subProcesses_.emplace_back(subProcessPSet,
533  *parameterSet,
534  preg(),
539  *actReg_,
540  token,
543  &processContext_);
544  }
545  }
546 
548  // Make the services available while everything is being deleted.
549  ServiceToken token = getToken();
550  ServiceRegistry::Operate op(token);
551 
552  // manually destroy all these thing that may need the services around
553  // propagate_const<T> has no reset() function
554  espController_ = nullptr;
555  esp_ = nullptr;
556  schedule_ = nullptr;
557  input_ = nullptr;
558  looper_ = nullptr;
559  actReg_ = nullptr;
560 
563  }
564 
565  void
567  if(beginJobCalled_) return;
568  beginJobCalled_=true;
569  bk::beginJob();
570 
571  // StateSentry toerror(this); // should we add this ?
572  //make the services available
574 
579  actReg_->preallocateSignal_(bounds);
580  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
582 
583  //NOTE: this may throw
585  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
586 
587  //NOTE: This implementation assumes 'Job' means one call
588  // the EventProcessor::run
589  // If it really means once per 'application' then this code will
590  // have to be changed.
591  // Also have to deal with case where have 'run' then new Module
592  // added and do 'run'
593  // again. In that case the newly added Module needs its 'beginJob'
594  // to be called.
595 
596  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
597  // For now we delay calling beginOfJob until first beginOfRun
598  //if(looper_) {
599  // looper_->beginOfJob(es);
600  //}
601  try {
602  convertException::wrap([&]() {
603  input_->doBeginJob();
604  });
605  }
606  catch(cms::Exception& ex) {
607  ex.addContext("Calling beginJob for the source");
608  throw;
609  }
610  schedule_->beginJob(*preg_);
611  // toerror.succeeded(); // should we add this?
612  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
613  actReg_->postBeginJobSignal_();
614 
615  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
616  schedule_->beginStream(i);
617  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
618  }
619  }
620 
621  void
623  // Collects exceptions, so we don't throw before all operations are performed.
624  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
625 
626  //make the services available
628 
629  //NOTE: this really should go elsewhere in the future
630  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
631  c.call([this,i](){this->schedule_->endStream(i);});
632  for(auto& subProcess : subProcesses_) {
633  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
634  }
635  }
636  auto actReg = actReg_.get();
637  c.call([actReg](){actReg->preEndJobSignal_();});
638  schedule_->endJob(c);
639  for(auto& subProcess : subProcesses_) {
640  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
641  }
642  c.call(std::bind(&InputSource::doEndJob, input_.get()));
643  if(looper_) {
644  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
645  }
646  c.call([actReg](){actReg->postEndJobSignal_();});
647  if(c.hasThrown()) {
648  c.rethrow();
649  }
650  }
651 
654  return serviceToken_;
655  }
656 
657  std::vector<ModuleDescription const*>
659  return schedule_->getAllModuleDescriptions();
660  }
661 
662  int
664  return schedule_->totalEvents();
665  }
666 
667  int
669  return schedule_->totalEventsPassed();
670  }
671 
672  int
674  return schedule_->totalEventsFailed();
675  }
676 
677  void
679  schedule_->enableEndPaths(active);
680  }
681 
682  bool
684  return schedule_->endPathsEnabled();
685  }
686 
687  void
689  schedule_->getTriggerReport(rep);
690  }
691 
692  void
694  schedule_->clearCounters();
695  }
696 
697  namespace {
698 #include "TransitionProcessors.icc"
699  }
700 
701  bool
703  bool returnValue = false;
704 
705  // Look for a shutdown signal
706  if(shutdown_flag.load(std::memory_order_acquire)) {
707  returnValue = true;
708  returnCode = epSignal;
709  }
710  return returnValue;
711  }
712 
715  SendSourceTerminationSignalIfException sentry(actReg_.get());
716  InputSource::ItemType itemType;
717  //For now, do nothing with InputSource::IsSynchronize
718  do {
719  itemType = input_->nextItemType();
720  } while( itemType == InputSource::IsSynchronize);
721 
722  sentry.completedSuccessfully();
723 
725 
726  if(checkForAsyncStopRequest(returnCode)) {
727  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
728  return InputSource::IsStop;
729  }
730 
731  return itemType;
732  }
733 
734  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
736  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
737  }
738 
741  return input_->luminosityBlock();
742  }
743 
746 
749  {
750  beginJob(); //make sure this was called
751 
752  // make the services available
754 
755 
758  try {
759  FilesProcessor fp(fileModeNoMerge_);
760 
761  convertException::wrap([&]() {
762  bool firstTime = true;
763  do {
764  if(not firstTime) {
766  rewindInput();
767  } else {
768  firstTime = false;
769  }
770  startingNewLoop();
771 
772  auto trans = fp.processFiles(*this);
773 
774  fp.normalEnd();
775 
776  if(deferredExceptionPtrIsSet_.load()) {
777  std::rethrow_exception(deferredExceptionPtr_);
778  }
779  if(trans != InputSource::IsStop) {
780  //problem with the source
781  doErrorStuff();
782 
783  throw cms::Exception("BadTransition")
784  << "Unexpected transition change "
785  << trans;
786 
787  }
788  } while(not endOfLoop());
789  }); // convertException::wrap
790 
791  } // Try block
792  catch (cms::Exception & e) {
793  if (!exceptionMessageLumis_.empty()) {
795  if (e.alreadyPrinted()) {
796  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
797  }
798  }
799  if (!exceptionMessageRuns_.empty()) {
801  if (e.alreadyPrinted()) {
802  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
803  }
804  }
805  if (!exceptionMessageFiles_.empty()) {
807  if (e.alreadyPrinted()) {
808  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
809  }
810  }
811  throw;
812  }
813  }
814 
815  return returnCode;
816  }
817 
819  FDEBUG(1) << " \treadFile\n";
820  size_t size = preg_->size();
821  SendSourceTerminationSignalIfException sentry(actReg_.get());
822 
823  fb_ = input_->readFile();
824  if(size < preg_->size()) {
826  }
830  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
831  }
832  sentry.completedSuccessfully();
833  }
834 
835  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
836  if (fb_.get() != nullptr) {
837  SendSourceTerminationSignalIfException sentry(actReg_.get());
838  input_->closeFile(fb_.get(), cleaningUpAfterException);
839  sentry.completedSuccessfully();
840  }
841  FDEBUG(1) << "\tcloseInputFile\n";
842  }
843 
845  if (fb_.get() != nullptr) {
846  schedule_->openOutputFiles(*fb_);
847  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
848  }
849  FDEBUG(1) << "\topenOutputFiles\n";
850  }
851 
853  if (fb_.get() != nullptr) {
854  schedule_->closeOutputFiles();
855  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
856  }
857  FDEBUG(1) << "\tcloseOutputFiles\n";
858  }
859 
861  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
862  if (fb_.get() != nullptr) {
863  schedule_->respondToOpenInputFile(*fb_);
864  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
865  }
866  FDEBUG(1) << "\trespondToOpenInputFile\n";
867  }
868 
870  if (fb_.get() != nullptr) {
871  schedule_->respondToCloseInputFile(*fb_);
872  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
873  }
874  FDEBUG(1) << "\trespondToCloseInputFile\n";
875  }
876 
878  shouldWeStop_ = false;
879  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
880  // until after we've called beginOfJob
881  if(looper_ && looperBeginJobRun_) {
882  looper_->doStartingNewLoop();
883  }
884  FDEBUG(1) << "\tstartingNewLoop\n";
885  }
886 
888  if(looper_) {
889  ModuleChanger changer(schedule_.get(),preg_.get());
890  looper_->setModuleChanger(&changer);
891  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
892  looper_->setModuleChanger(nullptr);
893  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
894  else return false;
895  }
896  FDEBUG(1) << "\tendOfLoop\n";
897  return true;
898  }
899 
901  input_->repeat();
902  input_->rewind();
903  FDEBUG(1) << "\trewind\n";
904  }
905 
907  looper_->prepareForNextLoop(esp_.get());
908  FDEBUG(1) << "\tprepareForNextLoop\n";
909  }
910 
912  FDEBUG(1) << "\tshouldWeCloseOutput\n";
913  if(!subProcesses_.empty()) {
914  for(auto const& subProcess : subProcesses_) {
915  if(subProcess.shouldWeCloseOutput()) {
916  return true;
917  }
918  }
919  return false;
920  }
921  return schedule_->shouldWeCloseOutput();
922  }
923 
925  FDEBUG(1) << "\tdoErrorStuff\n";
926  LogError("StateMachine")
927  << "The EventProcessor state machine encountered an unexpected event\n"
928  << "and went to the error state\n"
929  << "Will attempt to terminate processing normally\n"
930  << "(IF using the looper the next loop will be attempted)\n"
931  << "This likely indicates a bug in an input module or corrupted input or both\n";
932  }
933 
934  void EventProcessor::beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded) {
935  globalBeginSucceeded = false;
936  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
937  {
938  SendSourceTerminationSignalIfException sentry(actReg_.get());
939 
940  input_->doBeginRun(runPrincipal, &processContext_);
941  sentry.completedSuccessfully();
942  }
943 
944  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
945  runPrincipal.beginTime());
947  espController_->forceCacheClear();
948  }
949  {
950  SendSourceTerminationSignalIfException sentry(actReg_.get());
951  espController_->eventSetupForInstance(ts);
952  sentry.completedSuccessfully();
953  }
954  EventSetup const& es = esp_->eventSetup();
955  if(looper_ && looperBeginJobRun_== false) {
956  looper_->copyInfo(ScheduleInfo(schedule_.get()));
957  looper_->beginOfJob(es);
958  looperBeginJobRun_ = true;
959  looper_->doStartingNewLoop();
960  }
961  {
963  auto globalWaitTask = make_empty_waiting_task();
964  globalWaitTask->increment_ref_count();
965  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
966  *schedule_,
967  runPrincipal,
968  ts,
969  es,
970  subProcesses_);
971  globalWaitTask->wait_for_all();
972  if(globalWaitTask->exceptionPtr() != nullptr) {
973  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
974  }
975  }
976  globalBeginSucceeded = true;
977  FDEBUG(1) << "\tbeginRun " << run << "\n";
978  if(looper_) {
979  looper_->doBeginRun(runPrincipal, es, &processContext_);
980  }
981  {
982  //To wait, the ref count has to be 1+#streams
983  auto streamLoopWaitTask = make_empty_waiting_task();
984  streamLoopWaitTask->increment_ref_count();
985 
987 
988  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
989  *schedule_,
991  runPrincipal,
992  ts,
993  es,
994  subProcesses_);
995 
996  streamLoopWaitTask->wait_for_all();
997  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
998  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
999  }
1000  }
1001  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1002  if(looper_) {
1003  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1004  }
1005  }
1006 
1007  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1008  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1009  runPrincipal.setAtEndTransition(true);
1010  //We need to reset failed items since they might
1011  // be set this time around
1012  runPrincipal.resetFailedFromThisProcess();
1013 
1014  {
1015  SendSourceTerminationSignalIfException sentry(actReg_.get());
1016 
1017  runPrincipal.setEndTime(input_->timestamp());
1018  runPrincipal.setComplete();
1019  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1020  sentry.completedSuccessfully();
1021  }
1022 
1024  runPrincipal.endTime());
1025  {
1026  SendSourceTerminationSignalIfException sentry(actReg_.get());
1027  espController_->eventSetupForInstance(ts);
1028  sentry.completedSuccessfully();
1029  }
1030  EventSetup const& es = esp_->eventSetup();
1031  if(globalBeginSucceeded){
1032  //To wait, the ref count has to be 1+#streams
1033  auto streamLoopWaitTask = make_empty_waiting_task();
1034  streamLoopWaitTask->increment_ref_count();
1035 
1037 
1038  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1039  *schedule_,
1041  runPrincipal,
1042  ts,
1043  es,
1044  subProcesses_,
1045  cleaningUpAfterException);
1046 
1047  streamLoopWaitTask->wait_for_all();
1048  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1049  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1050  }
1051  }
1052  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1053  if(looper_) {
1054  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1055  }
1056  {
1057  auto globalWaitTask = make_empty_waiting_task();
1058  globalWaitTask->increment_ref_count();
1059 
1061  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1062  *schedule_,
1063  runPrincipal,
1064  ts,
1065  es,
1066  subProcesses_,
1067  cleaningUpAfterException);
1068  globalWaitTask->wait_for_all();
1069  if(globalWaitTask->exceptionPtr() != nullptr) {
1070  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1071  }
1072  }
1073  FDEBUG(1) << "\tendRun " << run << "\n";
1074  if(looper_) {
1075  looper_->doEndRun(runPrincipal, es, &processContext_);
1076  }
1077  }
1078 
1079  void EventProcessor::beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool& globalBeginSucceeded) {
1080  globalBeginSucceeded = false;
1081  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1082  {
1083  SendSourceTerminationSignalIfException sentry(actReg_.get());
1084 
1085  input_->doBeginLumi(lumiPrincipal, &processContext_);
1086  sentry.completedSuccessfully();
1087  }
1088 
1090  if(rng.isAvailable()) {
1091  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1092  rng->preBeginLumi(lb);
1093  }
1094 
1095  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1096  // lumi blocks know their start and end times why not also start and end events?
1097  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1098  {
1099  SendSourceTerminationSignalIfException sentry(actReg_.get());
1100  espController_->eventSetupForInstance(ts);
1101  sentry.completedSuccessfully();
1102  }
1103  EventSetup const& es = esp_->eventSetup();
1104  {
1106  auto globalWaitTask = make_empty_waiting_task();
1107  globalWaitTask->increment_ref_count();
1108  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1109  *schedule_,
1110  lumiPrincipal,
1111  ts,
1112  es,
1113  subProcesses_);
1114  globalWaitTask->wait_for_all();
1115  if(globalWaitTask->exceptionPtr() != nullptr) {
1116  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1117  }
1118  }
1119  globalBeginSucceeded=true;
1120  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1121  if(looper_) {
1122  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1123  }
1124  {
1125  //To wait, the ref count has to b 1+#streams
1126  auto streamLoopWaitTask = make_empty_waiting_task();
1127  streamLoopWaitTask->increment_ref_count();
1128 
1130 
1131  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1132  *schedule_,
1134  lumiPrincipal,
1135  ts,
1136  es,
1137  subProcesses_);
1138  streamLoopWaitTask->wait_for_all();
1139  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1140  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1141  }
1142  }
1143 
1144  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1145  if(looper_) {
1146  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1147  }
1148  }
1149 
1150  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool globalBeginSucceeded, bool cleaningUpAfterException) {
1151  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1152  lumiPrincipal.setAtEndTransition(true);
1153  //We need to reset failed items since they might
1154  // be set this time around
1155  lumiPrincipal.resetFailedFromThisProcess();
1156 
1157  {
1158  SendSourceTerminationSignalIfException sentry(actReg_.get());
1159 
1160  lumiPrincipal.setEndTime(input_->timestamp());
1161  lumiPrincipal.setComplete();
1162  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1163  sentry.completedSuccessfully();
1164  }
1165  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1166  // lumi blocks know their start and end times why not also start and end events?
1167  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1168  lumiPrincipal.endTime());
1169  {
1170  SendSourceTerminationSignalIfException sentry(actReg_.get());
1171  espController_->eventSetupForInstance(ts);
1172  sentry.completedSuccessfully();
1173  }
1174  EventSetup const& es = esp_->eventSetup();
1175  if(globalBeginSucceeded){
1176  //To wait, the ref count has to b 1+#streams
1177  auto streamLoopWaitTask = make_empty_waiting_task();
1178  streamLoopWaitTask->increment_ref_count();
1179 
1181 
1182  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1183  *schedule_,
1185  lumiPrincipal,
1186  ts,
1187  es,
1188  subProcesses_,
1189  cleaningUpAfterException);
1190  streamLoopWaitTask->wait_for_all();
1191  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1192  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1193  }
1194  }
1195  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1196  if(looper_) {
1197  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1198  }
1199  {
1200  auto globalWaitTask = make_empty_waiting_task();
1201  globalWaitTask->increment_ref_count();
1202 
1204  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1205  *schedule_,
1206  lumiPrincipal,
1207  ts,
1208  es,
1209  subProcesses_,
1210  cleaningUpAfterException);
1211  globalWaitTask->wait_for_all();
1212  if(globalWaitTask->exceptionPtr() != nullptr) {
1213  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1214  }
1215  }
1216  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1217  if(looper_) {
1218  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1219  }
1220  }
1221 
1222  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1225  << "EventProcessor::readRun\n"
1226  << "Illegal attempt to insert run into cache\n"
1227  << "Contact a Framework Developer\n";
1228  }
1229  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1230  {
1231  SendSourceTerminationSignalIfException sentry(actReg_.get());
1232  input_->readRun(*rp, *historyAppender_);
1233  sentry.completedSuccessfully();
1234  }
1235  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1236  principalCache_.insert(rp);
1237  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1238  }
1239 
1240  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1241  principalCache_.merge(input_->runAuxiliary(), preg());
1242  auto runPrincipal =principalCache_.runPrincipalPtr();
1243  {
1244  SendSourceTerminationSignalIfException sentry(actReg_.get());
1245  input_->readAndMergeRun(*runPrincipal);
1246  sentry.completedSuccessfully();
1247  }
1248  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1249  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1250  }
1251 
1255  << "EventProcessor::readRun\n"
1256  << "Illegal attempt to insert lumi into cache\n"
1257  << "Contact a Framework Developer\n";
1258  }
1261  << "EventProcessor::readRun\n"
1262  << "Illegal attempt to insert lumi into cache\n"
1263  << "Run is invalid\n"
1264  << "Contact a Framework Developer\n";
1265  }
1266  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1267  {
1268  SendSourceTerminationSignalIfException sentry(actReg_.get());
1269  input_->readLuminosityBlock(*lbp, *historyAppender_);
1270  sentry.completedSuccessfully();
1271  }
1272  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1273  principalCache_.insert(lbp);
1274  return input_->luminosityBlock();
1275  }
1276 
1278  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1279  {
1280  SendSourceTerminationSignalIfException sentry(actReg_.get());
1281  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1282  sentry.completedSuccessfully();
1283  }
1284  return input_->luminosityBlock();
1285  }
1286 
1288  schedule_->writeRun(principalCache_.runPrincipal(phid, run), &processContext_);
1289  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.writeRun(phid, run); });
1290  FDEBUG(1) << "\twriteRun " << run << "\n";
1291  }
1292 
1294  principalCache_.deleteRun(phid, run);
1295  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1296  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1297  }
1298 
1300  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1301  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1302  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1303  }
1304 
1306  principalCache_.deleteLumi(phid, run, lumi);
1307  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1308  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1309  }
1310 
1311  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1312  std::atomic<bool>* finishedProcessingEvents) {
1313  if(shouldWeStop()) {
1314  return false;
1315  }
1316 
1317  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1318  return false;
1319  }
1320 
1321  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1322  return false;
1323  }
1324 
1326  try {
1327  //need to use lock in addition to the serial task queue because
1328  // of delayed provenance reading and reading data in response to
1329  // edm::Refs etc
1330  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1331  if(not firstEventInBlock_) {
1332  //The state machine already called input_->nextItemType
1333  // and found an event. We can't call input_->nextItemType
1334  // again since it would move to the next transition
1335  InputSource::ItemType itemType = input_->nextItemType();
1336  if (InputSource::IsEvent !=itemType) {
1338  finishedProcessingEvents->store(true,std::memory_order_release);
1339  //std::cerr<<"next item type "<<itemType<<"\n";
1340  return false;
1341  }
1343  //std::cerr<<"task told to async stop\n";
1344  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1345  return false;
1346  }
1347  } else {
1348  firstEventInBlock_ = false;
1349  }
1350  readEvent(iStreamIndex);
1351  } catch (...) {
1352  bool expected =false;
1353  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1354  deferredExceptionPtr_ = std::current_exception();
1355 
1356  }
1357  return false;
1358  }
1359  return true;
1360  }
1361 
1363  unsigned int iStreamIndex,
1364  std::atomic<bool>* finishedProcessingEvents)
1365  {
1366  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,iTask,iStreamIndex]() mutable {
1368 
1369  try {
1370  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
1371  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) mutable {
1372  if(iPtr) {
1373  bool expected = false;
1374  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1375  deferredExceptionPtr_ = *iPtr;
1376  iTask.doneWaiting(*iPtr);
1377  }
1378  //the stream will stop now
1379  return;
1380  }
1381  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
1382  });
1383 
1384  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1385  } else {
1386  //the stream will stop now
1387  iTask.doneWaiting(std::exception_ptr{});
1388  }
1389  } catch(...) {
1390  bool expected = false;
1391  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1392  auto e =std::current_exception();
1394  iTask.doneWaiting(e);
1395  }
1396  }
1397  });
1398  }
1399 
1403 
1404  std::atomic<bool> finishedProcessingEvents{false};
1405  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
1406 
1407  //The state machine already found the event so
1408  // we have to avoid looking again
1409  firstEventInBlock_ = true;
1410 
1411  //To wait, the ref count has to b 1+#streams
1412  auto eventLoopWaitTask = make_empty_waiting_task();
1413  eventLoopWaitTask->increment_ref_count();
1414 
1415  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1416  unsigned int iStreamIndex = 0;
1417  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1418  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),
1419  [this,iStreamIndex,finishedProcessingEventsPtr,h=WaitingTaskHolder{eventLoopWaitTask.get()}](std::exception_ptr const*){
1420  handleNextEventForStreamAsync(h,iStreamIndex,finishedProcessingEventsPtr);
1421  }) );
1422  }
1423  //need a temporary Task so that the temporary WaitingTaskHolder assigned to h will go out of scope
1424  // before the call to spawn_and_wait_for_all
1425  auto t = make_waiting_task(tbb::task::allocate_root(),
1426  [this,iStreamIndex,finishedProcessingEventsPtr,h=WaitingTaskHolder{eventLoopWaitTask.get()}](std::exception_ptr const*){
1427  handleNextEventForStreamAsync(h,iStreamIndex,finishedProcessingEventsPtr);
1428  });
1429  eventLoopWaitTask->spawn_and_wait_for_all( *t);
1430 
1431  //One of the processing threads saw an exception
1433  std::rethrow_exception(deferredExceptionPtr_);
1434  }
1435  return nextItemTypeFromProcessingEvents_.load();
1436  }
1437 
1438  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1439  //TODO this will have to become per stream
1440  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1441  StreamContext streamContext(event.streamID(), &processContext_);
1442 
1443  SendSourceTerminationSignalIfException sentry(actReg_.get());
1444  input_->readEvent(event, streamContext);
1445  sentry.completedSuccessfully();
1446 
1447  FDEBUG(1) << "\treadEvent\n";
1448  }
1449 
1451  unsigned int iStreamIndex) {
1452  tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
1453  processEventAsyncImpl(iHolder, iStreamIndex);
1454  }) );
1455  }
1456 
1458  unsigned int iStreamIndex) {
1459  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1460  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1461 
1464  if(rng.isAvailable()) {
1465  Event ev(*pep, ModuleDescription(), nullptr);
1466  rng->postEventRead(ev);
1467  }
1468  assert(pep->luminosityBlockPrincipalPtrValid());
1469  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1470  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1471 
1472  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1473  tbb::task::allocate_root(),
1474  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1475  {
1477 
1478  //NOTE: If we have a looper we only have one Stream
1479  if(looper_) {
1480  processEventWithLooper(*pep);
1481  }
1482 
1483  FDEBUG(1) << "\tprocessEvent\n";
1484  pep->clearEventPrincipal();
1485  if(iPtr) {
1486  iHolder.doneWaiting(*iPtr);
1487  } else {
1488  iHolder.doneWaiting(std::exception_ptr());
1489  }
1490  }
1491  )
1492  );
1493  WaitingTaskHolder afterProcessTask;
1494  if(subProcesses_.empty()) {
1495  afterProcessTask = std::move(finalizeEventTask);
1496  } else {
1497  //Need to run SubProcesses after schedule has finished
1498  // with the event
1499  afterProcessTask = WaitingTaskHolder(
1500  make_waiting_task(tbb::task::allocate_root(),
1501  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1502  {
1503  if(not iPtr) {
1505 
1506  //when run with 1 thread, we want to the order to be what
1507  // it was before. This requires reversing the order since
1508  // tasks are run last one in first one out
1509  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1510  subProcess.doEventAsync(finalizeEventTask,*pep);
1511  }
1512  } else {
1513  finalizeEventTask.doneWaiting(*iPtr);
1514  }
1515  })
1516  );
1517  }
1518 
1519  schedule_->processOneEventAsync(std::move(afterProcessTask),
1520  iStreamIndex,*pep, esp_->eventSetup());
1521 
1522  }
1523 
1525  bool randomAccess = input_->randomAccess();
1526  ProcessingController::ForwardState forwardState = input_->forwardState();
1527  ProcessingController::ReverseState reverseState = input_->reverseState();
1528  ProcessingController pc(forwardState, reverseState, randomAccess);
1529 
1531  do {
1532 
1533  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1534  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1535 
1536  bool succeeded = true;
1537  if(randomAccess) {
1539  input_->skipEvents(-2);
1540  }
1542  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1543  }
1544  }
1545  pc.setLastOperationSucceeded(succeeded);
1546  } while(!pc.lastOperationSucceeded());
1547  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1548  }
1549 
1551  FDEBUG(1) << "\tshouldWeStop\n";
1552  if(shouldWeStop_) return true;
1553  if(!subProcesses_.empty()) {
1554  for(auto const& subProcess : subProcesses_) {
1555  if(subProcess.terminate()) {
1556  return true;
1557  }
1558  }
1559  return false;
1560  }
1561  return schedule_->terminate();
1562  }
1563 
1566  }
1567 
1570  }
1571 
1574  }
1575 
1576  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1577  bool expected =false;
1578  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1579  deferredExceptionPtr_ = iException;
1580  return true;
1581  }
1582  return false;
1583  }
1584 
1585 }
size
Write out results.
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:219
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
static PFTauRenderPlugin instance
ParameterSetID id() const
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded)
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void writeRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool ev
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:325
bool hasRunPrincipal() const
Definition: config.py:1
void push(T &&iAction)
asynchronously pushes functor iAction into queue
RunNumber_t run() const
Definition: RunPrincipal.h:61
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:20
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
void beginJob()
Definition: Breakpoints.cc:15
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
U second(std::pair< T, U > const &p)
config
Definition: looper.py:287
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
void setExceptionMessageLumis(std::string &message)
void setExceptionMessageRuns(std::string &message)
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:45
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
int totalEvents() const
std::shared_ptr< edm::ParameterSet > parameterSet() const
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:657
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
InputSource::ItemType readAndProcessEvents()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
void resetFailedFromThisProcess()
Definition: Principal.cc:902
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
void processEventWithLooper(EventPrincipal &)
void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool globalBeginSucceeded, bool cleaningUpAfterException)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool &globalBeginSucceeded)
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:13
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31