CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRunAsync (IOVSyncValue const &, WaitingTaskHolder)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void clearLumiPrincipal (LuminosityBlockProcessingStatus &)
 
void clearRunPrincipal (RunProcessingStatus &)
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (WaitingTaskHolder)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRunAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void endUnfinishedLumi (bool cleaningUpAfterException)
 
void endUnfinishedRun (bool cleaningUpAfterException)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void globalEndRunAsync (WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
 
void handleEndLumiExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void handleEndRunExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void inputProcessBlocks ()
 
InputSource::ItemTypeInfo lastTransitionType () const
 
InputSource::ItemTypeInfo nextTransitionType ()
 
void nextTransitionTypeAsync (std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processRuns ()
 
void readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
void readAndMergeRun (RunProcessingStatus &)
 
void readFile ()
 
std::shared_ptr< LuminosityBlockPrincipalreadLuminosityBlock (std::shared_ptr< RunPrincipal > rp)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::shared_ptr< RunPrincipalreadRun ()
 
void releaseBeginRunResources (unsigned int iStream)
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns ()
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamBeginRunAsync (unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
 
void streamEndLumiAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void streamEndRunAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void handleNextItemAfterMergingRunEntries (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
bool needToCallNext () const
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readAndMergeLumiEntriesAsync (std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
 
void readAndMergeRunEntriesAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
 
void setNeedToCallNext (bool val)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 
void warnAboutModulesRequiringRunSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::atomic< bool > exceptionMessageRuns_
 
std::shared_ptr< RunProcessingStatusexceptionRunStatus_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemTypeInfo lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
std::vector< std::string > modulesToIgnoreForDeleteEarly_
 
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
 
bool needToCallNext_ = true
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
std::multimap< std::string, std::string > referencesToBranches_
 
std::unique_ptr< edm::LimitedTaskQueuerunQueue_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
SerialTaskQueue streamQueuesInserter_
 
std::atomic< unsigned int > streamRunActive_ {0}
 
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 69 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 371 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 372 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 236 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 79 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 214 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

219  : actReg_(),
220  preg_(),
222  serviceToken_(),
223  input_(),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 253 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

256  : actReg_(),
257  preg_(),
259  serviceToken_(),
260  input_(),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 290 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

293  : actReg_(),
294  preg_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
322  init(processDesc, token, legacy);
323  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 607 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, findAndChange::op, schedule_, and unpackBuffers-CaloStage2::token.

607  {
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 632 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, HltBtagPostValidation_cff::c, edm::checkForModuleDependencyCorrectness(), ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), deleteNonConsumedUnscheduledModules_, makeListRunsInFiles::description, esp_, espController_, edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, MainPageGenerator::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, merge(), modulesToIgnoreForDeleteEarly_, eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, referencesToBranches_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, edm::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), warnAboutModulesRequiringRunSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

632  {
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
642  service::SystemBounds bounds(preallocations_.numberOfStreams(),
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
700  }
701  if (preallocations_.numberOfRuns() > 1) {
703  }
705 
706  //NOTE: This implementation assumes 'Job' means one call
707  // the EventProcessor::run
708  // If it really means once per 'application' then this code will
709  // have to be changed.
710  // Also have to deal with case where have 'run' then new Module
711  // added and do 'run'
712  // again. In that case the newly added Module needs its 'beginJob'
713  // to be called.
714 
715  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
716  // For now we delay calling beginOfJob until first beginOfRun
717  //if(looper_) {
718  // looper_->beginOfJob(es);
719  //}
720  espController_->finishConfiguration();
721  actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
722  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
723  try {
724  convertException::wrap([&]() { input_->doBeginJob(); });
725  } catch (cms::Exception& ex) {
726  ex.addContext("Calling beginJob for the source");
727  throw;
728  }
729 
730  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
731  if (looper_) {
732  constexpr bool mustPrefetchMayGet = true;
733  auto const processBlockLookup = preg_->productLookup(InProcess);
734  auto const runLookup = preg_->productLookup(InRun);
735  auto const lumiLookup = preg_->productLookup(InLumi);
736  auto const eventLookup = preg_->productLookup(InEvent);
737  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
740  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
741  looper_->updateLookup(esp_->recordsToResolverIndices());
742  }
743  // toerror.succeeded(); // should we add this?
744  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
745  actReg_->postBeginJobSignal_();
746 
747  oneapi::tbb::task_group group;
749  using namespace edm::waiting_task::chain;
750  first([this](auto nextTask) {
751  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
752  first([i, this](auto nextTask) {
754  schedule_->beginStream(i);
755  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
757  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
758  }) | lastTask(nextTask);
759  }
761  last.wait();
762  }
ProcessContext processContext_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
int merge(int argc, char *argv[])
Definition: DiMuonVmerge.cc:27
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::multimap< std::string, std::string > referencesToBranches_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::string > modulesToIgnoreForDeleteEarly_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:169
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( IOVSyncValue const &  iSync,
std::shared_ptr< RunProcessingStatus iRunStatus,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1667 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), dqmdumpme::first, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), edm::InputSource::LastItemToBeMerged, lastTransitionType(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeLumiEntriesAsync(), readLuminosityBlock(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, streamQueuesInserter_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1669  {
1670  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1671 
1672  auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1673  chain::first([this, &iSync, &status](auto nextTask) {
1674  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1675  nextTask,
1676  status->endIOVWaitingTasks(),
1677  status->eventSetupImpls(),
1679  actReg_.get(),
1680  serviceToken_);
1681  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1682  CMS_SA_ALLOW try {
1683  //the call to doneWaiting will cause the count to decrement
1684  if (iException) {
1685  WaitingTaskHolder copyHolder(nextTask);
1686  copyHolder.doneWaiting(*iException);
1687  }
1688 
1689  lumiQueue_->pushAndPause(
1690  *nextTask.group(),
1691  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1692  CMS_SA_ALLOW try {
1693  if (postLumiQueueTask.taskHasFailed()) {
1694  status->resetResources();
1696  endRunAsync(iRunStatus, postLumiQueueTask);
1697  return;
1698  }
1699 
1700  status->setResumer(std::move(iResumer));
1701 
1703  *postLumiQueueTask.group(),
1704  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1705  CMS_SA_ALLOW try {
1707 
1708  if (postSourceTask.taskHasFailed()) {
1709  status->resetResources();
1711  endRunAsync(iRunStatus, postSourceTask);
1712  return;
1713  }
1714 
1715  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1716 
1717  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1718  {
1719  SendSourceTerminationSignalIfException sentry(actReg_.get());
1720  input_->doBeginLumi(lumiPrincipal, &processContext_);
1721  sentry.completedSuccessfully();
1722  }
1723 
1724  Service<RandomNumberGenerator> rng;
1725  if (rng.isAvailable()) {
1726  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1727  rng->preBeginLumi(lb);
1728  }
1729 
1730  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1731 
1732  using namespace edm::waiting_task::chain;
1733  chain::first([this, status](auto nextTask) mutable {
1736  } else {
1737  setNeedToCallNext(true);
1738  }
1739  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1740  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1741  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1742  beginGlobalTransitionAsync<Traits>(
1743  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1744  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1745  looper_->prefetchAsync(
1746  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1747  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1748  status->globalBeginDidSucceed();
1749  ServiceRegistry::Operate operateLooper(serviceToken_);
1750  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1751  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1752  if (iException) {
1753  status->resetResources();
1755  WaitingTaskHolder copyHolder(holder);
1756  copyHolder.doneWaiting(*iException);
1757  endRunAsync(iRunStatus, holder);
1758  } else {
1759  if (not looper_) {
1760  status->globalBeginDidSucceed();
1761  }
1762 
1763  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1764 
1765  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1766  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1767 
1768  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1769  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1770  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1771  streamQueues_[i].pause();
1772 
1773  auto& event = principalCache_.eventPrincipal(i);
1774  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1775  // held by the container as this lambda may not finish executing before all the tasks it
1776  // spawns have already started to run.
1777  auto eventSetupImpls = &status->eventSetupImpls();
1778  auto lp = status->lumiPrincipal().get();
1781  event.setLuminosityBlockPrincipal(lp);
1782  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1783  using namespace edm::waiting_task::chain;
1784  chain::first([this, i, &transitionInfo](auto nextTask) {
1785  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1786  *schedule_,
1787  i,
1788  transitionInfo,
1789  serviceToken_,
1790  subProcesses_);
1791  }) |
1792  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1793  auto nextTask) {
1794  if (exceptionFromBeginStreamLumi) {
1795  WaitingTaskHolder copyHolder(nextTask);
1796  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1797  }
1799  }) |
1800  runLast(std::move(holder));
1801  });
1802  } // end for loop over streams
1803  });
1804  }
1805  }) | runLast(postSourceTask);
1806  } catch (...) {
1807  status->resetResources();
1809  WaitingTaskHolder copyHolder(postSourceTask);
1810  copyHolder.doneWaiting(std::current_exception());
1811  endRunAsync(iRunStatus, postSourceTask);
1812  }
1813  }); // task in sourceResourcesAcquirer
1814  } catch (...) {
1815  status->resetResources();
1817  WaitingTaskHolder copyHolder(postLumiQueueTask);
1818  copyHolder.doneWaiting(std::current_exception());
1819  endRunAsync(iRunStatus, postLumiQueueTask);
1820  }
1821  }); // task in lumiQueue
1822  } catch (...) {
1823  status->resetResources();
1825  WaitingTaskHolder copyHolder(nextTask);
1826  copyHolder.doneWaiting(std::current_exception());
1827  endRunAsync(iRunStatus, nextTask);
1828  }
1829  }) | chain::runLast(std::move(iHolder));
1830  }
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
InputSource::ItemTypeInfo lastTransitionType() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1109 of file EventProcessor.cc.

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1109  {
1110  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1111  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1112 
1113  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1114  FinalWaitingTask globalWaitTask{taskGroup_};
1115 
1116  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1117  beginGlobalTransitionAsync<Traits>(
1118  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1119 
1120  globalWaitTask.wait();
1121  beginProcessBlockSucceeded = true;
1122  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRunAsync()

void edm::EventProcessor::beginRunAsync ( IOVSyncValue const &  iSync,
WaitingTaskHolder  iHolder 
)

Definition at line 1212 of file EventProcessor.cc.

References actReg_, edm::BeginRun, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, exceptionRunStatus_, dqmdumpme::first, forceESCacheClearOnNewRun_, globalEndRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, handleNextItemAfterMergingRunEntries(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::InputSource::LastItemToBeMerged, lastTransitionType(), looper_, looperBeginJobRun_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeRunEntriesAsync(), readRun(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), runQueue_, schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceResourcesAcquirer_, mps_update::status, streamBeginRunAsync(), streamQueues_, streamQueuesInserter_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and edm::waiting_task::chain::then().

Referenced by endRunAsync(), and processRuns().

1212  {
1213  if (iHolder.taskHasFailed()) {
1214  return;
1215  }
1216 
1217  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1218 
1219  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1220 
1221  chain::first([this, &status, iSync](auto nextTask) {
1222  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1223  nextTask,
1224  status->endIOVWaitingTasks(),
1225  status->eventSetupImpls(),
1227  actReg_.get(),
1228  serviceToken_,
1230  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1231  CMS_SA_ALLOW try {
1232  if (iException) {
1233  WaitingTaskHolder copyHolder(nextTask);
1234  copyHolder.doneWaiting(*iException);
1235  // Finish handling the exception in the task pushed to runQueue_
1236  }
1238 
1239  runQueue_->pushAndPause(
1240  *nextTask.group(),
1241  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1242  CMS_SA_ALLOW try {
1243  if (postRunQueueTask.taskHasFailed()) {
1244  status->resetBeginResources();
1246  return;
1247  }
1248 
1249  status->setResumer(std::move(iResumer));
1250 
1252  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1253  CMS_SA_ALLOW try {
1255 
1256  if (postSourceTask.taskHasFailed()) {
1257  status->resetBeginResources();
1259  status->resumeGlobalRunQueue();
1260  return;
1261  }
1262 
1263  status->setRunPrincipal(readRun());
1264 
1265  RunPrincipal& runPrincipal = *status->runPrincipal();
1266  {
1267  SendSourceTerminationSignalIfException sentry(actReg_.get());
1268  input_->doBeginRun(runPrincipal, &processContext_);
1269  sentry.completedSuccessfully();
1270  }
1271 
1272  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1273  if (looper_ && looperBeginJobRun_ == false) {
1274  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1275 
1276  oneapi::tbb::task_group group;
1277  FinalWaitingTask waitTask{group};
1278  using namespace edm::waiting_task::chain;
1279  chain::first([this, &es](auto nextTask) {
1280  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1281  }) | then([this, &es](auto nextTask) mutable {
1282  looper_->beginOfJob(es);
1283  looperBeginJobRun_ = true;
1284  looper_->doStartingNewLoop();
1285  }) | runLast(WaitingTaskHolder(group, &waitTask));
1286  waitTask.wait();
1287  }
1288 
1289  using namespace edm::waiting_task::chain;
1290  chain::first([this, status](auto nextTask) mutable {
1291  CMS_SA_ALLOW try {
1294  } else {
1295  setNeedToCallNext(true);
1296  }
1297  } catch (...) {
1298  status->setStopBeforeProcessingRun(true);
1299  nextTask.doneWaiting(std::current_exception());
1300  }
1301  }) | then([this, status, &es](auto nextTask) {
1302  if (status->stopBeforeProcessingRun()) {
1303  return;
1304  }
1305  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1306  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1307  beginGlobalTransitionAsync<Traits>(
1308  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1309  }) | then([status](auto nextTask) mutable {
1310  if (status->stopBeforeProcessingRun()) {
1311  return;
1312  }
1313  status->globalBeginDidSucceed();
1314  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1315  if (status->stopBeforeProcessingRun()) {
1316  return;
1317  }
1318  looper_->prefetchAsync(
1319  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1320  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1321  if (status->stopBeforeProcessingRun()) {
1322  return;
1323  }
1324  ServiceRegistry::Operate operateLooper(serviceToken_);
1325  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1326  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1327  bool precedingTasksSucceeded = true;
1328  if (iException) {
1329  precedingTasksSucceeded = false;
1330  WaitingTaskHolder copyHolder(holder);
1331  copyHolder.doneWaiting(*iException);
1332  }
1333 
1334  if (status->stopBeforeProcessingRun()) {
1335  // We just quit now if there was a failure when merging runs
1336  status->resetBeginResources();
1338  status->resumeGlobalRunQueue();
1339  return;
1340  }
1341  CMS_SA_ALLOW try {
1342  // Under normal circumstances, this task runs after endRun has completed for all streams
1343  // and global endLumi has completed for all lumis contained in this run
1344  auto globalEndRunTask =
1345  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1346  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1347  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1349  });
1350  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1351  } catch (...) {
1352  status->resetBeginResources();
1354  status->resumeGlobalRunQueue();
1355  holder.doneWaiting(std::current_exception());
1356  return;
1357  }
1358 
1359  // After this point we are committed to end the run via endRunAsync
1360 
1362 
1363  // The only purpose of the pause is to cause stream begin run to execute before
1364  // global begin lumi in the single threaded case (maintains consistency with
1365  // the order that existed before concurrent runs were implemented).
1366  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1367 
1368  CMS_SA_ALLOW try {
1370  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1371  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1372  CMS_SA_ALLOW try {
1373  streamQueues_[i].push(
1374  *holder.group(),
1375  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1377  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1378  });
1379  } catch (...) {
1380  if (status->streamFinishedBeginRun()) {
1381  WaitingTaskHolder copyHolder(holder);
1382  copyHolder.doneWaiting(std::current_exception());
1383  status->resetBeginResources();
1386  }
1387  }
1388  }
1389  });
1390  } catch (...) {
1391  WaitingTaskHolder copyHolder(holder);
1392  copyHolder.doneWaiting(std::current_exception());
1393  status->resetBeginResources();
1396  }
1398  }) | runLast(postSourceTask);
1399  } catch (...) {
1400  status->resetBeginResources();
1402  status->resumeGlobalRunQueue();
1403  postSourceTask.doneWaiting(std::current_exception());
1404  }
1405  }); // task in sourceResourcesAcquirer
1406  } catch (...) {
1407  status->resetBeginResources();
1409  status->resumeGlobalRunQueue();
1410  postRunQueueTask.doneWaiting(std::current_exception());
1411  }
1412  }); // task in runQueue
1413  } catch (...) {
1414  status->resetBeginResources();
1416  nextTask.doneWaiting(std::current_exception());
1417  }
1418  }) | chain::runLast(std::move(iHolder));
1419  }
ProcessContext processContext_
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
std::shared_ptr< RunPrincipal > readRun()
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 282 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

282  {
284  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 285 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 849 of file EventProcessor.cc.

References epSignal, edm::JobReport::reportShutdownSignal(), runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

849  {
850  bool returnValue = false;
851 
852  // Look for a shutdown signal
853  if (shutdown_flag.load(std::memory_order_acquire)) {
854  returnValue = true;
855  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
857  jr->reportShutdownSignal();
859  }
860  return returnValue;
861  }
Log< level::System, false > LogSystem
volatile std::atomic< bool > shutdown_flag
void reportShutdownSignal()
Definition: JobReport.cc:543

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 843 of file EventProcessor.cc.

References schedule_.

843 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ clearLumiPrincipal()

void edm::EventProcessor::clearLumiPrincipal ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2109 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

2109  {
2110  for (auto& s : subProcesses_) {
2111  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2112  }
2113  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2114  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2115  iStatus.lumiPrincipal()->clearPrincipal();
2116  }
std::vector< SubProcess > subProcesses_

◆ clearRunPrincipal()

void edm::EventProcessor::clearRunPrincipal ( RunProcessingStatus iStatus)

Definition at line 2084 of file EventProcessor.cc.

References edm::RunPrincipal::kUninitialized, edm::RunProcessingStatus::runPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndRunAsync().

2084  {
2085  for (auto& s : subProcesses_) {
2086  s.clearRunPrincipal(*iStatus.runPrincipal());
2087  }
2088  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2089  iStatus.runPrincipal()->clearPrincipal();
2090  }
std::vector< SubProcess > subProcesses_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 1009 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

1009  {
1010  if (fileBlockValid()) {
1011  SendSourceTerminationSignalIfException sentry(actReg_.get());
1012  input_->closeFile(fb_.get(), cleaningUpAfterException);
1013  sentry.completedSuccessfully();
1014  }
1015  FDEBUG(1) << "\tcloseInputFile\n";
1016  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 1026 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

1026  {
1027  schedule_->closeOutputFiles();
1028  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
1029  processBlockHelper_->clearAfterOutputFilesClose();
1030  FDEBUG(1) << "\tcloseOutputFiles\n";
1031  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1832 of file EventProcessor.cc.

References dqmdumpme::first, h, handleNextEventForStreamAsync(), input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kProcessing, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeLumi(), edm::waiting_task::chain::runLast(), mps_update::status, streamLumiStatus_, and edm::waiting_task::chain::then().

Referenced by processRuns().

1832  {
1833  chain::first([this](auto nextTask) {
1834  //all streams are sharing the same status at the moment
1835  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1837 
1839  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1842  }
1843  }) | chain::then([this](auto nextTask) mutable {
1844  unsigned int streamIndex = 0;
1845  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1846  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1847  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1848  }
1849  nextTask.group()->run(
1850  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1851  }) | chain::runLast(std::move(iHolder));
1852  }
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
InputSource::ItemTypeInfo lastTransitionType() const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1100 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1100  {
1101  FDEBUG(1) << "\tdoErrorStuff\n";
1102  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1103  << "and went to the error state\n"
1104  << "Will attempt to terminate processing normally\n"
1105  << "(IF using the looper the next loop will be attempted)\n"
1106  << "This likely indicates a bug in an input module or corrupted input or both\n";
1107  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 764 of file EventProcessor.cc.

References actReg_, HltBtagPostValidation_cff::c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::first(), watchdog::group, mps_fire::i, input_, MainPageGenerator::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

764  {
765  // Collects exceptions, so we don't throw before all operations are performed.
766  ExceptionCollector c(
767  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
768 
769  //make the services available
771 
772  using namespace edm::waiting_task::chain;
773 
774  oneapi::tbb::task_group group;
775  edm::FinalWaitingTask waitTask{group};
776 
777  {
778  //handle endStream transitions
779  edm::WaitingTaskHolder taskHolder(group, &waitTask);
780  std::mutex collectorMutex;
781  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
782  first([this, i, &c, &collectorMutex](auto nextTask) {
783  std::exception_ptr ep;
784  try {
786  this->schedule_->endStream(i);
787  } catch (...) {
788  ep = std::current_exception();
789  }
790  if (ep) {
791  std::lock_guard<std::mutex> l(collectorMutex);
792  c.call([&ep]() { std::rethrow_exception(ep); });
793  }
794  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
795  for (auto& subProcess : subProcesses_) {
796  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
797  std::exception_ptr ep;
798  try {
800  subProcess.doEndStream(i);
801  } catch (...) {
802  ep = std::current_exception();
803  }
804  if (ep) {
805  std::lock_guard<std::mutex> l(collectorMutex);
806  c.call([&ep]() { std::rethrow_exception(ep); });
807  }
808  }) | lastTask(nextTask);
809  }
810  }) | lastTask(taskHolder);
811  }
812  }
813  waitTask.waitNoThrow();
814 
815  auto actReg = actReg_.get();
816  c.call([actReg]() { actReg->preEndJobSignal_(); });
817  schedule_->endJob(c);
818  for (auto& subProcess : subProcesses_) {
819  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
820  }
821  c.call(std::bind(&InputSource::doEndJob, input_.get()));
822  if (looper_) {
823  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
824  }
825  c.call([actReg]() { actReg->postEndJobSignal_(); });
826  if (c.hasThrown()) {
827  c.rethrow();
828  }
829  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:210
static std::mutex mutex
Definition: Proxy.cc:8
std::shared_ptr< EDLooperBase const > looper() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1061 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1061  {
1062  if (looper_) {
1063  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1064  looper_->setModuleChanger(&changer);
1065  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1066  looper_->setModuleChanger(nullptr);
1068  return true;
1069  else
1070  return false;
1071  }
1072  FDEBUG(1) << "\tendOfLoop\n";
1073  return true;
1074  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1150 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1150  {
1151  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1152 
1153  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1154  FinalWaitingTask globalWaitTask{taskGroup_};
1155 
1156  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1157  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1158  *schedule_,
1159  transitionInfo,
1160  serviceToken_,
1161  subProcesses_,
1162  cleaningUpAfterException);
1163  globalWaitTask.wait();
1164 
1165  if (beginProcessBlockSucceeded) {
1166  FinalWaitingTask writeWaitTask{taskGroup_};
1168  writeWaitTask.wait();
1169  }
1170 
1171  processBlockPrincipal.clearPrincipal();
1172  for (auto& s : subProcesses_) {
1173  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1174  }
1175  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRunAsync()

void edm::EventProcessor::endRunAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)

Definition at line 1462 of file EventProcessor.cc.

References actReg_, beginRunAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::RunPrincipal::endTime(), espController_, dqmdumpme::first, handleEndRunExceptions(), mps_fire::i, input_, edm::InputSource::IsRun, lastTransitionType(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), serviceToken_, edm::RunPrincipal::setEndTime(), streamEndRunAsync(), streamQueues_, streamQueuesInserter_, and edm::waiting_task::chain::then().

Referenced by beginLumiAsync(), endUnfinishedRun(), handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1462  {
1463  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1464  iRunStatus->setEndTime();
1465  IOVSyncValue ts(
1466  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1467  runPrincipal.endTime());
1468  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1469  WaitingTaskHolder copyHolder(iHolder);
1470  copyHolder.doneWaiting(std::current_exception());
1471  }
1472 
1473  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1474  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1475  nextTask,
1476  iRunStatus->endIOVWaitingTasksEndRun(),
1477  iRunStatus->eventSetupImplsEndRun(),
1479  actReg_.get(),
1480  serviceToken_);
1481  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1482  if (iException) {
1483  iRunStatus->setEndingEventSetupSucceeded(false);
1484  handleEndRunExceptions(*iException, nextTask);
1485  }
1487  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1488  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1489  CMS_SA_ALLOW try {
1490  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1491  streamQueues_[i].pause();
1492  streamEndRunAsync(std::move(nextTask), i);
1493  });
1494  } catch (...) {
1495  WaitingTaskHolder copyHolder(nextTask);
1496  copyHolder.doneWaiting(std::current_exception());
1497  }
1498  }
1499  });
1500 
1502  CMS_SA_ALLOW try {
1503  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1504  } catch (...) {
1505  WaitingTaskHolder copyHolder(nextTask);
1506  copyHolder.doneWaiting(std::current_exception());
1507  }
1508  }
1509  }) | chain::runLast(std::move(iHolder));
1510  }
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
InputSource::ItemTypeInfo lastTransitionType() const
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< ActivityRegistry > actReg_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( bool  cleaningUpAfterException)

Definition at line 1975 of file EventProcessor.cc.

References cms::cuda::assert(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamRunActive_, and taskGroup_.

1975  {
1976  if (streamRunActive_ == 0) {
1977  assert(streamLumiActive_ == 0);
1978  } else {
1980  if (streamLumiActive_ > 0) {
1981  FinalWaitingTask globalWaitTask{taskGroup_};
1983  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1984  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1985  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1986  }
1987  globalWaitTask.wait();
1988  }
1989  }
1990  }
assert(be >=bs)
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::atomic< unsigned int > streamLumiActive_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( bool  cleaningUpAfterException)

Definition at line 1653 of file EventProcessor.cc.

References endRunAsync(), edm::InputSource::IsStop, lastSourceTransition_, eostools::move(), streamRunActive_, streamRunStatus_, and taskGroup_.

1653  {
1654  if (streamRunActive_ > 0) {
1655  FinalWaitingTask waitTask{taskGroup_};
1656 
1657  auto runStatus = streamRunStatus_[0].get();
1658  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1659  WaitingTaskHolder holder{taskGroup_, &waitTask};
1660  runStatus->setHolderOfTaskInProcessRuns(holder);
1662  endRunAsync(streamRunStatus_[0], std::move(holder));
1663  waitTask.wait();
1664  }
1665  }
InputSource::ItemTypeInfo lastSourceTransition_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
oneapi::tbb::task_group taskGroup_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::atomic< unsigned int > streamRunActive_
def move(src, dest)
Definition: eostools.py:511

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 193 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

193 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 833 of file EventProcessor.cc.

References schedule_.

833  {
834  return schedule_->getAllModuleDescriptions();
835  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 831 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

831 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1863 of file EventProcessor.cc.

References clearLumiPrincipal(), CMS_SA_ALLOW, edm::EndLuminosityBlock, esp_, dqmdumpme::first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1864  {
1865  // Get some needed info out of the status object before moving
1866  // it into finalTaskForThisLumi.
1867  auto& lp = *(iLumiStatus->lumiPrincipal());
1868  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1869  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1870  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1871  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1872 
1873  using namespace edm::waiting_task::chain;
1874  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1875  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1876 
1877  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1878  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1879  endGlobalTransitionAsync<Traits>(
1880  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1881  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1882  //Only call writeLumi if beginLumi succeeded
1883  if (didGlobalBeginSucceed) {
1884  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1885  }
1886  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1887  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1888  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1889  //any thrown exception auto propagates to nextTask via the chain
1891  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1892  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1893  if (iException) {
1894  handleEndLumiExceptions(*iException, nextTask);
1895  }
1897 
1898  std::exception_ptr ptr;
1899 
1900  // Try hard to clean up resources so the
1901  // process can terminate in a controlled
1902  // fashion even after exceptions have occurred.
1903  // Caught exception is passed to handleEndLumiExceptions()
1904  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1905  if (not ptr) {
1906  ptr = std::current_exception();
1907  }
1908  }
1909  // Caught exception is passed to handleEndLumiExceptions()
1910  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1911  if (not ptr) {
1912  ptr = std::current_exception();
1913  }
1914  }
1915  // Caught exception is passed to handleEndLumiExceptions()
1916  CMS_SA_ALLOW try {
1917  status->resetResources();
1918  status->globalEndRunHolderDoneWaiting();
1919  status.reset();
1920  } catch (...) {
1921  if (not ptr) {
1922  ptr = std::current_exception();
1923  }
1924  }
1925 
1926  if (ptr && !iException) {
1927  handleEndLumiExceptions(ptr, nextTask);
1928  }
1929  }) | runLast(std::move(iTask));
1930  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ globalEndRunAsync()

void edm::EventProcessor::globalEndRunAsync ( WaitingTaskHolder  iTask,
std::shared_ptr< RunProcessingStatus iRunStatus 
)

Definition at line 1521 of file EventProcessor.cc.

References clearRunPrincipal(), CMS_SA_ALLOW, edm::EndRun, esp_, dqmdumpme::first, handleEndRunExceptions(), edm::waiting_task::chain::ifThen(), looper_, eostools::move(), edm::MergeableRunProductMetadata::preWriteRun(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeRunAsync().

Referenced by beginRunAsync().

1521  {
1522  auto& runPrincipal = *(iRunStatus->runPrincipal());
1523  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1524  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1525  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1526  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1527  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1528 
1529  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1530  using namespace edm::waiting_task::chain;
1531  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1532  auto nextTask) {
1533  if (endingEventSetupSucceeded) {
1534  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1535  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1536  endGlobalTransitionAsync<Traits>(
1537  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1538  }
1539  }) |
1540  ifThen(looper_ && endingEventSetupSucceeded,
1541  [this, &runPrincipal, &es](auto nextTask) {
1542  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1543  }) |
1544  ifThen(looper_ && endingEventSetupSucceeded,
1545  [this, &runPrincipal, &es](auto nextTask) {
1547  looper_->doEndRun(runPrincipal, es, &processContext_);
1548  }) |
1549  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1550  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1551  mergeableRunProductMetadata->preWriteRun();
1552  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1553  }) |
1554  then([status = std::move(iRunStatus),
1555  this,
1556  didGlobalBeginSucceed,
1557  mergeableRunProductMetadata,
1558  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1559  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1560  mergeableRunProductMetadata->postWriteRun();
1561  }
1562  if (iException) {
1563  handleEndRunExceptions(*iException, nextTask);
1564  }
1566 
1567  std::exception_ptr ptr;
1568 
1569  // Try hard to clean up resources so the
1570  // process can terminate in a controlled
1571  // fashion even after exceptions have occurred.
1572  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1573  if (not ptr) {
1574  ptr = std::current_exception();
1575  }
1576  }
1577  CMS_SA_ALLOW try {
1578  status->resumeGlobalRunQueue();
1580  } catch (...) {
1581  if (not ptr) {
1582  ptr = std::current_exception();
1583  }
1584  }
1585  CMS_SA_ALLOW try {
1586  status->resetEndResources();
1587  status.reset();
1588  } catch (...) {
1589  if (not ptr) {
1590  ptr = std::current_exception();
1591  }
1592  }
1593 
1594  if (ptr && !iException) {
1595  handleEndRunExceptions(ptr, nextTask);
1596  }
1597  }) |
1598  runLast(std::move(iTask));
1599  }
ProcessContext processContext_
void clearRunPrincipal(RunProcessingStatus &)
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1854 of file EventProcessor.cc.

References setExceptionMessageLumis(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1854  {
1855  if (holder.taskHasFailed()) {
1857  } else {
1858  WaitingTaskHolder tmp(holder);
1859  tmp.doneWaiting(iException);
1860  }
1861  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleEndRunExceptions()

void edm::EventProcessor::handleEndRunExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1512 of file EventProcessor.cc.

References setExceptionMessageRuns(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by endRunAsync(), globalEndRunAsync(), and streamEndRunAsync().

1512  {
1513  if (holder.taskHasFailed()) {
1515  } else {
1516  WaitingTaskHolder tmp(holder);
1517  tmp.doneWaiting(iException);
1518  }
1519  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 2280 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kStopLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiStatus_, streamRunStatus_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginLumiAsync(), and continueLumiAsync().

2280  {
2281  if (streamLumiStatus_[iStreamIndex]->haveStartedNextLumiOrEndedRun()) {
2282  streamEndLumiAsync(iTask, iStreamIndex);
2283  return;
2284  }
2285  auto group = iTask.group();
2286  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2287  CMS_SA_ALLOW try {
2288  auto status = streamLumiStatus_[iStreamIndex].get();
2290 
2291  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2292  auto recursionTask =
2293  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2294  if (iEventException) {
2295  WaitingTaskHolder copyHolder(iTask);
2296  copyHolder.doneWaiting(*iEventException);
2297  // Intentionally, we don't return here. The recursive call to
2298  // handleNextEvent takes care of immediately ending the run properly
2299  // using the same code it uses to end the run in other situations.
2300  }
2301  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2302  });
2303 
2304  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2305  } else {
2306  // the stream will stop processing this lumi now
2308  if (not status->haveStartedNextLumiOrEndedRun()) {
2309  status->startNextLumiOrEndRun();
2310  if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
2311  CMS_SA_ALLOW try {
2312  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2313  input_->luminosityBlockAuxiliary()->beginTime()),
2314  streamRunStatus_[iStreamIndex],
2315  iTask);
2316  } catch (...) {
2317  WaitingTaskHolder copyHolder(iTask);
2318  copyHolder.doneWaiting(std::current_exception());
2319  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2320  }
2321  } else {
2322  // If appropriate, this will also start the next run.
2323  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2324  }
2325  }
2326  streamEndLumiAsync(iTask, iStreamIndex);
2327  } else {
2328  assert(status->eventProcessingState() ==
2330  auto runStatus = streamRunStatus_[iStreamIndex].get();
2331 
2332  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2333  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2334  }
2335  }
2336  }
2337  } catch (...) {
2338  WaitingTaskHolder copyHolder(iTask);
2339  copyHolder.doneWaiting(std::current_exception());
2340  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2341  }
2342  });
2343  }
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ handleNextItemAfterMergingRunEntries()

void edm::EventProcessor::handleNextItemAfterMergingRunEntries ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2180 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), dqmdumpme::first, input_, edm::InputSource::IsFile, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), needToCallNext(), nextTransitionTypeAsync(), edm::waiting_task::chain::runLast(), serviceToken_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync(), and processRuns().

2181  {
2182  chain::first([this, iRunStatus](auto nextTask) mutable {
2183  if (needToCallNext()) {
2184  nextTransitionTypeAsync(std::move(iRunStatus), std::move(nextTask));
2185  }
2186  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
2188  if (iException) {
2189  WaitingTaskHolder copyHolder(nextTask);
2190  copyHolder.doneWaiting(*iException);
2191  }
2193  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2194  return;
2195  }
2196  if (lastTransitionType() == InputSource::ItemType::IsLumi && !nextTask.taskHasFailed()) {
2197  CMS_SA_ALLOW try {
2198  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2199  input_->luminosityBlockAuxiliary()->beginTime()),
2200  iRunStatus,
2201  nextTask);
2202  return;
2203  } catch (...) {
2204  WaitingTaskHolder copyHolder(nextTask);
2205  copyHolder.doneWaiting(std::current_exception());
2206  }
2207  }
2208  // Note that endRunAsync will call beginRunAsync for the following run
2209  // if appropriate.
2210  endRunAsync(iRunStatus, std::move(nextTask));
2211  }) | chain::runLast(std::move(iHolder));
2212  }
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool needToCallNext() const
constexpr auto then(O &&iO)
Definition: chain_first.h:277
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 325 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, testHGCalDigi_cfg::dumpOptions, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, DiMuonV_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ServiceRegistry::get(), edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ModuleDescription::getUniqueID(), edm::ParameterSet::getUntrackedParameterSet(), watchdog::group, historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::ServiceRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, modulesToIgnoreForDeleteEarly_, moduleTypeResolverMaker_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, muonDTDigis_cfi::pset, referencesToBranches_, edm::ParameterSet::registerIt(), runQueue_, schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, streamRunStatus_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

327  {
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
334  ParameterSet().registerIt();
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
438  ScheduleItems items;
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
495  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
519  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
585  SubProcessParentageHelper(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
std::multimap< std::string, std::string > referencesToBranches_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::string > modulesToIgnoreForDeleteEarly_
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
static ServiceRegistry & instance()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1124 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1124  {
1125  input_->fillProcessBlockHelper();
1126  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1127  while (input_->nextProcessBlock(processBlockPrincipal)) {
1128  readProcessBlock(processBlockPrincipal);
1129 
1130  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1131  FinalWaitingTask globalWaitTask{taskGroup_};
1132 
1133  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1134  beginGlobalTransitionAsync<Traits>(
1135  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1136 
1137  globalWaitTask.wait();
1138 
1139  FinalWaitingTask writeWaitTask{taskGroup_};
1141  writeWaitTask.wait();
1142 
1143  processBlockPrincipal.clearPrincipal();
1144  for (auto& s : subProcesses_) {
1145  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1146  }
1147  }
1148  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemTypeInfo edm::EventProcessor::lastTransitionType ( ) const
inline

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 292 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

292 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 293 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

293 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ needToCallNext()

bool edm::EventProcessor::needToCallNext ( ) const
inlineprivate

Definition at line 299 of file EventProcessor.h.

References needToCallNext_.

Referenced by handleNextItemAfterMergingRunEntries(), and readNextEventForStream().

299 { return needToCallNext_; }

◆ nextTransitionType()

InputSource::ItemTypeInfo edm::EventProcessor::nextTransitionType ( )

Definition at line 871 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by continueLumiAsync(), nextTransitionTypeAsync(), processRuns(), readAndMergeLumiEntriesAsync(), readAndMergeRunEntriesAsync(), and readNextEventForStream().

871  {
872  SendSourceTerminationSignalIfException sentry(actReg_.get());
873  InputSource::ItemTypeInfo itemTypeInfo;
874  {
875  SourceNextGuard guard(*actReg_.get());
876  //For now, do nothing with InputSource::IsSynchronize
877  do {
878  itemTypeInfo = input_->nextItemType();
879  } while (itemTypeInfo == InputSource::ItemType::IsSynchronize);
880  }
881  lastSourceTransition_ = itemTypeInfo;
882  sentry.completedSuccessfully();
883 
885 
887  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
889  }
890 
891  return lastSourceTransition_;
892  }
InputSource::ItemTypeInfo lastSourceTransition_
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::shared_ptr< ActivityRegistry > actReg_

◆ nextTransitionTypeAsync()

void edm::EventProcessor::nextTransitionTypeAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  nextTask 
)

Definition at line 894 of file EventProcessor.cc.

References CMS_SA_ALLOW, Exception, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, lastTransitionType(), edm::errors::LogicError, eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, and sourceResourcesAcquirer_.

Referenced by handleNextItemAfterMergingRunEntries().

895  {
896  auto group = nextTask.group();
898  *group, [this, runStatus = std::move(iRunStatus), nextHolder = std::move(nextTask)]() mutable {
899  CMS_SA_ALLOW try {
901  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
904  runStatus->runPrincipal()->run() == input_->run() &&
905  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
907  << "InputSource claimed previous Run Entry was last to be merged in this file,\n"
908  << "but the next entry has the same run number and reduced ProcessHistoryID.\n"
909  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
910  }
911  } catch (...) {
912  nextHolder.doneWaiting(std::current_exception());
913  }
914  });
915  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 1018 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1018  {
1019  if (fileBlockValid()) {
1020  schedule_->openOutputFiles(*fb_);
1021  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
1022  }
1023  FDEBUG(1) << "\topenOutputFiles\n";
1024  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 280 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), and readFile().

280 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 281 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

281 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1082 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1082  {
1083  looper_->prepareForNextLoop(esp_.get());
1084  FDEBUG(1) << "\tprepareForNextLoop\n";
1085  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 143 of file EventProcessor.h.

References processConfiguration_.

143 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2360 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

2360  {
2361  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2362  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2376 of file EventProcessor.cc.

References actReg_, esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, dqmdumpme::first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), edm::StreamContext::kEvent, looper_, eostools::move(), principalCache_, processContext_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

2376  {
2377  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2378 
2380  Service<RandomNumberGenerator> rng;
2381  if (rng.isAvailable()) {
2382  Event ev(*pep, ModuleDescription(), nullptr);
2383  rng->postEventRead(ev);
2384  }
2385 
2386  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2387  using namespace edm::waiting_task::chain;
2388  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2389  EventTransitionInfo info(*pep, es);
2390  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2391  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2392  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2393  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2394  }
2395  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2396  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2397  ServiceRegistry::Operate operateLooper(serviceToken_);
2398  processEventWithLooper(*pep, iStreamIndex);
2399  }) | then([this, pep](auto nextTask) {
2400  FDEBUG(1) << "\tprocessEvent\n";
2401  StreamContext streamContext(pep->streamID(),
2403  pep->id(),
2404  pep->runPrincipal().index(),
2405  pep->luminosityBlockPrincipal().index(),
2406  pep->time(),
2407  &processContext_);
2408  ClearEventGuard guard(*this->actReg_.get(), streamContext);
2409  pep->clearEventPrincipal();
2410  }) | runLast(iHolder);
2411  }
ProcessContext processContext_
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 2413 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

2413  {
2414  bool randomAccess = input_->randomAccess();
2415  ProcessingController::ForwardState forwardState = input_->forwardState();
2416  ProcessingController::ReverseState reverseState = input_->reverseState();
2417  ProcessingController pc(forwardState, reverseState, randomAccess);
2418 
2420  do {
2421  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2422  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2423  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2424 
2425  bool succeeded = true;
2426  if (randomAccess) {
2427  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2428  input_->skipEvents(-2);
2429  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2430  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2431  }
2432  }
2433  pc.setLastOperationSucceeded(succeeded);
2434  } while (!pc.lastOperationSucceeded());
2436  shouldWeStop_ = true;
2437  }
2438  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processRuns()

InputSource::ItemType edm::EventProcessor::processRuns ( )

Definition at line 1177 of file EventProcessor.cc.

References cms::cuda::assert(), beginRunAsync(), continueLumiAsync(), handleNextItemAfterMergingRunEntries(), input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeRun(), setNeedToCallNext(), streamLumiActive_, streamRunActive_, streamRunStatus_, and taskGroup_.

1177  {
1178  FinalWaitingTask waitTask{taskGroup_};
1180  if (streamRunActive_ == 0) {
1181  assert(streamLumiActive_ == 0);
1182 
1183  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1184  WaitingTaskHolder{taskGroup_, &waitTask});
1185  } else {
1187 
1188  auto runStatus = streamRunStatus_[0];
1189 
1191  runStatus->runPrincipal()->run() == input_->run() and
1192  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1193  readAndMergeRun(*runStatus);
1195  }
1196 
1197  setNeedToCallNext(false);
1198 
1199  WaitingTaskHolder holder{taskGroup_, &waitTask};
1200  runStatus->setHolderOfTaskInProcessRuns(holder);
1201  if (streamLumiActive_ > 0) {
1203  continueLumiAsync(std::move(holder));
1204  } else {
1206  }
1207  }
1208  waitTask.wait();
1209  return lastTransitionType();
1210  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
assert(be >=bs)
PreallocationConfiguration preallocations_
void setNeedToCallNext(bool val)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void readAndMergeRun(RunProcessingStatus &)
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
oneapi::tbb::task_group taskGroup_
std::atomic< unsigned int > streamRunActive_
void continueLumiAsync(WaitingTaskHolder)
std::atomic< unsigned int > streamLumiActive_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ readAndMergeLumi()

void edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2037 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by continueLumiAsync(), and readAndMergeLumiEntriesAsync().

2037  {
2038  auto& lumiPrincipal = *iStatus.lumiPrincipal();
2039  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
2040  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
2041  input_->processHistoryRegistry().reducedProcessHistoryID(
2042  input_->luminosityBlockAuxiliary()->processHistoryID()));
2043  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2044  assert(lumiOK);
2045  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2046  {
2047  SendSourceTerminationSignalIfException sentry(actReg_.get());
2048  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2049  sentry.completedSuccessfully();
2050  }
2051  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeLumiEntriesAsync()

void edm::EventProcessor::readAndMergeLumiEntriesAsync ( std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2152 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::InputSource::LastItemToBeMerged, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeLumi(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceMutex_, and sourceResourcesAcquirer_.

Referenced by beginLumiAsync().

2153  {
2154  auto group = iHolder.group();
2156  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2157  CMS_SA_ALLOW try {
2159 
2160  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2161 
2163  setNeedToCallNext(false);
2164 
2166  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2167  readAndMergeLumi(*iLumiStatus);
2169  setNeedToCallNext(true);
2170  return;
2171  }
2173  }
2174  } catch (...) {
2175  holder.doneWaiting(std::current_exception());
2176  }
2177  });
2178  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readAndMergeRun()

void edm::EventProcessor::readAndMergeRun ( RunProcessingStatus iStatus)

Definition at line 2011 of file EventProcessor.cc.

References actReg_, edm::Principal::adjustToNewProductRegistry(), cms::cuda::assert(), input_, edm::RunPrincipal::mergeAuxiliary(), preg_, and edm::RunProcessingStatus::runPrincipal().

Referenced by processRuns(), and readAndMergeRunEntriesAsync().

2011  {
2012  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
2013 
2014  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
2015  assert(runOK);
2016  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
2017  {
2018  SendSourceTerminationSignalIfException sentry(actReg_.get());
2019  input_->readAndMergeRun(runPrincipal);
2020  sentry.completedSuccessfully();
2021  }
2022  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRunEntriesAsync()

void edm::EventProcessor::readAndMergeRunEntriesAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2118 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, edm::InputSource::LastItemToBeMerged, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeRun(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, setNeedToCallNext(), sourceMutex_, sourceResourcesAcquirer_, and mps_update::status.

Referenced by beginRunAsync().

2119  {
2120  auto group = iHolder.group();
2122  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2123  CMS_SA_ALLOW try {
2125 
2126  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2127 
2129  setNeedToCallNext(false);
2130 
2132  status->runPrincipal()->run() == input_->run() and
2133  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2134  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2135  status->setStopBeforeProcessingRun(true);
2136  return;
2137  }
2140  setNeedToCallNext(true);
2141  return;
2142  }
2144  }
2145  } catch (...) {
2146  status->setStopBeforeProcessingRun(true);
2147  holder.doneWaiting(std::current_exception());
2148  }
2149  });
2150  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeRun(RunProcessingStatus &)
InputSource::ItemTypeInfo lastTransitionType() const
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2345 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, streamLumiStatus_, and streamRunStatus_.

Referenced by readNextEventForStream().

2345  {
2346  //TODO this will have to become per stream
2347  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2348  StreamContext streamContext(event.streamID(), &processContext_);
2349 
2350  SendSourceTerminationSignalIfException sentry(actReg_.get());
2351  input_->readEvent(event, streamContext);
2352 
2353  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2354  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2355  sentry.completedSuccessfully();
2356 
2357  FDEBUG(1) << "\treadEvent\n";
2358  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 984 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, streamLumiActive_, streamLumiStatus_, streamRunActive_, and streamRunStatus_.

984  {
985  FDEBUG(1) << " \treadFile\n";
986  size_t size = preg_->size();
987  SendSourceTerminationSignalIfException sentry(actReg_.get());
988 
989  if (streamRunActive_ > 0) {
990  streamRunStatus_[0]->runPrincipal()->preReadFile();
991  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
992  }
993 
994  if (streamLumiActive_ > 0) {
995  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
996  }
997 
998  fb_ = input_->readFile();
999  if (size < preg_->size()) {
1001  }
1004  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1005  }
1006  sentry.completedSuccessfully();
1007  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
PrincipalCache principalCache_

◆ readLuminosityBlock()

std::shared_ptr< LuminosityBlockPrincipal > edm::EventProcessor::readLuminosityBlock ( std::shared_ptr< RunPrincipal rp)

Definition at line 2024 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableLumiPrincipalPtr(), historyAppender_, input_, eostools::move(), and principalCache_.

Referenced by beginLumiAsync().

2024  {
2026  assert(lbp);
2027  lbp->setAux(*input_->luminosityBlockAuxiliary());
2028  {
2029  SendSourceTerminationSignalIfException sentry(actReg_.get());
2030  input_->readLuminosityBlock(*lbp, *historyAppender_);
2031  sentry.completedSuccessfully();
2032  }
2033  lbp->setRunPrincipal(std::move(rp));
2034  return lbp;
2035  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(be >=bs)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( WaitingTaskHolder const &  iTask,
unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iStatus 
)
private

Definition at line 2214 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::eventProcessingState(), Exception, input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kProcessing, edm::LuminosityBlockProcessingStatus::kStopLumi, lastSourceTransition_, lastTransitionType(), edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), needToCallNext(), nextTransitionType(), or, readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setEventProcessingState(), setNeedToCallNext(), shouldWeStop(), sourceMutex_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync().

2216  {
2217  // This function returns true if it successfully reads an event for the stream and that
2218  // requires both that an event is next and there are no problems or requests to stop.
2219 
2220  if (iTask.taskHasFailed()) {
2221  // We want all streams to stop or all streams to pause. If we are already in the
2222  // middle of pausing streams, then finish pausing all of them and the lumi will be
2223  // ended later. Otherwise, just end it now.
2224  if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2226  }
2227  return false;
2228  }
2229 
2230  // Did another stream already stop or pause this lumi?
2231  if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2232  return false;
2233  }
2234 
2235  // Are output modules or the looper requesting we stop?
2236  if (shouldWeStop()) {
2239  return false;
2240  }
2241 
2243 
2244  // need to use lock in addition to the serial task queue because
2245  // of delayed provenance reading and reading data in response to
2246  // edm::Refs etc
2247  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2248 
2249  // If we didn't already call nextTransitionType while merging lumis, call it here.
2250  // This asks the input source what is next and also checks for signals.
2251 
2253  setNeedToCallNext(true);
2254 
2255  if (InputSource::ItemType::IsEvent != itemType) {
2256  // IsFile may continue processing the lumi and
2257  // looper_ can cause the input source to declare a new IsRun which is actually
2258  // just a continuation of the previous run
2260  (InputSource::ItemType::IsRun == itemType and
2261  (iStatus.lumiPrincipal()->run() != input_->run() or
2262  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2263  if (itemType == InputSource::ItemType::IsLumi &&
2264  iStatus.lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2266  << "InputSource claimed previous Lumi Entry was last to be merged in this file,\n"
2267  << "but the next lumi entry has the same lumi number.\n"
2268  << "This is probably a bug in the InputSource. Please report to the Core group.\n";
2269  }
2271  } else {
2273  }
2274  return false;
2275  }
2276  readEvent(iStreamIndex);
2277  return true;
2278  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemTypeInfo lastSourceTransition_
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemTypeInfo nextTransitionType()
bool needToCallNext() const
ServiceToken serviceToken_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void setNeedToCallNext(bool val)
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemTypeInfo lastTransitionType() const
bool shouldWeStop() const

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1992 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1992  {
1993  SendSourceTerminationSignalIfException sentry(actReg_.get());
1994  input_->readProcessBlock(processBlockPrincipal);
1995  sentry.completedSuccessfully();
1996  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::shared_ptr< RunPrincipal > edm::EventProcessor::readRun ( )

Definition at line 1998 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableRunPrincipalPtr(), historyAppender_, input_, and principalCache_.

Referenced by beginRunAsync().

1998  {
2000  assert(rp);
2001  rp->setAux(*input_->runAuxiliary());
2002  {
2003  SendSourceTerminationSignalIfException sentry(actReg_.get());
2004  input_->readRun(*rp, *historyAppender_);
2005  sentry.completedSuccessfully();
2006  }
2007  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
2008  return rp;
2009  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ releaseBeginRunResources()

void edm::EventProcessor::releaseBeginRunResources ( unsigned int  iStream)

Definition at line 1453 of file EventProcessor.cc.

References queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), mps_update::status, streamQueues_, and streamRunStatus_.

Referenced by streamBeginRunAsync().

1453  {
1454  auto& status = streamRunStatus_[iStream];
1455  if (status->streamFinishedBeginRun()) {
1456  status->resetBeginResources();
1458  }
1459  streamQueues_[iStream].resume();
1460  }
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 1043 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1043  {
1044  if (fileBlockValid()) {
1045  schedule_->respondToCloseInputFile(*fb_);
1046  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1047  }
1048  FDEBUG(1) << "\trespondToCloseInputFile\n";
1049  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 1033 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1033  {
1034  if (fileBlockValid()) {
1036  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1037  schedule_->respondToOpenInputFile(*fb_);
1038  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1039  }
1040  FDEBUG(1) << "\trespondToOpenInputFile\n";
1041  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1076 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1076  {
1077  input_->repeat();
1078  input_->rewind();
1079  FDEBUG(1) << "\trewind\n";
1080  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 382 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

382 { return runToCompletion(); }
StatusCode runToCompletion()

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 917 of file EventProcessor.cc.

References actReg_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

917  {
918  beginJob(); //make sure this was called
919 
920  // make the services available
922  actReg_->beginProcessingSignal_();
923  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
924  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
925  try {
926  FilesProcessor fp(fileModeNoMerge_);
927 
928  convertException::wrap([&]() {
929  bool firstTime = true;
930  do {
931  if (not firstTime) {
933  rewindInput();
934  } else {
935  firstTime = false;
936  }
937  startingNewLoop();
938 
939  auto trans = fp.processFiles(*this);
940 
941  fp.normalEnd();
942 
943  if (deferredExceptionPtrIsSet_.load()) {
944  std::rethrow_exception(deferredExceptionPtr_);
945  }
946  if (trans != InputSource::ItemType::IsStop) {
947  //problem with the source
948  doErrorStuff();
949 
950  throw cms::Exception("BadTransition") << "Unexpected transition change " << static_cast<int>(trans);
951  }
952  } while (not endOfLoop());
953  }); // convertException::wrap
954 
955  } // Try block
956  catch (cms::Exception& e) {
958  std::string message(
959  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
960  e.addAdditionalInfo(message);
961  if (e.alreadyPrinted()) {
962  LogAbsolute("Additional Exceptions") << message;
963  }
964  }
965  if (exceptionMessageRuns_) {
966  std::string message(
967  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
968  e.addAdditionalInfo(message);
969  if (e.alreadyPrinted()) {
970  LogAbsolute("Additional Exceptions") << message;
971  }
972  }
973  if (!exceptionMessageFiles_.empty()) {
974  e.addAdditionalInfo(exceptionMessageFiles_);
975  if (e.alreadyPrinted()) {
976  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
977  }
978  }
979  throw;
980  }
981  return epSuccess;
982  }
std::atomic< bool > exceptionMessageLumis_
std::atomic< bool > exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2461 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

2461  {
2462  bool expected = false;
2463  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2464  deferredExceptionPtr_ = iException;
2465  return true;
2466  }
2467  return false;
2468  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2455 of file EventProcessor.cc.

References exceptionMessageFiles_.

2455 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2459 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2459 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( )

Definition at line 2457 of file EventProcessor.cc.

References exceptionMessageRuns_.

Referenced by handleEndRunExceptions().

2457 { exceptionMessageRuns_ = true; }
std::atomic< bool > exceptionMessageRuns_

◆ setNeedToCallNext()

void edm::EventProcessor::setNeedToCallNext ( bool  val)
inlineprivate

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1087 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1087  {
1088  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1089  if (!subProcesses_.empty()) {
1090  for (auto const& subProcess : subProcesses_) {
1091  if (subProcess.shouldWeCloseOutput()) {
1092  return true;
1093  }
1094  }
1095  return false;
1096  }
1097  return schedule_->shouldWeCloseOutput();
1098  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2440 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2440  {
2441  FDEBUG(1) << "\tshouldWeStop\n";
2442  if (shouldWeStop_)
2443  return true;
2444  if (!subProcesses_.empty()) {
2445  for (auto const& subProcess : subProcesses_) {
2446  if (subProcess.terminate()) {
2447  return true;
2448  }
2449  }
2450  return false;
2451  }
2452  return schedule_->terminate();
2453  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 1051 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

1051  {
1052  shouldWeStop_ = false;
1053  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1054  // until after we've called beginOfJob
1055  if (looper_ && looperBeginJobRun_) {
1056  looper_->doStartingNewLoop();
1057  }
1058  FDEBUG(1) << "\tstartingNewLoop\n";
1059  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamBeginRunAsync()

void edm::EventProcessor::streamBeginRunAsync ( unsigned int  iStream,
std::shared_ptr< RunProcessingStatus status,
bool  precedingTasksSucceeded,
WaitingTaskHolder  iHolder 
)

Definition at line 1421 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, edm::RunProcessingStatus::eventSetupImpl(), edm::RunProcessingStatus::eventSetupImpls(), dqmdumpme::first, eostools::move(), releaseBeginRunResources(), edm::waiting_task::chain::runLast(), edm::RunProcessingStatus::runPrincipal(), schedule_, serviceToken_, mps_update::status, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync().

1424  {
1425  // These shouldn't throw
1426  streamQueues_[iStream].pause();
1427  ++streamRunActive_;
1428  streamRunStatus_[iStream] = std::move(status);
1429 
1430  CMS_SA_ALLOW try {
1431  using namespace edm::waiting_task::chain;
1432  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1433  if (precedingTasksSucceeded) {
1434  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1435  RunTransitionInfo transitionInfo(
1436  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1437  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1438  beginStreamTransitionAsync<Traits>(
1439  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1440  }
1441  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1442  if (exceptionFromBeginStreamRun) {
1443  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1444  }
1445  releaseBeginRunResources(iStream);
1446  }) | runLast(iHolder);
1447  } catch (...) {
1448  releaseBeginRunResources(iStream);
1449  iHolder.doneWaiting(std::current_exception());
1450  }
1451  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void releaseBeginRunResources(unsigned int iStream)
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1932 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, submitPVValidationJobs::t, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1932  {
1933  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1934  auto status = streamLumiStatus_[iStreamIndex];
1935  if (iException) {
1936  handleEndLumiExceptions(*iException, iTask);
1937  }
1938 
1939  // reset status before releasing queue else get race condition
1940  streamLumiStatus_[iStreamIndex].reset();
1942  streamQueues_[iStreamIndex].resume();
1943 
1944  //are we the last one?
1945  if (status->streamFinishedLumi()) {
1947  }
1948  });
1949 
1950  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1951 
1952  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1953  // therefore we do not want to hold the shared_ptr
1954  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1955  lumiStatus->setEndTime();
1956 
1957  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1958  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1959  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1960 
1961  if (lumiStatus->didGlobalBeginSucceed()) {
1962  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1963  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1964  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1965  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1966  *schedule_,
1967  iStreamIndex,
1968  transitionInfo,
1969  serviceToken_,
1970  subProcesses_,
1971  cleaningUpAfterException);
1972  }
1973  }
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
bool taskHasFailed() const noexcept
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndRunAsync()

void edm::EventProcessor::streamEndRunAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1601 of file EventProcessor.cc.

References CMS_SA_ALLOW, esp_, exceptionRunStatus_, edm::WaitingTaskHolder::group(), handleEndRunExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endRunAsync().

1601  {
1602  CMS_SA_ALLOW try {
1603  if (!streamRunStatus_[iStreamIndex]) {
1604  if (exceptionRunStatus_->streamFinishedRun()) {
1605  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1606  exceptionRunStatus_.reset();
1607  }
1608  return;
1609  }
1610 
1611  auto runDoneTask =
1612  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1613  if (iException) {
1614  handleEndRunExceptions(*iException, iTask);
1615  }
1616 
1617  auto runStatus = streamRunStatus_[iStreamIndex];
1618 
1619  //reset status before releasing queue else get race condition
1620  if (runStatus->streamFinishedRun()) {
1621  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1622  }
1623  streamRunStatus_[iStreamIndex].reset();
1624  --streamRunActive_;
1625  streamQueues_[iStreamIndex].resume();
1626  });
1627 
1628  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1629 
1630  auto runStatus = streamRunStatus_[iStreamIndex].get();
1631 
1632  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1633  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1634  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1635  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1636 
1637  auto& runPrincipal = *runStatus->runPrincipal();
1638  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1639  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1640  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1641  *schedule_,
1642  iStreamIndex,
1643  transitionInfo,
1644  serviceToken_,
1645  subProcesses_,
1646  cleaningUpAfterException);
1647  }
1648  } catch (...) {
1649  handleEndRunExceptions(std::current_exception(), iTask);
1650  }
1651  }
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 625 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

625  {
628  task.waitNoThrow();
629  assert(task.done());
630  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 286 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

286  {
288  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 289 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

289  {
291  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2470 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2470  {
2471  cms::Exception ex("ModulesSynchingOnLumis");
2472  ex << "The framework is configured to use at least two streams, but the following modules\n"
2473  << "require synchronizing on LuminosityBlock boundaries:";
2474  bool found = false;
2475  for (auto worker : schedule_->allWorkers()) {
2476  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2477  found = true;
2478  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2479  }
2480  }
2481  if (found) {
2482  ex << "\n\nThe situation can be fixed by either\n"
2483  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2484  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2485  throw ex;
2486  }
2487  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 837 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

837 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 841 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

841 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 839 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

839 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutLegacyModules()

void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2502 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2502  {
2503  std::unique_ptr<LogSystem> s;
2504  for (auto worker : schedule_->allWorkers()) {
2505  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2506  if (not s) {
2507  s = std::make_unique<LogSystem>("LegacyModules");
2508  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2509  "is going to end soon. These modules need to be converted to have type\n"
2510  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2511  }
2512  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2513  }
2514  }
2515  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutModulesRequiringRunSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringRunSynchronization ( ) const
private

Definition at line 2489 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

2489  {
2490  std::unique_ptr<LogSystem> s;
2491  for (auto worker : schedule_->allWorkers()) {
2492  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2493  if (not s) {
2494  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2495  (*s) << "The following modules require synchronizing on Run boundaries:";
2496  }
2497  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2498  }
2499  }
2500  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 2092 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), findAndChange::op, processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

2092  {
2093  using namespace edm::waiting_task;
2094  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2095  chain::first([&](auto nextTask) {
2097 
2098  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2099  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2100  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2102  for (auto& s : subProcesses_) {
2103  s.writeLumiAsync(nextTask, lumiPrincipal);
2104  }
2106  }
2107  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 2053 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), eostools::move(), findAndChange::op, principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

2053  {
2054  using namespace edm::waiting_task;
2055  chain::first([&](auto nextTask) {
2057  schedule_->writeProcessBlockAsync(
2058  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2059  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2061  for (auto& s : subProcesses_) {
2062  s.writeProcessBlockAsync(nextTask, processBlockType);
2063  }
2064  }) | chain::runLast(std::move(task));
2065  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
RunPrincipal const &  runPrincipal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 2067 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::RunPrincipal::kNo, eostools::move(), findAndChange::op, processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, edm::RunPrincipal::shouldWriteRun(), subProcesses_, and TrackValidation_cff::task.

Referenced by globalEndRunAsync().

2069  {
2070  using namespace edm::waiting_task;
2071  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2072  chain::first([&](auto nextTask) {
2074  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2075  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2077  for (auto& s : subProcesses_) {
2078  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2079  }
2080  }) | chain::runLast(std::move(task));
2081  }
2082  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 324 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 357 of file EventProcessor.h.

Referenced by beginJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 340 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 314 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 352 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 351 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 376 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 373 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 360 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 362 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::atomic<bool> edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 361 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ exceptionRunStatus_

std::shared_ptr<RunProcessingStatus> edm::EventProcessor::exceptionRunStatus_
private

Definition at line 335 of file EventProcessor.h.

Referenced by beginRunAsync(), and streamEndRunAsync().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 359 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 369 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 365 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 363 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 345 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemTypeInfo edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 364 of file EventProcessor.h.

Referenced by beginRunAsync(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 333 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 328 of file EventProcessor.h.

Referenced by init().

◆ modulesToIgnoreForDeleteEarly_

std::vector<std::string> edm::EventProcessor::modulesToIgnoreForDeleteEarly_
private

Definition at line 342 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ moduleTypeResolverMaker_

edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const> > edm::EventProcessor::moduleTypeResolverMaker_
private

Definition at line 320 of file EventProcessor.h.

Referenced by init().

◆ needToCallNext_

bool edm::EventProcessor::needToCallNext_ = true
private

Definition at line 377 of file EventProcessor.h.

Referenced by needToCallNext(), and setNeedToCallNext().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 313 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), readAndMergeRun(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 375 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 315 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 325 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), and processConfiguration().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

◆ referencesToBranches_

std::multimap<std::string, std::string> edm::EventProcessor::referencesToBranches_
private

Definition at line 341 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ runQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::runQueue_
private

Definition at line 332 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 358 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

◆ streamQueuesInserter_

SerialTaskQueue edm::EventProcessor::streamQueuesInserter_
private

Definition at line 331 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRunAsync(), and endRunAsync().

◆ streamRunActive_

std::atomic<unsigned int> edm::EventProcessor::streamRunActive_ {0}
private

◆ streamRunStatus_

std::vector<std::shared_ptr<RunProcessingStatus> > edm::EventProcessor::streamRunStatus_
private

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().