CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRunAsync (IOVSyncValue const &, WaitingTaskHolder)
 
void beginStreams ()
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void clearLumiPrincipal (LuminosityBlockProcessingStatus &)
 
void clearRunPrincipal (RunProcessingStatus &)
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (WaitingTaskHolder)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRunAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void endStreams (ExceptionCollector &) noexcept
 
void endUnfinishedLumi (bool cleaningUpAfterException)
 
void endUnfinishedRun (bool cleaningUpAfterException)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void globalEndRunAsync (WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
 
void handleEndLumiExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void handleEndRunExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void inputProcessBlocks ()
 
InputSource::ItemTypeInfo lastTransitionType () const
 
InputSource::ItemTypeInfo nextTransitionType ()
 
void nextTransitionTypeAsync (std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processRuns ()
 
void readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
void readAndMergeRun (RunProcessingStatus &)
 
void readFile ()
 
std::shared_ptr< LuminosityBlockPrincipalreadLuminosityBlock (std::shared_ptr< RunPrincipal > rp)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::shared_ptr< RunPrincipalreadRun ()
 
void releaseBeginRunResources (unsigned int iStream)
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns ()
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamBeginRunAsync (unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
 
void streamEndLumiAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void streamEndRunAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void handleNextItemAfterMergingRunEntries (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
bool needToCallNext () const
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readAndMergeLumiEntriesAsync (std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
 
void readAndMergeRunEntriesAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
 
void setNeedToCallNext (bool val)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutModulesRequiringRunSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginJobCalled_
 
bool beginJobStartedModules_ = false
 
bool beginJobSucceeded_ = false
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::atomic< bool > exceptionMessageRuns_
 
std::shared_ptr< RunProcessingStatusexceptionRunStatus_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemTypeInfo lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
std::vector< std::string > modulesToIgnoreForDeleteEarly_
 
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
 
bool needToCallNext_ = true
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
std::multimap< std::string, std::string > referencesToBranches_
 
std::unique_ptr< edm::LimitedTaskQueuerunQueue_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
SerialTaskQueue streamQueuesInserter_
 
std::atomic< unsigned int > streamRunActive_ {0}
 
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 72 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 376 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 377 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 240 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 82 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 214 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

219  : actReg_(),
220  preg_(),
222  serviceToken_(),
223  input_(),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 253 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

256  : actReg_(),
257  preg_(),
259  serviceToken_(),
260  input_(),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 290 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

293  : actReg_(),
294  preg_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
322  init(processDesc, token, legacy);
323  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 607 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, findAndChange::op, schedule_, and unpackBuffers-CaloStage2::token.

607  {
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 632 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, beginJobStartedModules_, beginJobSucceeded_, beginStreams(), branchesToDeleteEarly_, DummyCfis::c, edm::checkForModuleDependencyCorrectness(), CMS_SA_ALLOW, ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), deleteNonConsumedUnscheduledModules_, makeListRunsInFiles::description, esp_, espController_, edm::for_all(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, MainPageGenerator::l, looper_, merge(), modulesToIgnoreForDeleteEarly_, eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, referencesToBranches_, edm::PathsAndConsumesOfModules::removeModules(), schedule_, serviceToken_, subProcesses_, edm::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutModulesRequiringRunSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

632  {
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
639 
640  service::SystemBounds bounds(preallocations_.numberOfStreams(),
644  actReg_->preallocateSignal_(bounds);
645  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
647 
648  std::vector<ModuleProcessName> consumedBySubProcesses;
650  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
651  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
652  if (consumedBySubProcesses.empty()) {
653  consumedBySubProcesses = std::move(c);
654  } else if (not c.empty()) {
655  std::vector<ModuleProcessName> tmp;
656  tmp.reserve(consumedBySubProcesses.size() + c.size());
657  std::merge(consumedBySubProcesses.begin(),
658  consumedBySubProcesses.end(),
659  c.begin(),
660  c.end(),
661  std::back_inserter(tmp));
662  std::swap(consumedBySubProcesses, tmp);
663  }
664  });
665 
666  // Note: all these may throw
669  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
670  not unusedModules.empty()) {
672 
673  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
674  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
675  "and "
676  "therefore they are deleted before beginJob transition.";
677  for (auto const& description : unusedModules) {
678  l << "\n " << description->moduleLabel();
679  }
680  });
681  for (auto const& description : unusedModules) {
682  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
683  }
684  }
685  }
686  // Initialize after the deletion of non-consumed unscheduled
687  // modules to avoid non-consumed non-run modules to keep the
688  // products unnecessarily alive
689  if (not branchesToDeleteEarly_.empty()) {
690  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
691  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
692  auto referencesToBranches = std::move(referencesToBranches_);
693  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
694  }
695 
698  }
699  if (preallocations_.numberOfRuns() > 1) {
701  }
702 
703  //NOTE: This implementation assumes 'Job' means one call
704  // the EventProcessor::run
705  // If it really means once per 'application' then this code will
706  // have to be changed.
707  // Also have to deal with case where have 'run' then new Module
708  // added and do 'run'
709  // again. In that case the newly added Module needs its 'beginJob'
710  // to be called.
711 
712  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
713  // For now we delay calling beginOfJob until first beginOfRun
714  //if(looper_) {
715  // looper_->beginOfJob(es);
716  //}
717  espController_->finishConfiguration();
718  actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
719  try {
720  convertException::wrap([&]() { input_->doBeginJob(); });
721  } catch (cms::Exception& ex) {
722  ex.addContext("Calling beginJob for the source");
723  throw;
724  }
725 
727 
728  // If we execute the beginJob transition for any module then we execute it
729  // for all of the modules. We save the first exception and rethrow that
730  // after they all complete.
731  std::exception_ptr firstException;
732  CMS_SA_ALLOW try {
733  schedule_->beginJob(
734  *preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, pathsAndConsumesOfModules_, processContext_);
735  } catch (...) {
736  firstException = std::current_exception();
737  }
738  if (looper_ && !firstException) {
739  CMS_SA_ALLOW try {
740  constexpr bool mustPrefetchMayGet = true;
741  auto const processBlockLookup = preg_->productLookup(InProcess);
742  auto const runLookup = preg_->productLookup(InRun);
743  auto const lumiLookup = preg_->productLookup(InLumi);
744  auto const eventLookup = preg_->productLookup(InEvent);
745  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
746  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
747  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
748  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
749  looper_->updateLookup(esp_->recordsToResolverIndices());
750  } catch (...) {
751  firstException = std::current_exception();
752  }
753  }
754  for (auto& subProcess : subProcesses_) {
755  CMS_SA_ALLOW try { subProcess.doBeginJob(); } catch (...) {
756  if (!firstException) {
757  firstException = std::current_exception();
758  }
759  }
760  }
761  if (firstException) {
762  std::rethrow_exception(firstException);
763  }
764 
765  beginJobSucceeded_ = true;
766  beginStreams();
767  }
ProcessContext processContext_
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
int merge(int argc, char *argv[])
Definition: DiMuonVmerge.cc:28
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::multimap< std::string, std::string > referencesToBranches_
ServiceToken serviceToken_
std::vector< std::string > modulesToIgnoreForDeleteEarly_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:169
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( IOVSyncValue const &  iSync,
std::shared_ptr< RunProcessingStatus iRunStatus,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1679 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), dqmdumpme::first, globalEndLumiAsync(), handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), edm::InputSource::LastItemToBeMerged, lastTransitionType(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeLumiEntriesAsync(), readLuminosityBlock(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, streamQueuesInserter_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1681  {
1682  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1683 
1684  auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1685  chain::first([this, &iSync, &status](auto nextTask) {
1686  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1687  nextTask,
1688  status->endIOVWaitingTasks(),
1689  status->eventSetupImpls(),
1691  actReg_.get(),
1692  serviceToken_);
1693  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1694  CMS_SA_ALLOW try {
1695  //the call to doneWaiting will cause the count to decrement
1696  if (iException) {
1697  WaitingTaskHolder copyHolder(nextTask);
1698  copyHolder.doneWaiting(*iException);
1699  }
1700 
1701  lumiQueue_->pushAndPause(
1702  *nextTask.group(),
1703  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1704  CMS_SA_ALLOW try {
1705  if (postLumiQueueTask.taskHasFailed()) {
1706  status->resetResources();
1708  endRunAsync(iRunStatus, postLumiQueueTask);
1709  return;
1710  }
1711 
1712  status->setResumer(std::move(iResumer));
1713 
1715  *postLumiQueueTask.group(),
1716  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1717  CMS_SA_ALLOW try {
1719 
1720  if (postSourceTask.taskHasFailed()) {
1721  status->resetResources();
1723  endRunAsync(iRunStatus, postSourceTask);
1724  return;
1725  }
1726 
1727  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1728 
1729  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1730  {
1731  SendSourceTerminationSignalIfException sentry(actReg_.get());
1732  input_->doBeginLumi(lumiPrincipal, &processContext_);
1733  sentry.completedSuccessfully();
1734  }
1735 
1736  Service<RandomNumberGenerator> rng;
1737  if (rng.isAvailable()) {
1738  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1739  rng->preBeginLumi(lb);
1740  }
1741 
1742  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1743 
1744  using namespace edm::waiting_task::chain;
1745  chain::first([this, status](auto nextTask) mutable {
1748  } else {
1749  setNeedToCallNext(true);
1750  }
1751  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1752  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1753  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1754  beginGlobalTransitionAsync<Traits>(
1755  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1756  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1757  looper_->prefetchAsync(
1758  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1759  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1760  ServiceRegistry::Operate operateLooper(serviceToken_);
1761  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1762  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1763  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1764 
1765  if (iException) {
1766  WaitingTaskHolder copyHolder(holder);
1767  copyHolder.doneWaiting(*iException);
1768  globalEndLumiAsync(holder, status);
1769  endRunAsync(iRunStatus, holder);
1770  } else {
1771  status->globalBeginDidSucceed();
1772 
1773  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1774  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1775 
1776  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1777  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1778  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1779  if (!status->shouldStreamStartLumi()) {
1780  return;
1781  }
1782  streamQueues_[i].pause();
1783 
1784  auto& event = principalCache_.eventPrincipal(i);
1785  auto eventSetupImpls = &status->eventSetupImpls();
1786  auto lp = status->lumiPrincipal().get();
1789  event.setLuminosityBlockPrincipal(lp);
1790  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1791  using namespace edm::waiting_task::chain;
1792  chain::first([this, i, &transitionInfo](auto nextTask) {
1793  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1794  *schedule_,
1795  i,
1796  transitionInfo,
1797  serviceToken_,
1798  subProcesses_);
1799  }) |
1800  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1801  auto nextTask) {
1802  if (exceptionFromBeginStreamLumi) {
1803  WaitingTaskHolder copyHolder(nextTask);
1804  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1805  }
1807  }) |
1808  runLast(std::move(holder));
1809  });
1810  } // end for loop over streams
1811  });
1812  }
1813  }) | runLast(postSourceTask);
1814  } catch (...) {
1815  status->resetResources();
1817  WaitingTaskHolder copyHolder(postSourceTask);
1818  copyHolder.doneWaiting(std::current_exception());
1819  endRunAsync(iRunStatus, postSourceTask);
1820  }
1821  }); // task in sourceResourcesAcquirer
1822  } catch (...) {
1823  status->resetResources();
1825  WaitingTaskHolder copyHolder(postLumiQueueTask);
1826  copyHolder.doneWaiting(std::current_exception());
1827  endRunAsync(iRunStatus, postLumiQueueTask);
1828  }
1829  }); // task in lumiQueue
1830  } catch (...) {
1831  status->resetResources();
1833  WaitingTaskHolder copyHolder(nextTask);
1834  copyHolder.doneWaiting(std::current_exception());
1835  endRunAsync(iRunStatus, nextTask);
1836  }
1837  }) | chain::runLast(std::move(iHolder));
1838  }
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
InputSource::ItemTypeInfo lastTransitionType() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1131 of file EventProcessor.cc.

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1131  {
1132  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1133  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1134 
1135  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1136  FinalWaitingTask globalWaitTask{taskGroup_};
1137 
1138  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1139  beginGlobalTransitionAsync<Traits>(
1140  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1141 
1142  globalWaitTask.wait();
1143  beginProcessBlockSucceeded = true;
1144  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRunAsync()

void edm::EventProcessor::beginRunAsync ( IOVSyncValue const &  iSync,
WaitingTaskHolder  iHolder 
)

Definition at line 1234 of file EventProcessor.cc.

References actReg_, edm::BeginRun, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, exceptionRunStatus_, dqmdumpme::first, forceESCacheClearOnNewRun_, globalEndRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, handleNextItemAfterMergingRunEntries(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::InputSource::LastItemToBeMerged, lastTransitionType(), looper_, looperBeginJobRun_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeRunEntriesAsync(), readRun(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), runQueue_, schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceResourcesAcquirer_, mps_update::status, streamBeginRunAsync(), streamQueues_, streamQueuesInserter_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and edm::waiting_task::chain::then().

Referenced by endRunAsync(), and processRuns().

1234  {
1235  if (iHolder.taskHasFailed()) {
1236  return;
1237  }
1238 
1239  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1240 
1241  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1242 
1243  chain::first([this, &status, iSync](auto nextTask) {
1244  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1245  nextTask,
1246  status->endIOVWaitingTasks(),
1247  status->eventSetupImpls(),
1249  actReg_.get(),
1250  serviceToken_,
1252  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1253  CMS_SA_ALLOW try {
1254  if (iException) {
1255  WaitingTaskHolder copyHolder(nextTask);
1256  copyHolder.doneWaiting(*iException);
1257  // Finish handling the exception in the task pushed to runQueue_
1258  }
1260 
1261  runQueue_->pushAndPause(
1262  *nextTask.group(),
1263  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1264  CMS_SA_ALLOW try {
1265  if (postRunQueueTask.taskHasFailed()) {
1266  status->resetBeginResources();
1268  return;
1269  }
1270 
1271  status->setResumer(std::move(iResumer));
1272 
1274  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1275  CMS_SA_ALLOW try {
1277 
1278  if (postSourceTask.taskHasFailed()) {
1279  status->resetBeginResources();
1281  status->resumeGlobalRunQueue();
1282  return;
1283  }
1284 
1285  status->setRunPrincipal(readRun());
1286 
1287  RunPrincipal& runPrincipal = *status->runPrincipal();
1288  {
1289  SendSourceTerminationSignalIfException sentry(actReg_.get());
1290  input_->doBeginRun(runPrincipal, &processContext_);
1291  sentry.completedSuccessfully();
1292  }
1293 
1294  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1295  if (looper_ && looperBeginJobRun_ == false) {
1296  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1297 
1298  oneapi::tbb::task_group group;
1299  FinalWaitingTask waitTask{group};
1300  using namespace edm::waiting_task::chain;
1301  chain::first([this, &es](auto nextTask) {
1302  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1303  }) | then([this, &es](auto nextTask) mutable {
1304  looper_->beginOfJob(es);
1305  looperBeginJobRun_ = true;
1306  looper_->doStartingNewLoop();
1307  }) | runLast(WaitingTaskHolder(group, &waitTask));
1308  waitTask.wait();
1309  }
1310 
1311  using namespace edm::waiting_task::chain;
1312  chain::first([this, status](auto nextTask) mutable {
1313  CMS_SA_ALLOW try {
1316  } else {
1317  setNeedToCallNext(true);
1318  }
1319  } catch (...) {
1320  status->setStopBeforeProcessingRun(true);
1321  nextTask.doneWaiting(std::current_exception());
1322  }
1323  }) | then([this, status, &es](auto nextTask) {
1324  if (status->stopBeforeProcessingRun()) {
1325  return;
1326  }
1327  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1328  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1329  beginGlobalTransitionAsync<Traits>(
1330  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1331  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1332  if (status->stopBeforeProcessingRun()) {
1333  return;
1334  }
1335  looper_->prefetchAsync(
1336  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1337  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1338  if (status->stopBeforeProcessingRun()) {
1339  return;
1340  }
1341  ServiceRegistry::Operate operateLooper(serviceToken_);
1342  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1343  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1344  if (iException) {
1345  WaitingTaskHolder copyHolder(holder);
1346  copyHolder.doneWaiting(*iException);
1347  } else {
1348  status->globalBeginDidSucceed();
1349  }
1350 
1351  if (status->stopBeforeProcessingRun()) {
1352  // We just quit now if there was a failure when merging runs
1353  status->resetBeginResources();
1355  status->resumeGlobalRunQueue();
1356  return;
1357  }
1358  CMS_SA_ALLOW try {
1359  // Under normal circumstances, this task runs after endRun has completed for all streams
1360  // and global endLumi has completed for all lumis contained in this run
1361  auto globalEndRunTask =
1362  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1363  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1364  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1366  });
1367  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1368  } catch (...) {
1369  status->resetBeginResources();
1371  status->resumeGlobalRunQueue();
1372  holder.doneWaiting(std::current_exception());
1373  return;
1374  }
1375 
1376  // After this point we are committed to end the run via endRunAsync
1377 
1379 
1380  // The only purpose of the pause is to cause stream begin run to execute before
1381  // global begin lumi in the single threaded case (maintains consistency with
1382  // the order that existed before concurrent runs were implemented).
1383  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1384 
1385  CMS_SA_ALLOW try {
1386  streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1387  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1388  CMS_SA_ALLOW try {
1389  streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1391  });
1392  } catch (...) {
1393  if (status->streamFinishedBeginRun()) {
1394  WaitingTaskHolder copyHolder(holder);
1395  copyHolder.doneWaiting(std::current_exception());
1396  status->resetBeginResources();
1399  }
1400  }
1401  }
1402  });
1403  } catch (...) {
1404  WaitingTaskHolder copyHolder(holder);
1405  copyHolder.doneWaiting(std::current_exception());
1406  status->resetBeginResources();
1409  }
1411  }) | runLast(postSourceTask);
1412  } catch (...) {
1413  status->resetBeginResources();
1415  status->resumeGlobalRunQueue();
1416  postSourceTask.doneWaiting(std::current_exception());
1417  }
1418  }); // task in sourceResourcesAcquirer
1419  } catch (...) {
1420  status->resetBeginResources();
1422  status->resumeGlobalRunQueue();
1423  postRunQueueTask.doneWaiting(std::current_exception());
1424  }
1425  }); // task in runQueue
1426  } catch (...) {
1427  status->resetBeginResources();
1429  nextTask.doneWaiting(std::current_exception());
1430  }
1431  }) | chain::runLast(std::move(iHolder));
1432  }
ProcessContext processContext_
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
std::shared_ptr< RunPrincipal > readRun()
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginStreams()

void edm::EventProcessor::beginStreams ( )

Definition at line 769 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::first(), watchdog::group, mps_fire::i, edm::waiting_task::chain::lastTask(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, and subProcesses_.

Referenced by beginJob().

769  {
770  // This will process streams concurrently, but not modules in the
771  // same stream or SubProcesses.
772  oneapi::tbb::task_group group;
773  FinalWaitingTask finalWaitingTask{group};
774  using namespace edm::waiting_task::chain;
775  {
776  WaitingTaskHolder taskHolder(group, &finalWaitingTask);
777  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
778  first([this, i](auto nextTask) {
779  std::exception_ptr exceptionPtr;
780  {
782  CMS_SA_ALLOW try { schedule_->beginStream(i); } catch (...) {
783  exceptionPtr = std::current_exception();
784  }
785  for (auto& subProcess : subProcesses_) {
786  CMS_SA_ALLOW try { subProcess.doBeginStream(i); } catch (...) {
787  if (!exceptionPtr) {
788  exceptionPtr = std::current_exception();
789  }
790  }
791  }
792  }
793  nextTask.doneWaiting(exceptionPtr);
794  }) | lastTask(taskHolder);
795  }
796  }
797  finalWaitingTask.wait();
798  }
#define CMS_SA_ALLOW
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
T first(std::pair< T, U > const &p)

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 286 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

286  {
288  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 289 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 871 of file EventProcessor.cc.

References epSignal, edm::JobReport::reportShutdownSignal(), runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

871  {
872  bool returnValue = false;
873 
874  // Look for a shutdown signal
875  if (shutdown_flag.load(std::memory_order_acquire)) {
876  returnValue = true;
877  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
879  jr->reportShutdownSignal();
881  }
882  return returnValue;
883  }
Log< level::System, false > LogSystem
volatile std::atomic< bool > shutdown_flag
void reportShutdownSignal()
Definition: JobReport.cc:543

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 865 of file EventProcessor.cc.

References schedule_.

865 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ clearLumiPrincipal()

void edm::EventProcessor::clearLumiPrincipal ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2116 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

2116  {
2117  for (auto& s : subProcesses_) {
2118  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2119  }
2120  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2121  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2122  iStatus.lumiPrincipal()->clearPrincipal();
2123  }
std::vector< SubProcess > subProcesses_

◆ clearRunPrincipal()

void edm::EventProcessor::clearRunPrincipal ( RunProcessingStatus iStatus)

Definition at line 2091 of file EventProcessor.cc.

References edm::RunPrincipal::kUninitialized, edm::RunProcessingStatus::runPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndRunAsync().

2091  {
2092  for (auto& s : subProcesses_) {
2093  s.clearRunPrincipal(*iStatus.runPrincipal());
2094  }
2095  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2096  iStatus.runPrincipal()->clearPrincipal();
2097  }
std::vector< SubProcess > subProcesses_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 1031 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

1031  {
1032  if (fileBlockValid()) {
1033  SendSourceTerminationSignalIfException sentry(actReg_.get());
1034  input_->closeFile(fb_.get(), cleaningUpAfterException);
1035  sentry.completedSuccessfully();
1036  }
1037  FDEBUG(1) << "\tcloseInputFile\n";
1038  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 1048 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

1048  {
1049  schedule_->closeOutputFiles();
1050  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1051  processBlockHelper_->clearAfterOutputFilesClose();
1052  FDEBUG(1) << "\tcloseOutputFiles\n";
1053  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1840 of file EventProcessor.cc.

References dqmdumpme::first, h, handleNextEventForStreamAsync(), input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kProcessing, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeLumi(), edm::waiting_task::chain::runLast(), mps_update::status, streamLumiStatus_, and edm::waiting_task::chain::then().

Referenced by processRuns().

1840  {
1841  chain::first([this](auto nextTask) {
1842  //all streams are sharing the same status at the moment
1843  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1845 
1847  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1850  }
1851  }) | chain::then([this](auto nextTask) mutable {
1852  unsigned int streamIndex = 0;
1853  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1854  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1855  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1856  }
1857  nextTask.group()->run(
1858  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1859  }) | chain::runLast(std::move(iHolder));
1860  }
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
InputSource::ItemTypeInfo lastTransitionType() const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1122 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1122  {
1123  FDEBUG(1) << "\tdoErrorStuff\n";
1124  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1125  << "and went to the error state\n"
1126  << "Will attempt to terminate processing normally\n"
1127  << "(IF using the looper the next loop will be attempted)\n"
1128  << "This likely indicates a bug in an input module or corrupted input or both\n";
1129  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 825 of file EventProcessor.cc.

References beginJobStartedModules_, beginJobSucceeded_, DummyCfis::c, edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), endStreams(), input_, looper(), looper_, schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

825  {
826  // Collects exceptions, so we don't throw before all operations are performed.
827  ExceptionCollector c(
828  "Multiple exceptions were thrown while executing endStream and endJob. An exception message follows for "
829  "each.\n");
830 
831  //make the services available
833 
834  if (beginJobSucceeded_) {
835  endStreams(c);
836  }
837 
839  schedule_->endJob(c);
840  for (auto& subProcess : subProcesses_) {
841  subProcess.doEndJob(c);
842  }
843  c.call(std::bind(&InputSource::doEndJob, input_.get()));
844  if (looper_) {
845  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
846  }
847  if (c.hasThrown()) {
848  c.rethrow();
849  }
850  }
851  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:210
std::shared_ptr< EDLooperBase const > looper() const
void endStreams(ExceptionCollector &) noexcept
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1083 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1083  {
1084  if (looper_) {
1085  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1086  looper_->setModuleChanger(&changer);
1087  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1088  looper_->setModuleChanger(nullptr);
1090  return true;
1091  else
1092  return false;
1093  }
1094  FDEBUG(1) << "\tendOfLoop\n";
1095  return true;
1096  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1172 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1172  {
1173  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1174 
1175  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1176  FinalWaitingTask globalWaitTask{taskGroup_};
1177 
1178  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1179  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1180  *schedule_,
1181  transitionInfo,
1182  serviceToken_,
1183  subProcesses_,
1184  cleaningUpAfterException);
1185  globalWaitTask.wait();
1186 
1187  if (beginProcessBlockSucceeded) {
1188  FinalWaitingTask writeWaitTask{taskGroup_};
1190  writeWaitTask.wait();
1191  }
1192 
1193  processBlockPrincipal.clearPrincipal();
1194  for (auto& s : subProcesses_) {
1195  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1196  }
1197  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRunAsync()

void edm::EventProcessor::endRunAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)

Definition at line 1474 of file EventProcessor.cc.

References actReg_, beginRunAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::RunPrincipal::endTime(), espController_, dqmdumpme::first, handleEndRunExceptions(), mps_fire::i, input_, edm::InputSource::IsRun, lastTransitionType(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), serviceToken_, edm::RunPrincipal::setEndTime(), streamEndRunAsync(), streamQueues_, streamQueuesInserter_, and edm::waiting_task::chain::then().

Referenced by beginLumiAsync(), endUnfinishedRun(), handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1474  {
1475  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1476  iRunStatus->setEndTime();
1477  IOVSyncValue ts(
1478  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1479  runPrincipal.endTime());
1480  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1481  WaitingTaskHolder copyHolder(iHolder);
1482  copyHolder.doneWaiting(std::current_exception());
1483  }
1484 
1485  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1486  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1487  nextTask,
1488  iRunStatus->endIOVWaitingTasksEndRun(),
1489  iRunStatus->eventSetupImplsEndRun(),
1491  actReg_.get(),
1492  serviceToken_);
1493  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1494  if (iException) {
1495  iRunStatus->setEndingEventSetupSucceeded(false);
1496  handleEndRunExceptions(*iException, nextTask);
1497  }
1499  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1500  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1501  CMS_SA_ALLOW try {
1502  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1503  streamQueues_[i].pause();
1504  streamEndRunAsync(std::move(nextTask), i);
1505  });
1506  } catch (...) {
1507  WaitingTaskHolder copyHolder(nextTask);
1508  copyHolder.doneWaiting(std::current_exception());
1509  }
1510  }
1511  });
1512 
1514  CMS_SA_ALLOW try {
1515  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1516  } catch (...) {
1517  WaitingTaskHolder copyHolder(nextTask);
1518  copyHolder.doneWaiting(std::current_exception());
1519  }
1520  }
1521  }) | chain::runLast(std::move(iHolder));
1522  }
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
InputSource::ItemTypeInfo lastTransitionType() const
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< ActivityRegistry > actReg_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ endStreams()

void edm::EventProcessor::endStreams ( ExceptionCollector collector)
noexcept

Definition at line 800 of file EventProcessor.cc.

References edm::first(), watchdog::group, mps_fire::i, edm::waiting_task::chain::lastTask(), mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, and subProcesses_.

Referenced by endJob().

800  {
801  std::mutex collectorMutex;
802 
803  // This will process streams concurrently, but not modules in the
804  // same stream or SubProcesses.
805  oneapi::tbb::task_group group;
806  FinalWaitingTask finalWaitingTask{group};
807  using namespace edm::waiting_task::chain;
808  {
809  WaitingTaskHolder taskHolder(group, &finalWaitingTask);
810  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
811  first([this, i, &collector, &collectorMutex](auto nextTask) {
812  {
814  schedule_->endStream(i, collector, collectorMutex);
815  for (auto& subProcess : subProcesses_) {
816  subProcess.doEndStream(i, collector, collectorMutex);
817  }
818  }
819  }) | lastTask(taskHolder);
820  }
821  }
822  finalWaitingTask.waitNoThrow();
823  }
static std::mutex mutex
Definition: Proxy.cc:8
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
T first(std::pair< T, U > const &p)

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( bool  cleaningUpAfterException)

Definition at line 1981 of file EventProcessor.cc.

References cms::cuda::assert(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamRunActive_, and taskGroup_.

1981  {
1982  if (streamRunActive_ == 0) {
1983  assert(streamLumiActive_ == 0);
1984  } else {
1986  if (streamLumiActive_ > 0) {
1987  FinalWaitingTask globalWaitTask{taskGroup_};
1989  streamLumiStatus_[0]->noMoreEventsInLumi();
1990  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1991  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1992  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1993  }
1994  globalWaitTask.wait();
1995  }
1996  }
1997  }
assert(be >=bs)
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::atomic< unsigned int > streamLumiActive_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( bool  cleaningUpAfterException)

Definition at line 1665 of file EventProcessor.cc.

References endRunAsync(), edm::InputSource::IsStop, lastSourceTransition_, eostools::move(), streamRunActive_, streamRunStatus_, and taskGroup_.

1665  {
1666  if (streamRunActive_ > 0) {
1667  FinalWaitingTask waitTask{taskGroup_};
1668 
1669  auto runStatus = streamRunStatus_[0].get();
1670  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1671  WaitingTaskHolder holder{taskGroup_, &waitTask};
1672  runStatus->setHolderOfTaskInProcessRuns(holder);
1674  endRunAsync(streamRunStatus_[0], std::move(holder));
1675  waitTask.wait();
1676  }
1677  }
InputSource::ItemTypeInfo lastSourceTransition_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
oneapi::tbb::task_group taskGroup_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::atomic< unsigned int > streamRunActive_
def move(src, dest)
Definition: eostools.py:511

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 200 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

200 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 855 of file EventProcessor.cc.

References schedule_.

855  {
856  return schedule_->getAllModuleDescriptions();
857  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 853 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

853 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1871 of file EventProcessor.cc.

References clearLumiPrincipal(), CMS_SA_ALLOW, edm::EndLuminosityBlock, esp_, dqmdumpme::first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by beginLumiAsync(), and streamEndLumiAsync().

1872  {
1873  // Get some needed info out of the status object before moving
1874  // it into finalTaskForThisLumi.
1875  auto& lp = *(iLumiStatus->lumiPrincipal());
1876  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1877  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1878  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1879  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1880 
1881  using namespace edm::waiting_task::chain;
1882  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1883  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1884 
1885  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1886  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1887  endGlobalTransitionAsync<Traits>(
1888  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1889  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1890  //Only call writeLumi if beginLumi succeeded
1891  if (didGlobalBeginSucceed) {
1892  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1893  }
1894  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1895  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1896  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1897  //any thrown exception auto propagates to nextTask via the chain
1899  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1900  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1901  if (iException) {
1902  handleEndLumiExceptions(*iException, nextTask);
1903  }
1905 
1906  std::exception_ptr ptr;
1907 
1908  // Try hard to clean up resources so the
1909  // process can terminate in a controlled
1910  // fashion even after exceptions have occurred.
1911  // Caught exception is passed to handleEndLumiExceptions()
1912  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1913  if (not ptr) {
1914  ptr = std::current_exception();
1915  }
1916  }
1917  // Caught exception is passed to handleEndLumiExceptions()
1918  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1919  if (not ptr) {
1920  ptr = std::current_exception();
1921  }
1922  }
1923  // Caught exception is passed to handleEndLumiExceptions()
1924  CMS_SA_ALLOW try {
1925  status->resetResources();
1926  status->globalEndRunHolderDoneWaiting();
1927  status.reset();
1928  } catch (...) {
1929  if (not ptr) {
1930  ptr = std::current_exception();
1931  }
1932  }
1933 
1934  if (ptr && !iException) {
1935  handleEndLumiExceptions(ptr, nextTask);
1936  }
1937  }) | runLast(std::move(iTask));
1938  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ globalEndRunAsync()

void edm::EventProcessor::globalEndRunAsync ( WaitingTaskHolder  iTask,
std::shared_ptr< RunProcessingStatus iRunStatus 
)

Definition at line 1533 of file EventProcessor.cc.

References clearRunPrincipal(), CMS_SA_ALLOW, edm::EndRun, esp_, dqmdumpme::first, handleEndRunExceptions(), edm::waiting_task::chain::ifThen(), looper_, eostools::move(), edm::MergeableRunProductMetadata::preWriteRun(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeRunAsync().

Referenced by beginRunAsync().

1533  {
1534  auto& runPrincipal = *(iRunStatus->runPrincipal());
1535  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1536  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1537  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1538  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1539  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1540 
1541  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1542  using namespace edm::waiting_task::chain;
1543  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1544  auto nextTask) {
1545  if (endingEventSetupSucceeded) {
1546  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1547  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1548  endGlobalTransitionAsync<Traits>(
1549  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1550  }
1551  }) |
1552  ifThen(looper_ && endingEventSetupSucceeded,
1553  [this, &runPrincipal, &es](auto nextTask) {
1554  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1555  }) |
1556  ifThen(looper_ && endingEventSetupSucceeded,
1557  [this, &runPrincipal, &es](auto nextTask) {
1559  looper_->doEndRun(runPrincipal, es, &processContext_);
1560  }) |
1561  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1562  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1563  mergeableRunProductMetadata->preWriteRun();
1564  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1565  }) |
1566  then([status = std::move(iRunStatus),
1567  this,
1568  didGlobalBeginSucceed,
1569  mergeableRunProductMetadata,
1570  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1571  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1572  mergeableRunProductMetadata->postWriteRun();
1573  }
1574  if (iException) {
1575  handleEndRunExceptions(*iException, nextTask);
1576  }
1578 
1579  std::exception_ptr ptr;
1580 
1581  // Try hard to clean up resources so the
1582  // process can terminate in a controlled
1583  // fashion even after exceptions have occurred.
1584  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1585  if (not ptr) {
1586  ptr = std::current_exception();
1587  }
1588  }
1589  CMS_SA_ALLOW try {
1590  status->resumeGlobalRunQueue();
1592  } catch (...) {
1593  if (not ptr) {
1594  ptr = std::current_exception();
1595  }
1596  }
1597  CMS_SA_ALLOW try {
1598  status->resetEndResources();
1599  status.reset();
1600  } catch (...) {
1601  if (not ptr) {
1602  ptr = std::current_exception();
1603  }
1604  }
1605 
1606  if (ptr && !iException) {
1607  handleEndRunExceptions(ptr, nextTask);
1608  }
1609  }) |
1610  runLast(std::move(iTask));
1611  }
ProcessContext processContext_
void clearRunPrincipal(RunProcessingStatus &)
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1862 of file EventProcessor.cc.

References setExceptionMessageLumis(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1862  {
1863  if (holder.taskHasFailed()) {
1865  } else {
1866  WaitingTaskHolder tmp(holder);
1867  tmp.doneWaiting(iException);
1868  }
1869  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleEndRunExceptions()

void edm::EventProcessor::handleEndRunExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1524 of file EventProcessor.cc.

References setExceptionMessageRuns(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by endRunAsync(), globalEndRunAsync(), and streamEndRunAsync().

1524  {
1525  if (holder.taskHasFailed()) {
1527  } else {
1528  WaitingTaskHolder tmp(holder);
1529  tmp.doneWaiting(iException);
1530  }
1531  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 2287 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kStopLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiStatus_, streamRunStatus_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginLumiAsync(), and continueLumiAsync().

2287  {
2288  if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2289  streamEndLumiAsync(iTask, iStreamIndex);
2290  return;
2291  }
2292  auto group = iTask.group();
2293  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2294  CMS_SA_ALLOW try {
2295  auto status = streamLumiStatus_[iStreamIndex].get();
2297 
2298  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2299  auto recursionTask =
2300  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2301  if (iEventException) {
2302  WaitingTaskHolder copyHolder(iTask);
2303  copyHolder.doneWaiting(*iEventException);
2304  // Intentionally, we don't return here. The recursive call to
2305  // handleNextEvent takes care of immediately ending the run properly
2306  // using the same code it uses to end the run in other situations.
2307  }
2308  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2309  });
2310 
2311  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2312  } else {
2313  // the stream will stop processing this lumi now
2315  if (not status->haveStartedNextLumiOrEndedRun()) {
2316  status->noMoreEventsInLumi();
2317  status->startNextLumiOrEndRun();
2318  if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
2319  CMS_SA_ALLOW try {
2320  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2321  input_->luminosityBlockAuxiliary()->beginTime()),
2322  streamRunStatus_[iStreamIndex],
2323  iTask);
2324  } catch (...) {
2325  WaitingTaskHolder copyHolder(iTask);
2326  copyHolder.doneWaiting(std::current_exception());
2327  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2328  }
2329  } else {
2330  // If appropriate, this will also start the next run.
2331  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2332  }
2333  }
2334  streamEndLumiAsync(iTask, iStreamIndex);
2335  } else {
2336  assert(status->eventProcessingState() ==
2338  auto runStatus = streamRunStatus_[iStreamIndex].get();
2339 
2340  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2341  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2342  }
2343  }
2344  }
2345  } catch (...) {
2346  WaitingTaskHolder copyHolder(iTask);
2347  copyHolder.doneWaiting(std::current_exception());
2348  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2349  }
2350  });
2351  }
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ handleNextItemAfterMergingRunEntries()

void edm::EventProcessor::handleNextItemAfterMergingRunEntries ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2187 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), dqmdumpme::first, input_, edm::InputSource::IsFile, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), needToCallNext(), nextTransitionTypeAsync(), edm::waiting_task::chain::runLast(), serviceToken_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync(), and processRuns().

2188  {
2189  chain::first([this, iRunStatus](auto nextTask) mutable {
2190  if (needToCallNext()) {
2191  nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2192  }
2193  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2195  if (iException) {
2196  WaitingTaskHolder copyHolder(nextTask);
2197  copyHolder.doneWaiting(*iException);
2198  }
2200  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2201  return;
2202  }
2203  if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2204  CMS_SA_ALLOW try {
2205  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2206  input_->luminosityBlockAuxiliary()->beginTime()),
2207  iRunStatus,
2208  nextTask);
2209  return;
2210  } catch (...) {
2211  WaitingTaskHolder copyHolder(nextTask);
2212  copyHolder.doneWaiting(std::current_exception());
2213  }
2214  }
2215  // Note that endRunAsync will call beginRunAsync for the following run
2216  // if appropriate.
2217  endRunAsync(iRunStatus, std::move(nextTask));
2218  }) | chain::runLast(std::move(iHolder));
2219  }
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool needToCallNext() const
constexpr auto then(O &&iO)
Definition: chain_first.h:277
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 325 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, testHGCalDigi_cfg::dumpOptions, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, DiMuonV_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ServiceRegistry::get(), edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ModuleDescription::getUniqueID(), edm::ParameterSet::getUntrackedParameterSet(), watchdog::group, historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::ServiceRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, modulesToIgnoreForDeleteEarly_, moduleTypeResolverMaker_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, muonDTDigis_cfi::pset, referencesToBranches_, edm::ParameterSet::registerIt(), runQueue_, schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, streamRunStatus_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

327  {
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
334  ParameterSet().registerIt();
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
438  ScheduleItems items;
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
495  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
519  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
585  SubProcessParentageHelper(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
std::multimap< std::string, std::string > referencesToBranches_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::string > modulesToIgnoreForDeleteEarly_
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
static ServiceRegistry & instance()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:837
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1146 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1146  {
1147  input_->fillProcessBlockHelper();
1148  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1149  while (input_->nextProcessBlock(processBlockPrincipal)) {
1150  readProcessBlock(processBlockPrincipal);
1151 
1152  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1153  FinalWaitingTask globalWaitTask{taskGroup_};
1154 
1155  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1156  beginGlobalTransitionAsync<Traits>(
1157  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1158 
1159  globalWaitTask.wait();
1160 
1161  FinalWaitingTask writeWaitTask{taskGroup_};
1163  writeWaitTask.wait();
1164 
1165  processBlockPrincipal.clearPrincipal();
1166  for (auto& s : subProcesses_) {
1167  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1168  }
1169  }
1170  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemTypeInfo edm::EventProcessor::lastTransitionType ( ) const
inline

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 296 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

296 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 297 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

297 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ needToCallNext()

bool edm::EventProcessor::needToCallNext ( ) const
inlineprivate

Definition at line 302 of file EventProcessor.h.

References needToCallNext_.

Referenced by handleNextItemAfterMergingRunEntries(), and readNextEventForStream().

302 { return needToCallNext_; }

◆ nextTransitionType()

InputSource::ItemTypeInfo edm::EventProcessor::nextTransitionType ( )

Definition at line 893 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by continueLumiAsync(), nextTransitionTypeAsync(), processRuns(), readAndMergeLumiEntriesAsync(), readAndMergeRunEntriesAsync(), and readNextEventForStream().

893  {
894  SendSourceTerminationSignalIfException sentry(actReg_.get());
895  InputSource::ItemTypeInfo itemTypeInfo;
896  {
897  SourceNextGuard guard(*actReg_.get());
898  //For now, do nothing with InputSource::IsSynchronize
899  do {
900  itemTypeInfo = input_->nextItemType();
901  } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
902  }
903  lastSourceTransition_ = itemTypeInfo;
904  sentry.completedSuccessfully();
905 
907 
909  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
911  }
912 
913  return lastSourceTransition_;
914  }
InputSource::ItemTypeInfo lastSourceTransition_
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::shared_ptr< ActivityRegistry > actReg_

◆ nextTransitionTypeAsync()

void edm::EventProcessor::nextTransitionTypeAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  nextTask 
)

Definition at line 916 of file EventProcessor.cc.

References CMS_SA_ALLOW, Exception, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, lastTransitionType(), edm::errors::LogicError, eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, and sourceResourcesAcquirer_.

Referenced by handleNextItemAfterMergingRunEntries().

917  {
918  auto group = nextTask.group();
920  *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
921  CMS_SA_ALLOW try {
923  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
926  runStatus->runPrincipal()->run() == input_->run() &&
927  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
929  << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
930  << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
931  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
932  }
933  } catch (...) {
934  nextHolder.doneWaiting(std::current_exception());
935  }
936  });
937  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 1040 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1040  {
1041  if (fileBlockValid()) {
1042  schedule_->openOutputFiles(*fb_);
1043  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1044  }
1045  FDEBUG(1) << "\topenOutputFiles\n";
1046  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 284 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), and readFile().

284 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

285 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1104 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1104  {
1105  looper_->prepareForNextLoop(esp_.get());
1106  FDEBUG(1) << "\tprepareForNextLoop\n";
1107  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 150 of file EventProcessor.h.

References processConfiguration_.

150 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2368 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

2368  {
2369  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2370  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2384 of file EventProcessor.cc.

References actReg_, esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, dqmdumpme::first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), edm::StreamContext::kEvent, looper_, eostools::move(), principalCache_, processContext_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

2384  {
2385  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2386 
2388  Service<RandomNumberGenerator> rng;
2389  if (rng.isAvailable()) {
2390  Event ev(*pep, ModuleDescription(), nullptr);
2391  rng->postEventRead(ev);
2392  }
2393 
2394  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2395  using namespace edm::waiting_task::chain;
2396  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2397  EventTransitionInfo info(*pep, es);
2398  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2399  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2400  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2401  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2402  }
2403  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2404  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2405  ServiceRegistry::Operate operateLooper(serviceToken_);
2406  processEventWithLooper(*pep, iStreamIndex);
2407  }) | then([this, pep](auto nextTask) {
2408  FDEBUG(1) << "\tprocessEvent\n";
2409  StreamContext streamContext(pep->streamID(),
2411  pep->id(),
2412  pep->runPrincipal().index(),
2413  pep->luminosityBlockPrincipal().index(),
2414  pep->time(),
2415  &processContext_);
2416  ClearEventGuard guard(*this->actReg_.get(), streamContext);
2417  pep->clearEventPrincipal();
2418  }) | runLast(iHolder);
2419  }
ProcessContext processContext_
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 2421 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

2421  {
2422  bool randomAccess = input_->randomAccess();
2423  ProcessingController::ForwardState forwardState = input_->forwardState();
2424  ProcessingController::ReverseState reverseState = input_->reverseState();
2425  ProcessingController pc(forwardState, reverseState, randomAccess);
2426 
2428  do {
2429  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2430  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2431  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2432 
2433  bool succeeded = true;
2434  if (randomAccess) {
2435  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2436  input_->skipEvents(-2);
2437  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2438  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2439  }
2440  }
2441  pc.setLastOperationSucceeded(succeeded);
2442  } while (!pc.lastOperationSucceeded());
2444  shouldWeStop_ = true;
2445  }
2446  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processRuns()

InputSource::ItemType edm::EventProcessor::processRuns ( )

Definition at line 1199 of file EventProcessor.cc.

References cms::cuda::assert(), beginRunAsync(), continueLumiAsync(), handleNextItemAfterMergingRunEntries(), input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeRun(), setNeedToCallNext(), streamLumiActive_, streamRunActive_, streamRunStatus_, and taskGroup_.

1199  {
1200  FinalWaitingTask waitTask{taskGroup_};
1202  if (streamRunActive_ == 0) {
1203  assert(streamLumiActive_ == 0);
1204 
1205  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1206  WaitingTaskHolder{taskGroup_, &waitTask});
1207  } else {
1209 
1210  auto runStatus = streamRunStatus_[0];
1211 
1213  runStatus->runPrincipal()->run() == input_->run() and
1214  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1215  readAndMergeRun(*runStatus);
1217  }
1218 
1219  setNeedToCallNext(false);
1220 
1221  WaitingTaskHolder holder{taskGroup_, &waitTask};
1222  runStatus->setHolderOfTaskInProcessRuns(holder);
1223  if (streamLumiActive_ > 0) {
1225  continueLumiAsync(std::move(holder));
1226  } else {
1228  }
1229  }
1230  waitTask.wait();
1231  return lastTransitionType();
1232  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
assert(be >=bs)
PreallocationConfiguration preallocations_
void setNeedToCallNext(bool val)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void readAndMergeRun(RunProcessingStatus &)
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
oneapi::tbb::task_group taskGroup_
std::atomic< unsigned int > streamRunActive_
void continueLumiAsync(WaitingTaskHolder)
std::atomic< unsigned int > streamLumiActive_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ readAndMergeLumi()

void edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2044 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by continueLumiAsync(), and readAndMergeLumiEntriesAsync().

2044  {
2045  auto& lumiPrincipal = *iStatus.lumiPrincipal();
2046  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2047  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2048  input_->processHistoryRegistry().reducedProcessHistoryID(
2049  input_->luminosityBlockAuxiliary()->processHistoryID()));
2050  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2051  assert(lumiOK);
2052  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2053  {
2054  SendSourceTerminationSignalIfException sentry(actReg_.get());
2055  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2056  sentry.completedSuccessfully();
2057  }
2058  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeLumiEntriesAsync()

void edm::EventProcessor::readAndMergeLumiEntriesAsync ( std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2159 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::InputSource::LastItemToBeMerged, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeLumi(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceMutex_, and sourceResourcesAcquirer_.

Referenced by beginLumiAsync().

2160  {
2161  auto group = iHolder.group();
2163  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2164  CMS_SA_ALLOW try {
2166 
2167  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2168 
2170  setNeedToCallNext(false);
2171 
2173  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2174  readAndMergeLumi(*iLumiStatus);
2176  setNeedToCallNext(true);
2177  return;
2178  }
2180  }
2181  } catch (...) {
2182  holder.doneWaiting(std::current_exception());
2183  }
2184  });
2185  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readAndMergeRun()

void edm::EventProcessor::readAndMergeRun ( RunProcessingStatus iStatus)

Definition at line 2018 of file EventProcessor.cc.

References actReg_, edm::Principal::adjustToNewProductRegistry(), cms::cuda::assert(), input_, edm::RunPrincipal::mergeAuxiliary(), preg_, and edm::RunProcessingStatus::runPrincipal().

Referenced by processRuns(), and readAndMergeRunEntriesAsync().

2018  {
2019  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
2020 
2021  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
2022  assert(runOK);
2023  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2024  {
2025  SendSourceTerminationSignalIfException sentry(actReg_.get());
2026  input_->readAndMergeRun(runPrincipal);
2027  sentry.completedSuccessfully();
2028  }
2029  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRunEntriesAsync()

void edm::EventProcessor::readAndMergeRunEntriesAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2125 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, edm::InputSource::LastItemToBeMerged, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeRun(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceMutex_, sourceResourcesAcquirer_, and mps_update::status.

Referenced by beginRunAsync().

2126  {
2127  auto group = iHolder.group();
2129  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2130  CMS_SA_ALLOW try {
2132 
2133  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2134 
2136  setNeedToCallNext(false);
2137 
2139  status->runPrincipal()->run() == input_->run() and
2140  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2141  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2142  status->setStopBeforeProcessingRun(true);
2143  return;
2144  }
2147  setNeedToCallNext(true);
2148  return;
2149  }
2151  }
2152  } catch (...) {
2153  status->setStopBeforeProcessingRun(true);
2154  holder.doneWaiting(std::current_exception());
2155  }
2156  });
2157  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeRun(RunProcessingStatus &)
InputSource::ItemTypeInfo lastTransitionType() const
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2353 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, streamLumiStatus_, and streamRunStatus_.

Referenced by readNextEventForStream().

2353  {
2354  //TODO this will have to become per stream
2355  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2356  StreamContext streamContext(event.streamID(), &processContext_);
2357 
2358  SendSourceTerminationSignalIfException sentry(actReg_.get());
2359  input_->readEvent(event, streamContext);
2360 
2361  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2362  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2363  sentry.completedSuccessfully();
2364 
2365  FDEBUG(1) << "\treadEvent\n";
2366  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 1006 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, streamLumiActive_, streamLumiStatus_, streamRunActive_, and streamRunStatus_.

1006  {
1007  FDEBUG(1) << " \treadFile\n";
1008  size_t size = preg_->size();
1009  SendSourceTerminationSignalIfException sentry(actReg_.get());
1010 
1011  if (streamRunActive_ > 0) {
1012  streamRunStatus_[0]->runPrincipal()->preReadFile();
1013  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
1014  }
1015 
1016  if (streamLumiActive_ > 0) {
1017  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
1018  }
1019 
1020  fb_ = input_->readFile();
1021  if (size < preg_->size()) {
1023  }
1026  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1027  }
1028  sentry.completedSuccessfully();
1029  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
PrincipalCache principalCache_

◆ readLuminosityBlock()

std::shared_ptr< LuminosityBlockPrincipal > edm::EventProcessor::readLuminosityBlock ( std::shared_ptr< RunPrincipal rp)

Definition at line 2031 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableLumiPrincipalPtr(), historyAppender_, input_, eostools::move(), and principalCache_.

Referenced by beginLumiAsync().

2031  {
2033  assert(lbp);
2034  lbp->setAux(*input_->luminosityBlockAuxiliary());
2035  {
2036  SendSourceTerminationSignalIfException sentry(actReg_.get());
2037  input_->readLuminosityBlock(*lbp, *historyAppender_);
2038  sentry.completedSuccessfully();
2039  }
2040  lbp->setRunPrincipal(std::move(rp));
2041  return lbp;
2042  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(be >=bs)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( WaitingTaskHolder const &  iTask,
unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iStatus 
)
private

Definition at line 2221 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::eventProcessingState(), Exception, input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kProcessing, edm::LuminosityBlockProcessingStatus::kStopLumi, lastSourceTransition_, lastTransitionType(), edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), needToCallNext(), nextTransitionType(), or, readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setEventProcessingState(), setNeedToCallNext(), shouldWeStop(), sourceMutex_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync().

2223  {
2224  // This function returns true if it successfully reads an event for the stream and that
2225  // requires both that an event is next and there are no problems or requests to stop.
2226 
2227  if (iTask.taskHasFailed()) {
2228  // We want all streams to stop or all streams to pause. If we are already in the
2229  // middle of pausing streams, then finish pausing all of them and the lumi will be
2230  // ended later. Otherwise, just end it now.
2231  if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2233  }
2234  return false;
2235  }
2236 
2237  // Did another stream already stop or pause this lumi?
2238  if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2239  return false;
2240  }
2241 
2242  // Are output modules or the looper requesting we stop?
2243  if (shouldWeStop()) {
2246  return false;
2247  }
2248 
2250 
2251  // need to use lock in addition to the serial task queue because
2252  // of delayed provenance reading and reading data in response to
2253  // edm::Refs etc
2254  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2255 
2256  // If we didn't already call nextTransitionType while merging lumis, call it here.
2257  // This asks the input source what is next and also checks for signals.
2258 
2260  setNeedToCallNext(true);
2261 
2262  if (InputSource::ItemType::IsEvent != itemType) {
2263  // IsFile may continue processing the lumi and
2264  // looper_ can cause the input source to declare a new IsRun which is actually
2265  // just a continuation of the previous run
2267  (InputSource::ItemType::IsRun == itemType and
2268  (iStatus.lumiPrincipal()->run() != input_->run() or
2269  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2270  if (itemType == InputSource::ItemType::IsLumi &&
2271  iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2273  << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2274  << "but the next lumi entry has the same lumi number.\n"
2275  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2276  }
2278  } else {
2280  }
2281  return false;
2282  }
2283  readEvent(iStreamIndex);
2284  return true;
2285  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemTypeInfo lastSourceTransition_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
bool needToCallNext() const
ServiceToken serviceToken_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
bool shouldWeStop() const

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1999 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1999  {
2000  SendSourceTerminationSignalIfException sentry(actReg_.get());
2001  input_->readProcessBlock(processBlockPrincipal);
2002  sentry.completedSuccessfully();
2003  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::shared_ptr< RunPrincipal > edm::EventProcessor::readRun ( )

Definition at line 2005 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableRunPrincipalPtr(), historyAppender_, input_, and principalCache_.

Referenced by beginRunAsync().

2005  {
2007  assert(rp);
2008  rp->setAux(*input_->runAuxiliary());
2009  {
2010  SendSourceTerminationSignalIfException sentry(actReg_.get());
2011  input_->readRun(*rp, *historyAppender_);
2012  sentry.completedSuccessfully();
2013  }
2014  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2015  return rp;
2016  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ releaseBeginRunResources()

void edm::EventProcessor::releaseBeginRunResources ( unsigned int  iStream)

Definition at line 1465 of file EventProcessor.cc.

References queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), mps_update::status, streamQueues_, and streamRunStatus_.

Referenced by streamBeginRunAsync().

1465  {
1466  auto& status = streamRunStatus_[iStream];
1467  if (status->streamFinishedBeginRun()) {
1468  status->resetBeginResources();
1470  }
1471  streamQueues_[iStream].resume();
1472  }
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 1065 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1065  {
1066  if (fileBlockValid()) {
1067  schedule_->respondToCloseInputFile(*fb_);
1068  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1069  }
1070  FDEBUG(1) << "\trespondToCloseInputFile\n";
1071  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 1055 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1055  {
1056  if (fileBlockValid()) {
1058  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1059  schedule_->respondToOpenInputFile(*fb_);
1060  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1061  }
1062  FDEBUG(1) << "\trespondToOpenInputFile\n";
1063  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1098 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1098  {
1099  input_->repeat();
1100  input_->rewind();
1101  FDEBUG(1) << "\trewind\n";
1102  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 387 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

387 { return runToCompletion(); }
StatusCode runToCompletion()

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 939 of file EventProcessor.cc.

References actReg_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

939  {
940  beginJob(); //make sure this was called
941 
942  // make the services available
944  actReg_->beginProcessingSignal_();
945  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
946  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
947  try {
948  FilesProcessor fp(fileModeNoMerge_);
949 
950  convertException::wrap([&]() {
951  bool firstTime = true;
952  do {
953  if (not firstTime) {
955  rewindInput();
956  } else {
957  firstTime = false;
958  }
959  startingNewLoop();
960 
961  auto trans = fp.processFiles(*this);
962 
963  fp.normalEnd();
964 
965  if (deferredExceptionPtrIsSet_.load()) {
966  std::rethrow_exception(deferredExceptionPtr_);
967  }
968  if (trans != InputSource::ItemType::IsStop) {
969  //problem with the source
970  doErrorStuff();
971 
972  throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
973  }
974  } while (not endOfLoop());
975  }); // convertException::wrap
976 
977  } // Try block
978  catch (cms::Exception& e) {
980  std::string message(
981  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
982  e.addAdditionalInfo(message);
983  if (e.alreadyPrinted()) {
984  LogAbsolute("Additional Exceptions") << message;
985  }
986  }
987  if (exceptionMessageRuns_) {
988  std::string message(
989  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
990  e.addAdditionalInfo(message);
991  if (e.alreadyPrinted()) {
992  LogAbsolute("Additional Exceptions") << message;
993  }
994  }
995  if (!exceptionMessageFiles_.empty()) {
996  e.addAdditionalInfo(exceptionMessageFiles_);
997  if (e.alreadyPrinted()) {
998  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
999  }
1000  }
1001  throw;
1002  }
1003  return epSuccess;
1004  }
std::atomic< bool > exceptionMessageLumis_
std::atomic< bool > exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2469 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

2469  {
2470  bool expected = false;
2471  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2472  deferredExceptionPtr_ = iException;
2473  return true;
2474  }
2475  return false;
2476  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2463 of file EventProcessor.cc.

References exceptionMessageFiles_.

2463 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2467 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2467 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( )

Definition at line 2465 of file EventProcessor.cc.

References exceptionMessageRuns_.

Referenced by handleEndRunExceptions().

2465 { exceptionMessageRuns_ = true; }
std::atomic< bool > exceptionMessageRuns_

◆ setNeedToCallNext()

void edm::EventProcessor::setNeedToCallNext ( bool  val)
inlineprivate

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1109 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1109  {
1110  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1111  if (!subProcesses_.empty()) {
1112  for (auto const& subProcess : subProcesses_) {
1113  if (subProcess.shouldWeCloseOutput()) {
1114  return true;
1115  }
1116  }
1117  return false;
1118  }
1119  return schedule_->shouldWeCloseOutput();
1120  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2448 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2448  {
2449  FDEBUG(1) << "\tshouldWeStop\n";
2450  if (shouldWeStop_)
2451  return true;
2452  if (!subProcesses_.empty()) {
2453  for (auto const& subProcess : subProcesses_) {
2454  if (subProcess.terminate()) {
2455  return true;
2456  }
2457  }
2458  return false;
2459  }
2460  return schedule_->terminate();
2461  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 1073 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

1073  {
1074  shouldWeStop_ = false;
1075  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1076  // until after we've called beginOfJob
1077  if (looper_ && looperBeginJobRun_) {
1078  looper_->doStartingNewLoop();
1079  }
1080  FDEBUG(1) << "\tstartingNewLoop\n";
1081  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamBeginRunAsync()

void edm::EventProcessor::streamBeginRunAsync ( unsigned int  iStream,
std::shared_ptr< RunProcessingStatus status,
WaitingTaskHolder  iHolder 
)
noexcept

Definition at line 1434 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::RunProcessingStatus::didGlobalBeginSucceed(), esp_, edm::RunProcessingStatus::eventSetupImpl(), edm::RunProcessingStatus::eventSetupImpls(), dqmdumpme::first, eostools::move(), releaseBeginRunResources(), edm::waiting_task::chain::runLast(), edm::RunProcessingStatus::runPrincipal(), schedule_, serviceToken_, mps_update::status, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync().

1436  {
1437  // These shouldn't throw
1438  streamQueues_[iStream].pause();
1439  ++streamRunActive_;
1440  streamRunStatus_[iStream] = std::move(status);
1441 
1442  CMS_SA_ALLOW try {
1443  using namespace edm::waiting_task::chain;
1444  chain::first([this, iStream](auto nextTask) {
1445  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1446  if (rs.didGlobalBeginSucceed()) {
1447  RunTransitionInfo transitionInfo(
1448  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1449  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1450  beginStreamTransitionAsync<Traits>(
1451  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1452  }
1453  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1454  if (exceptionFromBeginStreamRun) {
1455  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1456  }
1457  releaseBeginRunResources(iStream);
1458  }) | runLast(iHolder);
1459  } catch (...) {
1460  releaseBeginRunResources(iStream);
1461  iHolder.doneWaiting(std::current_exception());
1462  }
1463  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void releaseBeginRunResources(unsigned int iStream)
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1940 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, submitPVValidationJobs::t, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1940  {
1941  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1942  auto status = streamLumiStatus_[iStreamIndex];
1943  if (iException) {
1944  handleEndLumiExceptions(*iException, iTask);
1945  }
1946 
1947  // reset status before releasing queue else get race condition
1948  streamLumiStatus_[iStreamIndex].reset();
1950  streamQueues_[iStreamIndex].resume();
1951 
1952  //are we the last one?
1953  if (status->streamFinishedLumi()) {
1955  }
1956  });
1957 
1958  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1959 
1960  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1961  // therefore we do not want to hold the shared_ptr
1962  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1963  lumiStatus->setEndTime();
1964 
1965  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1966  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1967  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1968 
1969  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1970  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1971  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1972  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1973  *schedule_,
1974  iStreamIndex,
1975  transitionInfo,
1976  serviceToken_,
1977  subProcesses_,
1978  cleaningUpAfterException);
1979  }
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
bool taskHasFailed() const noexcept
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndRunAsync()

void edm::EventProcessor::streamEndRunAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1613 of file EventProcessor.cc.

References CMS_SA_ALLOW, esp_, exceptionRunStatus_, edm::WaitingTaskHolder::group(), handleEndRunExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endRunAsync().

1613  {
1614  CMS_SA_ALLOW try {
1615  if (!streamRunStatus_[iStreamIndex]) {
1616  if (exceptionRunStatus_->streamFinishedRun()) {
1617  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1618  exceptionRunStatus_.reset();
1619  }
1620  return;
1621  }
1622 
1623  auto runDoneTask =
1624  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1625  if (iException) {
1626  handleEndRunExceptions(*iException, iTask);
1627  }
1628 
1629  auto runStatus = streamRunStatus_[iStreamIndex];
1630 
1631  //reset status before releasing queue else get race condition
1632  if (runStatus->streamFinishedRun()) {
1633  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1634  }
1635  streamRunStatus_[iStreamIndex].reset();
1636  --streamRunActive_;
1637  streamQueues_[iStreamIndex].resume();
1638  });
1639 
1640  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1641 
1642  auto runStatus = streamRunStatus_[iStreamIndex].get();
1643 
1644  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1645  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1646  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1647  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1648 
1649  auto& runPrincipal = *runStatus->runPrincipal();
1650  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1651  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1652  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1653  *schedule_,
1654  iStreamIndex,
1655  transitionInfo,
1656  serviceToken_,
1657  subProcesses_,
1658  cleaningUpAfterException);
1659  }
1660  } catch (...) {
1661  handleEndRunExceptions(std::current_exception(), iTask);
1662  }
1663  }
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 625 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

625  {
628  task.waitNoThrow();
629  assert(task.done());
630  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 290 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

290  {
292  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 293 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

293  {
295  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2478 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2478  {
2479  cms::Exception ex("ModulesSynchingOnLumis");
2480  ex << "The framework is configured to use at least two streams, but the following modules\n"
2481  << "require synchronizing on LuminosityBlock boundaries:";
2482  bool found = false;
2483  for (auto worker : schedule_->allWorkers()) {
2484  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2485  found = true;
2486  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2487  }
2488  }
2489  if (found) {
2490  ex << "\n\nThe situation can be fixed by either\n"
2491  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2492  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2493  throw ex;
2494  }
2495  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 859 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

859 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 863 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

863 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 861 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

861 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutModulesRequiringRunSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringRunSynchronization ( ) const
private

Definition at line 2497 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

2497  {
2498  std::unique_ptr<LogSystem> s;
2499  for (auto worker : schedule_->allWorkers()) {
2500  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2501  if (not s) {
2502  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2503  (*s) << "The following modules require synchronizing on Run boundaries:";
2504  }
2505  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2506  }
2507  }
2508  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 2099 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), findAndChange::op, processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

2099  {
2100  using namespace edm::waiting_task;
2101  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2102  chain::first([&](auto nextTask) {
2104 
2105  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2106  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2107  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2109  for (auto& s : subProcesses_) {
2110  s.writeLumiAsync(nextTask, lumiPrincipal);
2111  }
2113  }
2114  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 2060 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), eostools::move(), findAndChange::op, principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

2060  {
2061  using namespace edm::waiting_task;
2062  chain::first([&](auto nextTask) {
2064  schedule_->writeProcessBlockAsync(
2065  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2066  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2068  for (auto& s : subProcesses_) {
2069  s.writeProcessBlockAsync(nextTask, processBlockType);
2070  }
2071  }) | chain::runLast(std::move(task));
2072  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
RunPrincipal const &  runPrincipal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 2074 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::RunPrincipal::kNo, eostools::move(), findAndChange::op, processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, edm::RunPrincipal::shouldWriteRun(), subProcesses_, and TrackValidation_cff::task.

Referenced by globalEndRunAsync().

2076  {
2077  using namespace edm::waiting_task;
2078  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2079  chain::first([&](auto nextTask) {
2081  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2082  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2084  for (auto& s : subProcesses_) {
2085  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2086  }
2087  }) | chain::runLast(std::move(task));
2088  }
2089  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 327 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 360 of file EventProcessor.h.

Referenced by beginJob().

◆ beginJobStartedModules_

bool edm::EventProcessor::beginJobStartedModules_ = false
private

Definition at line 361 of file EventProcessor.h.

Referenced by beginJob(), and endJob().

◆ beginJobSucceeded_

bool edm::EventProcessor::beginJobSucceeded_ = false
private

Definition at line 362 of file EventProcessor.h.

Referenced by beginJob(), and endJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 343 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 317 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 355 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 354 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 381 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 378 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 365 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 367 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::atomic<bool> edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 366 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ exceptionRunStatus_

std::shared_ptr<RunProcessingStatus> edm::EventProcessor::exceptionRunStatus_
private

Definition at line 338 of file EventProcessor.h.

Referenced by beginRunAsync(), and streamEndRunAsync().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 364 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 374 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 370 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 368 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 348 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemTypeInfo edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 369 of file EventProcessor.h.

Referenced by beginRunAsync(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 336 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 331 of file EventProcessor.h.

Referenced by init().

◆ modulesToIgnoreForDeleteEarly_

std::vector<std::string> edm::EventProcessor::modulesToIgnoreForDeleteEarly_
private

Definition at line 345 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ moduleTypeResolverMaker_

edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const> > edm::EventProcessor::moduleTypeResolverMaker_
private

Definition at line 323 of file EventProcessor.h.

Referenced by init().

◆ needToCallNext_

bool edm::EventProcessor::needToCallNext_ = true
private

Definition at line 382 of file EventProcessor.h.

Referenced by needToCallNext(), and setNeedToCallNext().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 316 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), readAndMergeRun(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 380 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 318 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), and processConfiguration().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

◆ referencesToBranches_

std::multimap<std::string, std::string> edm::EventProcessor::referencesToBranches_
private

Definition at line 344 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ runQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::runQueue_
private

Definition at line 335 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 363 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

◆ streamQueuesInserter_

SerialTaskQueue edm::EventProcessor::streamQueuesInserter_
private

Definition at line 334 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRunAsync(), and endRunAsync().

◆ streamRunActive_

std::atomic<unsigned int> edm::EventProcessor::streamRunActive_ {0}
private

◆ streamRunStatus_

std::vector<std::shared_ptr<RunProcessingStatus> > edm::EventProcessor::streamRunStatus_
private

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 319 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().