CMS 3D CMS Logo

EventProcessor.cc
Go to the documentation of this file.
2 
9 
36 
38 
46 
51 
53 
65 
66 #include "MessageForSource.h"
67 #include "MessageForParent.h"
68 
69 #include "boost/range/adaptor/reversed.hpp"
70 
71 #include <exception>
72 #include <iomanip>
73 #include <iostream>
74 #include <utility>
75 #include <sstream>
76 
77 #include <sys/ipc.h>
78 #include <sys/msg.h>
79 
80 #include "tbb/task.h"
81 
82 //Used for CPU affinity
83 #ifndef __APPLE__
84 #include <sched.h>
85 #endif
86 
87 namespace {
88  //Sentry class to only send a signal if an
89  // exception occurs. An exception is identified
90  // by the destructor being called without first
91  // calling completedSuccessfully().
92  class SendSourceTerminationSignalIfException {
93  public:
94  SendSourceTerminationSignalIfException(edm::ActivityRegistry* iReg):
95  reg_(iReg) {}
96  ~SendSourceTerminationSignalIfException() {
97  if(reg_) {
98  reg_->preSourceEarlyTerminationSignal_(edm::TerminationOrigin::ExceptionFromThisContext);
99  }
100  }
101  void completedSuccessfully() {
102  reg_ = nullptr;
103  }
104  private:
105  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
106  };
107 
108 }
109 
110 namespace edm {
111 
112  // ---------------------------------------------------------------
113  std::unique_ptr<InputSource>
115  CommonParams const& common,
116  std::shared_ptr<ProductRegistry> preg,
117  std::shared_ptr<BranchIDListHelper> branchIDListHelper,
118  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
119  std::shared_ptr<ActivityRegistry> areg,
120  std::shared_ptr<ProcessConfiguration const> processConfiguration,
121  PreallocationConfiguration const& allocations) {
122  ParameterSet* main_input = params.getPSetForUpdate("@main_input");
123  if(main_input == 0) {
125  << "There must be exactly one source in the configuration.\n"
126  << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
127  }
128 
129  std::string modtype(main_input->getParameter<std::string>("@module_type"));
130 
131  std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
133  ConfigurationDescriptions descriptions(filler->baseType());
134  filler->fill(descriptions);
135 
136  try {
137  convertException::wrap([&]() {
138  descriptions.validate(*main_input, std::string("source"));
139  });
140  }
141  catch (cms::Exception & iException) {
142  std::ostringstream ost;
143  ost << "Validating configuration of input source of type " << modtype;
144  iException.addContext(ost.str());
145  throw;
146  }
147 
148  main_input->registerIt();
149 
150  // Fill in "ModuleDescription", in case the input source produces
151  // any EDProducts, which would be registered in the ProductRegistry.
152  // Also fill in the process history item for this process.
153  // There is no module label for the unnamed input source, so
154  // just use "source".
155  // Only the tracked parameters belong in the process configuration.
156  ModuleDescription md(main_input->id(),
157  main_input->getParameter<std::string>("@module_type"),
158  "source",
159  processConfiguration.get(),
160  ModuleDescription::getUniqueID());
161 
162  InputSourceDescription isdesc(md, preg, branchIDListHelper, thinnedAssociationsHelper, areg,
163  common.maxEventsInput_, common.maxLumisInput_,
164  common.maxSecondsUntilRampdown_, allocations);
165 
166  areg->preSourceConstructionSignal_(md);
167  std::unique_ptr<InputSource> input;
168  try {
169  //even if we have an exception, send the signal
170  std::shared_ptr<int> sentry(nullptr,[areg,&md](void*){areg->postSourceConstructionSignal_(md);});
171  convertException::wrap([&]() {
172  input = std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
173  input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
174  input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
175  });
176  }
177  catch (cms::Exception& iException) {
178  std::ostringstream ost;
179  ost << "Constructing input source of type " << modtype;
180  iException.addContext(ost.str());
181  throw;
182  }
183  return input;
184  }
185 
186  // ---------------------------------------------------------------
187  std::shared_ptr<EDLooperBase>
190  ParameterSet& params) {
191  std::shared_ptr<EDLooperBase> vLooper;
192 
193  std::vector<std::string> loopers = params.getParameter<std::vector<std::string> >("@all_loopers");
194 
195  if(loopers.size() == 0) {
196  return vLooper;
197  }
198 
199  assert(1 == loopers.size());
200 
201  for(std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end();
202  itName != itNameEnd;
203  ++itName) {
204 
205  ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
206  providerPSet->registerIt();
207  vLooper = eventsetup::LooperFactory::get()->addTo(esController,
208  cp,
209  *providerPSet);
210  }
211  return vLooper;
212  }
213 
214  // ---------------------------------------------------------------
215  EventProcessor::EventProcessor(std::string const& config,
216  ServiceToken const& iToken,
218  std::vector<std::string> const& defaultServices,
219  std::vector<std::string> const& forcedServices) :
220  actReg_(),
221  preg_(),
222  branchIDListHelper_(),
223  serviceToken_(),
224  input_(),
225  espController_(new eventsetup::EventSetupsController),
226  esp_(),
227  act_table_(),
228  processConfiguration_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
234  deferredExceptionPtrIsSet_(false),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
241  exceptionMessageFiles_(),
242  exceptionMessageRuns_(),
243  exceptionMessageLumis_(),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
246  forceESCacheClearOnNewRun_(false),
247  eventSetupDataToExcludeFromPrefetching_() {
248  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
249  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
250  processDesc->addServices(defaultServices, forcedServices);
251  init(processDesc, iToken, iLegacy);
252  }
253 
255  std::vector<std::string> const& defaultServices,
256  std::vector<std::string> const& forcedServices) :
257  actReg_(),
258  preg_(),
260  serviceToken_(),
261  input_(),
262  espController_(new eventsetup::EventSetupsController),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
287  {
288  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
289  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
290  processDesc->addServices(defaultServices, forcedServices);
292  }
293 
294  EventProcessor::EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
295  ServiceToken const& token,
297  actReg_(),
298  preg_(),
300  serviceToken_(),
301  input_(),
302  espController_(new eventsetup::EventSetupsController),
303  esp_(),
304  act_table_(),
306  schedule_(),
307  subProcesses_(),
309  fb_(),
310  looper_(),
312  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
313  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
314  principalCache_(),
327  {
328  init(processDesc, token, legacy);
329  }
330 
331 
333  actReg_(),
334  preg_(),
336  serviceToken_(),
337  input_(),
338  espController_(new eventsetup::EventSetupsController),
339  esp_(),
340  act_table_(),
342  schedule_(),
343  subProcesses_(),
345  fb_(),
346  looper_(),
348  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
349  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
350  principalCache_(),
363  {
364  if(isPython) {
365  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
366  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
368  }
369  else {
370  auto processDesc = std::make_shared<ProcessDesc>(config);
372  }
373  }
374 
375  void
376  EventProcessor::init(std::shared_ptr<ProcessDesc>& processDesc,
377  ServiceToken const& iToken,
379 
380  //std::cerr << processDesc->dump() << std::endl;
381 
382  // register the empty parentage vector , once and for all
384 
385  // register the empty parameter set, once and for all.
387 
388  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
389 
390  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
391  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
392  bool const hasSubProcesses = !subProcessVParameterSet.empty();
393 
394  // Now set some parameters specific to the main process.
395  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
396  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode", "FULLMERGE");
397  if(fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
398  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
399  << fileMode << ".\n"
400  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
401  } else {
402  fileModeNoMerge_ = (fileMode == "NOMERGE");
403  }
404  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
405  //threading
406  unsigned int nThreads=1;
407  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
408  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
409  if(nThreads == 0) {
410  nThreads = 1;
411  }
412  }
413  /* TODO: when we support having each stream run in a different thread use this default
414  unsigned int nStreams =nThreads;
415  */
416  unsigned int nStreams =1;
417  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
418  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
419  if(nStreams==0) {
420  nStreams = nThreads;
421  }
422  }
423  if(nThreads >1) {
424  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
425  }
426 
427  /*
428  bool nRunsSet = false;
429  */
430  unsigned int nConcurrentRuns =1;
431  /*
432  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
433  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
434  }
435  */
436  unsigned int nConcurrentLumis =1;
437  /*
438  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
439  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
440  } else {
441  nConcurrentLumis = nConcurrentRuns;
442  }
443  */
444  //Check that relationships between threading parameters makes sense
445  /*
446  if(nThreads<nStreams) {
447  //bad
448  }
449  if(nConcurrentRuns>nStreams) {
450  //bad
451  }
452  if(nConcurrentRuns>nConcurrentLumis) {
453  //bad
454  }
455  */
456  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
457 
458  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
459 
460  // Now do general initialization
462 
463  //initialize the services
464  auto& serviceSets = processDesc->getServicesPSets();
465  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
466  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
467 
468  //make the services available
470 
471  if(nStreams>1) {
473  handler->willBeUsingThreads();
474  }
475 
476  // intialize miscellaneous items
477  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
478 
479  // intialize the event setup provider
480  esp_ = espController_->makeProvider(*parameterSet);
481 
482  // initialize the looper, if any
483  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
484  if(looper_) {
485  looper_->setActionTable(items.act_table_.get());
486  looper_->attachTo(*items.actReg_);
487 
488  //For now loopers make us run only 1 transition at a time
489  nStreams=1;
490  nConcurrentLumis=1;
491  nConcurrentRuns=1;
492  }
493 
494  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
495 
496  // initialize the input source
497  input_ = makeInput(*parameterSet,
498  *common,
499  items.preg(),
500  items.branchIDListHelper(),
502  items.actReg_,
503  items.processConfiguration(),
505 
506  // intialize the Schedule
507  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
508 
509  // set the data members
511  actReg_ = items.actReg_;
512  preg_ = items.preg();
517  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
518 
519  FDEBUG(2) << parameterSet << std::endl;
520 
522  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
523  // Reusable event principal
524  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
527  }
528 
529  // fill the subprocesses, if there are any
530  subProcesses_.reserve(subProcessVParameterSet.size());
531  for(auto& subProcessPSet : subProcessVParameterSet) {
532  subProcesses_.emplace_back(subProcessPSet,
533  *parameterSet,
534  preg(),
539  *actReg_,
540  token,
543  &processContext_);
544  }
545  }
546 
548  // Make the services available while everything is being deleted.
549  ServiceToken token = getToken();
550  ServiceRegistry::Operate op(token);
551 
552  // manually destroy all these thing that may need the services around
553  // propagate_const<T> has no reset() function
554  espController_ = nullptr;
555  esp_ = nullptr;
556  schedule_ = nullptr;
557  input_ = nullptr;
558  looper_ = nullptr;
559  actReg_ = nullptr;
560 
563  }
564 
565  void
567  if(beginJobCalled_) return;
568  beginJobCalled_=true;
569  bk::beginJob();
570 
571  // StateSentry toerror(this); // should we add this ?
572  //make the services available
574 
579  actReg_->preallocateSignal_(bounds);
580  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
582 
583  //NOTE: this may throw
585  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
586 
587  //NOTE: This implementation assumes 'Job' means one call
588  // the EventProcessor::run
589  // If it really means once per 'application' then this code will
590  // have to be changed.
591  // Also have to deal with case where have 'run' then new Module
592  // added and do 'run'
593  // again. In that case the newly added Module needs its 'beginJob'
594  // to be called.
595 
596  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
597  // For now we delay calling beginOfJob until first beginOfRun
598  //if(looper_) {
599  // looper_->beginOfJob(es);
600  //}
601  try {
602  convertException::wrap([&]() {
603  input_->doBeginJob();
604  });
605  }
606  catch(cms::Exception& ex) {
607  ex.addContext("Calling beginJob for the source");
608  throw;
609  }
610  schedule_->beginJob(*preg_);
611  // toerror.succeeded(); // should we add this?
612  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
613  actReg_->postBeginJobSignal_();
614 
615  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
616  schedule_->beginStream(i);
617  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
618  }
619  }
620 
621  void
623  // Collects exceptions, so we don't throw before all operations are performed.
624  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
625 
626  //make the services available
628 
629  //NOTE: this really should go elsewhere in the future
630  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
631  c.call([this,i](){this->schedule_->endStream(i);});
632  for(auto& subProcess : subProcesses_) {
633  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
634  }
635  }
636  auto actReg = actReg_.get();
637  c.call([actReg](){actReg->preEndJobSignal_();});
638  schedule_->endJob(c);
639  for(auto& subProcess : subProcesses_) {
640  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
641  }
642  c.call(std::bind(&InputSource::doEndJob, input_.get()));
643  if(looper_) {
644  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
645  }
646  c.call([actReg](){actReg->postEndJobSignal_();});
647  if(c.hasThrown()) {
648  c.rethrow();
649  }
650  }
651 
654  return serviceToken_;
655  }
656 
657  std::vector<ModuleDescription const*>
659  return schedule_->getAllModuleDescriptions();
660  }
661 
662  int
664  return schedule_->totalEvents();
665  }
666 
667  int
669  return schedule_->totalEventsPassed();
670  }
671 
672  int
674  return schedule_->totalEventsFailed();
675  }
676 
677  void
679  schedule_->enableEndPaths(active);
680  }
681 
682  bool
684  return schedule_->endPathsEnabled();
685  }
686 
687  void
689  schedule_->getTriggerReport(rep);
690  }
691 
692  void
694  schedule_->clearCounters();
695  }
696 
697  namespace {
698 #include "TransitionProcessors.icc"
699  }
700 
701  bool
703  bool returnValue = false;
704 
705  // Look for a shutdown signal
706  if(shutdown_flag.load(std::memory_order_acquire)) {
707  returnValue = true;
708  returnCode = epSignal;
709  }
710  return returnValue;
711  }
712 
715  SendSourceTerminationSignalIfException sentry(actReg_.get());
716  InputSource::ItemType itemType;
717  //For now, do nothing with InputSource::IsSynchronize
718  do {
719  itemType = input_->nextItemType();
720  } while( itemType == InputSource::IsSynchronize);
721 
722  sentry.completedSuccessfully();
723 
725 
726  if(checkForAsyncStopRequest(returnCode)) {
727  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
728  return InputSource::IsStop;
729  }
730 
731  return itemType;
732  }
733 
734  std::pair<edm::ProcessHistoryID, edm::RunNumber_t>
736  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
737  }
738 
741  return input_->luminosityBlock();
742  }
743 
746 
749  {
750  beginJob(); //make sure this was called
751 
752  // make the services available
754 
755 
758  try {
759  FilesProcessor fp(fileModeNoMerge_);
760 
761  convertException::wrap([&]() {
762  bool firstTime = true;
763  do {
764  if(not firstTime) {
766  rewindInput();
767  } else {
768  firstTime = false;
769  }
770  startingNewLoop();
771 
772  auto trans = fp.processFiles(*this);
773 
774  fp.normalEnd();
775 
776  if(deferredExceptionPtrIsSet_.load()) {
777  std::rethrow_exception(deferredExceptionPtr_);
778  }
779  if(trans != InputSource::IsStop) {
780  //problem with the source
781  doErrorStuff();
782 
783  throw cms::Exception("BadTransition")
784  << "Unexpected transition change "
785  << trans;
786 
787  }
788  } while(not endOfLoop());
789  }); // convertException::wrap
790 
791  } // Try block
792  catch (cms::Exception & e) {
793  if (!exceptionMessageLumis_.empty()) {
795  if (e.alreadyPrinted()) {
796  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
797  }
798  }
799  if (!exceptionMessageRuns_.empty()) {
801  if (e.alreadyPrinted()) {
802  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
803  }
804  }
805  if (!exceptionMessageFiles_.empty()) {
807  if (e.alreadyPrinted()) {
808  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
809  }
810  }
811  throw;
812  }
813  }
814 
815  return returnCode;
816  }
817 
819  FDEBUG(1) << " \treadFile\n";
820  size_t size = preg_->size();
821  SendSourceTerminationSignalIfException sentry(actReg_.get());
822 
823  fb_ = input_->readFile();
824  if(size < preg_->size()) {
826  }
830  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
831  }
832  sentry.completedSuccessfully();
833  }
834 
835  void EventProcessor::closeInputFile(bool cleaningUpAfterException) {
836  if (fb_.get() != nullptr) {
837  SendSourceTerminationSignalIfException sentry(actReg_.get());
838  input_->closeFile(fb_.get(), cleaningUpAfterException);
839  sentry.completedSuccessfully();
840  }
841  FDEBUG(1) << "\tcloseInputFile\n";
842  }
843 
845  if (fb_.get() != nullptr) {
846  schedule_->openOutputFiles(*fb_);
847  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
848  }
849  FDEBUG(1) << "\topenOutputFiles\n";
850  }
851 
853  if (fb_.get() != nullptr) {
854  schedule_->closeOutputFiles();
855  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
856  }
857  FDEBUG(1) << "\tcloseOutputFiles\n";
858  }
859 
861  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
862  if (fb_.get() != nullptr) {
863  schedule_->respondToOpenInputFile(*fb_);
864  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
865  }
866  FDEBUG(1) << "\trespondToOpenInputFile\n";
867  }
868 
870  if (fb_.get() != nullptr) {
871  schedule_->respondToCloseInputFile(*fb_);
872  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
873  }
874  FDEBUG(1) << "\trespondToCloseInputFile\n";
875  }
876 
878  shouldWeStop_ = false;
879  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
880  // until after we've called beginOfJob
881  if(looper_ && looperBeginJobRun_) {
882  looper_->doStartingNewLoop();
883  }
884  FDEBUG(1) << "\tstartingNewLoop\n";
885  }
886 
888  if(looper_) {
889  ModuleChanger changer(schedule_.get(),preg_.get());
890  looper_->setModuleChanger(&changer);
891  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
892  looper_->setModuleChanger(nullptr);
893  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
894  else return false;
895  }
896  FDEBUG(1) << "\tendOfLoop\n";
897  return true;
898  }
899 
901  input_->repeat();
902  input_->rewind();
903  FDEBUG(1) << "\trewind\n";
904  }
905 
907  looper_->prepareForNextLoop(esp_.get());
908  FDEBUG(1) << "\tprepareForNextLoop\n";
909  }
910 
912  FDEBUG(1) << "\tshouldWeCloseOutput\n";
913  if(!subProcesses_.empty()) {
914  for(auto const& subProcess : subProcesses_) {
915  if(subProcess.shouldWeCloseOutput()) {
916  return true;
917  }
918  }
919  return false;
920  }
921  return schedule_->shouldWeCloseOutput();
922  }
923 
925  FDEBUG(1) << "\tdoErrorStuff\n";
926  LogError("StateMachine")
927  << "The EventProcessor state machine encountered an unexpected event\n"
928  << "and went to the error state\n"
929  << "Will attempt to terminate processing normally\n"
930  << "(IF using the looper the next loop will be attempted)\n"
931  << "This likely indicates a bug in an input module or corrupted input or both\n";
932  }
933 
935  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
936  {
937  SendSourceTerminationSignalIfException sentry(actReg_.get());
938 
939  input_->doBeginRun(runPrincipal, &processContext_);
940  sentry.completedSuccessfully();
941  }
942 
943  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
944  runPrincipal.beginTime());
946  espController_->forceCacheClear();
947  }
948  {
949  SendSourceTerminationSignalIfException sentry(actReg_.get());
950  espController_->eventSetupForInstance(ts);
951  sentry.completedSuccessfully();
952  }
953  EventSetup const& es = esp_->eventSetup();
954  if(looper_ && looperBeginJobRun_== false) {
955  looper_->copyInfo(ScheduleInfo(schedule_.get()));
956  looper_->beginOfJob(es);
957  looperBeginJobRun_ = true;
958  looper_->doStartingNewLoop();
959  }
960  {
962  auto globalWaitTask = make_empty_waiting_task();
963  globalWaitTask->increment_ref_count();
964  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
965  *schedule_,
966  runPrincipal,
967  ts,
968  es,
969  subProcesses_);
970  globalWaitTask->wait_for_all();
971  if(globalWaitTask->exceptionPtr() != nullptr) {
972  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
973  }
974  }
975  FDEBUG(1) << "\tbeginRun " << run << "\n";
976  if(looper_) {
977  looper_->doBeginRun(runPrincipal, es, &processContext_);
978  }
979  {
980  //To wait, the ref count has to be 1+#streams
981  auto streamLoopWaitTask = make_empty_waiting_task();
982  streamLoopWaitTask->increment_ref_count();
983 
985 
986  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
987  *schedule_,
989  runPrincipal,
990  ts,
991  es,
992  subProcesses_);
993 
994  streamLoopWaitTask->wait_for_all();
995  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
996  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
997  }
998  }
999  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1000  if(looper_) {
1001  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1002  }
1003  }
1004 
1005  void EventProcessor::endRun(ProcessHistoryID const& phid, RunNumber_t run, bool cleaningUpAfterException) {
1006  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1007  {
1008  SendSourceTerminationSignalIfException sentry(actReg_.get());
1009 
1010  runPrincipal.setEndTime(input_->timestamp());
1011  runPrincipal.setComplete();
1012  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1013  sentry.completedSuccessfully();
1014  }
1015 
1017  runPrincipal.endTime());
1018  {
1019  SendSourceTerminationSignalIfException sentry(actReg_.get());
1020  espController_->eventSetupForInstance(ts);
1021  sentry.completedSuccessfully();
1022  }
1023  EventSetup const& es = esp_->eventSetup();
1024  {
1025  //To wait, the ref count has to be 1+#streams
1026  auto streamLoopWaitTask = make_empty_waiting_task();
1027  streamLoopWaitTask->increment_ref_count();
1028 
1030 
1031  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1032  *schedule_,
1034  runPrincipal,
1035  ts,
1036  es,
1037  subProcesses_,
1038  cleaningUpAfterException);
1039 
1040  streamLoopWaitTask->wait_for_all();
1041  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1042  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1043  }
1044  }
1045  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1046  if(looper_) {
1047  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1048  }
1049  {
1050  auto globalWaitTask = make_empty_waiting_task();
1051  globalWaitTask->increment_ref_count();
1052 
1053  runPrincipal.setAtEndTransition(true);
1055  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1056  *schedule_,
1057  runPrincipal,
1058  ts,
1059  es,
1060  subProcesses_,
1061  cleaningUpAfterException);
1062  globalWaitTask->wait_for_all();
1063  if(globalWaitTask->exceptionPtr() != nullptr) {
1064  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1065  }
1066  }
1067  FDEBUG(1) << "\tendRun " << run << "\n";
1068  if(looper_) {
1069  looper_->doEndRun(runPrincipal, es, &processContext_);
1070  }
1071  }
1072 
1074  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1075  {
1076  SendSourceTerminationSignalIfException sentry(actReg_.get());
1077 
1078  input_->doBeginLumi(lumiPrincipal, &processContext_);
1079  sentry.completedSuccessfully();
1080  }
1081 
1083  if(rng.isAvailable()) {
1084  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1085  rng->preBeginLumi(lb);
1086  }
1087 
1088  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1089  // lumi blocks know their start and end times why not also start and end events?
1090  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1091  {
1092  SendSourceTerminationSignalIfException sentry(actReg_.get());
1093  espController_->eventSetupForInstance(ts);
1094  sentry.completedSuccessfully();
1095  }
1096  EventSetup const& es = esp_->eventSetup();
1097  {
1099  auto globalWaitTask = make_empty_waiting_task();
1100  globalWaitTask->increment_ref_count();
1101  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1102  *schedule_,
1103  lumiPrincipal,
1104  ts,
1105  es,
1106  subProcesses_);
1107  globalWaitTask->wait_for_all();
1108  if(globalWaitTask->exceptionPtr() != nullptr) {
1109  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1110  }
1111  }
1112  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1113  if(looper_) {
1114  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1115  }
1116  {
1117  //To wait, the ref count has to b 1+#streams
1118  auto streamLoopWaitTask = make_empty_waiting_task();
1119  streamLoopWaitTask->increment_ref_count();
1120 
1122 
1123  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1124  *schedule_,
1126  lumiPrincipal,
1127  ts,
1128  es,
1129  subProcesses_);
1130  streamLoopWaitTask->wait_for_all();
1131  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1132  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1133  }
1134  }
1135 
1136  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1137  if(looper_) {
1138  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1139  }
1140  }
1141 
1142  void EventProcessor::endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) {
1143  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1144  {
1145  SendSourceTerminationSignalIfException sentry(actReg_.get());
1146 
1147  lumiPrincipal.setEndTime(input_->timestamp());
1148  lumiPrincipal.setComplete();
1149  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1150  sentry.completedSuccessfully();
1151  }
1152  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1153  // lumi blocks know their start and end times why not also start and end events?
1154  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1155  lumiPrincipal.endTime());
1156  {
1157  SendSourceTerminationSignalIfException sentry(actReg_.get());
1158  espController_->eventSetupForInstance(ts);
1159  sentry.completedSuccessfully();
1160  }
1161  EventSetup const& es = esp_->eventSetup();
1162  {
1163  //To wait, the ref count has to b 1+#streams
1164  auto streamLoopWaitTask = make_empty_waiting_task();
1165  streamLoopWaitTask->increment_ref_count();
1166 
1168 
1169  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1170  *schedule_,
1172  lumiPrincipal,
1173  ts,
1174  es,
1175  subProcesses_,
1176  cleaningUpAfterException);
1177  streamLoopWaitTask->wait_for_all();
1178  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1179  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1180  }
1181  }
1182  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1183  if(looper_) {
1184  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1185  }
1186  {
1187  lumiPrincipal.setAtEndTransition(true);
1189  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1190  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1191  }
1192  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1193  if(looper_) {
1194  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1195  }
1196  }
1197 
1198  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readRun() {
1201  << "EventProcessor::readRun\n"
1202  << "Illegal attempt to insert run into cache\n"
1203  << "Contact a Framework Developer\n";
1204  }
1205  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1206  {
1207  SendSourceTerminationSignalIfException sentry(actReg_.get());
1208  input_->readRun(*rp, *historyAppender_);
1209  sentry.completedSuccessfully();
1210  }
1211  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1212  principalCache_.insert(rp);
1213  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1214  }
1215 
1216  std::pair<ProcessHistoryID,RunNumber_t> EventProcessor::readAndMergeRun() {
1217  principalCache_.merge(input_->runAuxiliary(), preg());
1218  auto runPrincipal =principalCache_.runPrincipalPtr();
1219  {
1220  SendSourceTerminationSignalIfException sentry(actReg_.get());
1221  input_->readAndMergeRun(*runPrincipal);
1222  sentry.completedSuccessfully();
1223  }
1224  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1225  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1226  }
1227 
1231  << "EventProcessor::readRun\n"
1232  << "Illegal attempt to insert lumi into cache\n"
1233  << "Contact a Framework Developer\n";
1234  }
1237  << "EventProcessor::readRun\n"
1238  << "Illegal attempt to insert lumi into cache\n"
1239  << "Run is invalid\n"
1240  << "Contact a Framework Developer\n";
1241  }
1242  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1243  {
1244  SendSourceTerminationSignalIfException sentry(actReg_.get());
1245  input_->readLuminosityBlock(*lbp, *historyAppender_);
1246  sentry.completedSuccessfully();
1247  }
1248  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1249  principalCache_.insert(lbp);
1250  return input_->luminosityBlock();
1251  }
1252 
1254  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1255  {
1256  SendSourceTerminationSignalIfException sentry(actReg_.get());
1257  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1258  sentry.completedSuccessfully();
1259  }
1260  return input_->luminosityBlock();
1261  }
1262 
1264  schedule_->writeRun(principalCache_.runPrincipal(phid, run), &processContext_);
1265  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.writeRun(phid, run); });
1266  FDEBUG(1) << "\twriteRun " << run << "\n";
1267  }
1268 
1270  principalCache_.deleteRun(phid, run);
1271  for_all(subProcesses_, [run,phid](auto& subProcess){ subProcess.deleteRunFromCache(phid, run); });
1272  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1273  }
1274 
1276  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi), &processContext_);
1277  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1278  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1279  }
1280 
1282  principalCache_.deleteLumi(phid, run, lumi);
1283  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1284  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1285  }
1286 
1287  bool EventProcessor::readNextEventForStream(unsigned int iStreamIndex,
1288  std::atomic<bool>* finishedProcessingEvents) {
1289  if(shouldWeStop()) {
1290  return false;
1291  }
1292 
1293  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1294  return false;
1295  }
1296 
1297  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1298  return false;
1299  }
1300 
1302  try {
1303  //need to use lock in addition to the serial task queue because
1304  // of delayed provenance reading and reading data in response to
1305  // edm::Refs etc
1306  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1307  if(not firstEventInBlock_) {
1308  //The state machine already called input_->nextItemType
1309  // and found an event. We can't call input_->nextItemType
1310  // again since it would move to the next transition
1311  InputSource::ItemType itemType = input_->nextItemType();
1312  if (InputSource::IsEvent !=itemType) {
1314  finishedProcessingEvents->store(true,std::memory_order_release);
1315  //std::cerr<<"next item type "<<itemType<<"\n";
1316  return false;
1317  }
1319  //std::cerr<<"task told to async stop\n";
1320  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1321  return false;
1322  }
1323  } else {
1324  firstEventInBlock_ = false;
1325  }
1326  readEvent(iStreamIndex);
1327  } catch (...) {
1328  bool expected =false;
1329  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1330  deferredExceptionPtr_ = std::current_exception();
1331 
1332  }
1333  return false;
1334  }
1335  return true;
1336  }
1337 
1339  unsigned int iStreamIndex,
1340  std::atomic<bool>* finishedProcessingEvents)
1341  {
1342  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
1343  if(iPtr) {
1344  bool expected = false;
1345  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1346  deferredExceptionPtr_ = *iPtr;
1347  {
1348  WaitingTaskHolder h(iTask);
1349  h.doneWaiting(*iPtr);
1350  }
1351  }
1352  //the stream will stop now
1353  iTask->decrement_ref_count();
1354  return;
1355  }
1356 
1357  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
1358  });
1359 
1360  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
1362 
1363  try {
1364  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
1365  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1366  } else {
1367  //the stream will stop now
1368  tbb::task::destroy(*recursionTask);
1369  iTask->decrement_ref_count();
1370  }
1371  } catch(...) {
1372  WaitingTaskHolder h(recursionTask);
1373  h.doneWaiting(std::current_exception());
1374  }
1375  });
1376  }
1377 
1381 
1382  std::atomic<bool> finishedProcessingEvents{false};
1383  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
1384 
1385  //The state machine already found the event so
1386  // we have to avoid looking again
1387  firstEventInBlock_ = true;
1388 
1389  //To wait, the ref count has to b 1+#streams
1390  auto eventLoopWaitTask = make_empty_waiting_task();
1391  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
1392  eventLoopWaitTask->increment_ref_count();
1393 
1394  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1395  unsigned int iStreamIndex = 0;
1396  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1397  eventLoopWaitTask->increment_ref_count();
1398  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
1399  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
1400  }) );
1401  }
1402  eventLoopWaitTask->increment_ref_count();
1403  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
1404  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
1405  }));
1406 
1407  //One of the processing threads saw an exception
1409  std::rethrow_exception(deferredExceptionPtr_);
1410  }
1412  }
1413 
1414  void EventProcessor::readEvent(unsigned int iStreamIndex) {
1415  //TODO this will have to become per stream
1416  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1417  StreamContext streamContext(event.streamID(), &processContext_);
1418 
1419  SendSourceTerminationSignalIfException sentry(actReg_.get());
1420  input_->readEvent(event, streamContext);
1421  sentry.completedSuccessfully();
1422 
1423  FDEBUG(1) << "\treadEvent\n";
1424  }
1425 
1427  unsigned int iStreamIndex) {
1428  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1429  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1431  if(rng.isAvailable()) {
1432  Event ev(*pep, ModuleDescription(), nullptr);
1433  rng->postEventRead(ev);
1434  }
1435  assert(pep->luminosityBlockPrincipalPtrValid());
1436  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1437  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1438 
1439  WaitingTaskHolder finalizeEventTask( make_waiting_task(
1440  tbb::task::allocate_root(),
1441  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
1442  {
1444 
1445  //NOTE: If we have a looper we only have one Stream
1446  if(looper_) {
1447  processEventWithLooper(*pep);
1448  }
1449 
1450  FDEBUG(1) << "\tprocessEvent\n";
1451  pep->clearEventPrincipal();
1452  if(iPtr) {
1453  iHolder.doneWaiting(*iPtr);
1454  } else {
1455  iHolder.doneWaiting(std::exception_ptr());
1456  }
1457  }
1458  )
1459  );
1460  WaitingTaskHolder afterProcessTask;
1461  if(subProcesses_.empty()) {
1462  afterProcessTask = std::move(finalizeEventTask);
1463  } else {
1464  //Need to run SubProcesses after schedule has finished
1465  // with the event
1466  afterProcessTask = WaitingTaskHolder(
1467  make_waiting_task(tbb::task::allocate_root(),
1468  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
1469  {
1470  if(not iPtr) {
1472 
1473  //when run with 1 thread, we want to the order to be what
1474  // it was before. This requires reversing the order since
1475  // tasks are run last one in first one out
1476  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
1477  subProcess.doEventAsync(finalizeEventTask,*pep);
1478  }
1479  } else {
1480  finalizeEventTask.doneWaiting(*iPtr);
1481  }
1482  })
1483  );
1484  }
1485 
1486  schedule_->processOneEventAsync(std::move(afterProcessTask),
1487  iStreamIndex,*pep, esp_->eventSetup());
1488 
1489  }
1490 
1492  bool randomAccess = input_->randomAccess();
1493  ProcessingController::ForwardState forwardState = input_->forwardState();
1494  ProcessingController::ReverseState reverseState = input_->reverseState();
1495  ProcessingController pc(forwardState, reverseState, randomAccess);
1496 
1498  do {
1499 
1500  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1501  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
1502 
1503  bool succeeded = true;
1504  if(randomAccess) {
1506  input_->skipEvents(-2);
1507  }
1509  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1510  }
1511  }
1512  pc.setLastOperationSucceeded(succeeded);
1513  } while(!pc.lastOperationSucceeded());
1514  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1515  }
1516 
1518  FDEBUG(1) << "\tshouldWeStop\n";
1519  if(shouldWeStop_) return true;
1520  if(!subProcesses_.empty()) {
1521  for(auto const& subProcess : subProcesses_) {
1522  if(subProcess.terminate()) {
1523  return true;
1524  }
1525  }
1526  return false;
1527  }
1528  return schedule_->terminate();
1529  }
1530 
1533  }
1534 
1537  }
1538 
1541  }
1542 
1543  bool EventProcessor::setDeferredException(std::exception_ptr iException) {
1544  bool expected =false;
1545  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1546  deferredExceptionPtr_ = iException;
1547  return true;
1548  }
1549  return false;
1550  }
1551 
1552 }
size
Write out results.
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
void insert(std::shared_ptr< RunPrincipal > rp)
T getParameter(std::string const &) const
void readEvent(unsigned int iStreamIndex)
T getUntrackedParameter(std::string const &, T const &) const
ProcessContext processContext_
void clear()
Not thread safe.
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
Timestamp const & beginTime() const
edm::EventID specifiedEventTransition() const
InputSource::ItemType nextTransitionType()
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:222
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
static PFTauRenderPlugin instance
def destroy(e)
Definition: pyrootRender.py:13
void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
ParameterSetID id() const
void setExceptionMessageFiles(std::string &message)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
volatile std::atomic< bool > shutdown_flag
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
Timestamp const & endTime() const
void clearCounters()
Clears counters used by trigger report.
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void writeRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool ev
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:323
bool hasRunPrincipal() const
Definition: config.py:1
RunNumber_t run() const
Definition: RunPrincipal.h:61
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool alreadyPrinted() const
Definition: Exception.cc:251
void beginJob()
Definition: Breakpoints.cc:15
static std::string const input
Definition: EdmProvDump.cc:44
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool cleaningUpAfterException)
U second(std::pair< T, U > const &p)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
void doneWaiting(std::exception_ptr iExcept)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< ProductRegistry const > preg() const
void beginRun(ProcessHistoryID const &phid, RunNumber_t run)
void setExceptionMessageLumis(std::string &message)
void setExceptionMessageRuns(std::string &message)
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
void setEndTime(Timestamp const &time)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
Timestamp const & beginTime() const
Definition: RunPrincipal.h:73
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
void setLastOperationSucceeded(bool value)
StreamID streamID() const
bool isAvailable() const
Definition: Service.h:46
void clear()
Not thread safe.
Definition: Registry.cc:44
Timestamp const & endTime() const
Definition: RunPrincipal.h:77
StatusCode runToCompletion()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
int totalEvents() const
std::shared_ptr< edm::ParameterSet > parameterSet() const
element_type const * get() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::string exceptionMessageFiles_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:784
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
StatusCode asyncStopStatusCodeFromProcessingEvents_
bool hasLumiPrincipal() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
InputSource::ItemType readAndProcessEvents()
bool shouldWeCloseOutput() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
void addContext(std::string const &context)
Definition: Exception.cc:227
ServiceToken getToken()
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
HLT enums.
void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
unsigned int RunNumber_t
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
void processEventWithLooper(EventPrincipal &)
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
T first(std::pair< T, U > const &p)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
static ParentageRegistry * instance()
bool setDeferredException(std::exception_ptr)
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
Transition requestedTransition() const
T get(const Candidate &c)
Definition: component.h:55
static Registry * instance()
Definition: Registry.cc:12
std::shared_ptr< EDLooperBase const > looper() const
Definition: event.py:1
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
bool shouldWeStop() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
ParameterSet * getPSetForUpdate(std::string const &name, bool &isTracked)
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()
int maxSecondsUntilRampdown_
Definition: CommonParams.h:31