CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 66 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 365 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 366 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 243 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 76 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 233 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

238  : actReg_(),
239  preg_(),
241  serviceToken_(),
242  input_(),
243  espController_(new eventsetup::EventSetupsController),
244  esp_(),
245  act_table_(),
247  schedule_(),
248  subProcesses_(),
249  historyAppender_(new HistoryAppender),
250  fb_(),
251  looper_(),
253  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
254  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
255  principalCache_(),
256  beginJobCalled_(false),
257  shouldWeStop_(false),
258  fileModeNoMerge_(false),
261  exceptionMessageLumis_(false),
262  forceLooperToEnd_(false),
263  looperBeginJobRun_(false),
266  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
267  processDesc->addServices(defaultServices, forcedServices);
268  init(processDesc, iToken, iLegacy);
269  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 271 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

274  : actReg_(),
275  preg_(),
277  serviceToken_(),
278  input_(),
279  espController_(new eventsetup::EventSetupsController),
280  esp_(),
281  act_table_(),
283  schedule_(),
284  subProcesses_(),
285  historyAppender_(new HistoryAppender),
286  fb_(),
287  looper_(),
289  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
290  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
291  principalCache_(),
292  beginJobCalled_(false),
293  shouldWeStop_(false),
294  fileModeNoMerge_(false),
297  exceptionMessageLumis_(false),
298  forceLooperToEnd_(false),
299  looperBeginJobRun_(false),
303  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
304  processDesc->addServices(defaultServices, forcedServices);
306  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 308 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

311  : actReg_(),
312  preg_(),
314  serviceToken_(),
315  input_(),
316  espController_(new eventsetup::EventSetupsController),
317  esp_(),
318  act_table_(),
320  schedule_(),
321  subProcesses_(),
322  historyAppender_(new HistoryAppender),
323  fb_(),
324  looper_(),
326  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
327  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
328  principalCache_(),
329  beginJobCalled_(false),
330  shouldWeStop_(false),
331  fileModeNoMerge_(false),
334  exceptionMessageLumis_(false),
335  forceLooperToEnd_(false),
336  looperBeginJobRun_(false),
340  init(processDesc, token, legacy);
341  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 579 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

579  {
580  // Make the services available while everything is being deleted.
583 
584  // manually destroy all these thing that may need the services around
585  // propagate_const<T> has no reset() function
586  espController_ = nullptr;
587  esp_ = nullptr;
588  schedule_ = nullptr;
589  input_ = nullptr;
590  looper_ = nullptr;
591  actReg_ = nullptr;
592 
595  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 604 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, edmLumisInFiles::description, esp_, espController_, edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, cmsLHEtoEOSManager::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, MatrixUtil::merge(), eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, edm::swap(), std::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), and edm::convertException::wrap().

Referenced by runToCompletion().

604  {
605  if (beginJobCalled_)
606  return;
607  beginJobCalled_ = true;
608  bk::beginJob();
609 
610  // StateSentry toerror(this); // should we add this ?
611  //make the services available
613 
614  service::SystemBounds bounds(preallocations_.numberOfStreams(),
618  actReg_->preallocateSignal_(bounds);
619  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
621 
622  std::vector<ModuleProcessName> consumedBySubProcesses;
624  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
625  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
626  if (consumedBySubProcesses.empty()) {
627  consumedBySubProcesses = std::move(c);
628  } else if (not c.empty()) {
629  std::vector<ModuleProcessName> tmp;
630  tmp.reserve(consumedBySubProcesses.size() + c.size());
631  std::merge(consumedBySubProcesses.begin(),
632  consumedBySubProcesses.end(),
633  c.begin(),
634  c.end(),
635  std::back_inserter(tmp));
636  std::swap(consumedBySubProcesses, tmp);
637  }
638  });
639 
640  // Note: all these may throw
643  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
644  not unusedModules.empty()) {
646 
647  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
648  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
649  "and "
650  "therefore they are deleted before beginJob transition.";
651  for (auto const& description : unusedModules) {
652  l << "\n " << description->moduleLabel();
653  }
654  });
655  for (auto const& description : unusedModules) {
656  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
657  }
658  }
659  }
660  // Initialize after the deletion of non-consumed unscheduled
661  // modules to avoid non-consumed non-run modules to keep the
662  // products unnecessarily alive
663  if (not branchesToDeleteEarly_.empty()) {
664  schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
666  }
667 
668  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
669 
672  }
674 
675  //NOTE: This implementation assumes 'Job' means one call
676  // the EventProcessor::run
677  // If it really means once per 'application' then this code will
678  // have to be changed.
679  // Also have to deal with case where have 'run' then new Module
680  // added and do 'run'
681  // again. In that case the newly added Module needs its 'beginJob'
682  // to be called.
683 
684  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
685  // For now we delay calling beginOfJob until first beginOfRun
686  //if(looper_) {
687  // looper_->beginOfJob(es);
688  //}
689  try {
690  convertException::wrap([&]() { input_->doBeginJob(); });
691  } catch (cms::Exception& ex) {
692  ex.addContext("Calling beginJob for the source");
693  throw;
694  }
695  espController_->finishConfiguration();
696  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
697  if (looper_) {
698  constexpr bool mustPrefetchMayGet = true;
699  auto const processBlockLookup = preg_->productLookup(InProcess);
700  auto const runLookup = preg_->productLookup(InRun);
701  auto const lumiLookup = preg_->productLookup(InLumi);
702  auto const eventLookup = preg_->productLookup(InEvent);
703  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
704  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
705  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
706  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
707  looper_->updateLookup(esp_->recordsToProxyIndices());
708  }
709  // toerror.succeeded(); // should we add this?
710  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
711  actReg_->postBeginJobSignal_();
712 
713  FinalWaitingTask last;
714  oneapi::tbb::task_group group;
715  using namespace edm::waiting_task::chain;
716  first([this](auto nextTask) {
717  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
718  first([i, this](auto nextTask) {
720  schedule_->beginStream(i);
721  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
723  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
724  }) | lastTask(nextTask);
725  }
727  group.wait();
728  if (last.exceptionPtr()) {
729  std::rethrow_exception(*last.exceptionPtr());
730  }
731  }
ProcessContext processContext_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:205

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1358 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::signalslot::Signal< T >::emit(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), first, watchdog::group, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::ActivityRegistry::preESSyncIOVSignal_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), createBeamHaloJobs::queue, queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, TrackValidation_cff::task, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

1360  {
1361  if (iHolder.taskHasFailed()) {
1362  return;
1363  }
1364 
1365  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1366  // We must be careful with the status object here and in code this function calls. IF we want
1367  // endRun to be called, then we must call resetResources before the things waiting on
1368  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1369  // endRun to be called much later than it should be, because status is holding iRunResource).
1370  // Note that this must be done explicitly. Relying on the destructor does not work well
1371  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1372  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1373  // destroyed inside this function and lumiWork.
1374  auto status =
1375  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1376  chain::first([&](auto nextTask) {
1377  auto asyncEventSetup = [](ActivityRegistry* actReg,
1378  auto* espController,
1379  auto& queue,
1381  auto& status,
1382  IOVSyncValue const& iSync) {
1383  queue.pause();
1384  CMS_SA_ALLOW try {
1385  SendSourceTerminationSignalIfException sentry(actReg);
1386  // Pass in iSync to let the EventSetup system know which run and lumi
1387  // need to be processed and prepare IOVs for it.
1388  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1389  // lumi is done and no longer needs its EventSetup IOVs.
1390  actReg->preESSyncIOVSignal_.emit(iSync);
1391  espController->eventSetupForInstanceAsync(
1392  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1393  sentry.completedSuccessfully();
1394  } catch (...) {
1395  task.doneWaiting(std::current_exception());
1396  }
1397  };
1398  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1399  // We only get here inside this block if there is an EventSetup
1400  // module not able to handle concurrent IOVs (usually an ESSource)
1401  // and the new sync value is outside the current IOV of that module.
1402  auto group = nextTask.group();
1404  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1405  asyncEventSetup(
1407  });
1408  } else {
1409  asyncEventSetup(
1410  actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(nextTask), status, iSync);
1411  }
1412  }) | chain::then([this, status, iSync](std::exception_ptr const* iPtr, auto nextTask) {
1413  actReg_->postESSyncIOVSignal_.emit(iSync);
1414  //the call to doneWaiting will cause the count to decrement
1415  auto copyTask = nextTask;
1416  if (iPtr) {
1417  nextTask.doneWaiting(*iPtr);
1418  }
1419  auto group = copyTask.group();
1420  lumiQueue_->pushAndPause(
1421  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1422  if (task.taskHasFailed()) {
1423  status->resetResources();
1424  return;
1425  }
1426 
1427  status->setResumer(std::move(iResumer));
1428 
1429  auto group = task.group();
1431  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1432  //make the services available
1434  // Caught exception is propagated via WaitingTaskHolder
1435  CMS_SA_ALLOW try {
1437 
1438  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1439  {
1440  SendSourceTerminationSignalIfException sentry(actReg_.get());
1441 
1442  input_->doBeginLumi(lumiPrincipal, &processContext_);
1443  sentry.completedSuccessfully();
1444  }
1445 
1446  Service<RandomNumberGenerator> rng;
1447  if (rng.isAvailable()) {
1448  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1449  rng->preBeginLumi(lb);
1450  }
1451 
1452  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1453 
1454  using namespace edm::waiting_task::chain;
1455  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1456  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1457  {
1458  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1459  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1460  beginGlobalTransitionAsync<Traits>(
1461  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1462  }
1463  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1464  looper_->prefetchAsync(
1465  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1466  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1467  status->globalBeginDidSucceed();
1468  //make the services available
1469  ServiceRegistry::Operate operateLooper(serviceToken_);
1470  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1471  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1472  if (iPtr) {
1473  status->resetResources();
1474  holder.doneWaiting(*iPtr);
1475  } else {
1476  if (not looper_) {
1477  status->globalBeginDidSucceed();
1478  }
1479  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1480  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1481 
1482  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1483  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1484  streamQueues_[i].pause();
1485 
1486  auto& event = principalCache_.eventPrincipal(i);
1487  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1488  // held by the container as this lambda may not finish executing before all the tasks it
1489  // spawns have already started to run.
1490  auto eventSetupImpls = &status->eventSetupImpls();
1491  auto lp = status->lumiPrincipal().get();
1494  event.setLuminosityBlockPrincipal(lp);
1495  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1496  using namespace edm::waiting_task::chain;
1497  chain::first([this, i, &transitionInfo](auto nextTask) {
1498  beginStreamTransitionAsync<Traits>(
1499  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1500  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1501  if (exceptionFromBeginStreamLumi) {
1502  WaitingTaskHolder tmp(nextTask);
1503  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1504  streamEndLumiAsync(nextTask, i);
1505  } else {
1507  }
1508  }) | runLast(holder);
1509  });
1510  }
1511  }
1512  }) | runLast(postQueueTask);
1513 
1514  } catch (...) {
1515  status->resetResources();
1516  postQueueTask.doneWaiting(std::current_exception());
1517  }
1518  }); // task in sourceResourcesAcquirer
1519  });
1520  }) | chain::runLast(std::move(iHolder));
1521  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
tmp
align.sh
Definition: createJobs.py:716
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1048 of file EventProcessor.cc.

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1048  {
1049  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1050  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1051 
1052  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1053  FinalWaitingTask globalWaitTask;
1054 
1055  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1056  beginGlobalTransitionAsync<Traits>(
1057  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1058 
1059  do {
1060  taskGroup_.wait();
1061  } while (not globalWaitTask.done());
1062 
1063  if (globalWaitTask.exceptionPtr() != nullptr) {
1064  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1065  }
1066  beginProcessBlockSucceeded = true;
1067  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRun()

void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 1142 of file EventProcessor.cc.

References actReg_, edm::BeginRun, edm::RunPrincipal::beginTime(), edm::FinalWaitingTask::done(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, forceESCacheClearOnNewRun_, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), taskGroup_, and edm::waiting_task::chain::then().

1145  {
1146  globalBeginSucceeded = false;
1147  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1148  {
1149  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150 
1151  input_->doBeginRun(runPrincipal, &processContext_);
1152  sentry.completedSuccessfully();
1153  }
1154 
1155  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1157  espController_->forceCacheClear();
1158  }
1159  {
1160  SendSourceTerminationSignalIfException sentry(actReg_.get());
1161  actReg_->preESSyncIOVSignal_.emit(ts);
1163  actReg_->postESSyncIOVSignal_.emit(ts);
1164  eventSetupForInstanceSucceeded = true;
1165  sentry.completedSuccessfully();
1166  }
1167  auto const& es = esp_->eventSetupImpl();
1168  if (looper_ && looperBeginJobRun_ == false) {
1169  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1170 
1171  FinalWaitingTask waitTask;
1172  using namespace edm::waiting_task::chain;
1173  chain::first([this, &es](auto nextTask) {
1174  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1175  }) | then([this, &es](auto nextTask) mutable {
1176  looper_->beginOfJob(es);
1177  looperBeginJobRun_ = true;
1178  looper_->doStartingNewLoop();
1179  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1180 
1181  do {
1182  taskGroup_.wait();
1183  } while (not waitTask.done());
1184  if (waitTask.exceptionPtr() != nullptr) {
1185  std::rethrow_exception(*(waitTask.exceptionPtr()));
1186  }
1187  }
1188  {
1189  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1190  FinalWaitingTask globalWaitTask;
1191 
1192  using namespace edm::waiting_task::chain;
1193  chain::first([&runPrincipal, &es, this](auto waitTask) {
1194  RunTransitionInfo transitionInfo(runPrincipal, es);
1195  beginGlobalTransitionAsync<Traits>(
1196  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1197  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1198  globalBeginSucceeded = true;
1199  FDEBUG(1) << "\tbeginRun " << run << "\n";
1200  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1201  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1202  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1203  looper_->doBeginRun(runPrincipal, es, &processContext_);
1204  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1205 
1206  do {
1207  taskGroup_.wait();
1208  } while (not globalWaitTask.done());
1209  if (globalWaitTask.exceptionPtr() != nullptr) {
1210  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1211  }
1212  }
1213  {
1214  //To wait, the ref count has to be 1+#streams
1215  FinalWaitingTask streamLoopWaitTask;
1216 
1217  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1218 
1219  RunTransitionInfo transitionInfo(runPrincipal, es);
1220  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1221  *schedule_,
1223  transitionInfo,
1224  serviceToken_,
1225  subProcesses_);
1226  do {
1227  taskGroup_.wait();
1228  } while (not streamLoopWaitTask.done());
1229  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1230  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1231  }
1232  }
1233  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1234  if (looper_) {
1235  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1236  }
1237  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 287 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

287  {
289  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 290 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 818 of file EventProcessor.cc.

References epSignal, runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

818  {
819  bool returnValue = false;
820 
821  // Look for a shutdown signal
822  if (shutdown_flag.load(std::memory_order_acquire)) {
823  returnValue = true;
825  }
826  return returnValue;
827  }
volatile std::atomic< bool > shutdown_flag

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 812 of file EventProcessor.cc.

References schedule_.

812 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 948 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

948  {
949  if (fileBlockValid()) {
950  SendSourceTerminationSignalIfException sentry(actReg_.get());
951  input_->closeFile(fb_.get(), cleaningUpAfterException);
952  sentry.completedSuccessfully();
953  }
954  FDEBUG(1) << "\tcloseInputFile\n";
955  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 965 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

965  {
966  schedule_->closeOutputFiles();
967  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
968  processBlockHelper_->clearAfterOutputFilesClose();
969  FDEBUG(1) << "\tcloseOutputFiles\n";
970  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1523 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), h, handleNextEventForStreamAsync(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1523  {
1524  {
1525  //all streams are sharing the same status at the moment
1526  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1527  status->needToContinueLumi();
1528  status->startProcessingEvents();
1529  }
1530 
1531  unsigned int streamIndex = 0;
1532  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1533  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1534  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1535  }
1536  iHolder.group()->run(
1537  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1538  }
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
oneapi::tbb::task_group * group() const noexcept
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ deleteLumiFromCache()

void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1821 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1821  {
1822  for (auto& s : subProcesses_) {
1823  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1824  }
1825  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
1826  iStatus.lumiPrincipal()->clearPrincipal();
1827  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1828  }
std::vector< SubProcess > subProcesses_

◆ deleteRunFromCache()

void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1798 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, run(), and subProcesses_.

Referenced by endUnfinishedRun().

1798  {
1799  principalCache_.deleteRun(phid, run);
1800  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1801  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1802  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1039 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1039  {
1040  FDEBUG(1) << "\tdoErrorStuff\n";
1041  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1042  << "and went to the error state\n"
1043  << "Will attempt to terminate processing normally\n"
1044  << "(IF using the looper the next loop will be attempted)\n"
1045  << "This likely indicates a bug in an input module or corrupted input or both\n";
1046  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 733 of file EventProcessor.cc.

References actReg_, c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::first(), watchdog::group, mps_fire::i, input_, cmsLHEtoEOSManager::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

733  {
734  // Collects exceptions, so we don't throw before all operations are performed.
735  ExceptionCollector c(
736  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
737 
738  //make the services available
740 
741  using namespace edm::waiting_task::chain;
742 
743  edm::FinalWaitingTask waitTask;
744  oneapi::tbb::task_group group;
745 
746  {
747  //handle endStream transitions
748  edm::WaitingTaskHolder taskHolder(group, &waitTask);
749  std::mutex collectorMutex;
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([this, i, &c, &collectorMutex](auto nextTask) {
752  std::exception_ptr ep;
753  try {
755  this->schedule_->endStream(i);
756  } catch (...) {
757  ep = std::current_exception();
758  }
759  if (ep) {
760  std::lock_guard<std::mutex> l(collectorMutex);
761  c.call([&ep]() { std::rethrow_exception(ep); });
762  }
763  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
764  for (auto& subProcess : subProcesses_) {
765  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
766  std::exception_ptr ep;
767  try {
769  subProcess.doEndStream(i);
770  } catch (...) {
771  ep = std::current_exception();
772  }
773  if (ep) {
774  std::lock_guard<std::mutex> l(collectorMutex);
775  c.call([&ep]() { std::rethrow_exception(ep); });
776  }
777  }) | lastTask(nextTask);
778  }
779  }) | lastTask(taskHolder);
780  }
781  }
782  group.wait();
783 
784  auto actReg = actReg_.get();
785  c.call([actReg]() { actReg->preEndJobSignal_(); });
786  schedule_->endJob(c);
787  for (auto& subProcess : subProcesses_) {
788  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
789  }
790  c.call(std::bind(&InputSource::doEndJob, input_.get()));
791  if (looper_) {
792  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
793  }
794  c.call([actReg]() { actReg->postEndJobSignal_(); });
795  if (c.hasThrown()) {
796  c.rethrow();
797  }
798  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
static std::mutex mutex
Definition: Proxy.cc:8
std::shared_ptr< EDLooperBase const > looper() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1000 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1000  {
1001  if (looper_) {
1002  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1003  looper_->setModuleChanger(&changer);
1004  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1005  looper_->setModuleChanger(nullptr);
1007  return true;
1008  else
1009  return false;
1010  }
1011  FDEBUG(1) << "\tendOfLoop\n";
1012  return true;
1013  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1105 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1105  {
1106  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1107 
1108  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1109  FinalWaitingTask globalWaitTask;
1110 
1111  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1112  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1113  *schedule_,
1114  transitionInfo,
1115  serviceToken_,
1116  subProcesses_,
1117  cleaningUpAfterException);
1118  do {
1119  taskGroup_.wait();
1120  } while (not globalWaitTask.done());
1121  if (globalWaitTask.exceptionPtr() != nullptr) {
1122  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1123  }
1124 
1125  if (beginProcessBlockSucceeded) {
1126  FinalWaitingTask writeWaitTask;
1128  do {
1129  taskGroup_.wait();
1130  } while (not writeWaitTask.done());
1131  if (writeWaitTask.exceptionPtr()) {
1132  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1133  }
1134  }
1135 
1136  processBlockPrincipal.clearPrincipal();
1137  for (auto& s : subProcesses_) {
1138  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1139  }
1140  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRun()

void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1268 of file EventProcessor.cc.

References actReg_, edm::FinalWaitingTask::done(), edm::EndRun, edm::RunPrincipal::endTime(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, edm::waiting_task::chain::ifThen(), input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), and taskGroup_.

Referenced by endUnfinishedRun().

1271  {
1272  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1273  runPrincipal.setEndTime(input_->timestamp());
1274 
1275  IOVSyncValue ts(
1276  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1277  runPrincipal.endTime());
1278  {
1279  SendSourceTerminationSignalIfException sentry(actReg_.get());
1280  actReg_->preESSyncIOVSignal_.emit(ts);
1282  actReg_->postESSyncIOVSignal_.emit(ts);
1283  sentry.completedSuccessfully();
1284  }
1285  auto const& es = esp_->eventSetupImpl();
1286  if (globalBeginSucceeded) {
1287  //To wait, the ref count has to be 1+#streams
1288  FinalWaitingTask streamLoopWaitTask;
1289 
1290  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1291 
1292  RunTransitionInfo transitionInfo(runPrincipal, es);
1293  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1294  *schedule_,
1296  transitionInfo,
1297  serviceToken_,
1298  subProcesses_,
1299  cleaningUpAfterException);
1300  do {
1301  taskGroup_.wait();
1302  } while (not streamLoopWaitTask.done());
1303  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1304  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1305  }
1306  }
1307  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1308  if (looper_) {
1309  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1310  }
1311  {
1312  FinalWaitingTask globalWaitTask;
1313 
1314  using namespace edm::waiting_task::chain;
1315  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1316  RunTransitionInfo transitionInfo(runPrincipal, es);
1317  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1318  endGlobalTransitionAsync<Traits>(
1319  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1320  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1321  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1322  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1323  looper_->doEndRun(runPrincipal, es, &processContext_);
1324  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1325 
1326  do {
1327  taskGroup_.wait();
1328  } while (not globalWaitTask.done());
1329  if (globalWaitTask.exceptionPtr() != nullptr) {
1330  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1331  }
1332  }
1333  FDEBUG(1) << "\tendRun " << run << "\n";
1334  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1667 of file EventProcessor.cc.

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, and taskGroup_.

1667  {
1668  if (streamLumiActive_.load() > 0) {
1669  FinalWaitingTask globalWaitTask;
1670  {
1671  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1672  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1673  if (streamLumiStatus_[i]) {
1674  streamEndLumiAsync(globalTaskHolder, i);
1675  }
1676  }
1677  }
1678  do {
1679  taskGroup_.wait();
1680  } while (not globalWaitTask.done());
1681  if (globalWaitTask.exceptionPtr() != nullptr) {
1682  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1683  }
1684  }
1685  }
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::atomic< unsigned int > streamLumiActive_

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1239 of file EventProcessor.cc.

References deleteRunFromCache(), endRun(), edm::RunPrincipal::kNo, edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), edm::RunPrincipal::shouldWriteRun(), submitPVValidationJobs::t, taskGroup_, and writeRunAsync().

1243  {
1244  if (eventSetupForInstanceSucceeded) {
1245  //If we skip empty runs, this would be called conditionally
1246  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1247 
1248  if (globalBeginSucceeded) {
1249  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1250  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1251  FinalWaitingTask t;
1252  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1253  mergeableRunProductMetadata->preWriteRun();
1254  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1255  do {
1256  taskGroup_.wait();
1257  } while (not t.done());
1258  mergeableRunProductMetadata->postWriteRun();
1259  if (t.exceptionPtr()) {
1260  std::rethrow_exception(*t.exceptionPtr());
1261  }
1262  }
1263  }
1264  }
1265  deleteRunFromCache(phid, run);
1266  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 196 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

196 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 802 of file EventProcessor.cc.

References schedule_.

802  {
803  return schedule_->getAllModuleDescriptions();
804  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 800 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

800 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1549 of file EventProcessor.cc.

References CMS_SA_ALLOW, deleteLumiFromCache(), edm::EndLuminosityBlock, esp_, first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1550  {
1551  // Get some needed info out of the status object before moving
1552  // it into finalTaskForThisLumi.
1553  auto& lp = *(iLumiStatus->lumiPrincipal());
1554  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1555  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1556  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1557  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1558 
1559  using namespace edm::waiting_task::chain;
1560  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1561  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1562 
1563  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1564  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1565  endGlobalTransitionAsync<Traits>(
1566  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1567  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1568  //Only call writeLumi if beginLumi succeeded
1569  if (didGlobalBeginSucceed) {
1570  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1571  }
1572  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1573  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1574  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1575  //any thrown exception auto propagates to nextTask via the chain
1577  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1578  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1579  std::exception_ptr ptr;
1580  if (iPtr) {
1581  ptr = *iPtr;
1582  }
1584 
1585  // Try hard to clean up resources so the
1586  // process can terminate in a controlled
1587  // fashion even after exceptions have occurred.
1588  // Caught exception is passed to handleEndLumiExceptions()
1589  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1590  if (not ptr) {
1591  ptr = std::current_exception();
1592  }
1593  }
1594  // Caught exception is passed to handleEndLumiExceptions()
1595  CMS_SA_ALLOW try {
1596  status->resumeGlobalLumiQueue();
1598  } catch (...) {
1599  if (not ptr) {
1600  ptr = std::current_exception();
1601  }
1602  }
1603  // Caught exception is passed to handleEndLumiExceptions()
1604  CMS_SA_ALLOW try {
1605  // This call to status.resetResources() must occur before iTask is destroyed.
1606  // Otherwise there will be a data race which could result in endRun
1607  // being delayed until it is too late to successfully call it.
1608  status->resetResources();
1609  status.reset();
1610  } catch (...) {
1611  if (not ptr) {
1612  ptr = std::current_exception();
1613  }
1614  }
1615 
1616  if (ptr) {
1617  handleEndLumiExceptions(&ptr, nextTask);
1618  }
1619  }) | runLast(std::move(iTask));
1620  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1540 of file EventProcessor.cc.

References setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1540  {
1541  if (setDeferredException(*iPtr)) {
1542  WaitingTaskHolder tmp(holder);
1543  tmp.doneWaiting(*iPtr);
1544  } else {
1546  }
1547  }
tmp
align.sh
Definition: createJobs.py:716
bool setDeferredException(std::exception_ptr)

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1892 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::WaitingTaskHolder::group(), edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1892  {
1893  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1895  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1896  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1897  auto status = streamLumiStatus_[iStreamIndex].get();
1898  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1899  CMS_SA_ALLOW try {
1900  if (readNextEventForStream(iStreamIndex, *status)) {
1901  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1902  if (iPtr) {
1903  // Try to end the stream properly even if an exception was
1904  // thrown on an event.
1905  bool expected = false;
1906  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1907  // This is the case where the exception in iPtr is the primary
1908  // exception and we want to see its message.
1909  deferredExceptionPtr_ = *iPtr;
1910  WaitingTaskHolder tempHolder(iTask);
1911  tempHolder.doneWaiting(*iPtr);
1912  }
1913  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1914  //the stream will stop now
1915  return;
1916  }
1917  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1918  });
1919 
1920  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1921  } else {
1922  //the stream will stop now
1923  if (status->isLumiEnding()) {
1924  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1925  status->startNextLumi();
1926  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1927  }
1928  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1929  } else {
1930  iTask.doneWaiting(std::exception_ptr{});
1931  }
1932  }
1933  } catch (...) {
1934  // It is unlikely we will ever get in here ...
1935  // But if we do try to clean up and propagate the exception
1936  if (streamLumiStatus_[iStreamIndex]) {
1937  streamEndLumiAsync(iTask, iStreamIndex);
1938  }
1939  bool expected = false;
1940  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1941  auto e = std::current_exception();
1943  iTask.doneWaiting(e);
1944  }
1945  }
1946  });
1947  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
InputSource::ItemType lastTransitionType() const
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 343 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, processOptions_cff::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

345  {
346  //std::cerr << processDesc->dump() << std::endl;
347 
348  // register the empty parentage vector , once and for all
350 
351  // register the empty parameter set, once and for all.
352  ParameterSet().registerIt();
353 
354  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
355 
356  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
357  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
358  bool const hasSubProcesses = !subProcessVParameterSet.empty();
359 
360  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
361  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
362  // set in here if the parameters were not explicitly set.
364 
365  // Now set some parameters specific to the main process.
366  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
367  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
368  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
369  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
370  << fileMode << ".\n"
371  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
372  } else {
373  fileModeNoMerge_ = (fileMode == "NOMERGE");
374  }
375  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
377 
378  //threading
379  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
380 
381  // Even if numberOfThreads was set to zero in the Python configuration, the code
382  // in cmsRun.cpp should have reset it to something else.
383  assert(nThreads != 0);
384 
385  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
386  if (nStreams == 0) {
387  nStreams = nThreads;
388  }
389  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
390  if (nConcurrentRuns != 1) {
391  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
392  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
393  }
394  unsigned int nConcurrentLumis =
395  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
396  if (nConcurrentLumis == 0) {
397  nConcurrentLumis = 2;
398  }
399  if (nConcurrentLumis > nStreams) {
400  nConcurrentLumis = nStreams;
401  }
402  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
403  if (!loopers.empty()) {
404  //For now loopers make us run only 1 transition at a time
405  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
406  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
407  "of concurrent runs, and the number of concurrent lumis "
408  "are all being reset to 1. Loopers cannot currently support "
409  "values greater than 1.";
410  nStreams = 1;
411  nConcurrentLumis = 1;
412  nConcurrentRuns = 1;
413  }
414  }
415  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
416  if (dumpOptions) {
417  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
418  } else {
419  if (nThreads > 1 or nStreams > 1) {
420  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
421  }
422  }
423  // The number of concurrent IOVs is configured individually for each record in
424  // the class NumberOfConcurrentIOVs to values less than or equal to this.
425  unsigned int maxConcurrentIOVs = nConcurrentLumis;
426 
427  //Check that relationships between threading parameters makes sense
428  /*
429  if(nThreads<nStreams) {
430  //bad
431  }
432  if(nConcurrentRuns>nStreams) {
433  //bad
434  }
435  if(nConcurrentRuns>nConcurrentLumis) {
436  //bad
437  }
438  */
439  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
440 
441  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
443  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
444  //for now, if have a subProcess, don't allow early delete
445  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
446  if (not hasSubProcesses) {
447  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
448  }
449 
450  // Now do general initialization
451  ScheduleItems items;
452 
453  //initialize the services
454  auto& serviceSets = processDesc->getServicesPSets();
455  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
456  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
457 
458  //make the services available
460 
461  CMS_SA_ALLOW try {
462  if (nStreams > 1) {
464  handler->willBeUsingThreads();
465  }
466 
467  // intialize miscellaneous items
468  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
469 
470  // intialize the event setup provider
471  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
472  esp_ = espController_->makeProvider(
473  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
474 
475  // initialize the looper, if any
476  if (!loopers.empty()) {
478  looper_->setActionTable(items.act_table_.get());
479  looper_->attachTo(*items.actReg_);
480 
481  // in presence of looper do not delete modules
483  }
484 
485  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
486 
487  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
488  streamQueues_.resize(nStreams);
489  streamLumiStatus_.resize(nStreams);
490 
491  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
492 
493  // initialize the input source
495  *common,
496  items.preg(),
497  items.branchIDListHelper(),
499  items.thinnedAssociationsHelper(),
500  items.actReg_,
501  items.processConfiguration(),
503 
504  // initialize the Schedule
505  schedule_ =
506  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
507 
508  // set the data members
509  act_table_ = std::move(items.act_table_);
510  actReg_ = items.actReg_;
511  preg_ = items.preg();
513  branchIDListHelper_ = items.branchIDListHelper();
514  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
515  processConfiguration_ = items.processConfiguration();
517  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
518 
519  FDEBUG(2) << parameterSet << std::endl;
520 
522  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
523  // Reusable event principal
524  auto ep = std::make_shared<EventPrincipal>(preg(),
528  historyAppender_.get(),
529  index,
530  true /*primary process*/,
533  }
534 
535  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
536  auto lp =
537  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
539  }
540 
541  {
542  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
544 
545  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
547  }
548 
549  // fill the subprocesses, if there are any
550  subProcesses_.reserve(subProcessVParameterSet.size());
551  for (auto& subProcessPSet : subProcessVParameterSet) {
552  subProcesses_.emplace_back(subProcessPSet,
553  *parameterSet,
554  preg(),
558  SubProcessParentageHelper(),
560  *actReg_,
561  token,
564  &processContext_);
565  }
566  } catch (...) {
567  //in case of an exception, make sure Services are available
568  // during the following destructors
569  espController_ = nullptr;
570  esp_ = nullptr;
571  schedule_ = nullptr;
572  input_ = nullptr;
573  looper_ = nullptr;
574  actReg_ = nullptr;
575  throw;
576  }
577  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1069 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1069  {
1070  input_->fillProcessBlockHelper();
1071  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1072  while (input_->nextProcessBlock(processBlockPrincipal)) {
1073  readProcessBlock(processBlockPrincipal);
1074 
1075  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1076  FinalWaitingTask globalWaitTask;
1077 
1078  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1079  beginGlobalTransitionAsync<Traits>(
1080  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1081 
1082  do {
1083  taskGroup_.wait();
1084  } while (not globalWaitTask.done());
1085  if (globalWaitTask.exceptionPtr() != nullptr) {
1086  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1087  }
1088 
1089  FinalWaitingTask writeWaitTask;
1091  do {
1092  taskGroup_.wait();
1093  } while (not writeWaitTask.done());
1094  if (writeWaitTask.exceptionPtr()) {
1095  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1096  }
1097 
1098  processBlockPrincipal.clearPrincipal();
1099  for (auto& s : subProcesses_) {
1100  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1101  }
1102  }
1103  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 186 of file EventProcessor.h.

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

186  {
188  return InputSource::IsStop;
189  }
190  return lastSourceTransition_;
191  }
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 297 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

297 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 298 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

298 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ nextLuminosityBlockID()

edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 859 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

859 { return input_->luminosityBlock(); }
edm::propagate_const< std::unique_ptr< InputSource > > input_

◆ nextRunID()

std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 855 of file EventProcessor.cc.

References input_.

855  {
856  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
857  }
edm::propagate_const< std::unique_ptr< InputSource > > input_

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 829 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

829  {
830  if (deferredExceptionPtrIsSet_.load()) {
832  return InputSource::IsStop;
833  }
834 
835  SendSourceTerminationSignalIfException sentry(actReg_.get());
836  InputSource::ItemType itemType;
837  //For now, do nothing with InputSource::IsSynchronize
838  do {
839  itemType = input_->nextItemType();
840  } while (itemType == InputSource::IsSynchronize);
841 
842  lastSourceTransition_ = itemType;
843  sentry.completedSuccessfully();
844 
846 
848  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
850  }
851 
852  return lastSourceTransition_;
853  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 957 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

957  {
958  if (fileBlockValid()) {
959  schedule_->openOutputFiles(*fb_);
960  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
961  }
962  FDEBUG(1) << "\topenOutputFiles\n";
963  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

285 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 286 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

286 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1021 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1021  {
1022  looper_->prepareForNextLoop(esp_.get());
1023  FDEBUG(1) << "\tprepareForNextLoop\n";
1024  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 140 of file EventProcessor.h.

References processConfiguration_.

140 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1963 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1963  {
1964  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1965  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1967 of file EventProcessor.cc.

References esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

1967  {
1968  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1969 
1971  Service<RandomNumberGenerator> rng;
1972  if (rng.isAvailable()) {
1973  Event ev(*pep, ModuleDescription(), nullptr);
1974  rng->postEventRead(ev);
1975  }
1976 
1977  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1978  using namespace edm::waiting_task::chain;
1979  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1980  EventTransitionInfo info(*pep, es);
1981  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1982  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1983  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1984  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1985  }
1986  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1987  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1988  ServiceRegistry::Operate operateLooper(serviceToken_);
1989  processEventWithLooper(*pep, iStreamIndex);
1990  }) | then([pep](auto nextTask) {
1991  FDEBUG(1) << "\tprocessEvent\n";
1992  pep->clearEventPrincipal();
1993  }) | runLast(iHolder);
1994  }
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1996 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1996  {
1997  bool randomAccess = input_->randomAccess();
1998  ProcessingController::ForwardState forwardState = input_->forwardState();
1999  ProcessingController::ReverseState reverseState = input_->reverseState();
2000  ProcessingController pc(forwardState, reverseState, randomAccess);
2001 
2003  do {
2004  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2005  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2006  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2007 
2008  bool succeeded = true;
2009  if (randomAccess) {
2010  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2011  input_->skipEvents(-2);
2012  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2013  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2014  }
2015  }
2016  pc.setLastOperationSucceeded(succeeded);
2017  } while (!pc.lastOperationSucceeded());
2019  shouldWeStop_ = true;
2021  }
2022  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processLumis()

InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1336 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), input_, lastTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamLumiActive_, and taskGroup_.

1336  {
1337  FinalWaitingTask waitTask;
1338  if (streamLumiActive_ > 0) {
1340  // Continue after opening a new input file
1342  } else {
1343  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1344  input_->luminosityBlockAuxiliary()->beginTime()),
1345  iRunResource,
1346  WaitingTaskHolder{taskGroup_, &waitTask});
1347  }
1348  do {
1349  taskGroup_.wait();
1350  } while (not waitTask.done());
1351 
1352  if (waitTask.exceptionPtr() != nullptr) {
1353  std::rethrow_exception(*(waitTask.exceptionPtr()));
1354  }
1355  return lastTransitionType();
1356  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
PreallocationConfiguration preallocations_
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
oneapi::tbb::task_group taskGroup_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_

◆ readAndMergeLumi()

int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1747 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1747  {
1748  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1749  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1750  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1751  input_->processHistoryRegistry().reducedProcessHistoryID(
1752  input_->luminosityBlockAuxiliary()->processHistoryID()));
1753  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1754  assert(lumiOK);
1755  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1756  {
1757  SendSourceTerminationSignalIfException sentry(actReg_.get());
1758  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1759  sentry.completedSuccessfully();
1760  }
1761  return input_->luminosityBlock();
1762  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1716 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1716  {
1717  principalCache_.merge(input_->runAuxiliary(), preg());
1718  auto runPrincipal = principalCache_.runPrincipalPtr();
1719  {
1720  SendSourceTerminationSignalIfException sentry(actReg_.get());
1721  input_->readAndMergeRun(*runPrincipal);
1722  sentry.completedSuccessfully();
1723  }
1724  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1725  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1726  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1949 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1949  {
1950  //TODO this will have to become per stream
1951  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1952  StreamContext streamContext(event.streamID(), &processContext_);
1953 
1954  SendSourceTerminationSignalIfException sentry(actReg_.get());
1955  input_->readEvent(event, streamContext);
1956 
1957  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1958  sentry.completedSuccessfully();
1959 
1960  FDEBUG(1) << "\treadEvent\n";
1961  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 930 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, edm::PrincipalCache::preReadFile(), principalCache_, and findQualityFiles::size.

930  {
931  FDEBUG(1) << " \treadFile\n";
932  size_t size = preg_->size();
933  SendSourceTerminationSignalIfException sentry(actReg_.get());
934 
936 
937  fb_ = input_->readFile();
938  if (size < preg_->size()) {
940  }
943  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
944  }
945  sentry.completedSuccessfully();
946  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ readLuminosityBlock()

void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1728 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1728  {
1730  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1731  << "Illegal attempt to insert lumi into cache\n"
1732  << "Run is invalid\n"
1733  << "Contact a Framework Developer\n";
1734  }
1736  assert(lbp);
1737  lbp->setAux(*input_->luminosityBlockAuxiliary());
1738  {
1739  SendSourceTerminationSignalIfException sentry(actReg_.get());
1740  input_->readLuminosityBlock(*lbp, *historyAppender_);
1741  sentry.completedSuccessfully();
1742  }
1743  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1744  iStatus.lumiPrincipal() = std::move(lbp);
1745  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1830 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1830  {
1831  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1832  iStatus.endLumi();
1833  return false;
1834  }
1835 
1836  if (iStatus.wasEventProcessingStopped()) {
1837  return false;
1838  }
1839 
1840  if (shouldWeStop()) {
1842  iStatus.stopProcessingEvents();
1843  iStatus.endLumi();
1844  return false;
1845  }
1846 
1848  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1849  CMS_SA_ALLOW try {
1850  //need to use lock in addition to the serial task queue because
1851  // of delayed provenance reading and reading data in response to
1852  // edm::Refs etc
1853  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1854 
1855  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1856  if (InputSource::IsLumi == itemType) {
1857  iStatus.haveContinuedLumi();
1858  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1859  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1860  readAndMergeLumi(iStatus);
1861  itemType = nextTransitionType();
1862  }
1863  if (InputSource::IsLumi == itemType) {
1864  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1865  input_->luminosityBlockAuxiliary()->beginTime()));
1866  }
1867  }
1868  if (InputSource::IsEvent != itemType) {
1869  iStatus.stopProcessingEvents();
1870 
1871  //IsFile may continue processing the lumi and
1872  // looper_ can cause the input source to declare a new IsRun which is actually
1873  // just a continuation of the previous run
1874  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1875  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1876  iStatus.endLumi();
1877  }
1878  return false;
1879  }
1880  readEvent(iStreamIndex);
1881  } catch (...) {
1882  bool expected = false;
1883  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1884  deferredExceptionPtr_ = std::current_exception();
1885  iStatus.endLumi();
1886  }
1887  return false;
1888  }
1889  return true;
1890  }
void readEvent(unsigned int iStreamIndex)
#define CMS_SA_ALLOW
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
bool shouldWeStop() const
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1687 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1687  {
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689  input_->readProcessBlock(processBlockPrincipal);
1690  sentry.completedSuccessfully();
1691  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1693 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

1693  {
1695  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1696  << "Illegal attempt to insert run into cache\n"
1697  << "Contact a Framework Developer\n";
1698  }
1699  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1700  preg(),
1702  historyAppender_.get(),
1703  0,
1704  true,
1706  {
1707  SendSourceTerminationSignalIfException sentry(actReg_.get());
1708  input_->readRun(*rp, *historyAppender_);
1709  sentry.completedSuccessfully();
1710  }
1711  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1712  principalCache_.insert(rp);
1713  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1714  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 982 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

982  {
983  if (fileBlockValid()) {
984  schedule_->respondToCloseInputFile(*fb_);
985  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
986  }
987  FDEBUG(1) << "\trespondToCloseInputFile\n";
988  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 972 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

972  {
973  if (fileBlockValid()) {
975  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
976  schedule_->respondToOpenInputFile(*fb_);
977  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
978  }
979  FDEBUG(1) << "\trespondToOpenInputFile\n";
980  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1015 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1015  {
1016  input_->repeat();
1017  input_->rewind();
1018  FDEBUG(1) << "\trewind\n";
1019  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 861 of file EventProcessor.cc.

References asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

861  {
864  {
865  beginJob(); //make sure this was called
866 
867  // make the services available
869 
871  try {
872  FilesProcessor fp(fileModeNoMerge_);
873 
874  convertException::wrap([&]() {
875  bool firstTime = true;
876  do {
877  if (not firstTime) {
879  rewindInput();
880  } else {
881  firstTime = false;
882  }
883  startingNewLoop();
884 
885  auto trans = fp.processFiles(*this);
886 
887  fp.normalEnd();
888 
889  if (deferredExceptionPtrIsSet_.load()) {
890  std::rethrow_exception(deferredExceptionPtr_);
891  }
892  if (trans != InputSource::IsStop) {
893  //problem with the source
894  doErrorStuff();
895 
896  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
897  }
898  } while (not endOfLoop());
899  }); // convertException::wrap
900 
901  } // Try block
902  catch (cms::Exception& e) {
904  std::string message(
905  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
906  e.addAdditionalInfo(message);
907  if (e.alreadyPrinted()) {
908  LogAbsolute("Additional Exceptions") << message;
909  }
910  }
911  if (!exceptionMessageRuns_.empty()) {
912  e.addAdditionalInfo(exceptionMessageRuns_);
913  if (e.alreadyPrinted()) {
914  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
915  }
916  }
917  if (!exceptionMessageFiles_.empty()) {
918  e.addAdditionalInfo(exceptionMessageFiles_);
919  if (e.alreadyPrinted()) {
920  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
921  }
922  }
923  throw;
924  }
925  }
926 
927  return returnCode;
928  }
std::atomic< bool > exceptionMessageLumis_
std::string exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2045 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

2045  {
2046  bool expected = false;
2047  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2048  deferredExceptionPtr_ = iException;
2049  return true;
2050  }
2051  return false;
2052  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2039 of file EventProcessor.cc.

References exceptionMessageFiles_.

2039 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2043 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2043 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 2041 of file EventProcessor.cc.

References exceptionMessageRuns_.

2041 { exceptionMessageRuns_ = message; }
std::string exceptionMessageRuns_

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1026 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1026  {
1027  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1028  if (!subProcesses_.empty()) {
1029  for (auto const& subProcess : subProcesses_) {
1030  if (subProcess.shouldWeCloseOutput()) {
1031  return true;
1032  }
1033  }
1034  return false;
1035  }
1036  return schedule_->shouldWeCloseOutput();
1037  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2024 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2024  {
2025  FDEBUG(1) << "\tshouldWeStop\n";
2026  if (shouldWeStop_)
2027  return true;
2028  if (!subProcesses_.empty()) {
2029  for (auto const& subProcess : subProcesses_) {
2030  if (subProcess.terminate()) {
2031  return true;
2032  }
2033  }
2034  return false;
2035  }
2036  return schedule_->terminate();
2037  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 990 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

990  {
991  shouldWeStop_ = false;
992  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
993  // until after we've called beginOfJob
994  if (looper_ && looperBeginJobRun_) {
995  looper_->doStartingNewLoop();
996  }
997  FDEBUG(1) << "\tstartingNewLoop\n";
998  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1622 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and submitPVValidationJobs::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

1622  {
1623  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1624  if (iPtr) {
1625  handleEndLumiExceptions(iPtr, iTask);
1626  }
1627  auto status = streamLumiStatus_[iStreamIndex];
1628  //reset status before releasing queue else get race condtion
1629  streamLumiStatus_[iStreamIndex].reset();
1631  streamQueues_[iStreamIndex].resume();
1632 
1633  //are we the last one?
1634  if (status->streamFinishedLumi()) {
1636  }
1637  });
1638 
1639  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1640 
1641  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1642  // therefore we do not want to hold the shared_ptr
1643  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1644  lumiStatus->setEndTime();
1645 
1646  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1647 
1648  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1649  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1650 
1651  if (lumiStatus->didGlobalBeginSucceed()) {
1652  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1653  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1654  lumiPrincipal.endTime());
1655  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1656  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1657  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1658  *schedule_,
1659  iStreamIndex,
1660  transitionInfo,
1661  serviceToken_,
1662  subProcesses_,
1663  cleaningUpAfterException);
1664  }
1665  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 597 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

597  {
600  taskGroup_.wait();
601  assert(task.done());
602  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

291  {
293  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 294 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

294  {
296  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2054 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2054  {
2055  cms::Exception ex("ModulesSynchingOnLumis");
2056  ex << "The framework is configured to use at least two streams, but the following modules\n"
2057  << "require synchronizing on LuminosityBlock boundaries:";
2058  bool found = false;
2059  for (auto worker : schedule_->allWorkers()) {
2060  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2061  found = true;
2062  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2063  }
2064  }
2065  if (found) {
2066  ex << "\n\nThe situation can be fixed by either\n"
2067  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2068  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2069  throw ex;
2070  }
2071  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 806 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

806 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 810 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

810 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 808 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

808 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutLegacyModules()

void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2073 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2073  {
2074  std::unique_ptr<LogSystem> s;
2075  for (auto worker : schedule_->allWorkers()) {
2076  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2077  if (not s) {
2078  s = std::make_unique<LogSystem>("LegacyModules");
2079  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2080  "is going to end soon. These modules need to be converted to have type\n"
2081  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2082  }
2083  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2084  }
2085  }
2086  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1804 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

1804  {
1805  using namespace edm::waiting_task;
1806  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
1807  chain::first([&](auto nextTask) {
1809 
1810  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1811  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1812  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1814  for (auto& s : subProcesses_) {
1815  s.writeLumiAsync(nextTask, lumiPrincipal);
1816  }
1818  }
1819  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 1764 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

1764  {
1765  using namespace edm::waiting_task;
1766  chain::first([&](auto nextTask) {
1768  schedule_->writeProcessBlockAsync(
1769  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1770  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1772  for (auto& s : subProcesses_) {
1773  s.writeProcessBlockAsync(nextTask, processBlockType);
1774  }
1775  }) | chain::runLast(std::move(task));
1776  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1778 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, processContext_, run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endUnfinishedRun().

1781  {
1782  using namespace edm::waiting_task;
1783  chain::first([&](auto nextTask) {
1785  schedule_->writeRunAsync(nextTask,
1787  &processContext_,
1788  actReg_.get(),
1789  mergeableRunProductMetadata);
1790  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1792  for (auto& s : subProcesses_) {
1793  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1794  }
1795  }) | chain::runLast(std::move(task));
1796  }
ProcessContext processContext_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 323 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ asyncStopRequestedWhileProcessingEvents_

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 361 of file EventProcessor.h.

Referenced by runToCompletion().

◆ asyncStopStatusCodeFromProcessingEvents_

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 362 of file EventProcessor.h.

Referenced by runToCompletion().

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 349 of file EventProcessor.h.

Referenced by beginJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 334 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 314 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 370 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 367 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 352 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 354 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 353 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 351 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 363 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 357 of file EventProcessor.h.

Referenced by beginRun(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 355 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 337 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 356 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 327 of file EventProcessor.h.

Referenced by init(), and readRun().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 313 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 369 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 315 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 324 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), processConfiguration(), and readRun().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 350 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 347 of file EventProcessor.h.

Referenced by readNextEventForStream().

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 346 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().