CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRunAsync (IOVSyncValue const &, WaitingTaskHolder)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void clearLumiPrincipal (LuminosityBlockProcessingStatus &)
 
void clearRunPrincipal (RunProcessingStatus &)
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (WaitingTaskHolder)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRunAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void endUnfinishedLumi (bool cleaningUpAfterException)
 
void endUnfinishedRun (bool cleaningUpAfterException)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void globalEndRunAsync (WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
 
void handleEndLumiExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void handleEndRunExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void inputProcessBlocks ()
 
InputSource::ItemTypeInfo lastTransitionType () const
 
InputSource::ItemTypeInfo nextTransitionType ()
 
void nextTransitionTypeAsync (std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processRuns ()
 
void readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
void readAndMergeRun (RunProcessingStatus &)
 
void readFile ()
 
std::shared_ptr< LuminosityBlockPrincipalreadLuminosityBlock (std::shared_ptr< RunPrincipal > rp)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::shared_ptr< RunPrincipalreadRun ()
 
void releaseBeginRunResources (unsigned int iStream)
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns ()
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamBeginRunAsync (unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
 
void streamEndLumiAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void streamEndRunAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void handleNextItemAfterMergingRunEntries (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
bool needToCallNext () const
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readAndMergeLumiEntriesAsync (std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
 
void readAndMergeRunEntriesAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
 
void setNeedToCallNext (bool val)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutModulesRequiringRunSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::atomic< bool > exceptionMessageRuns_
 
std::shared_ptr< RunProcessingStatusexceptionRunStatus_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemTypeInfo lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
std::vector< std::string > modulesToIgnoreForDeleteEarly_
 
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
 
bool needToCallNext_ = true
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
std::multimap< std::string, std::string > referencesToBranches_
 
std::unique_ptr< edm::LimitedTaskQueuerunQueue_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
SerialTaskQueue streamQueuesInserter_
 
std::atomic< unsigned int > streamRunActive_ {0}
 
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 69 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 367 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 368 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 233 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 79 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 214 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

219  : actReg_(),
220  preg_(),
222  serviceToken_(),
223  input_(),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 253 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

256  : actReg_(),
257  preg_(),
259  serviceToken_(),
260  input_(),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 290 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

293  : actReg_(),
294  preg_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
322  init(processDesc, token, legacy);
323  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 607 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, findAndChange::op, schedule_, and unpackBuffers-CaloStage2::token.

607  {
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 632 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, DummyCfis::c, edm::checkForModuleDependencyCorrectness(), ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), deleteNonConsumedUnscheduledModules_, makeListRunsInFiles::description, esp_, espController_, edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, MainPageGenerator::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, merge(), modulesToIgnoreForDeleteEarly_, eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, referencesToBranches_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, edm::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutModulesRequiringRunSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

632  {
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
642  service::SystemBounds bounds(preallocations_.numberOfStreams(),
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
700  }
701  if (preallocations_.numberOfRuns() > 1) {
703  }
704 
705  //NOTE: This implementation assumes 'Job' means one call
706  // the EventProcessor::run
707  // If it really means once per 'application' then this code will
708  // have to be changed.
709  // Also have to deal with case where have 'run' then new Module
710  // added and do 'run'
711  // again. In that case the newly added Module needs its 'beginJob'
712  // to be called.
713 
714  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
715  // For now we delay calling beginOfJob until first beginOfRun
716  //if(looper_) {
717  // looper_->beginOfJob(es);
718  //}
719  espController_->finishConfiguration();
720  actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
721  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728 
729  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToResolverIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
ProcessContext processContext_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
int merge(int argc, char *argv[])
Definition: DiMuonVmerge.cc:28
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::multimap< std::string, std::string > referencesToBranches_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::string > modulesToIgnoreForDeleteEarly_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:169
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( IOVSyncValue const &  iSync,
std::shared_ptr< RunProcessingStatus iRunStatus,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1656 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), dqmdumpme::first, globalEndLumiAsync(), handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), edm::InputSource::LastItemToBeMerged, lastTransitionType(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeLumiEntriesAsync(), readLuminosityBlock(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, streamQueuesInserter_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1658  {
1659  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1660 
1661  auto status = std::make_shared<LuminosityBlockProcessingStatus>();
1662  chain::first([this, &iSync, &status](auto nextTask) {
1663  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1664  nextTask,
1665  status->endIOVWaitingTasks(),
1666  status->eventSetupImpls(),
1668  actReg_.get(),
1669  serviceToken_);
1670  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1671  CMS_SA_ALLOW try {
1672  //the call to doneWaiting will cause the count to decrement
1673  if (iException) {
1674  WaitingTaskHolder copyHolder(nextTask);
1675  copyHolder.doneWaiting(*iException);
1676  }
1677 
1678  lumiQueue_->pushAndPause(
1679  *nextTask.group(),
1680  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1681  CMS_SA_ALLOW try {
1682  if (postLumiQueueTask.taskHasFailed()) {
1683  status->resetResources();
1685  endRunAsync(iRunStatus, postLumiQueueTask);
1686  return;
1687  }
1688 
1689  status->setResumer(std::move(iResumer));
1690 
1692  *postLumiQueueTask.group(),
1693  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1694  CMS_SA_ALLOW try {
1696 
1697  if (postSourceTask.taskHasFailed()) {
1698  status->resetResources();
1700  endRunAsync(iRunStatus, postSourceTask);
1701  return;
1702  }
1703 
1704  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1705 
1706  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1707  {
1708  SendSourceTerminationSignalIfException sentry(actReg_.get());
1709  input_->doBeginLumi(lumiPrincipal, &processContext_);
1710  sentry.completedSuccessfully();
1711  }
1712 
1713  Service<RandomNumberGenerator> rng;
1714  if (rng.isAvailable()) {
1715  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1716  rng->preBeginLumi(lb);
1717  }
1718 
1719  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1720 
1721  using namespace edm::waiting_task::chain;
1722  chain::first([this, status](auto nextTask) mutable {
1725  } else {
1726  setNeedToCallNext(true);
1727  }
1728  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1729  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1730  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1731  beginGlobalTransitionAsync<Traits>(
1732  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1733  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1734  looper_->prefetchAsync(
1735  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1736  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1737  ServiceRegistry::Operate operateLooper(serviceToken_);
1738  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1739  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1740  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1741 
1742  if (iException) {
1743  WaitingTaskHolder copyHolder(holder);
1744  copyHolder.doneWaiting(*iException);
1745  globalEndLumiAsync(holder, status);
1746  endRunAsync(iRunStatus, holder);
1747  } else {
1748  status->globalBeginDidSucceed();
1749 
1750  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1751  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1752 
1753  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1754  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1755  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1756  if (!status->shouldStreamStartLumi()) {
1757  return;
1758  }
1759  streamQueues_[i].pause();
1760 
1761  auto& event = principalCache_.eventPrincipal(i);
1762  auto eventSetupImpls = &status->eventSetupImpls();
1763  auto lp = status->lumiPrincipal().get();
1766  event.setLuminosityBlockPrincipal(lp);
1767  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1768  using namespace edm::waiting_task::chain;
1769  chain::first([this, i, &transitionInfo](auto nextTask) {
1770  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1771  *schedule_,
1772  i,
1773  transitionInfo,
1774  serviceToken_,
1775  subProcesses_);
1776  }) |
1777  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1778  auto nextTask) {
1779  if (exceptionFromBeginStreamLumi) {
1780  WaitingTaskHolder copyHolder(nextTask);
1781  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1782  }
1784  }) |
1785  runLast(std::move(holder));
1786  });
1787  } // end for loop over streams
1788  });
1789  }
1790  }) | runLast(postSourceTask);
1791  } catch (...) {
1792  status->resetResources();
1794  WaitingTaskHolder copyHolder(postSourceTask);
1795  copyHolder.doneWaiting(std::current_exception());
1796  endRunAsync(iRunStatus, postSourceTask);
1797  }
1798  }); // task in sourceResourcesAcquirer
1799  } catch (...) {
1800  status->resetResources();
1802  WaitingTaskHolder copyHolder(postLumiQueueTask);
1803  copyHolder.doneWaiting(std::current_exception());
1804  endRunAsync(iRunStatus, postLumiQueueTask);
1805  }
1806  }); // task in lumiQueue
1807  } catch (...) {
1808  status->resetResources();
1810  WaitingTaskHolder copyHolder(nextTask);
1811  copyHolder.doneWaiting(std::current_exception());
1812  endRunAsync(iRunStatus, nextTask);
1813  }
1814  }) | chain::runLast(std::move(iHolder));
1815  }
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
InputSource::ItemTypeInfo lastTransitionType() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1108 of file EventProcessor.cc.

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1108  {
1109  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1110  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1111 
1112  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1113  FinalWaitingTask globalWaitTask{taskGroup_};
1114 
1115  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1116  beginGlobalTransitionAsync<Traits>(
1117  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1118 
1119  globalWaitTask.wait();
1120  beginProcessBlockSucceeded = true;
1121  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRunAsync()

void edm::EventProcessor::beginRunAsync ( IOVSyncValue const &  iSync,
WaitingTaskHolder  iHolder 
)

Definition at line 1211 of file EventProcessor.cc.

References actReg_, edm::BeginRun, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, exceptionRunStatus_, dqmdumpme::first, forceESCacheClearOnNewRun_, globalEndRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, handleNextItemAfterMergingRunEntries(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::InputSource::LastItemToBeMerged, lastTransitionType(), looper_, looperBeginJobRun_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeRunEntriesAsync(), readRun(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), runQueue_, schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceResourcesAcquirer_, mps_update::status, streamBeginRunAsync(), streamQueues_, streamQueuesInserter_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and edm::waiting_task::chain::then().

Referenced by endRunAsync(), and processRuns().

1211  {
1212  if (iHolder.taskHasFailed()) {
1213  return;
1214  }
1215 
1216  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1217 
1218  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1219 
1220  chain::first([this, &status, iSync](auto nextTask) {
1221  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1222  nextTask,
1223  status->endIOVWaitingTasks(),
1224  status->eventSetupImpls(),
1226  actReg_.get(),
1227  serviceToken_,
1229  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1230  CMS_SA_ALLOW try {
1231  if (iException) {
1232  WaitingTaskHolder copyHolder(nextTask);
1233  copyHolder.doneWaiting(*iException);
1234  // Finish handling the exception in the task pushed to runQueue_
1235  }
1237 
1238  runQueue_->pushAndPause(
1239  *nextTask.group(),
1240  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1241  CMS_SA_ALLOW try {
1242  if (postRunQueueTask.taskHasFailed()) {
1243  status->resetBeginResources();
1245  return;
1246  }
1247 
1248  status->setResumer(std::move(iResumer));
1249 
1251  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1252  CMS_SA_ALLOW try {
1254 
1255  if (postSourceTask.taskHasFailed()) {
1256  status->resetBeginResources();
1258  status->resumeGlobalRunQueue();
1259  return;
1260  }
1261 
1262  status->setRunPrincipal(readRun());
1263 
1264  RunPrincipal& runPrincipal = *status->runPrincipal();
1265  {
1266  SendSourceTerminationSignalIfException sentry(actReg_.get());
1267  input_->doBeginRun(runPrincipal, &processContext_);
1268  sentry.completedSuccessfully();
1269  }
1270 
1271  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1272  if (looper_ && looperBeginJobRun_ == false) {
1273  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1274 
1275  oneapi::tbb::task_group group;
1276  FinalWaitingTask waitTask{group};
1277  using namespace edm::waiting_task::chain;
1278  chain::first([this, &es](auto nextTask) {
1279  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1280  }) | then([this, &es](auto nextTask) mutable {
1281  looper_->beginOfJob(es);
1282  looperBeginJobRun_ = true;
1283  looper_->doStartingNewLoop();
1284  }) | runLast(WaitingTaskHolder(group, &waitTask));
1285  waitTask.wait();
1286  }
1287 
1288  using namespace edm::waiting_task::chain;
1289  chain::first([this, status](auto nextTask) mutable {
1290  CMS_SA_ALLOW try {
1293  } else {
1294  setNeedToCallNext(true);
1295  }
1296  } catch (...) {
1297  status->setStopBeforeProcessingRun(true);
1298  nextTask.doneWaiting(std::current_exception());
1299  }
1300  }) | then([this, status, &es](auto nextTask) {
1301  if (status->stopBeforeProcessingRun()) {
1302  return;
1303  }
1304  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1305  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1306  beginGlobalTransitionAsync<Traits>(
1307  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1308  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1309  if (status->stopBeforeProcessingRun()) {
1310  return;
1311  }
1312  looper_->prefetchAsync(
1313  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1314  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1315  if (status->stopBeforeProcessingRun()) {
1316  return;
1317  }
1318  ServiceRegistry::Operate operateLooper(serviceToken_);
1319  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1320  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1321  if (iException) {
1322  WaitingTaskHolder copyHolder(holder);
1323  copyHolder.doneWaiting(*iException);
1324  } else {
1325  status->globalBeginDidSucceed();
1326  }
1327 
1328  if (status->stopBeforeProcessingRun()) {
1329  // We just quit now if there was a failure when merging runs
1330  status->resetBeginResources();
1332  status->resumeGlobalRunQueue();
1333  return;
1334  }
1335  CMS_SA_ALLOW try {
1336  // Under normal circumstances, this task runs after endRun has completed for all streams
1337  // and global endLumi has completed for all lumis contained in this run
1338  auto globalEndRunTask =
1339  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1340  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1341  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1343  });
1344  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1345  } catch (...) {
1346  status->resetBeginResources();
1348  status->resumeGlobalRunQueue();
1349  holder.doneWaiting(std::current_exception());
1350  return;
1351  }
1352 
1353  // After this point we are committed to end the run via endRunAsync
1354 
1356 
1357  // The only purpose of the pause is to cause stream begin run to execute before
1358  // global begin lumi in the single threaded case (maintains consistency with
1359  // the order that existed before concurrent runs were implemented).
1360  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1361 
1362  CMS_SA_ALLOW try {
1363  streamQueuesInserter_.push(*holder.group(), [this, status, holder]() mutable {
1364  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1365  CMS_SA_ALLOW try {
1366  streamQueues_[i].push(*holder.group(), [this, i, status, holder]() mutable {
1368  });
1369  } catch (...) {
1370  if (status->streamFinishedBeginRun()) {
1371  WaitingTaskHolder copyHolder(holder);
1372  copyHolder.doneWaiting(std::current_exception());
1373  status->resetBeginResources();
1376  }
1377  }
1378  }
1379  });
1380  } catch (...) {
1381  WaitingTaskHolder copyHolder(holder);
1382  copyHolder.doneWaiting(std::current_exception());
1383  status->resetBeginResources();
1386  }
1388  }) | runLast(postSourceTask);
1389  } catch (...) {
1390  status->resetBeginResources();
1392  status->resumeGlobalRunQueue();
1393  postSourceTask.doneWaiting(std::current_exception());
1394  }
1395  }); // task in sourceResourcesAcquirer
1396  } catch (...) {
1397  status->resetBeginResources();
1399  status->resumeGlobalRunQueue();
1400  postRunQueueTask.doneWaiting(std::current_exception());
1401  }
1402  }); // task in runQueue
1403  } catch (...) {
1404  status->resetBeginResources();
1406  nextTask.doneWaiting(std::current_exception());
1407  }
1408  }) | chain::runLast(std::move(iHolder));
1409  }
ProcessContext processContext_
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
std::shared_ptr< RunPrincipal > readRun()
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 279 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

279  {
281  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 282 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 848 of file EventProcessor.cc.

References epSignal, edm::JobReport::reportShutdownSignal(), runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

848  {
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
854  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
856  jr->reportShutdownSignal();
858  }
859  return returnValue;
860  }
Log< level::System, false > LogSystem
volatile std::atomic< bool > shutdown_flag
void reportShutdownSignal()
Definition: JobReport.cc:543

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 842 of file EventProcessor.cc.

References schedule_.

842 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ clearLumiPrincipal()

void edm::EventProcessor::clearLumiPrincipal ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2093 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

2093  {
2094  for (auto& s : subProcesses_) {
2095  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2096  }
2097  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2098  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2099  iStatus.lumiPrincipal()->clearPrincipal();
2100  }
std::vector< SubProcess > subProcesses_

◆ clearRunPrincipal()

void edm::EventProcessor::clearRunPrincipal ( RunProcessingStatus iStatus)

Definition at line 2068 of file EventProcessor.cc.

References edm::RunPrincipal::kUninitialized, edm::RunProcessingStatus::runPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndRunAsync().

2068  {
2069  for (auto& s : subProcesses_) {
2070  s.clearRunPrincipal(*iStatus.runPrincipal());
2071  }
2072  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2073  iStatus.runPrincipal()->clearPrincipal();
2074  }
std::vector< SubProcess > subProcesses_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 1008 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

1008  {
1009  if (fileBlockValid()) {
1010  SendSourceTerminationSignalIfException sentry(actReg_.get());
1011  input_->closeFile(fb_.get(), cleaningUpAfterException);
1012  sentry.completedSuccessfully();
1013  }
1014  FDEBUG(1) << "\tcloseInputFile\n";
1015  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 1025 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

1025  {
1026  schedule_->closeOutputFiles();
1027  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1028  processBlockHelper_->clearAfterOutputFilesClose();
1029  FDEBUG(1) << "\tcloseOutputFiles\n";
1030  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1817 of file EventProcessor.cc.

References dqmdumpme::first, h, handleNextEventForStreamAsync(), input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kProcessing, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeLumi(), edm::waiting_task::chain::runLast(), mps_update::status, streamLumiStatus_, and edm::waiting_task::chain::then().

Referenced by processRuns().

1817  {
1818  chain::first([this](auto nextTask) {
1819  //all streams are sharing the same status at the moment
1820  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1822 
1824  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1827  }
1828  }) | chain::then([this](auto nextTask) mutable {
1829  unsigned int streamIndex = 0;
1830  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1831  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1832  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1833  }
1834  nextTask.group()->run(
1835  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1836  }) | chain::runLast(std::move(iHolder));
1837  }
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
InputSource::ItemTypeInfo lastTransitionType() const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1099 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1099  {
1100  FDEBUG(1) << "\tdoErrorStuff\n";
1101  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1102  << "and went to the error state\n"
1103  << "Will attempt to terminate processing normally\n"
1104  << "(IF using the looper the next loop will be attempted)\n"
1105  << "This likely indicates a bug in an input module or corrupted input or both\n";
1106  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 763 of file EventProcessor.cc.

References actReg_, DummyCfis::c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::first(), watchdog::group, mps_fire::i, input_, MainPageGenerator::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

763  {
764  // Collects exceptions, so we don't throw before all operations are performed.
765  ExceptionCollector c(
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:210
static std::mutex mutex
Definition: Proxy.cc:8
std::shared_ptr< EDLooperBase const > looper() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1060 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1060  {
1061  if (looper_) {
1062  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1063  looper_->setModuleChanger(&changer);
1064  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1065  looper_->setModuleChanger(nullptr);
1067  return true;
1068  else
1069  return false;
1070  }
1071  FDEBUG(1) << "\tendOfLoop\n";
1072  return true;
1073  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1149 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1149  {
1150  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1151 
1152  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1153  FinalWaitingTask globalWaitTask{taskGroup_};
1154 
1155  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1156  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1157  *schedule_,
1158  transitionInfo,
1159  serviceToken_,
1160  subProcesses_,
1161  cleaningUpAfterException);
1162  globalWaitTask.wait();
1163 
1164  if (beginProcessBlockSucceeded) {
1165  FinalWaitingTask writeWaitTask{taskGroup_};
1167  writeWaitTask.wait();
1168  }
1169 
1170  processBlockPrincipal.clearPrincipal();
1171  for (auto& s : subProcesses_) {
1172  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1173  }
1174  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRunAsync()

void edm::EventProcessor::endRunAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)

Definition at line 1451 of file EventProcessor.cc.

References actReg_, beginRunAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::RunPrincipal::endTime(), espController_, dqmdumpme::first, handleEndRunExceptions(), mps_fire::i, input_, edm::InputSource::IsRun, lastTransitionType(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), serviceToken_, edm::RunPrincipal::setEndTime(), streamEndRunAsync(), streamQueues_, streamQueuesInserter_, and edm::waiting_task::chain::then().

Referenced by beginLumiAsync(), endUnfinishedRun(), handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1451  {
1452  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1453  iRunStatus->setEndTime();
1454  IOVSyncValue ts(
1455  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1456  runPrincipal.endTime());
1457  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1458  WaitingTaskHolder copyHolder(iHolder);
1459  copyHolder.doneWaiting(std::current_exception());
1460  }
1461 
1462  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1463  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1464  nextTask,
1465  iRunStatus->endIOVWaitingTasksEndRun(),
1466  iRunStatus->eventSetupImplsEndRun(),
1468  actReg_.get(),
1469  serviceToken_);
1470  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1471  if (iException) {
1472  iRunStatus->setEndingEventSetupSucceeded(false);
1473  handleEndRunExceptions(*iException, nextTask);
1474  }
1476  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1477  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1478  CMS_SA_ALLOW try {
1479  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1480  streamQueues_[i].pause();
1481  streamEndRunAsync(std::move(nextTask), i);
1482  });
1483  } catch (...) {
1484  WaitingTaskHolder copyHolder(nextTask);
1485  copyHolder.doneWaiting(std::current_exception());
1486  }
1487  }
1488  });
1489 
1491  CMS_SA_ALLOW try {
1492  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1493  } catch (...) {
1494  WaitingTaskHolder copyHolder(nextTask);
1495  copyHolder.doneWaiting(std::current_exception());
1496  }
1497  }
1498  }) | chain::runLast(std::move(iHolder));
1499  }
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
InputSource::ItemTypeInfo lastTransitionType() const
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< ActivityRegistry > actReg_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( bool  cleaningUpAfterException)

Definition at line 1958 of file EventProcessor.cc.

References cms::cuda::assert(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamRunActive_, and taskGroup_.

1958  {
1959  if (streamRunActive_ == 0) {
1960  assert(streamLumiActive_ == 0);
1961  } else {
1963  if (streamLumiActive_ > 0) {
1964  FinalWaitingTask globalWaitTask{taskGroup_};
1966  streamLumiStatus_[0]->noMoreEventsInLumi();
1967  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1968  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1969  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1970  }
1971  globalWaitTask.wait();
1972  }
1973  }
1974  }
assert(be >=bs)
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::atomic< unsigned int > streamLumiActive_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( bool  cleaningUpAfterException)

Definition at line 1642 of file EventProcessor.cc.

References endRunAsync(), edm::InputSource::IsStop, lastSourceTransition_, eostools::move(), streamRunActive_, streamRunStatus_, and taskGroup_.

1642  {
1643  if (streamRunActive_ > 0) {
1644  FinalWaitingTask waitTask{taskGroup_};
1645 
1646  auto runStatus = streamRunStatus_[0].get();
1647  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1648  WaitingTaskHolder holder{taskGroup_, &waitTask};
1649  runStatus->setHolderOfTaskInProcessRuns(holder);
1651  endRunAsync(streamRunStatus_[0], std::move(holder));
1652  waitTask.wait();
1653  }
1654  }
InputSource::ItemTypeInfo lastSourceTransition_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
oneapi::tbb::task_group taskGroup_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::atomic< unsigned int > streamRunActive_
def move(src, dest)
Definition: eostools.py:511

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 193 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

193 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 832 of file EventProcessor.cc.

References schedule_.

832  {
833  return schedule_->getAllModuleDescriptions();
834  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 830 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

830 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1848 of file EventProcessor.cc.

References clearLumiPrincipal(), CMS_SA_ALLOW, edm::EndLuminosityBlock, esp_, dqmdumpme::first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by beginLumiAsync(), and streamEndLumiAsync().

1849  {
1850  // Get some needed info out of the status object before moving
1851  // it into finalTaskForThisLumi.
1852  auto& lp = *(iLumiStatus->lumiPrincipal());
1853  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1854  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1855  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1856  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1857 
1858  using namespace edm::waiting_task::chain;
1859  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1860  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1861 
1862  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1863  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1864  endGlobalTransitionAsync<Traits>(
1865  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1866  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1867  //Only call writeLumi if beginLumi succeeded
1868  if (didGlobalBeginSucceed) {
1869  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1870  }
1871  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1872  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1873  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1874  //any thrown exception auto propagates to nextTask via the chain
1876  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1877  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1878  if (iException) {
1879  handleEndLumiExceptions(*iException, nextTask);
1880  }
1882 
1883  std::exception_ptr ptr;
1884 
1885  // Try hard to clean up resources so the
1886  // process can terminate in a controlled
1887  // fashion even after exceptions have occurred.
1888  // Caught exception is passed to handleEndLumiExceptions()
1889  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1890  if (not ptr) {
1891  ptr = std::current_exception();
1892  }
1893  }
1894  // Caught exception is passed to handleEndLumiExceptions()
1895  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1896  if (not ptr) {
1897  ptr = std::current_exception();
1898  }
1899  }
1900  // Caught exception is passed to handleEndLumiExceptions()
1901  CMS_SA_ALLOW try {
1902  status->resetResources();
1903  status->globalEndRunHolderDoneWaiting();
1904  status.reset();
1905  } catch (...) {
1906  if (not ptr) {
1907  ptr = std::current_exception();
1908  }
1909  }
1910 
1911  if (ptr && !iException) {
1912  handleEndLumiExceptions(ptr, nextTask);
1913  }
1914  }) | runLast(std::move(iTask));
1915  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ globalEndRunAsync()

void edm::EventProcessor::globalEndRunAsync ( WaitingTaskHolder  iTask,
std::shared_ptr< RunProcessingStatus iRunStatus 
)

Definition at line 1510 of file EventProcessor.cc.

References clearRunPrincipal(), CMS_SA_ALLOW, edm::EndRun, esp_, dqmdumpme::first, handleEndRunExceptions(), edm::waiting_task::chain::ifThen(), looper_, eostools::move(), edm::MergeableRunProductMetadata::preWriteRun(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeRunAsync().

Referenced by beginRunAsync().

1510  {
1511  auto& runPrincipal = *(iRunStatus->runPrincipal());
1512  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1513  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1514  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1515  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1516  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1517 
1518  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1519  using namespace edm::waiting_task::chain;
1520  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1521  auto nextTask) {
1522  if (endingEventSetupSucceeded) {
1523  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1524  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1525  endGlobalTransitionAsync<Traits>(
1526  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1527  }
1528  }) |
1529  ifThen(looper_ && endingEventSetupSucceeded,
1530  [this, &runPrincipal, &es](auto nextTask) {
1531  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1532  }) |
1533  ifThen(looper_ && endingEventSetupSucceeded,
1534  [this, &runPrincipal, &es](auto nextTask) {
1536  looper_->doEndRun(runPrincipal, es, &processContext_);
1537  }) |
1538  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1539  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1540  mergeableRunProductMetadata->preWriteRun();
1541  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1542  }) |
1543  then([status = std::move(iRunStatus),
1544  this,
1545  didGlobalBeginSucceed,
1546  mergeableRunProductMetadata,
1547  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1548  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1549  mergeableRunProductMetadata->postWriteRun();
1550  }
1551  if (iException) {
1552  handleEndRunExceptions(*iException, nextTask);
1553  }
1555 
1556  std::exception_ptr ptr;
1557 
1558  // Try hard to clean up resources so the
1559  // process can terminate in a controlled
1560  // fashion even after exceptions have occurred.
1561  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1562  if (not ptr) {
1563  ptr = std::current_exception();
1564  }
1565  }
1566  CMS_SA_ALLOW try {
1567  status->resumeGlobalRunQueue();
1569  } catch (...) {
1570  if (not ptr) {
1571  ptr = std::current_exception();
1572  }
1573  }
1574  CMS_SA_ALLOW try {
1575  status->resetEndResources();
1576  status.reset();
1577  } catch (...) {
1578  if (not ptr) {
1579  ptr = std::current_exception();
1580  }
1581  }
1582 
1583  if (ptr && !iException) {
1584  handleEndRunExceptions(ptr, nextTask);
1585  }
1586  }) |
1587  runLast(std::move(iTask));
1588  }
ProcessContext processContext_
void clearRunPrincipal(RunProcessingStatus &)
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1839 of file EventProcessor.cc.

References setExceptionMessageLumis(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1839  {
1840  if (holder.taskHasFailed()) {
1842  } else {
1843  WaitingTaskHolder tmp(holder);
1844  tmp.doneWaiting(iException);
1845  }
1846  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleEndRunExceptions()

void edm::EventProcessor::handleEndRunExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1501 of file EventProcessor.cc.

References setExceptionMessageRuns(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by endRunAsync(), globalEndRunAsync(), and streamEndRunAsync().

1501  {
1502  if (holder.taskHasFailed()) {
1504  } else {
1505  WaitingTaskHolder tmp(holder);
1506  tmp.doneWaiting(iException);
1507  }
1508  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 2264 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kStopLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiStatus_, streamRunStatus_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginLumiAsync(), and continueLumiAsync().

2264  {
2265  if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2266  streamEndLumiAsync(iTask, iStreamIndex);
2267  return;
2268  }
2269  auto group = iTask.group();
2270  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2271  CMS_SA_ALLOW try {
2272  auto status = streamLumiStatus_[iStreamIndex].get();
2274 
2275  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2276  auto recursionTask =
2277  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2278  if (iEventException) {
2279  WaitingTaskHolder copyHolder(iTask);
2280  copyHolder.doneWaiting(*iEventException);
2281  // Intentionally, we don't return here. The recursive call to
2282  // handleNextEvent takes care of immediately ending the run properly
2283  // using the same code it uses to end the run in other situations.
2284  }
2285  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2286  });
2287 
2288  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2289  } else {
2290  // the stream will stop processing this lumi now
2292  if (not status->haveStartedNextLumiOrEndedRun()) {
2293  status->noMoreEventsInLumi();
2294  status->startNextLumiOrEndRun();
2295  if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
2296  CMS_SA_ALLOW try {
2297  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2298  input_->luminosityBlockAuxiliary()->beginTime()),
2299  streamRunStatus_[iStreamIndex],
2300  iTask);
2301  } catch (...) {
2302  WaitingTaskHolder copyHolder(iTask);
2303  copyHolder.doneWaiting(std::current_exception());
2304  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2305  }
2306  } else {
2307  // If appropriate, this will also start the next run.
2308  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2309  }
2310  }
2311  streamEndLumiAsync(iTask, iStreamIndex);
2312  } else {
2313  assert(status->eventProcessingState() ==
2315  auto runStatus = streamRunStatus_[iStreamIndex].get();
2316 
2317  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2318  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2319  }
2320  }
2321  }
2322  } catch (...) {
2323  WaitingTaskHolder copyHolder(iTask);
2324  copyHolder.doneWaiting(std::current_exception());
2325  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2326  }
2327  });
2328  }
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ handleNextItemAfterMergingRunEntries()

void edm::EventProcessor::handleNextItemAfterMergingRunEntries ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2164 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), dqmdumpme::first, input_, edm::InputSource::IsFile, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), needToCallNext(), nextTransitionTypeAsync(), edm::waiting_task::chain::runLast(), serviceToken_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync(), and processRuns().

2165  {
2166  chain::first([this, iRunStatus](auto nextTask) mutable {
2167  if (needToCallNext()) {
2168  nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2169  }
2170  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2172  if (iException) {
2173  WaitingTaskHolder copyHolder(nextTask);
2174  copyHolder.doneWaiting(*iException);
2175  }
2177  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2178  return;
2179  }
2180  if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2181  CMS_SA_ALLOW try {
2182  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2183  input_->luminosityBlockAuxiliary()->beginTime()),
2184  iRunStatus,
2185  nextTask);
2186  return;
2187  } catch (...) {
2188  WaitingTaskHolder copyHolder(nextTask);
2189  copyHolder.doneWaiting(std::current_exception());
2190  }
2191  }
2192  // Note that endRunAsync will call beginRunAsync for the following run
2193  // if appropriate.
2194  endRunAsync(iRunStatus, std::move(nextTask));
2195  }) | chain::runLast(std::move(iHolder));
2196  }
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool needToCallNext() const
constexpr auto then(O &&iO)
Definition: chain_first.h:277
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 325 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, testHGCalDigi_cfg::dumpOptions, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, DiMuonV_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ServiceRegistry::get(), edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ModuleDescription::getUniqueID(), edm::ParameterSet::getUntrackedParameterSet(), watchdog::group, historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::ServiceRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, modulesToIgnoreForDeleteEarly_, moduleTypeResolverMaker_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, muonDTDigis_cfi::pset, referencesToBranches_, edm::ParameterSet::registerIt(), runQueue_, schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, streamRunStatus_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

327  {
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
334  ParameterSet().registerIt();
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
438  ScheduleItems items;
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
495  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
519  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
585  SubProcessParentageHelper(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
std::multimap< std::string, std::string > referencesToBranches_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::string > modulesToIgnoreForDeleteEarly_
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
static ServiceRegistry & instance()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1123 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1123  {
1124  input_->fillProcessBlockHelper();
1125  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1126  while (input_->nextProcessBlock(processBlockPrincipal)) {
1127  readProcessBlock(processBlockPrincipal);
1128 
1129  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1130  FinalWaitingTask globalWaitTask{taskGroup_};
1131 
1132  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1133  beginGlobalTransitionAsync<Traits>(
1134  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1135 
1136  globalWaitTask.wait();
1137 
1138  FinalWaitingTask writeWaitTask{taskGroup_};
1140  writeWaitTask.wait();
1141 
1142  processBlockPrincipal.clearPrincipal();
1143  for (auto& s : subProcesses_) {
1144  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1145  }
1146  }
1147  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemTypeInfo edm::EventProcessor::lastTransitionType ( ) const
inline

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 289 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

289 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 290 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

290 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ needToCallNext()

bool edm::EventProcessor::needToCallNext ( ) const
inlineprivate

Definition at line 295 of file EventProcessor.h.

References needToCallNext_.

Referenced by handleNextItemAfterMergingRunEntries(), and readNextEventForStream().

295 { return needToCallNext_; }

◆ nextTransitionType()

InputSource::ItemTypeInfo edm::EventProcessor::nextTransitionType ( )

Definition at line 870 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by continueLumiAsync(), nextTransitionTypeAsync(), processRuns(), readAndMergeLumiEntriesAsync(), readAndMergeRunEntriesAsync(), and readNextEventForStream().

870  {
871  SendSourceTerminationSignalIfException sentry(actReg_.get());
872  InputSource::ItemTypeInfo itemTypeInfo;
873  {
874  SourceNextGuard guard(*actReg_.get());
875  //For now, do nothing with InputSource::IsSynchronize
876  do {
877  itemTypeInfo = input_->nextItemType();
878  } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
879  }
880  lastSourceTransition_ = itemTypeInfo;
881  sentry.completedSuccessfully();
882 
884 
886  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
888  }
889 
890  return lastSourceTransition_;
891  }
InputSource::ItemTypeInfo lastSourceTransition_
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::shared_ptr< ActivityRegistry > actReg_

◆ nextTransitionTypeAsync()

void edm::EventProcessor::nextTransitionTypeAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  nextTask 
)

Definition at line 893 of file EventProcessor.cc.

References CMS_SA_ALLOW, Exception, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, lastTransitionType(), edm::errors::LogicError, eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, and sourceResourcesAcquirer_.

Referenced by handleNextItemAfterMergingRunEntries().

894  {
895  auto group = nextTask.group();
897  *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
898  CMS_SA_ALLOW try {
900  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
903  runStatus->runPrincipal()->run() == input_->run() &&
904  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
906  << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
907  << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
908  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
909  }
910  } catch (...) {
911  nextHolder.doneWaiting(std::current_exception());
912  }
913  });
914  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 1017 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1017  {
1018  if (fileBlockValid()) {
1019  schedule_->openOutputFiles(*fb_);
1020  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1021  }
1022  FDEBUG(1) << "\topenOutputFiles\n";
1023  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 277 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), and readFile().

277 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 278 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

278 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1081 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1081  {
1082  looper_->prepareForNextLoop(esp_.get());
1083  FDEBUG(1) << "\tprepareForNextLoop\n";
1084  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 143 of file EventProcessor.h.

References processConfiguration_.

143 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2345 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

2345  {
2346  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2347  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2361 of file EventProcessor.cc.

References actReg_, esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, dqmdumpme::first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), edm::StreamContext::kEvent, looper_, eostools::move(), principalCache_, processContext_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

2361  {
2362  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2363 
2365  Service<RandomNumberGenerator> rng;
2366  if (rng.isAvailable()) {
2367  Event ev(*pep, ModuleDescription(), nullptr);
2368  rng->postEventRead(ev);
2369  }
2370 
2371  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2372  using namespace edm::waiting_task::chain;
2373  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2374  EventTransitionInfo info(*pep, es);
2375  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2376  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2377  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2378  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2379  }
2380  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2381  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2382  ServiceRegistry::Operate operateLooper(serviceToken_);
2383  processEventWithLooper(*pep, iStreamIndex);
2384  }) | then([this, pep](auto nextTask) {
2385  FDEBUG(1) << "\tprocessEvent\n";
2386  StreamContext streamContext(pep->streamID(),
2388  pep->id(),
2389  pep->runPrincipal().index(),
2390  pep->luminosityBlockPrincipal().index(),
2391  pep->time(),
2392  &processContext_);
2393  ClearEventGuard guard(*this->actReg_.get(), streamContext);
2394  pep->clearEventPrincipal();
2395  }) | runLast(iHolder);
2396  }
ProcessContext processContext_
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 2398 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

2398  {
2399  bool randomAccess = input_->randomAccess();
2400  ProcessingController::ForwardState forwardState = input_->forwardState();
2401  ProcessingController::ReverseState reverseState = input_->reverseState();
2402  ProcessingController pc(forwardState, reverseState, randomAccess);
2403 
2405  do {
2406  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2407  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2408  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2409 
2410  bool succeeded = true;
2411  if (randomAccess) {
2412  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2413  input_->skipEvents(-2);
2414  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2415  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2416  }
2417  }
2418  pc.setLastOperationSucceeded(succeeded);
2419  } while (!pc.lastOperationSucceeded());
2421  shouldWeStop_ = true;
2422  }
2423  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processRuns()

InputSource::ItemType edm::EventProcessor::processRuns ( )

Definition at line 1176 of file EventProcessor.cc.

References cms::cuda::assert(), beginRunAsync(), continueLumiAsync(), handleNextItemAfterMergingRunEntries(), input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeRun(), setNeedToCallNext(), streamLumiActive_, streamRunActive_, streamRunStatus_, and taskGroup_.

1176  {
1177  FinalWaitingTask waitTask{taskGroup_};
1179  if (streamRunActive_ == 0) {
1180  assert(streamLumiActive_ == 0);
1181 
1182  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1183  WaitingTaskHolder{taskGroup_, &waitTask});
1184  } else {
1186 
1187  auto runStatus = streamRunStatus_[0];
1188 
1190  runStatus->runPrincipal()->run() == input_->run() and
1191  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1192  readAndMergeRun(*runStatus);
1194  }
1195 
1196  setNeedToCallNext(false);
1197 
1198  WaitingTaskHolder holder{taskGroup_, &waitTask};
1199  runStatus->setHolderOfTaskInProcessRuns(holder);
1200  if (streamLumiActive_ > 0) {
1202  continueLumiAsync(std::move(holder));
1203  } else {
1205  }
1206  }
1207  waitTask.wait();
1208  return lastTransitionType();
1209  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
assert(be >=bs)
PreallocationConfiguration preallocations_
void setNeedToCallNext(bool val)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void readAndMergeRun(RunProcessingStatus &)
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
oneapi::tbb::task_group taskGroup_
std::atomic< unsigned int > streamRunActive_
void continueLumiAsync(WaitingTaskHolder)
std::atomic< unsigned int > streamLumiActive_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ readAndMergeLumi()

void edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2021 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by continueLumiAsync(), and readAndMergeLumiEntriesAsync().

2021  {
2022  auto& lumiPrincipal = *iStatus.lumiPrincipal();
2023  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2024  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2025  input_->processHistoryRegistry().reducedProcessHistoryID(
2026  input_->luminosityBlockAuxiliary()->processHistoryID()));
2027  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2028  assert(lumiOK);
2029  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2030  {
2031  SendSourceTerminationSignalIfException sentry(actReg_.get());
2032  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2033  sentry.completedSuccessfully();
2034  }
2035  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeLumiEntriesAsync()

void edm::EventProcessor::readAndMergeLumiEntriesAsync ( std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2136 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::InputSource::LastItemToBeMerged, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeLumi(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceMutex_, and sourceResourcesAcquirer_.

Referenced by beginLumiAsync().

2137  {
2138  auto group = iHolder.group();
2140  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2141  CMS_SA_ALLOW try {
2143 
2144  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2145 
2147  setNeedToCallNext(false);
2148 
2150  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2151  readAndMergeLumi(*iLumiStatus);
2153  setNeedToCallNext(true);
2154  return;
2155  }
2157  }
2158  } catch (...) {
2159  holder.doneWaiting(std::current_exception());
2160  }
2161  });
2162  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readAndMergeRun()

void edm::EventProcessor::readAndMergeRun ( RunProcessingStatus iStatus)

Definition at line 1995 of file EventProcessor.cc.

References actReg_, edm::Principal::adjustToNewProductRegistry(), cms::cuda::assert(), input_, edm::RunPrincipal::mergeAuxiliary(), preg_, and edm::RunProcessingStatus::runPrincipal().

Referenced by processRuns(), and readAndMergeRunEntriesAsync().

1995  {
1996  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1997 
1998  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1999  assert(runOK);
2000  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2001  {
2002  SendSourceTerminationSignalIfException sentry(actReg_.get());
2003  input_->readAndMergeRun(runPrincipal);
2004  sentry.completedSuccessfully();
2005  }
2006  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRunEntriesAsync()

void edm::EventProcessor::readAndMergeRunEntriesAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2102 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, edm::InputSource::LastItemToBeMerged, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeRun(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceMutex_, sourceResourcesAcquirer_, and mps_update::status.

Referenced by beginRunAsync().

2103  {
2104  auto group = iHolder.group();
2106  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2107  CMS_SA_ALLOW try {
2109 
2110  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2111 
2113  setNeedToCallNext(false);
2114 
2116  status->runPrincipal()->run() == input_->run() and
2117  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2118  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2119  status->setStopBeforeProcessingRun(true);
2120  return;
2121  }
2124  setNeedToCallNext(true);
2125  return;
2126  }
2128  }
2129  } catch (...) {
2130  status->setStopBeforeProcessingRun(true);
2131  holder.doneWaiting(std::current_exception());
2132  }
2133  });
2134  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeRun(RunProcessingStatus &)
InputSource::ItemTypeInfo lastTransitionType() const
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2330 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, streamLumiStatus_, and streamRunStatus_.

Referenced by readNextEventForStream().

2330  {
2331  //TODO this will have to become per stream
2332  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2333  StreamContext streamContext(event.streamID(), &processContext_);
2334 
2335  SendSourceTerminationSignalIfException sentry(actReg_.get());
2336  input_->readEvent(event, streamContext);
2337 
2338  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2339  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2340  sentry.completedSuccessfully();
2341 
2342  FDEBUG(1) << "\treadEvent\n";
2343  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 983 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, streamLumiActive_, streamLumiStatus_, streamRunActive_, and streamRunStatus_.

983  {
984  FDEBUG(1) << " \treadFile\n";
985  size_t size = preg_->size();
986  SendSourceTerminationSignalIfException sentry(actReg_.get());
987 
988  if (streamRunActive_ > 0) {
989  streamRunStatus_[0]->runPrincipal()->preReadFile();
990  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
991  }
992 
993  if (streamLumiActive_ > 0) {
994  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
995  }
996 
997  fb_ = input_->readFile();
998  if (size < preg_->size()) {
1000  }
1003  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1004  }
1005  sentry.completedSuccessfully();
1006  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
PrincipalCache principalCache_

◆ readLuminosityBlock()

std::shared_ptr< LuminosityBlockPrincipal > edm::EventProcessor::readLuminosityBlock ( std::shared_ptr< RunPrincipal rp)

Definition at line 2008 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableLumiPrincipalPtr(), historyAppender_, input_, eostools::move(), and principalCache_.

Referenced by beginLumiAsync().

2008  {
2010  assert(lbp);
2011  lbp->setAux(*input_->luminosityBlockAuxiliary());
2012  {
2013  SendSourceTerminationSignalIfException sentry(actReg_.get());
2014  input_->readLuminosityBlock(*lbp, *historyAppender_);
2015  sentry.completedSuccessfully();
2016  }
2017  lbp->setRunPrincipal(std::move(rp));
2018  return lbp;
2019  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(be >=bs)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( WaitingTaskHolder const &  iTask,
unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iStatus 
)
private

Definition at line 2198 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::eventProcessingState(), Exception, input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kProcessing, edm::LuminosityBlockProcessingStatus::kStopLumi, lastSourceTransition_, lastTransitionType(), edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), needToCallNext(), nextTransitionType(), or, readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setEventProcessingState(), setNeedToCallNext(), shouldWeStop(), sourceMutex_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync().

2200  {
2201  // This function returns true if it successfully reads an event for the stream and that
2202  // requires both that an event is next and there are no problems or requests to stop.
2203 
2204  if (iTask.taskHasFailed()) {
2205  // We want all streams to stop or all streams to pause. If we are already in the
2206  // middle of pausing streams, then finish pausing all of them and the lumi will be
2207  // ended later. Otherwise, just end it now.
2208  if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2210  }
2211  return false;
2212  }
2213 
2214  // Did another stream already stop or pause this lumi?
2215  if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2216  return false;
2217  }
2218 
2219  // Are output modules or the looper requesting we stop?
2220  if (shouldWeStop()) {
2223  return false;
2224  }
2225 
2227 
2228  // need to use lock in addition to the serial task queue because
2229  // of delayed provenance reading and reading data in response to
2230  // edm::Refs etc
2231  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2232 
2233  // If we didn't already call nextTransitionType while merging lumis, call it here.
2234  // This asks the input source what is next and also checks for signals.
2235 
2237  setNeedToCallNext(true);
2238 
2239  if (InputSource::ItemType::IsEvent != itemType) {
2240  // IsFile may continue processing the lumi and
2241  // looper_ can cause the input source to declare a new IsRun which is actually
2242  // just a continuation of the previous run
2244  (InputSource::ItemType::IsRun == itemType and
2245  (iStatus.lumiPrincipal()->run() != input_->run() or
2246  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2247  if (itemType == InputSource::ItemType::IsLumi &&
2248  iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2250  << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2251  << "but the next lumi entry has the same lumi number.\n"
2252  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2253  }
2255  } else {
2257  }
2258  return false;
2259  }
2260  readEvent(iStreamIndex);
2261  return true;
2262  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemTypeInfo lastSourceTransition_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
bool needToCallNext() const
ServiceToken serviceToken_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
bool shouldWeStop() const

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1976 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1976  {
1977  SendSourceTerminationSignalIfException sentry(actReg_.get());
1978  input_->readProcessBlock(processBlockPrincipal);
1979  sentry.completedSuccessfully();
1980  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::shared_ptr< RunPrincipal > edm::EventProcessor::readRun ( )

Definition at line 1982 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableRunPrincipalPtr(), historyAppender_, input_, and principalCache_.

Referenced by beginRunAsync().

1982  {
1984  assert(rp);
1985  rp->setAux(*input_->runAuxiliary());
1986  {
1987  SendSourceTerminationSignalIfException sentry(actReg_.get());
1988  input_->readRun(*rp, *historyAppender_);
1989  sentry.completedSuccessfully();
1990  }
1991  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1992  return rp;
1993  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ releaseBeginRunResources()

void edm::EventProcessor::releaseBeginRunResources ( unsigned int  iStream)

Definition at line 1442 of file EventProcessor.cc.

References queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), mps_update::status, streamQueues_, and streamRunStatus_.

Referenced by streamBeginRunAsync().

1442  {
1443  auto& status = streamRunStatus_[iStream];
1444  if (status->streamFinishedBeginRun()) {
1445  status->resetBeginResources();
1447  }
1448  streamQueues_[iStream].resume();
1449  }
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 1042 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1042  {
1043  if (fileBlockValid()) {
1044  schedule_->respondToCloseInputFile(*fb_);
1045  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1046  }
1047  FDEBUG(1) << "\trespondToCloseInputFile\n";
1048  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 1032 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1032  {
1033  if (fileBlockValid()) {
1035  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1036  schedule_->respondToOpenInputFile(*fb_);
1037  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1038  }
1039  FDEBUG(1) << "\trespondToOpenInputFile\n";
1040  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1075 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1075  {
1076  input_->repeat();
1077  input_->rewind();
1078  FDEBUG(1) << "\trewind\n";
1079  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 378 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

378 { return runToCompletion(); }
StatusCode runToCompletion()

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 916 of file EventProcessor.cc.

References actReg_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

916  {
917  beginJob(); //make sure this was called
918 
919  // make the services available
921  actReg_->beginProcessingSignal_();
922  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
923  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
924  try {
925  FilesProcessor fp(fileModeNoMerge_);
926 
927  convertException::wrap([&]() {
928  bool firstTime = true;
929  do {
930  if (not firstTime) {
932  rewindInput();
933  } else {
934  firstTime = false;
935  }
936  startingNewLoop();
937 
938  auto trans = fp.processFiles(*this);
939 
940  fp.normalEnd();
941 
942  if (deferredExceptionPtrIsSet_.load()) {
943  std::rethrow_exception(deferredExceptionPtr_);
944  }
945  if (trans != InputSource::ItemType::IsStop) {
946  //problem with the source
947  doErrorStuff();
948 
949  throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
950  }
951  } while (not endOfLoop());
952  }); // convertException::wrap
953 
954  } // Try block
955  catch (cms::Exception& e) {
957  std::string message(
958  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
959  e.addAdditionalInfo(message);
960  if (e.alreadyPrinted()) {
961  LogAbsolute("Additional Exceptions") << message;
962  }
963  }
964  if (exceptionMessageRuns_) {
965  std::string message(
966  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
967  e.addAdditionalInfo(message);
968  if (e.alreadyPrinted()) {
969  LogAbsolute("Additional Exceptions") << message;
970  }
971  }
972  if (!exceptionMessageFiles_.empty()) {
973  e.addAdditionalInfo(exceptionMessageFiles_);
974  if (e.alreadyPrinted()) {
975  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
976  }
977  }
978  throw;
979  }
980  return epSuccess;
981  }
std::atomic< bool > exceptionMessageLumis_
std::atomic< bool > exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2446 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

2446  {
2447  bool expected = false;
2448  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2449  deferredExceptionPtr_ = iException;
2450  return true;
2451  }
2452  return false;
2453  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2440 of file EventProcessor.cc.

References exceptionMessageFiles_.

2440 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2444 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2444 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( )

Definition at line 2442 of file EventProcessor.cc.

References exceptionMessageRuns_.

Referenced by handleEndRunExceptions().

2442 { exceptionMessageRuns_ = true; }
std::atomic< bool > exceptionMessageRuns_

◆ setNeedToCallNext()

void edm::EventProcessor::setNeedToCallNext ( bool  val)
inlineprivate

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1086 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1086  {
1087  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1088  if (!subProcesses_.empty()) {
1089  for (auto const& subProcess : subProcesses_) {
1090  if (subProcess.shouldWeCloseOutput()) {
1091  return true;
1092  }
1093  }
1094  return false;
1095  }
1096  return schedule_->shouldWeCloseOutput();
1097  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2425 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2425  {
2426  FDEBUG(1) << "\tshouldWeStop\n";
2427  if (shouldWeStop_)
2428  return true;
2429  if (!subProcesses_.empty()) {
2430  for (auto const& subProcess : subProcesses_) {
2431  if (subProcess.terminate()) {
2432  return true;
2433  }
2434  }
2435  return false;
2436  }
2437  return schedule_->terminate();
2438  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 1050 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

1050  {
1051  shouldWeStop_ = false;
1052  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1053  // until after we've called beginOfJob
1054  if (looper_ && looperBeginJobRun_) {
1055  looper_->doStartingNewLoop();
1056  }
1057  FDEBUG(1) << "\tstartingNewLoop\n";
1058  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamBeginRunAsync()

void edm::EventProcessor::streamBeginRunAsync ( unsigned int  iStream,
std::shared_ptr< RunProcessingStatus status,
WaitingTaskHolder  iHolder 
)
noexcept

Definition at line 1411 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::RunProcessingStatus::didGlobalBeginSucceed(), esp_, edm::RunProcessingStatus::eventSetupImpl(), edm::RunProcessingStatus::eventSetupImpls(), dqmdumpme::first, eostools::move(), releaseBeginRunResources(), edm::waiting_task::chain::runLast(), edm::RunProcessingStatus::runPrincipal(), schedule_, serviceToken_, mps_update::status, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync().

1413  {
1414  // These shouldn't throw
1415  streamQueues_[iStream].pause();
1416  ++streamRunActive_;
1417  streamRunStatus_[iStream] = std::move(status);
1418 
1419  CMS_SA_ALLOW try {
1420  using namespace edm::waiting_task::chain;
1421  chain::first([this, iStream](auto nextTask) {
1422  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1423  if (rs.didGlobalBeginSucceed()) {
1424  RunTransitionInfo transitionInfo(
1425  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1426  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1427  beginStreamTransitionAsync<Traits>(
1428  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1429  }
1430  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1431  if (exceptionFromBeginStreamRun) {
1432  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1433  }
1434  releaseBeginRunResources(iStream);
1435  }) | runLast(iHolder);
1436  } catch (...) {
1437  releaseBeginRunResources(iStream);
1438  iHolder.doneWaiting(std::current_exception());
1439  }
1440  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void releaseBeginRunResources(unsigned int iStream)
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1917 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, submitPVValidationJobs::t, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1917  {
1918  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1919  auto status = streamLumiStatus_[iStreamIndex];
1920  if (iException) {
1921  handleEndLumiExceptions(*iException, iTask);
1922  }
1923 
1924  // reset status before releasing queue else get race condition
1925  streamLumiStatus_[iStreamIndex].reset();
1927  streamQueues_[iStreamIndex].resume();
1928 
1929  //are we the last one?
1930  if (status->streamFinishedLumi()) {
1932  }
1933  });
1934 
1935  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1936 
1937  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1938  // therefore we do not want to hold the shared_ptr
1939  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1940  lumiStatus->setEndTime();
1941 
1942  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1943  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1944  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1945 
1946  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1947  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1948  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1949  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1950  *schedule_,
1951  iStreamIndex,
1952  transitionInfo,
1953  serviceToken_,
1954  subProcesses_,
1955  cleaningUpAfterException);
1956  }
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
bool taskHasFailed() const noexcept
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndRunAsync()

void edm::EventProcessor::streamEndRunAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1590 of file EventProcessor.cc.

References CMS_SA_ALLOW, esp_, exceptionRunStatus_, edm::WaitingTaskHolder::group(), handleEndRunExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endRunAsync().

1590  {
1591  CMS_SA_ALLOW try {
1592  if (!streamRunStatus_[iStreamIndex]) {
1593  if (exceptionRunStatus_->streamFinishedRun()) {
1594  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1595  exceptionRunStatus_.reset();
1596  }
1597  return;
1598  }
1599 
1600  auto runDoneTask =
1601  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1602  if (iException) {
1603  handleEndRunExceptions(*iException, iTask);
1604  }
1605 
1606  auto runStatus = streamRunStatus_[iStreamIndex];
1607 
1608  //reset status before releasing queue else get race condition
1609  if (runStatus->streamFinishedRun()) {
1610  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1611  }
1612  streamRunStatus_[iStreamIndex].reset();
1613  --streamRunActive_;
1614  streamQueues_[iStreamIndex].resume();
1615  });
1616 
1617  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1618 
1619  auto runStatus = streamRunStatus_[iStreamIndex].get();
1620 
1621  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1622  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1623  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1624  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1625 
1626  auto& runPrincipal = *runStatus->runPrincipal();
1627  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1628  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1629  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1630  *schedule_,
1631  iStreamIndex,
1632  transitionInfo,
1633  serviceToken_,
1634  subProcesses_,
1635  cleaningUpAfterException);
1636  }
1637  } catch (...) {
1638  handleEndRunExceptions(std::current_exception(), iTask);
1639  }
1640  }
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 625 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

625  {
628  task.waitNoThrow();
629  assert(task.done());
630  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 283 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

283  {
285  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 286 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

286  {
288  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2455 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2455  {
2456  cms::Exception ex("ModulesSynchingOnLumis");
2457  ex << "The framework is configured to use at least two streams, but the following modules\n"
2458  << "require synchronizing on LuminosityBlock boundaries:";
2459  bool found = false;
2460  for (auto worker : schedule_->allWorkers()) {
2461  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2462  found = true;
2463  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2464  }
2465  }
2466  if (found) {
2467  ex << "\n\nThe situation can be fixed by either\n"
2468  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2469  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2470  throw ex;
2471  }
2472  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 836 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

836 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 840 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

840 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 838 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

838 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutModulesRequiringRunSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringRunSynchronization ( ) const
private

Definition at line 2474 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

2474  {
2475  std::unique_ptr<LogSystem> s;
2476  for (auto worker : schedule_->allWorkers()) {
2477  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2478  if (not s) {
2479  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2480  (*s) << "The following modules require synchronizing on Run boundaries:";
2481  }
2482  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2483  }
2484  }
2485  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 2076 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), findAndChange::op, processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

2076  {
2077  using namespace edm::waiting_task;
2078  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2079  chain::first([&](auto nextTask) {
2081 
2082  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2083  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2084  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2086  for (auto& s : subProcesses_) {
2087  s.writeLumiAsync(nextTask, lumiPrincipal);
2088  }
2090  }
2091  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 2037 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), eostools::move(), findAndChange::op, principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

2037  {
2038  using namespace edm::waiting_task;
2039  chain::first([&](auto nextTask) {
2041  schedule_->writeProcessBlockAsync(
2042  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2043  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2045  for (auto& s : subProcesses_) {
2046  s.writeProcessBlockAsync(nextTask, processBlockType);
2047  }
2048  }) | chain::runLast(std::move(task));
2049  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
RunPrincipal const &  runPrincipal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 2051 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::RunPrincipal::kNo, eostools::move(), findAndChange::op, processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, edm::RunPrincipal::shouldWriteRun(), subProcesses_, and TrackValidation_cff::task.

Referenced by globalEndRunAsync().

2053  {
2054  using namespace edm::waiting_task;
2055  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2056  chain::first([&](auto nextTask) {
2058  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2059  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2061  for (auto& s : subProcesses_) {
2062  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2063  }
2064  }) | chain::runLast(std::move(task));
2065  }
2066  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 320 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 353 of file EventProcessor.h.

Referenced by beginJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 336 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 310 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 348 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 347 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 372 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 369 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 356 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 358 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::atomic<bool> edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 357 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ exceptionRunStatus_

std::shared_ptr<RunProcessingStatus> edm::EventProcessor::exceptionRunStatus_
private

Definition at line 331 of file EventProcessor.h.

Referenced by beginRunAsync(), and streamEndRunAsync().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 355 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 365 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 361 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 359 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 341 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemTypeInfo edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 360 of file EventProcessor.h.

Referenced by beginRunAsync(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 324 of file EventProcessor.h.

Referenced by init().

◆ modulesToIgnoreForDeleteEarly_

std::vector<std::string> edm::EventProcessor::modulesToIgnoreForDeleteEarly_
private

Definition at line 338 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ moduleTypeResolverMaker_

edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const> > edm::EventProcessor::moduleTypeResolverMaker_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init().

◆ needToCallNext_

bool edm::EventProcessor::needToCallNext_ = true
private

Definition at line 373 of file EventProcessor.h.

Referenced by needToCallNext(), and setNeedToCallNext().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 323 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 309 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), readAndMergeRun(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 371 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 321 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), and processConfiguration().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

◆ referencesToBranches_

std::multimap<std::string, std::string> edm::EventProcessor::referencesToBranches_
private

Definition at line 337 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ runQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::runQueue_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 354 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

◆ streamQueuesInserter_

SerialTaskQueue edm::EventProcessor::streamQueuesInserter_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRunAsync(), and endRunAsync().

◆ streamRunActive_

std::atomic<unsigned int> edm::EventProcessor::streamRunActive_ {0}
private

◆ streamRunStatus_

std::vector<std::shared_ptr<RunProcessingStatus> > edm::EventProcessor::streamRunStatus_
private

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 312 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().