CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRunAsync (IOVSyncValue const &, WaitingTaskHolder)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void clearLumiPrincipal (LuminosityBlockProcessingStatus &)
 
void clearRunPrincipal (RunProcessingStatus &)
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (WaitingTaskHolder)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRunAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void endUnfinishedLumi (bool cleaningUpAfterException)
 
void endUnfinishedRun (bool cleaningUpAfterException)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void globalEndRunAsync (WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
 
void handleEndLumiExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void handleEndRunExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processRuns ()
 
void readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
void readAndMergeRun (RunProcessingStatus &)
 
void readFile ()
 
std::shared_ptr< LuminosityBlockPrincipalreadLuminosityBlock (std::shared_ptr< RunPrincipal > rp)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::shared_ptr< RunPrincipalreadRun ()
 
void releaseBeginRunResources (unsigned int iStream)
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns ()
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamBeginRunAsync (unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
 
void streamEndLumiAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void streamEndRunAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void handleNextItemAfterMergingRunEntries (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readAndMergeLumiEntriesAsync (std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
 
void readAndMergeRunEntriesAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 
void warnAboutModulesRequiringRunSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::atomic< bool > exceptionMessageRuns_
 
std::shared_ptr< RunProcessingStatusexceptionRunStatus_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool firstItemAfterLumiMerge_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_ = InputSource::IsInvalid
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
std::vector< std::string > modulesToIgnoreForDeleteEarly_
 
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
std::multimap< std::string, std::string > referencesToBranches_
 
std::unique_ptr< edm::LimitedTaskQueuerunQueue_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
SerialTaskQueue streamQueuesInserter_
 
std::atomic< unsigned int > streamRunActive_ {0}
 
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 69 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 366 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 367 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 235 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 79 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 214 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

219  : actReg_(),
220  preg_(),
222  serviceToken_(),
223  input_(),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 253 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

256  : actReg_(),
257  preg_(),
259  serviceToken_(),
260  input_(),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 290 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

293  : actReg_(),
294  preg_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
322  init(processDesc, token, legacy);
323  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 607 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, findAndChange::op, schedule_, and unpackBuffers-CaloStage2::token.

607  {
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 632 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, HltBtagPostValidation_cff::c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, makeListRunsInFiles::description, esp_, espController_, edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, MainPageGenerator::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, merge(), modulesToIgnoreForDeleteEarly_, eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, referencesToBranches_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, edm::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), warnAboutModulesRequiringRunSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

632  {
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
642  service::SystemBounds bounds(preallocations_.numberOfStreams(),
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
700  }
701  if (preallocations_.numberOfRuns() > 1) {
703  }
705 
706  //NOTE: This implementation assumes 'Job' means one call
707  // the EventProcessor::run
708  // If it really means once per 'application' then this code will
709  // have to be changed.
710  // Also have to deal with case where have 'run' then new Module
711  // added and do 'run'
712  // again. In that case the newly added Module needs its 'beginJob'
713  // to be called.
714 
715  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
716  // For now we delay calling beginOfJob until first beginOfRun
717  //if(looper_) {
718  // looper_->beginOfJob(es);
719  //}
720  espController_->finishConfiguration();
721  actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
722  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
723  try {
724  convertException::wrap([&]() { input_->doBeginJob(); });
725  } catch (cms::Exception& ex) {
726  ex.addContext("Calling beginJob for the source");
727  throw;
728  }
729 
730  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
731  if (looper_) {
732  constexpr bool mustPrefetchMayGet = true;
733  auto const processBlockLookup = preg_->productLookup(InProcess);
734  auto const runLookup = preg_->productLookup(InRun);
735  auto const lumiLookup = preg_->productLookup(InLumi);
736  auto const eventLookup = preg_->productLookup(InEvent);
737  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
740  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
741  looper_->updateLookup(esp_->recordsToResolverIndices());
742  }
743  // toerror.succeeded(); // should we add this?
744  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
745  actReg_->postBeginJobSignal_();
746 
747  oneapi::tbb::task_group group;
749  using namespace edm::waiting_task::chain;
750  first([this](auto nextTask) {
751  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
752  first([i, this](auto nextTask) {
754  schedule_->beginStream(i);
755  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
757  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
758  }) | lastTask(nextTask);
759  }
761  last.wait();
762  }
ProcessContext processContext_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::multimap< std::string, std::string > referencesToBranches_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::string > modulesToIgnoreForDeleteEarly_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:169
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( IOVSyncValue const &  iSync,
std::shared_ptr< RunProcessingStatus iRunStatus,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1625 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), dqmdumpme::first, firstItemAfterLumiMerge_, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeLumiEntriesAsync(), readLuminosityBlock(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, streamQueuesInserter_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1627  {
1628  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1629 
1630  auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1631  chain::first([this, &iSync, &status](auto nextTask) {
1632  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1633  nextTask,
1634  status->endIOVWaitingTasks(),
1635  status->eventSetupImpls(),
1637  actReg_.get(),
1638  serviceToken_);
1639  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1640  CMS_SA_ALLOW try {
1641  //the call to doneWaiting will cause the count to decrement
1642  if (iException) {
1643  WaitingTaskHolder copyHolder(nextTask);
1644  copyHolder.doneWaiting(*iException);
1645  }
1646 
1647  lumiQueue_->pushAndPause(
1648  *nextTask.group(),
1649  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1650  CMS_SA_ALLOW try {
1651  if (postLumiQueueTask.taskHasFailed()) {
1652  status->resetResources();
1654  endRunAsync(iRunStatus, postLumiQueueTask);
1655  return;
1656  }
1657 
1658  status->setResumer(std::move(iResumer));
1659 
1661  *postLumiQueueTask.group(),
1662  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1663  CMS_SA_ALLOW try {
1665 
1666  if (postSourceTask.taskHasFailed()) {
1667  status->resetResources();
1669  endRunAsync(iRunStatus, postSourceTask);
1670  return;
1671  }
1672 
1673  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1674 
1675  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1676  {
1677  SendSourceTerminationSignalIfException sentry(actReg_.get());
1678  input_->doBeginLumi(lumiPrincipal, &processContext_);
1679  sentry.completedSuccessfully();
1680  }
1681 
1682  Service<RandomNumberGenerator> rng;
1683  if (rng.isAvailable()) {
1684  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1685  rng->preBeginLumi(lb);
1686  }
1687 
1688  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1689 
1690  using namespace edm::waiting_task::chain;
1691  chain::first([this, status](auto nextTask) mutable {
1693  firstItemAfterLumiMerge_ = true;
1694  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1695  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1696  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1697  beginGlobalTransitionAsync<Traits>(
1698  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1699  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1700  looper_->prefetchAsync(
1701  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1702  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1703  status->globalBeginDidSucceed();
1704  ServiceRegistry::Operate operateLooper(serviceToken_);
1705  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1706  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1707  if (iException) {
1708  status->resetResources();
1710  WaitingTaskHolder copyHolder(holder);
1711  copyHolder.doneWaiting(*iException);
1712  endRunAsync(iRunStatus, holder);
1713  } else {
1714  if (not looper_) {
1715  status->globalBeginDidSucceed();
1716  }
1717 
1718  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1719 
1720  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1721  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1722 
1723  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1724  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1725  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1726  streamQueues_[i].pause();
1727 
1728  auto& event = principalCache_.eventPrincipal(i);
1729  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1730  // held by the container as this lambda may not finish executing before all the tasks it
1731  // spawns have already started to run.
1732  auto eventSetupImpls = &status->eventSetupImpls();
1733  auto lp = status->lumiPrincipal().get();
1736  event.setLuminosityBlockPrincipal(lp);
1737  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1738  using namespace edm::waiting_task::chain;
1739  chain::first([this, i, &transitionInfo](auto nextTask) {
1740  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1741  *schedule_,
1742  i,
1743  transitionInfo,
1744  serviceToken_,
1745  subProcesses_);
1746  }) |
1747  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1748  auto nextTask) {
1749  if (exceptionFromBeginStreamLumi) {
1750  WaitingTaskHolder copyHolder(nextTask);
1751  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1752  }
1754  }) |
1755  runLast(std::move(holder));
1756  });
1757  } // end for loop over streams
1758  });
1759  }
1760  }) | runLast(postSourceTask);
1761  } catch (...) {
1762  status->resetResources();
1764  WaitingTaskHolder copyHolder(postSourceTask);
1765  copyHolder.doneWaiting(std::current_exception());
1766  endRunAsync(iRunStatus, postSourceTask);
1767  }
1768  }); // task in sourceResourcesAcquirer
1769  } catch (...) {
1770  status->resetResources();
1772  WaitingTaskHolder copyHolder(postLumiQueueTask);
1773  copyHolder.doneWaiting(std::current_exception());
1774  endRunAsync(iRunStatus, postLumiQueueTask);
1775  }
1776  }); // task in lumiQueue
1777  } catch (...) {
1778  status->resetResources();
1780  WaitingTaskHolder copyHolder(nextTask);
1781  copyHolder.doneWaiting(std::current_exception());
1782  endRunAsync(iRunStatus, nextTask);
1783  }
1784  }) | chain::runLast(std::move(iHolder));
1785  }
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1076 of file EventProcessor.cc.

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1076  {
1077  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1078  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1079 
1080  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1081  FinalWaitingTask globalWaitTask{taskGroup_};
1082 
1083  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1084  beginGlobalTransitionAsync<Traits>(
1085  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1086 
1087  globalWaitTask.wait();
1088  beginProcessBlockSucceeded = true;
1089  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRunAsync()

void edm::EventProcessor::beginRunAsync ( IOVSyncValue const &  iSync,
WaitingTaskHolder  iHolder 
)

Definition at line 1176 of file EventProcessor.cc.

References actReg_, edm::BeginRun, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, exceptionRunStatus_, dqmdumpme::first, forceESCacheClearOnNewRun_, globalEndRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, handleNextItemAfterMergingRunEntries(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeRunEntriesAsync(), readRun(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), runQueue_, schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamBeginRunAsync(), streamQueues_, streamQueuesInserter_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and edm::waiting_task::chain::then().

Referenced by endRunAsync(), and processRuns().

1176  {
1177  if (iHolder.taskHasFailed()) {
1178  return;
1179  }
1180 
1181  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1182 
1183  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1184 
1185  chain::first([this, &status, iSync](auto nextTask) {
1186  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1187  nextTask,
1188  status->endIOVWaitingTasks(),
1189  status->eventSetupImpls(),
1191  actReg_.get(),
1192  serviceToken_,
1194  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1195  CMS_SA_ALLOW try {
1196  if (iException) {
1197  WaitingTaskHolder copyHolder(nextTask);
1198  copyHolder.doneWaiting(*iException);
1199  // Finish handling the exception in the task pushed to runQueue_
1200  }
1202 
1203  runQueue_->pushAndPause(
1204  *nextTask.group(),
1205  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1206  CMS_SA_ALLOW try {
1207  if (postRunQueueTask.taskHasFailed()) {
1208  status->resetBeginResources();
1210  return;
1211  }
1212 
1213  status->setResumer(std::move(iResumer));
1214 
1216  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1217  CMS_SA_ALLOW try {
1219 
1220  if (postSourceTask.taskHasFailed()) {
1221  status->resetBeginResources();
1223  status->resumeGlobalRunQueue();
1224  return;
1225  }
1226 
1227  status->setRunPrincipal(readRun());
1228 
1229  RunPrincipal& runPrincipal = *status->runPrincipal();
1230  {
1231  SendSourceTerminationSignalIfException sentry(actReg_.get());
1232  input_->doBeginRun(runPrincipal, &processContext_);
1233  sentry.completedSuccessfully();
1234  }
1235 
1236  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1237  if (looper_ && looperBeginJobRun_ == false) {
1238  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1239 
1240  oneapi::tbb::task_group group;
1241  FinalWaitingTask waitTask{group};
1242  using namespace edm::waiting_task::chain;
1243  chain::first([this, &es](auto nextTask) {
1244  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1245  }) | then([this, &es](auto nextTask) mutable {
1246  looper_->beginOfJob(es);
1247  looperBeginJobRun_ = true;
1248  looper_->doStartingNewLoop();
1249  }) | runLast(WaitingTaskHolder(group, &waitTask));
1250  waitTask.wait();
1251  }
1252 
1253  using namespace edm::waiting_task::chain;
1254  chain::first([this, status](auto nextTask) mutable {
1255  CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1256  status->setStopBeforeProcessingRun(true);
1257  nextTask.doneWaiting(std::current_exception());
1258  }
1259  }) | then([this, status, &es](auto nextTask) {
1260  if (status->stopBeforeProcessingRun()) {
1261  return;
1262  }
1263  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1264  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1265  beginGlobalTransitionAsync<Traits>(
1266  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1267  }) | then([status](auto nextTask) mutable {
1268  if (status->stopBeforeProcessingRun()) {
1269  return;
1270  }
1271  status->globalBeginDidSucceed();
1272  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1273  if (status->stopBeforeProcessingRun()) {
1274  return;
1275  }
1276  looper_->prefetchAsync(
1277  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1278  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1279  if (status->stopBeforeProcessingRun()) {
1280  return;
1281  }
1282  ServiceRegistry::Operate operateLooper(serviceToken_);
1283  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1284  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1285  bool precedingTasksSucceeded = true;
1286  if (iException) {
1287  precedingTasksSucceeded = false;
1288  WaitingTaskHolder copyHolder(holder);
1289  copyHolder.doneWaiting(*iException);
1290  }
1291 
1292  if (status->stopBeforeProcessingRun()) {
1293  // We just quit now if there was a failure when merging runs
1294  status->resetBeginResources();
1296  status->resumeGlobalRunQueue();
1297  return;
1298  }
1299  CMS_SA_ALLOW try {
1300  // Under normal circumstances, this task runs after endRun has completed for all streams
1301  // and global endLumi has completed for all lumis contained in this run
1302  auto globalEndRunTask =
1303  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1304  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1305  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1307  });
1308  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1309  } catch (...) {
1310  status->resetBeginResources();
1312  status->resumeGlobalRunQueue();
1313  holder.doneWaiting(std::current_exception());
1314  return;
1315  }
1316 
1317  // After this point we are committed to end the run via endRunAsync
1318 
1320 
1321  // The only purpose of the pause is to cause stream begin run to execute before
1322  // global begin lumi in the single threaded case (maintains consistency with
1323  // the order that existed before concurrent runs were implemented).
1324  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1325 
1326  CMS_SA_ALLOW try {
1328  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1329  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1330  CMS_SA_ALLOW try {
1331  streamQueues_[i].push(
1332  *holder.group(),
1333  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1335  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1336  });
1337  } catch (...) {
1338  if (status->streamFinishedBeginRun()) {
1339  WaitingTaskHolder copyHolder(holder);
1340  copyHolder.doneWaiting(std::current_exception());
1341  status->resetBeginResources();
1344  }
1345  }
1346  }
1347  });
1348  } catch (...) {
1349  WaitingTaskHolder copyHolder(holder);
1350  copyHolder.doneWaiting(std::current_exception());
1351  status->resetBeginResources();
1354  }
1356  }) | runLast(postSourceTask);
1357  } catch (...) {
1358  status->resetBeginResources();
1360  status->resumeGlobalRunQueue();
1361  postSourceTask.doneWaiting(std::current_exception());
1362  }
1363  }); // task in sourceResourcesAcquirer
1364  } catch (...) {
1365  status->resetBeginResources();
1367  status->resumeGlobalRunQueue();
1368  postRunQueueTask.doneWaiting(std::current_exception());
1369  }
1370  }); // task in runQueue
1371  } catch (...) {
1372  status->resetBeginResources();
1374  nextTask.doneWaiting(std::current_exception());
1375  }
1376  }) | chain::runLast(std::move(iHolder));
1377  }
ProcessContext processContext_
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
std::shared_ptr< RunPrincipal > readRun()
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 281 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

281  {
283  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 284 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 849 of file EventProcessor.cc.

References epSignal, edm::JobReport::reportShutdownSignal(), runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

849  {
850  bool returnValue = false;
851 
852  // Look for a shutdown signal
853  if (shutdown_flag.load(std::memory_order_acquire)) {
854  returnValue = true;
855  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
857  jr->reportShutdownSignal();
859  }
860  return returnValue;
861  }
Log< level::System, false > LogSystem
volatile std::atomic< bool > shutdown_flag
void reportShutdownSignal()
Definition: JobReport.cc:543

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 843 of file EventProcessor.cc.

References schedule_.

843 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ clearLumiPrincipal()

void edm::EventProcessor::clearLumiPrincipal ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2065 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

2065  {
2066  for (auto& s : subProcesses_) {
2067  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2068  }
2069  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2070  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2071  iStatus.lumiPrincipal()->clearPrincipal();
2072  }
std::vector< SubProcess > subProcesses_

◆ clearRunPrincipal()

void edm::EventProcessor::clearRunPrincipal ( RunProcessingStatus iStatus)

Definition at line 2040 of file EventProcessor.cc.

References edm::RunPrincipal::kUninitialized, edm::RunProcessingStatus::runPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndRunAsync().

2040  {
2041  for (auto& s : subProcesses_) {
2042  s.clearRunPrincipal(*iStatus.runPrincipal());
2043  }
2044  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2045  iStatus.runPrincipal()->clearPrincipal();
2046  }
std::vector< SubProcess > subProcesses_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 976 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

976  {
977  if (fileBlockValid()) {
978  SendSourceTerminationSignalIfException sentry(actReg_.get());
979  input_->closeFile(fb_.get(), cleaningUpAfterException);
980  sentry.completedSuccessfully();
981  }
982  FDEBUG(1) << "\tcloseInputFile\n";
983  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 993 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

993  {
994  schedule_->closeOutputFiles();
995  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
996  processBlockHelper_->clearAfterOutputFilesClose();
997  FDEBUG(1) << "\tcloseOutputFiles\n";
998  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1787 of file EventProcessor.cc.

References dqmdumpme::first, firstItemAfterLumiMerge_, h, handleNextEventForStreamAsync(), input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kProcessing, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeLumi(), edm::waiting_task::chain::runLast(), mps_update::status, streamLumiStatus_, and edm::waiting_task::chain::then().

Referenced by processRuns().

1787  {
1788  chain::first([this](auto nextTask) {
1789  //all streams are sharing the same status at the moment
1790  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1792 
1793  while (lastTransitionType() == InputSource::IsLumi and
1794  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1797  }
1798  firstItemAfterLumiMerge_ = true;
1799  }) | chain::then([this](auto nextTask) mutable {
1800  unsigned int streamIndex = 0;
1801  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1802  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1803  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1804  }
1805  nextTask.group()->run(
1806  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1807  }) | chain::runLast(std::move(iHolder));
1808  }
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1067 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1067  {
1068  FDEBUG(1) << "\tdoErrorStuff\n";
1069  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1070  << "and went to the error state\n"
1071  << "Will attempt to terminate processing normally\n"
1072  << "(IF using the looper the next loop will be attempted)\n"
1073  << "This likely indicates a bug in an input module or corrupted input or both\n";
1074  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 764 of file EventProcessor.cc.

References actReg_, HltBtagPostValidation_cff::c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::first(), watchdog::group, mps_fire::i, input_, MainPageGenerator::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

764  {
765  // Collects exceptions, so we don't throw before all operations are performed.
766  ExceptionCollector c(
767  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
768 
769  //make the services available
771 
772  using namespace edm::waiting_task::chain;
773 
774  oneapi::tbb::task_group group;
775  edm::FinalWaitingTask waitTask{group};
776 
777  {
778  //handle endStream transitions
779  edm::WaitingTaskHolder taskHolder(group, &waitTask);
780  std::mutex collectorMutex;
781  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
782  first([this, i, &c, &collectorMutex](auto nextTask) {
783  std::exception_ptr ep;
784  try {
786  this->schedule_->endStream(i);
787  } catch (...) {
788  ep = std::current_exception();
789  }
790  if (ep) {
791  std::lock_guard<std::mutex> l(collectorMutex);
792  c.call([&ep]() { std::rethrow_exception(ep); });
793  }
794  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
795  for (auto& subProcess : subProcesses_) {
796  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
797  std::exception_ptr ep;
798  try {
800  subProcess.doEndStream(i);
801  } catch (...) {
802  ep = std::current_exception();
803  }
804  if (ep) {
805  std::lock_guard<std::mutex> l(collectorMutex);
806  c.call([&ep]() { std::rethrow_exception(ep); });
807  }
808  }) | lastTask(nextTask);
809  }
810  }) | lastTask(taskHolder);
811  }
812  }
813  waitTask.waitNoThrow();
814 
815  auto actReg = actReg_.get();
816  c.call([actReg]() { actReg->preEndJobSignal_(); });
817  schedule_->endJob(c);
818  for (auto& subProcess : subProcesses_) {
819  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
820  }
821  c.call(std::bind(&InputSource::doEndJob, input_.get()));
822  if (looper_) {
823  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
824  }
825  c.call([actReg]() { actReg->postEndJobSignal_(); });
826  if (c.hasThrown()) {
827  c.rethrow();
828  }
829  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
static std::mutex mutex
Definition: Proxy.cc:8
std::shared_ptr< EDLooperBase const > looper() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1028 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1028  {
1029  if (looper_) {
1030  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1031  looper_->setModuleChanger(&changer);
1032  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1033  looper_->setModuleChanger(nullptr);
1035  return true;
1036  else
1037  return false;
1038  }
1039  FDEBUG(1) << "\tendOfLoop\n";
1040  return true;
1041  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1117 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1117  {
1118  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1119 
1120  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1121  FinalWaitingTask globalWaitTask{taskGroup_};
1122 
1123  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1124  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1125  *schedule_,
1126  transitionInfo,
1127  serviceToken_,
1128  subProcesses_,
1129  cleaningUpAfterException);
1130  globalWaitTask.wait();
1131 
1132  if (beginProcessBlockSucceeded) {
1133  FinalWaitingTask writeWaitTask{taskGroup_};
1135  writeWaitTask.wait();
1136  }
1137 
1138  processBlockPrincipal.clearPrincipal();
1139  for (auto& s : subProcesses_) {
1140  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1141  }
1142  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRunAsync()

void edm::EventProcessor::endRunAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)

Definition at line 1420 of file EventProcessor.cc.

References actReg_, beginRunAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::RunPrincipal::endTime(), espController_, dqmdumpme::first, handleEndRunExceptions(), mps_fire::i, input_, edm::InputSource::IsRun, lastTransitionType(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), serviceToken_, edm::RunPrincipal::setEndTime(), streamEndRunAsync(), streamQueues_, streamQueuesInserter_, and edm::waiting_task::chain::then().

Referenced by beginLumiAsync(), endUnfinishedRun(), handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1420  {
1421  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1422  iRunStatus->setEndTime();
1423  IOVSyncValue ts(
1424  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1425  runPrincipal.endTime());
1426  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1427  WaitingTaskHolder copyHolder(iHolder);
1428  copyHolder.doneWaiting(std::current_exception());
1429  }
1430 
1431  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1432  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1433  nextTask,
1434  iRunStatus->endIOVWaitingTasksEndRun(),
1435  iRunStatus->eventSetupImplsEndRun(),
1437  actReg_.get(),
1438  serviceToken_);
1439  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1440  if (iException) {
1441  iRunStatus->setEndingEventSetupSucceeded(false);
1442  handleEndRunExceptions(*iException, nextTask);
1443  }
1445  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1446  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1447  CMS_SA_ALLOW try {
1448  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1449  streamQueues_[i].pause();
1450  streamEndRunAsync(std::move(nextTask), i);
1451  });
1452  } catch (...) {
1453  WaitingTaskHolder copyHolder(nextTask);
1454  copyHolder.doneWaiting(std::current_exception());
1455  }
1456  }
1457  });
1458 
1460  CMS_SA_ALLOW try {
1461  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1462  } catch (...) {
1463  WaitingTaskHolder copyHolder(nextTask);
1464  copyHolder.doneWaiting(std::current_exception());
1465  }
1466  }
1467  }) | chain::runLast(std::move(iHolder));
1468  }
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
InputSource::ItemType lastTransitionType() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< ActivityRegistry > actReg_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( bool  cleaningUpAfterException)

Definition at line 1931 of file EventProcessor.cc.

References cms::cuda::assert(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamRunActive_, and taskGroup_.

1931  {
1932  if (streamRunActive_ == 0) {
1933  assert(streamLumiActive_ == 0);
1934  } else {
1936  if (streamLumiActive_ > 0) {
1937  FinalWaitingTask globalWaitTask{taskGroup_};
1939  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1940  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1941  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1942  }
1943  globalWaitTask.wait();
1944  }
1945  }
1946  }
assert(be >=bs)
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::atomic< unsigned int > streamLumiActive_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( bool  cleaningUpAfterException)

Definition at line 1611 of file EventProcessor.cc.

References endRunAsync(), edm::InputSource::IsStop, lastSourceTransition_, eostools::move(), streamRunActive_, streamRunStatus_, and taskGroup_.

1611  {
1612  if (streamRunActive_ > 0) {
1613  FinalWaitingTask waitTask{taskGroup_};
1614 
1615  auto runStatus = streamRunStatus_[0].get();
1616  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1617  WaitingTaskHolder holder{taskGroup_, &waitTask};
1618  runStatus->setHolderOfTaskInProcessRuns(holder);
1620  endRunAsync(streamRunStatus_[0], std::move(holder));
1621  waitTask.wait();
1622  }
1623  }
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
InputSource::ItemType lastSourceTransition_
oneapi::tbb::task_group taskGroup_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::atomic< unsigned int > streamRunActive_
def move(src, dest)
Definition: eostools.py:511

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 192 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

192 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 833 of file EventProcessor.cc.

References schedule_.

833  {
834  return schedule_->getAllModuleDescriptions();
835  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 831 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

831 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1819 of file EventProcessor.cc.

References clearLumiPrincipal(), CMS_SA_ALLOW, edm::EndLuminosityBlock, esp_, dqmdumpme::first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1820  {
1821  // Get some needed info out of the status object before moving
1822  // it into finalTaskForThisLumi.
1823  auto& lp = *(iLumiStatus->lumiPrincipal());
1824  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1825  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1826  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1827  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1828 
1829  using namespace edm::waiting_task::chain;
1830  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1831  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1832 
1833  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1834  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1835  endGlobalTransitionAsync<Traits>(
1836  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1837  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1838  //Only call writeLumi if beginLumi succeeded
1839  if (didGlobalBeginSucceed) {
1840  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1841  }
1842  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1843  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1844  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1845  //any thrown exception auto propagates to nextTask via the chain
1847  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1848  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1849  if (iException) {
1850  handleEndLumiExceptions(*iException, nextTask);
1851  }
1853 
1854  std::exception_ptr ptr;
1855 
1856  // Try hard to clean up resources so the
1857  // process can terminate in a controlled
1858  // fashion even after exceptions have occurred.
1859  // Caught exception is passed to handleEndLumiExceptions()
1860  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1861  if (not ptr) {
1862  ptr = std::current_exception();
1863  }
1864  }
1865  // Caught exception is passed to handleEndLumiExceptions()
1866  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1867  if (not ptr) {
1868  ptr = std::current_exception();
1869  }
1870  }
1871  // Caught exception is passed to handleEndLumiExceptions()
1872  CMS_SA_ALLOW try {
1873  status->resetResources();
1874  status->globalEndRunHolderDoneWaiting();
1875  status.reset();
1876  } catch (...) {
1877  if (not ptr) {
1878  ptr = std::current_exception();
1879  }
1880  }
1881 
1882  if (ptr && !iException) {
1883  handleEndLumiExceptions(ptr, nextTask);
1884  }
1885  }) | runLast(std::move(iTask));
1886  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ globalEndRunAsync()

void edm::EventProcessor::globalEndRunAsync ( WaitingTaskHolder  iTask,
std::shared_ptr< RunProcessingStatus iRunStatus 
)

Definition at line 1479 of file EventProcessor.cc.

References clearRunPrincipal(), CMS_SA_ALLOW, edm::EndRun, esp_, dqmdumpme::first, handleEndRunExceptions(), edm::waiting_task::chain::ifThen(), looper_, eostools::move(), edm::MergeableRunProductMetadata::preWriteRun(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeRunAsync().

Referenced by beginRunAsync().

1479  {
1480  auto& runPrincipal = *(iRunStatus->runPrincipal());
1481  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1482  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1483  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1484  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1485  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1486 
1487  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1488  using namespace edm::waiting_task::chain;
1489  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1490  auto nextTask) {
1491  if (endingEventSetupSucceeded) {
1492  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1493  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1494  endGlobalTransitionAsync<Traits>(
1495  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1496  }
1497  }) |
1498  ifThen(looper_ && endingEventSetupSucceeded,
1499  [this, &runPrincipal, &es](auto nextTask) {
1500  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1501  }) |
1502  ifThen(looper_ && endingEventSetupSucceeded,
1503  [this, &runPrincipal, &es](auto nextTask) {
1505  looper_->doEndRun(runPrincipal, es, &processContext_);
1506  }) |
1507  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1508  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1509  mergeableRunProductMetadata->preWriteRun();
1510  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1511  }) |
1512  then([status = std::move(iRunStatus),
1513  this,
1514  didGlobalBeginSucceed,
1515  mergeableRunProductMetadata,
1516  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1517  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1518  mergeableRunProductMetadata->postWriteRun();
1519  }
1520  if (iException) {
1521  handleEndRunExceptions(*iException, nextTask);
1522  }
1524 
1525  std::exception_ptr ptr;
1526 
1527  // Try hard to clean up resources so the
1528  // process can terminate in a controlled
1529  // fashion even after exceptions have occurred.
1530  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1531  if (not ptr) {
1532  ptr = std::current_exception();
1533  }
1534  }
1535  CMS_SA_ALLOW try {
1536  status->resumeGlobalRunQueue();
1538  } catch (...) {
1539  if (not ptr) {
1540  ptr = std::current_exception();
1541  }
1542  }
1543  CMS_SA_ALLOW try {
1544  status->resetEndResources();
1545  status.reset();
1546  } catch (...) {
1547  if (not ptr) {
1548  ptr = std::current_exception();
1549  }
1550  }
1551 
1552  if (ptr && !iException) {
1553  handleEndRunExceptions(ptr, nextTask);
1554  }
1555  }) |
1556  runLast(std::move(iTask));
1557  }
ProcessContext processContext_
void clearRunPrincipal(RunProcessingStatus &)
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1810 of file EventProcessor.cc.

References setExceptionMessageLumis(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1810  {
1811  if (holder.taskHasFailed()) {
1813  } else {
1814  WaitingTaskHolder tmp(holder);
1815  tmp.doneWaiting(iException);
1816  }
1817  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleEndRunExceptions()

void edm::EventProcessor::handleEndRunExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1470 of file EventProcessor.cc.

References setExceptionMessageRuns(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by endRunAsync(), globalEndRunAsync(), and streamEndRunAsync().

1470  {
1471  if (holder.taskHasFailed()) {
1473  } else {
1474  WaitingTaskHolder tmp(holder);
1475  tmp.doneWaiting(iException);
1476  }
1477  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 2205 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kStopLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiStatus_, streamRunStatus_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginLumiAsync(), and continueLumiAsync().

2205  {
2206  auto group = iTask.group();
2207  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2208  CMS_SA_ALLOW try {
2209  auto status = streamLumiStatus_[iStreamIndex].get();
2211 
2212  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2213  auto recursionTask =
2214  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2215  if (iEventException) {
2216  WaitingTaskHolder copyHolder(iTask);
2217  copyHolder.doneWaiting(*iEventException);
2218  // Intentionally, we don't return here. The recursive call to
2219  // handleNextEvent takes care of immediately ending the run properly
2220  // using the same code it uses to end the run in other situations.
2221  }
2222  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2223  });
2224 
2225  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2226  } else {
2227  // the stream will stop processing this lumi now
2229  if (not status->haveStartedNextLumiOrEndedRun()) {
2230  status->startNextLumiOrEndRun();
2231  if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2232  CMS_SA_ALLOW try {
2233  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2234  input_->luminosityBlockAuxiliary()->beginTime()),
2235  streamRunStatus_[iStreamIndex],
2236  iTask);
2237  } catch (...) {
2238  WaitingTaskHolder copyHolder(iTask);
2239  copyHolder.doneWaiting(std::current_exception());
2240  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2241  }
2242  } else {
2243  // If appropriate, this will also start the next run.
2244  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2245  }
2246  }
2247  streamEndLumiAsync(iTask, iStreamIndex);
2248  } else {
2249  assert(status->eventProcessingState() ==
2251  auto runStatus = streamRunStatus_[iStreamIndex].get();
2252 
2253  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2254  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2255  }
2256  }
2257  }
2258  } catch (...) {
2259  WaitingTaskHolder copyHolder(iTask);
2260  copyHolder.doneWaiting(std::current_exception());
2261  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2262  }
2263  });
2264  }
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ handleNextItemAfterMergingRunEntries()

void edm::EventProcessor::handleNextItemAfterMergingRunEntries ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2123 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), input_, edm::InputSource::IsFile, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginRunAsync(), and processRuns().

2124  {
2126  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2127  iHolder.doneWaiting(std::exception_ptr{});
2128  } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2129  CMS_SA_ALLOW try {
2130  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2131  input_->luminosityBlockAuxiliary()->beginTime()),
2132  iRunStatus,
2133  iHolder);
2134  } catch (...) {
2135  WaitingTaskHolder copyHolder(iHolder);
2136  iHolder.doneWaiting(std::current_exception());
2137  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2138  }
2139  } else {
2140  // Note that endRunAsync will call beginRunAsync for the following run
2141  // if appropriate.
2142  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2143  }
2144  }
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 325 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, testHGCalDigi_cfg::dumpOptions, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, DMR_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ServiceRegistry::get(), edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ModuleDescription::getUniqueID(), edm::ParameterSet::getUntrackedParameterSet(), watchdog::group, historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::ServiceRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, modulesToIgnoreForDeleteEarly_, moduleTypeResolverMaker_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, muonDTDigis_cfi::pset, referencesToBranches_, edm::ParameterSet::registerIt(), runQueue_, schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, streamRunStatus_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

327  {
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
334  ParameterSet().registerIt();
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
438  ScheduleItems items;
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
495  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
519  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
585  SubProcessParentageHelper(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
std::multimap< std::string, std::string > referencesToBranches_
fileMode
Definition: DMR_cfg.py:72
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::string > modulesToIgnoreForDeleteEarly_
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
static ServiceRegistry & instance()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1091 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1091  {
1092  input_->fillProcessBlockHelper();
1093  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1094  while (input_->nextProcessBlock(processBlockPrincipal)) {
1095  readProcessBlock(processBlockPrincipal);
1096 
1097  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1098  FinalWaitingTask globalWaitTask{taskGroup_};
1099 
1100  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1101  beginGlobalTransitionAsync<Traits>(
1102  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1103 
1104  globalWaitTask.wait();
1105 
1106  FinalWaitingTask writeWaitTask{taskGroup_};
1108  writeWaitTask.wait();
1109 
1110  processBlockPrincipal.clearPrincipal();
1111  for (auto& s : subProcesses_) {
1112  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1113  }
1114  }
1115  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

291 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 292 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

292 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 863 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by continueLumiAsync(), processRuns(), readAndMergeLumiEntriesAsync(), readAndMergeRunEntriesAsync(), and readNextEventForStream().

863  {
864  SendSourceTerminationSignalIfException sentry(actReg_.get());
865  InputSource::ItemType itemType;
866  //For now, do nothing with InputSource::IsSynchronize
867  do {
868  itemType = input_->nextItemType();
869  } while (itemType == InputSource::IsSynchronize);
870 
871  lastSourceTransition_ = itemType;
872  sentry.completedSuccessfully();
873 
875 
877  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
879  }
880 
881  return lastSourceTransition_;
882  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 985 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

985  {
986  if (fileBlockValid()) {
987  schedule_->openOutputFiles(*fb_);
988  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
989  }
990  FDEBUG(1) << "\topenOutputFiles\n";
991  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 279 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), and readFile().

279 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 280 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

280 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1049 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1049  {
1050  looper_->prepareForNextLoop(esp_.get());
1051  FDEBUG(1) << "\tprepareForNextLoop\n";
1052  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 143 of file EventProcessor.h.

References processConfiguration_.

143 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2281 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

2281  {
2282  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2283  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2285 of file EventProcessor.cc.

References esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, dqmdumpme::first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

2285  {
2286  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2287 
2289  Service<RandomNumberGenerator> rng;
2290  if (rng.isAvailable()) {
2291  Event ev(*pep, ModuleDescription(), nullptr);
2292  rng->postEventRead(ev);
2293  }
2294 
2295  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2296  using namespace edm::waiting_task::chain;
2297  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2298  EventTransitionInfo info(*pep, es);
2299  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2300  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2301  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2302  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2303  }
2304  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2305  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2306  ServiceRegistry::Operate operateLooper(serviceToken_);
2307  processEventWithLooper(*pep, iStreamIndex);
2308  }) | then([pep](auto nextTask) {
2309  FDEBUG(1) << "\tprocessEvent\n";
2310  pep->clearEventPrincipal();
2311  }) | runLast(iHolder);
2312  }
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 2314 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

2314  {
2315  bool randomAccess = input_->randomAccess();
2316  ProcessingController::ForwardState forwardState = input_->forwardState();
2317  ProcessingController::ReverseState reverseState = input_->reverseState();
2318  ProcessingController pc(forwardState, reverseState, randomAccess);
2319 
2321  do {
2322  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2323  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2324  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2325 
2326  bool succeeded = true;
2327  if (randomAccess) {
2328  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2329  input_->skipEvents(-2);
2330  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2331  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2332  }
2333  }
2334  pc.setLastOperationSucceeded(succeeded);
2335  } while (!pc.lastOperationSucceeded());
2337  shouldWeStop_ = true;
2338  }
2339  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processRuns()

InputSource::ItemType edm::EventProcessor::processRuns ( )

Definition at line 1144 of file EventProcessor.cc.

References cms::cuda::assert(), beginRunAsync(), continueLumiAsync(), handleNextItemAfterMergingRunEntries(), input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeRun(), streamLumiActive_, streamRunActive_, streamRunStatus_, and taskGroup_.

1144  {
1145  FinalWaitingTask waitTask{taskGroup_};
1147  if (streamRunActive_ == 0) {
1148  assert(streamLumiActive_ == 0);
1149 
1150  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1151  WaitingTaskHolder{taskGroup_, &waitTask});
1152  } else {
1154 
1155  auto runStatus = streamRunStatus_[0];
1156 
1157  while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1158  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1159  readAndMergeRun(*runStatus);
1161  }
1162 
1163  WaitingTaskHolder holder{taskGroup_, &waitTask};
1164  runStatus->setHolderOfTaskInProcessRuns(holder);
1165  if (streamLumiActive_ > 0) {
1167  continueLumiAsync(std::move(holder));
1168  } else {
1170  }
1171  }
1172  waitTask.wait();
1173  return lastTransitionType();
1174  }
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
PreallocationConfiguration preallocations_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void readAndMergeRun(RunProcessingStatus &)
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
oneapi::tbb::task_group taskGroup_
std::atomic< unsigned int > streamRunActive_
void continueLumiAsync(WaitingTaskHolder)
std::atomic< unsigned int > streamLumiActive_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ readAndMergeLumi()

void edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1993 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by continueLumiAsync(), and readAndMergeLumiEntriesAsync().

1993  {
1994  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1995  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1996  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1997  input_->processHistoryRegistry().reducedProcessHistoryID(
1998  input_->luminosityBlockAuxiliary()->processHistoryID()));
1999  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
2000  assert(lumiOK);
2001  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2002  {
2003  SendSourceTerminationSignalIfException sentry(actReg_.get());
2004  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2005  sentry.completedSuccessfully();
2006  }
2007  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeLumiEntriesAsync()

void edm::EventProcessor::readAndMergeLumiEntriesAsync ( std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2101 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeLumi(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, and sourceResourcesAcquirer_.

Referenced by beginLumiAsync().

2102  {
2103  auto group = iHolder.group();
2105  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2106  CMS_SA_ALLOW try {
2108 
2109  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2110 
2112  while (lastTransitionType() == InputSource::IsLumi and
2113  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2114  readAndMergeLumi(*iLumiStatus);
2116  }
2117  } catch (...) {
2118  holder.doneWaiting(std::current_exception());
2119  }
2120  });
2121  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readAndMergeRun()

void edm::EventProcessor::readAndMergeRun ( RunProcessingStatus iStatus)

Definition at line 1967 of file EventProcessor.cc.

References actReg_, edm::Principal::adjustToNewProductRegistry(), cms::cuda::assert(), input_, edm::RunPrincipal::mergeAuxiliary(), preg_, and edm::RunProcessingStatus::runPrincipal().

Referenced by processRuns(), and readAndMergeRunEntriesAsync().

1967  {
1968  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1969 
1970  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1971  assert(runOK);
1972  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1973  {
1974  SendSourceTerminationSignalIfException sentry(actReg_.get());
1975  input_->readAndMergeRun(runPrincipal);
1976  sentry.completedSuccessfully();
1977  }
1978  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRunEntriesAsync()

void edm::EventProcessor::readAndMergeRunEntriesAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2074 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeRun(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, sourceResourcesAcquirer_, and mps_update::status.

Referenced by beginRunAsync().

2075  {
2076  auto group = iHolder.group();
2078  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2079  CMS_SA_ALLOW try {
2081 
2082  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2083 
2085  while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2086  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2087  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2088  status->setStopBeforeProcessingRun(true);
2089  return;
2090  }
2093  }
2094  } catch (...) {
2095  status->setStopBeforeProcessingRun(true);
2096  holder.doneWaiting(std::current_exception());
2097  }
2098  });
2099  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeRun(RunProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2266 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, streamLumiStatus_, and streamRunStatus_.

Referenced by readNextEventForStream().

2266  {
2267  //TODO this will have to become per stream
2268  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2269  StreamContext streamContext(event.streamID(), &processContext_);
2270 
2271  SendSourceTerminationSignalIfException sentry(actReg_.get());
2272  input_->readEvent(event, streamContext);
2273 
2274  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2275  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2276  sentry.completedSuccessfully();
2277 
2278  FDEBUG(1) << "\treadEvent\n";
2279  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 951 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, streamLumiActive_, streamLumiStatus_, streamRunActive_, and streamRunStatus_.

951  {
952  FDEBUG(1) << " \treadFile\n";
953  size_t size = preg_->size();
954  SendSourceTerminationSignalIfException sentry(actReg_.get());
955 
956  if (streamRunActive_ > 0) {
957  streamRunStatus_[0]->runPrincipal()->preReadFile();
958  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
959  }
960 
961  if (streamLumiActive_ > 0) {
962  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
963  }
964 
965  fb_ = input_->readFile();
966  if (size < preg_->size()) {
968  }
971  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
972  }
973  sentry.completedSuccessfully();
974  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
PrincipalCache principalCache_

◆ readLuminosityBlock()

std::shared_ptr< LuminosityBlockPrincipal > edm::EventProcessor::readLuminosityBlock ( std::shared_ptr< RunPrincipal rp)

Definition at line 1980 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableLumiPrincipalPtr(), historyAppender_, input_, eostools::move(), and principalCache_.

Referenced by beginLumiAsync().

1980  {
1982  assert(lbp);
1983  lbp->setAux(*input_->luminosityBlockAuxiliary());
1984  {
1985  SendSourceTerminationSignalIfException sentry(actReg_.get());
1986  input_->readLuminosityBlock(*lbp, *historyAppender_);
1987  sentry.completedSuccessfully();
1988  }
1989  lbp->setRunPrincipal(std::move(rp));
1990  return lbp;
1991  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(be >=bs)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( WaitingTaskHolder const &  iTask,
unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iStatus 
)
private

Definition at line 2146 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::eventProcessingState(), firstItemAfterLumiMerge_, input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kProcessing, edm::LuminosityBlockProcessingStatus::kStopLumi, lastSourceTransition_, lastTransitionType(), edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextTransitionType(), or, readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setEventProcessingState(), shouldWeStop(), sourceMutex_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync().

2148  {
2149  // This function returns true if it successfully reads an event for the stream and that
2150  // requires both that an event is next and there are no problems or requests to stop.
2151 
2152  if (iTask.taskHasFailed()) {
2153  // We want all streams to stop or all streams to pause. If we are already in the
2154  // middle of pausing streams, then finish pausing all of them and the lumi will be
2155  // ended later. Otherwise, just end it now.
2156  if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2158  }
2159  return false;
2160  }
2161 
2162  // Did another stream already stop or pause this lumi?
2163  if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2164  return false;
2165  }
2166 
2167  // Are output modules or the looper requesting we stop?
2168  if (shouldWeStop()) {
2171  return false;
2172  }
2173 
2175 
2176  // need to use lock in addition to the serial task queue because
2177  // of delayed provenance reading and reading data in response to
2178  // edm::Refs etc
2179  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2180 
2181  // If we didn't already call nextTransitionType while merging lumis, call it here.
2182  // This asks the input source what is next and also checks for signals.
2183 
2185  firstItemAfterLumiMerge_ = false;
2186 
2187  if (InputSource::IsEvent != itemType) {
2188  // IsFile may continue processing the lumi and
2189  // looper_ can cause the input source to declare a new IsRun which is actually
2190  // just a continuation of the previous run
2191  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2192  (InputSource::IsRun == itemType and
2193  (iStatus.lumiPrincipal()->run() != input_->run() or
2194  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2196  } else {
2198  }
2199  return false;
2200  }
2201  readEvent(iStreamIndex);
2202  return true;
2203  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
bool shouldWeStop() const

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1948 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1948  {
1949  SendSourceTerminationSignalIfException sentry(actReg_.get());
1950  input_->readProcessBlock(processBlockPrincipal);
1951  sentry.completedSuccessfully();
1952  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::shared_ptr< RunPrincipal > edm::EventProcessor::readRun ( )

Definition at line 1954 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableRunPrincipalPtr(), historyAppender_, input_, and principalCache_.

Referenced by beginRunAsync().

1954  {
1956  assert(rp);
1957  rp->setAux(*input_->runAuxiliary());
1958  {
1959  SendSourceTerminationSignalIfException sentry(actReg_.get());
1960  input_->readRun(*rp, *historyAppender_);
1961  sentry.completedSuccessfully();
1962  }
1963  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1964  return rp;
1965  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ releaseBeginRunResources()

void edm::EventProcessor::releaseBeginRunResources ( unsigned int  iStream)

Definition at line 1411 of file EventProcessor.cc.

References queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), mps_update::status, streamQueues_, and streamRunStatus_.

Referenced by streamBeginRunAsync().

1411  {
1412  auto& status = streamRunStatus_[iStream];
1413  if (status->streamFinishedBeginRun()) {
1414  status->resetBeginResources();
1416  }
1417  streamQueues_[iStream].resume();
1418  }
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 1010 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1010  {
1011  if (fileBlockValid()) {
1012  schedule_->respondToCloseInputFile(*fb_);
1013  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1014  }
1015  FDEBUG(1) << "\trespondToCloseInputFile\n";
1016  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 1000 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1000  {
1001  if (fileBlockValid()) {
1003  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1004  schedule_->respondToOpenInputFile(*fb_);
1005  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1006  }
1007  FDEBUG(1) << "\trespondToOpenInputFile\n";
1008  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1043 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1043  {
1044  input_->repeat();
1045  input_->rewind();
1046  FDEBUG(1) << "\trewind\n";
1047  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 377 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

377 { return runToCompletion(); }
StatusCode runToCompletion()

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 884 of file EventProcessor.cc.

References actReg_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

884  {
885  beginJob(); //make sure this was called
886 
887  // make the services available
889  actReg_->beginProcessingSignal_();
890  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
891  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
892  try {
893  FilesProcessor fp(fileModeNoMerge_);
894 
895  convertException::wrap([&]() {
896  bool firstTime = true;
897  do {
898  if (not firstTime) {
900  rewindInput();
901  } else {
902  firstTime = false;
903  }
904  startingNewLoop();
905 
906  auto trans = fp.processFiles(*this);
907 
908  fp.normalEnd();
909 
910  if (deferredExceptionPtrIsSet_.load()) {
911  std::rethrow_exception(deferredExceptionPtr_);
912  }
913  if (trans != InputSource::IsStop) {
914  //problem with the source
915  doErrorStuff();
916 
917  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
918  }
919  } while (not endOfLoop());
920  }); // convertException::wrap
921 
922  } // Try block
923  catch (cms::Exception& e) {
925  std::string message(
926  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
927  e.addAdditionalInfo(message);
928  if (e.alreadyPrinted()) {
929  LogAbsolute("Additional Exceptions") << message;
930  }
931  }
932  if (exceptionMessageRuns_) {
933  std::string message(
934  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
935  e.addAdditionalInfo(message);
936  if (e.alreadyPrinted()) {
937  LogAbsolute("Additional Exceptions") << message;
938  }
939  }
940  if (!exceptionMessageFiles_.empty()) {
941  e.addAdditionalInfo(exceptionMessageFiles_);
942  if (e.alreadyPrinted()) {
943  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
944  }
945  }
946  throw;
947  }
948  return epSuccess;
949  }
std::atomic< bool > exceptionMessageLumis_
std::atomic< bool > exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2362 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

2362  {
2363  bool expected = false;
2364  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2365  deferredExceptionPtr_ = iException;
2366  return true;
2367  }
2368  return false;
2369  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2356 of file EventProcessor.cc.

References exceptionMessageFiles_.

2356 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2360 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2360 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( )

Definition at line 2358 of file EventProcessor.cc.

References exceptionMessageRuns_.

Referenced by handleEndRunExceptions().

2358 { exceptionMessageRuns_ = true; }
std::atomic< bool > exceptionMessageRuns_

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1054 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1054  {
1055  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1056  if (!subProcesses_.empty()) {
1057  for (auto const& subProcess : subProcesses_) {
1058  if (subProcess.shouldWeCloseOutput()) {
1059  return true;
1060  }
1061  }
1062  return false;
1063  }
1064  return schedule_->shouldWeCloseOutput();
1065  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2341 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2341  {
2342  FDEBUG(1) << "\tshouldWeStop\n";
2343  if (shouldWeStop_)
2344  return true;
2345  if (!subProcesses_.empty()) {
2346  for (auto const& subProcess : subProcesses_) {
2347  if (subProcess.terminate()) {
2348  return true;
2349  }
2350  }
2351  return false;
2352  }
2353  return schedule_->terminate();
2354  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 1018 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

1018  {
1019  shouldWeStop_ = false;
1020  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1021  // until after we've called beginOfJob
1022  if (looper_ && looperBeginJobRun_) {
1023  looper_->doStartingNewLoop();
1024  }
1025  FDEBUG(1) << "\tstartingNewLoop\n";
1026  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamBeginRunAsync()

void edm::EventProcessor::streamBeginRunAsync ( unsigned int  iStream,
std::shared_ptr< RunProcessingStatus status,
bool  precedingTasksSucceeded,
WaitingTaskHolder  iHolder 
)

Definition at line 1379 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, edm::RunProcessingStatus::eventSetupImpl(), edm::RunProcessingStatus::eventSetupImpls(), dqmdumpme::first, eostools::move(), releaseBeginRunResources(), edm::waiting_task::chain::runLast(), edm::RunProcessingStatus::runPrincipal(), schedule_, serviceToken_, mps_update::status, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync().

1382  {
1383  // These shouldn't throw
1384  streamQueues_[iStream].pause();
1385  ++streamRunActive_;
1386  streamRunStatus_[iStream] = std::move(status);
1387 
1388  CMS_SA_ALLOW try {
1389  using namespace edm::waiting_task::chain;
1390  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1391  if (precedingTasksSucceeded) {
1392  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1393  RunTransitionInfo transitionInfo(
1394  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1395  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1396  beginStreamTransitionAsync<Traits>(
1397  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1398  }
1399  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1400  if (exceptionFromBeginStreamRun) {
1401  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1402  }
1403  releaseBeginRunResources(iStream);
1404  }) | runLast(iHolder);
1405  } catch (...) {
1406  releaseBeginRunResources(iStream);
1407  iHolder.doneWaiting(std::current_exception());
1408  }
1409  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void releaseBeginRunResources(unsigned int iStream)
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1888 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, submitPVValidationJobs::t, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1888  {
1889  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1890  auto status = streamLumiStatus_[iStreamIndex];
1891  if (iException) {
1892  handleEndLumiExceptions(*iException, iTask);
1893  }
1894 
1895  // reset status before releasing queue else get race condition
1896  streamLumiStatus_[iStreamIndex].reset();
1898  streamQueues_[iStreamIndex].resume();
1899 
1900  //are we the last one?
1901  if (status->streamFinishedLumi()) {
1903  }
1904  });
1905 
1906  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1907 
1908  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1909  // therefore we do not want to hold the shared_ptr
1910  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1911  lumiStatus->setEndTime();
1912 
1913  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1914  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1915  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1916 
1917  if (lumiStatus->didGlobalBeginSucceed()) {
1918  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1919  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1920  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1921  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1922  *schedule_,
1923  iStreamIndex,
1924  transitionInfo,
1925  serviceToken_,
1926  subProcesses_,
1927  cleaningUpAfterException);
1928  }
1929  }
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
bool taskHasFailed() const noexcept
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndRunAsync()

void edm::EventProcessor::streamEndRunAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1559 of file EventProcessor.cc.

References CMS_SA_ALLOW, esp_, exceptionRunStatus_, edm::WaitingTaskHolder::group(), handleEndRunExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endRunAsync().

1559  {
1560  CMS_SA_ALLOW try {
1561  if (!streamRunStatus_[iStreamIndex]) {
1562  if (exceptionRunStatus_->streamFinishedRun()) {
1563  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1564  exceptionRunStatus_.reset();
1565  }
1566  return;
1567  }
1568 
1569  auto runDoneTask =
1570  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1571  if (iException) {
1572  handleEndRunExceptions(*iException, iTask);
1573  }
1574 
1575  auto runStatus = streamRunStatus_[iStreamIndex];
1576 
1577  //reset status before releasing queue else get race condition
1578  if (runStatus->streamFinishedRun()) {
1579  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1580  }
1581  streamRunStatus_[iStreamIndex].reset();
1582  --streamRunActive_;
1583  streamQueues_[iStreamIndex].resume();
1584  });
1585 
1586  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1587 
1588  auto runStatus = streamRunStatus_[iStreamIndex].get();
1589 
1590  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1591  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1592  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1593  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1594 
1595  auto& runPrincipal = *runStatus->runPrincipal();
1596  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1597  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1598  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1599  *schedule_,
1600  iStreamIndex,
1601  transitionInfo,
1602  serviceToken_,
1603  subProcesses_,
1604  cleaningUpAfterException);
1605  }
1606  } catch (...) {
1607  handleEndRunExceptions(std::current_exception(), iTask);
1608  }
1609  }
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 625 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

625  {
628  task.waitNoThrow();
629  assert(task.done());
630  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

285  {
287  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 288 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

288  {
290  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2371 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2371  {
2372  cms::Exception ex("ModulesSynchingOnLumis");
2373  ex << "The framework is configured to use at least two streams, but the following modules\n"
2374  << "require synchronizing on LuminosityBlock boundaries:";
2375  bool found = false;
2376  for (auto worker : schedule_->allWorkers()) {
2377  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2378  found = true;
2379  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2380  }
2381  }
2382  if (found) {
2383  ex << "\n\nThe situation can be fixed by either\n"
2384  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2385  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2386  throw ex;
2387  }
2388  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 837 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

837 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 841 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

841 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 839 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

839 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutLegacyModules()

void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2403 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2403  {
2404  std::unique_ptr<LogSystem> s;
2405  for (auto worker : schedule_->allWorkers()) {
2406  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2407  if (not s) {
2408  s = std::make_unique<LogSystem>("LegacyModules");
2409  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2410  "is going to end soon. These modules need to be converted to have type\n"
2411  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2412  }
2413  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2414  }
2415  }
2416  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutModulesRequiringRunSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringRunSynchronization ( ) const
private

Definition at line 2390 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

2390  {
2391  std::unique_ptr<LogSystem> s;
2392  for (auto worker : schedule_->allWorkers()) {
2393  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2394  if (not s) {
2395  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2396  (*s) << "The following modules require synchronizing on Run boundaries:";
2397  }
2398  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2399  }
2400  }
2401  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 2048 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), findAndChange::op, processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

2048  {
2049  using namespace edm::waiting_task;
2050  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2051  chain::first([&](auto nextTask) {
2053 
2054  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2055  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2056  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2058  for (auto& s : subProcesses_) {
2059  s.writeLumiAsync(nextTask, lumiPrincipal);
2060  }
2062  }
2063  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 2009 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), eostools::move(), findAndChange::op, principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

2009  {
2010  using namespace edm::waiting_task;
2011  chain::first([&](auto nextTask) {
2013  schedule_->writeProcessBlockAsync(
2014  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2015  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2017  for (auto& s : subProcesses_) {
2018  s.writeProcessBlockAsync(nextTask, processBlockType);
2019  }
2020  }) | chain::runLast(std::move(task));
2021  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
RunPrincipal const &  runPrincipal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 2023 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::RunPrincipal::kNo, eostools::move(), findAndChange::op, processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, edm::RunPrincipal::shouldWriteRun(), subProcesses_, and TrackValidation_cff::task.

Referenced by globalEndRunAsync().

2025  {
2026  using namespace edm::waiting_task;
2027  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2028  chain::first([&](auto nextTask) {
2030  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2031  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2033  for (auto& s : subProcesses_) {
2034  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2035  }
2036  }) | chain::runLast(std::move(task));
2037  }
2038  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 319 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 352 of file EventProcessor.h.

Referenced by beginJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 335 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 309 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 347 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 346 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 371 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 368 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 355 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 357 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::atomic<bool> edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 356 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ exceptionRunStatus_

std::shared_ptr<RunProcessingStatus> edm::EventProcessor::exceptionRunStatus_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginRunAsync(), and streamEndRunAsync().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 354 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 364 of file EventProcessor.h.

◆ firstItemAfterLumiMerge_

bool edm::EventProcessor::firstItemAfterLumiMerge_ = true
private

Definition at line 372 of file EventProcessor.h.

Referenced by beginLumiAsync(), continueLumiAsync(), and readNextEventForStream().

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 360 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 358 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 340 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_ = InputSource::IsInvalid
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 359 of file EventProcessor.h.

Referenced by beginRunAsync(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 323 of file EventProcessor.h.

Referenced by init().

◆ modulesToIgnoreForDeleteEarly_

std::vector<std::string> edm::EventProcessor::modulesToIgnoreForDeleteEarly_
private

Definition at line 337 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ moduleTypeResolverMaker_

edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const> > edm::EventProcessor::moduleTypeResolverMaker_
private

Definition at line 315 of file EventProcessor.h.

Referenced by init().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 308 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), readAndMergeRun(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 370 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 310 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 320 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), and processConfiguration().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

◆ referencesToBranches_

std::multimap<std::string, std::string> edm::EventProcessor::referencesToBranches_
private

Definition at line 336 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ runQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::runQueue_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 353 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

◆ streamQueuesInserter_

SerialTaskQueue edm::EventProcessor::streamQueuesInserter_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRunAsync(), and endRunAsync().

◆ streamRunActive_

std::atomic<unsigned int> edm::EventProcessor::streamRunActive_ {0}
private

◆ streamRunStatus_

std::vector<std::shared_ptr<RunProcessingStatus> > edm::EventProcessor::streamRunStatus_
private

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 311 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().