CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRunAsync (IOVSyncValue const &, WaitingTaskHolder)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void clearLumiPrincipal (LuminosityBlockProcessingStatus &)
 
void clearRunPrincipal (RunProcessingStatus &)
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (WaitingTaskHolder)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRunAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void endUnfinishedLumi (bool cleaningUpAfterException)
 
void endUnfinishedRun (bool cleaningUpAfterException)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void globalEndRunAsync (WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
 
void handleEndLumiExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void handleEndRunExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processRuns ()
 
void readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
void readAndMergeRun (RunProcessingStatus &)
 
void readFile ()
 
std::shared_ptr< LuminosityBlockPrincipalreadLuminosityBlock (std::shared_ptr< RunPrincipal > rp)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::shared_ptr< RunPrincipalreadRun ()
 
void releaseBeginRunResources (unsigned int iStream)
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns ()
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamBeginRunAsync (unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
 
void streamEndLumiAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void streamEndRunAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void handleNextItemAfterMergingRunEntries (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readAndMergeLumiEntriesAsync (std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
 
void readAndMergeRunEntriesAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 
void warnAboutModulesRequiringRunSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::atomic< bool > exceptionMessageRuns_
 
std::shared_ptr< RunProcessingStatusexceptionRunStatus_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool firstItemAfterLumiMerge_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_ = InputSource::IsInvalid
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
std::vector< std::string > modulesToIgnoreForDeleteEarly_
 
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
std::multimap< std::string, std::string > referencesToBranches_
 
std::unique_ptr< edm::LimitedTaskQueuerunQueue_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
SerialTaskQueue streamQueuesInserter_
 
std::atomic< unsigned int > streamRunActive_ {0}
 
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 69 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 366 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 367 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 235 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 79 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 214 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

219  : actReg_(),
220  preg_(),
222  serviceToken_(),
223  input_(),
225  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
226  esp_(),
227  act_table_(),
229  schedule_(),
230  subProcesses_(),
231  historyAppender_(new HistoryAppender),
232  fb_(),
233  looper_(),
235  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
236  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
237  principalCache_(),
238  beginJobCalled_(false),
239  shouldWeStop_(false),
240  fileModeNoMerge_(false),
242  exceptionMessageRuns_(false),
243  exceptionMessageLumis_(false),
244  forceLooperToEnd_(false),
245  looperBeginJobRun_(false),
248  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
249  processDesc->addServices(defaultServices, forcedServices);
250  init(processDesc, iToken, iLegacy);
251  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 253 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

256  : actReg_(),
257  preg_(),
259  serviceToken_(),
260  input_(),
262  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
263  esp_(),
264  act_table_(),
266  schedule_(),
267  subProcesses_(),
268  historyAppender_(new HistoryAppender),
269  fb_(),
270  looper_(),
272  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
273  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
274  principalCache_(),
275  beginJobCalled_(false),
276  shouldWeStop_(false),
277  fileModeNoMerge_(false),
279  exceptionMessageRuns_(false),
280  exceptionMessageLumis_(false),
281  forceLooperToEnd_(false),
282  looperBeginJobRun_(false),
285  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
286  processDesc->addServices(defaultServices, forcedServices);
288  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 290 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

293  : actReg_(),
294  preg_(),
296  serviceToken_(),
297  input_(),
298  moduleTypeResolverMaker_(makeModuleTypeResolverMaker(*processDesc->getProcessPSet())),
299  espController_(std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get())),
300  esp_(),
301  act_table_(),
303  schedule_(),
304  subProcesses_(),
305  historyAppender_(new HistoryAppender),
306  fb_(),
307  looper_(),
309  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
310  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
311  principalCache_(),
312  beginJobCalled_(false),
313  shouldWeStop_(false),
314  fileModeNoMerge_(false),
316  exceptionMessageRuns_(false),
317  exceptionMessageLumis_(false),
318  forceLooperToEnd_(false),
319  looperBeginJobRun_(false),
322  init(processDesc, token, legacy);
323  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< edm::ModuleTypeResolverMaker const > makeModuleTypeResolverMaker(edm::ParameterSet const &pset)
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 607 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, findAndChange::op, schedule_, and unpackBuffers-CaloStage2::token.

607  {
608  // Make the services available while everything is being deleted.
611 
612  // manually destroy all these thing that may need the services around
613  // propagate_const<T> has no reset() function
614  espController_ = nullptr;
615  esp_ = nullptr;
616  schedule_ = nullptr;
617  input_ = nullptr;
618  looper_ = nullptr;
619  actReg_ = nullptr;
620 
623  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 632 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, HltBtagPostValidation_cff::c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, makeListRunsInFiles::description, esp_, espController_, edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, MainPageGenerator::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, merge(), modulesToIgnoreForDeleteEarly_, eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, referencesToBranches_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, std::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), warnAboutModulesRequiringRunSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

632  {
633  if (beginJobCalled_)
634  return;
635  beginJobCalled_ = true;
636  bk::beginJob();
637 
638  // StateSentry toerror(this); // should we add this ?
639  //make the services available
641 
642  service::SystemBounds bounds(preallocations_.numberOfStreams(),
646  actReg_->preallocateSignal_(bounds);
647  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
649 
650  std::vector<ModuleProcessName> consumedBySubProcesses;
652  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
653  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
654  if (consumedBySubProcesses.empty()) {
655  consumedBySubProcesses = std::move(c);
656  } else if (not c.empty()) {
657  std::vector<ModuleProcessName> tmp;
658  tmp.reserve(consumedBySubProcesses.size() + c.size());
659  std::merge(consumedBySubProcesses.begin(),
660  consumedBySubProcesses.end(),
661  c.begin(),
662  c.end(),
663  std::back_inserter(tmp));
664  std::swap(consumedBySubProcesses, tmp);
665  }
666  });
667 
668  // Note: all these may throw
671  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
672  not unusedModules.empty()) {
674 
675  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
676  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
677  "and "
678  "therefore they are deleted before beginJob transition.";
679  for (auto const& description : unusedModules) {
680  l << "\n " << description->moduleLabel();
681  }
682  });
683  for (auto const& description : unusedModules) {
684  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
685  }
686  }
687  }
688  // Initialize after the deletion of non-consumed unscheduled
689  // modules to avoid non-consumed non-run modules to keep the
690  // products unnecessarily alive
691  if (not branchesToDeleteEarly_.empty()) {
692  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
693  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
694  auto referencesToBranches = std::move(referencesToBranches_);
695  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
696  }
697 
698  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
699 
702  }
703  if (preallocations_.numberOfRuns() > 1) {
705  }
707 
708  //NOTE: This implementation assumes 'Job' means one call
709  // the EventProcessor::run
710  // If it really means once per 'application' then this code will
711  // have to be changed.
712  // Also have to deal with case where have 'run' then new Module
713  // added and do 'run'
714  // again. In that case the newly added Module needs its 'beginJob'
715  // to be called.
716 
717  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
718  // For now we delay calling beginOfJob until first beginOfRun
719  //if(looper_) {
720  // looper_->beginOfJob(es);
721  //}
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728  espController_->finishConfiguration();
729  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToResolverIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
ProcessContext processContext_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::multimap< std::string, std::string > referencesToBranches_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:169
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( IOVSyncValue const &  iSync,
std::shared_ptr< RunProcessingStatus iRunStatus,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1624 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), dqmdumpme::first, firstItemAfterLumiMerge_, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeLumiEntriesAsync(), readLuminosityBlock(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, streamQueuesInserter_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1626  {
1627  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1628 
1629  auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1630  chain::first([this, &iSync, &status](auto nextTask) {
1631  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1632  nextTask,
1633  status->endIOVWaitingTasks(),
1634  status->eventSetupImpls(),
1636  actReg_.get(),
1637  serviceToken_);
1638  }) | chain::then([this, status, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1639  CMS_SA_ALLOW try {
1640  //the call to doneWaiting will cause the count to decrement
1641  if (iException) {
1642  WaitingTaskHolder copyHolder(nextTask);
1643  copyHolder.doneWaiting(*iException);
1644  }
1645 
1646  lumiQueue_->pushAndPause(
1647  *nextTask.group(),
1648  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1649  CMS_SA_ALLOW try {
1650  if (postLumiQueueTask.taskHasFailed()) {
1651  status->resetResources();
1653  endRunAsync(iRunStatus, postLumiQueueTask);
1654  return;
1655  }
1656 
1657  status->setResumer(std::move(iResumer));
1658 
1660  *postLumiQueueTask.group(),
1661  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1662  CMS_SA_ALLOW try {
1664 
1665  if (postSourceTask.taskHasFailed()) {
1666  status->resetResources();
1668  endRunAsync(iRunStatus, postSourceTask);
1669  return;
1670  }
1671 
1672  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1673 
1674  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1675  {
1676  SendSourceTerminationSignalIfException sentry(actReg_.get());
1677  input_->doBeginLumi(lumiPrincipal, &processContext_);
1678  sentry.completedSuccessfully();
1679  }
1680 
1681  Service<RandomNumberGenerator> rng;
1682  if (rng.isAvailable()) {
1683  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1684  rng->preBeginLumi(lb);
1685  }
1686 
1687  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1688 
1689  using namespace edm::waiting_task::chain;
1690  chain::first([this, status](auto nextTask) mutable {
1692  firstItemAfterLumiMerge_ = true;
1693  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1694  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1695  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1696  beginGlobalTransitionAsync<Traits>(
1697  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1698  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1699  looper_->prefetchAsync(
1700  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1701  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1702  status->globalBeginDidSucceed();
1703  ServiceRegistry::Operate operateLooper(serviceToken_);
1704  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1705  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1706  if (iException) {
1707  status->resetResources();
1709  WaitingTaskHolder copyHolder(holder);
1710  copyHolder.doneWaiting(*iException);
1711  endRunAsync(iRunStatus, holder);
1712  } else {
1713  if (not looper_) {
1714  status->globalBeginDidSucceed();
1715  }
1716 
1717  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1718 
1719  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1720  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1721 
1722  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1723  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1724  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1725  streamQueues_[i].pause();
1726 
1727  auto& event = principalCache_.eventPrincipal(i);
1728  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1729  // held by the container as this lambda may not finish executing before all the tasks it
1730  // spawns have already started to run.
1731  auto eventSetupImpls = &status->eventSetupImpls();
1732  auto lp = status->lumiPrincipal().get();
1735  event.setLuminosityBlockPrincipal(lp);
1736  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1737  using namespace edm::waiting_task::chain;
1738  chain::first([this, i, &transitionInfo](auto nextTask) {
1739  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1740  *schedule_,
1741  i,
1742  transitionInfo,
1743  serviceToken_,
1744  subProcesses_);
1745  }) |
1746  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1747  auto nextTask) {
1748  if (exceptionFromBeginStreamLumi) {
1749  WaitingTaskHolder copyHolder(nextTask);
1750  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1751  }
1753  }) |
1754  runLast(std::move(holder));
1755  });
1756  } // end for loop over streams
1757  });
1758  }
1759  }) | runLast(postSourceTask);
1760  } catch (...) {
1761  status->resetResources();
1763  WaitingTaskHolder copyHolder(postSourceTask);
1764  copyHolder.doneWaiting(std::current_exception());
1765  endRunAsync(iRunStatus, postSourceTask);
1766  }
1767  }); // task in sourceResourcesAcquirer
1768  } catch (...) {
1769  status->resetResources();
1771  WaitingTaskHolder copyHolder(postLumiQueueTask);
1772  copyHolder.doneWaiting(std::current_exception());
1773  endRunAsync(iRunStatus, postLumiQueueTask);
1774  }
1775  }); // task in lumiQueue
1776  } catch (...) {
1777  status->resetResources();
1779  WaitingTaskHolder copyHolder(nextTask);
1780  copyHolder.doneWaiting(std::current_exception());
1781  endRunAsync(iRunStatus, nextTask);
1782  }
1783  }) | chain::runLast(std::move(iHolder));
1784  }
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1075 of file EventProcessor.cc.

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1075  {
1076  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1077  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1078 
1079  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1080  FinalWaitingTask globalWaitTask{taskGroup_};
1081 
1082  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1083  beginGlobalTransitionAsync<Traits>(
1084  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1085 
1086  globalWaitTask.wait();
1087  beginProcessBlockSucceeded = true;
1088  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRunAsync()

void edm::EventProcessor::beginRunAsync ( IOVSyncValue const &  iSync,
WaitingTaskHolder  iHolder 
)

Definition at line 1175 of file EventProcessor.cc.

References actReg_, edm::BeginRun, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, exceptionRunStatus_, dqmdumpme::first, forceESCacheClearOnNewRun_, globalEndRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, handleNextItemAfterMergingRunEntries(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeRunEntriesAsync(), readRun(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), runQueue_, schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamBeginRunAsync(), streamQueues_, streamQueuesInserter_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and edm::waiting_task::chain::then().

Referenced by endRunAsync(), and processRuns().

1175  {
1176  if (iHolder.taskHasFailed()) {
1177  return;
1178  }
1179 
1180  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1181 
1182  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1183 
1184  chain::first([this, &status, iSync](auto nextTask) {
1185  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1186  nextTask,
1187  status->endIOVWaitingTasks(),
1188  status->eventSetupImpls(),
1190  actReg_.get(),
1191  serviceToken_,
1193  }) | chain::then([this, status](std::exception_ptr const* iException, auto nextTask) {
1194  CMS_SA_ALLOW try {
1195  if (iException) {
1196  WaitingTaskHolder copyHolder(nextTask);
1197  copyHolder.doneWaiting(*iException);
1198  // Finish handling the exception in the task pushed to runQueue_
1199  }
1201 
1202  runQueue_->pushAndPause(
1203  *nextTask.group(),
1204  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1205  CMS_SA_ALLOW try {
1206  if (postRunQueueTask.taskHasFailed()) {
1207  status->resetBeginResources();
1209  return;
1210  }
1211 
1212  status->setResumer(std::move(iResumer));
1213 
1215  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1216  CMS_SA_ALLOW try {
1218 
1219  if (postSourceTask.taskHasFailed()) {
1220  status->resetBeginResources();
1222  status->resumeGlobalRunQueue();
1223  return;
1224  }
1225 
1226  status->setRunPrincipal(readRun());
1227 
1228  RunPrincipal& runPrincipal = *status->runPrincipal();
1229  {
1230  SendSourceTerminationSignalIfException sentry(actReg_.get());
1231  input_->doBeginRun(runPrincipal, &processContext_);
1232  sentry.completedSuccessfully();
1233  }
1234 
1235  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1236  if (looper_ && looperBeginJobRun_ == false) {
1237  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1238 
1239  oneapi::tbb::task_group group;
1240  FinalWaitingTask waitTask{group};
1241  using namespace edm::waiting_task::chain;
1242  chain::first([this, &es](auto nextTask) {
1243  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1244  }) | then([this, &es](auto nextTask) mutable {
1245  looper_->beginOfJob(es);
1246  looperBeginJobRun_ = true;
1247  looper_->doStartingNewLoop();
1248  }) | runLast(WaitingTaskHolder(group, &waitTask));
1249  waitTask.wait();
1250  }
1251 
1252  using namespace edm::waiting_task::chain;
1253  chain::first([this, status](auto nextTask) mutable {
1254  CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1255  status->setStopBeforeProcessingRun(true);
1256  nextTask.doneWaiting(std::current_exception());
1257  }
1258  }) | then([this, status, &es](auto nextTask) {
1259  if (status->stopBeforeProcessingRun()) {
1260  return;
1261  }
1262  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1263  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1264  beginGlobalTransitionAsync<Traits>(
1265  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1266  }) | then([status](auto nextTask) mutable {
1267  if (status->stopBeforeProcessingRun()) {
1268  return;
1269  }
1270  status->globalBeginDidSucceed();
1271  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1272  if (status->stopBeforeProcessingRun()) {
1273  return;
1274  }
1275  looper_->prefetchAsync(
1276  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1277  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1278  if (status->stopBeforeProcessingRun()) {
1279  return;
1280  }
1281  ServiceRegistry::Operate operateLooper(serviceToken_);
1282  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1283  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1284  bool precedingTasksSucceeded = true;
1285  if (iException) {
1286  precedingTasksSucceeded = false;
1287  WaitingTaskHolder copyHolder(holder);
1288  copyHolder.doneWaiting(*iException);
1289  }
1290 
1291  if (status->stopBeforeProcessingRun()) {
1292  // We just quit now if there was a failure when merging runs
1293  status->resetBeginResources();
1295  status->resumeGlobalRunQueue();
1296  return;
1297  }
1298  CMS_SA_ALLOW try {
1299  // Under normal circumstances, this task runs after endRun has completed for all streams
1300  // and global endLumi has completed for all lumis contained in this run
1301  auto globalEndRunTask =
1302  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1303  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1304  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1306  });
1307  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1308  } catch (...) {
1309  status->resetBeginResources();
1311  status->resumeGlobalRunQueue();
1312  holder.doneWaiting(std::current_exception());
1313  return;
1314  }
1315 
1316  // After this point we are committed to end the run via endRunAsync
1317 
1319 
1320  // The only purpose of the pause is to cause stream begin run to execute before
1321  // global begin lumi in the single threaded case (maintains consistency with
1322  // the order that existed before concurrent runs were implemented).
1323  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1324 
1325  CMS_SA_ALLOW try {
1327  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1328  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1329  CMS_SA_ALLOW try {
1330  streamQueues_[i].push(
1331  *holder.group(),
1332  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1334  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1335  });
1336  } catch (...) {
1337  if (status->streamFinishedBeginRun()) {
1338  WaitingTaskHolder copyHolder(holder);
1339  copyHolder.doneWaiting(std::current_exception());
1340  status->resetBeginResources();
1343  }
1344  }
1345  }
1346  });
1347  } catch (...) {
1348  WaitingTaskHolder copyHolder(holder);
1349  copyHolder.doneWaiting(std::current_exception());
1350  status->resetBeginResources();
1353  }
1355  }) | runLast(postSourceTask);
1356  } catch (...) {
1357  status->resetBeginResources();
1359  status->resumeGlobalRunQueue();
1360  postSourceTask.doneWaiting(std::current_exception());
1361  }
1362  }); // task in sourceResourcesAcquirer
1363  } catch (...) {
1364  status->resetBeginResources();
1366  status->resumeGlobalRunQueue();
1367  postRunQueueTask.doneWaiting(std::current_exception());
1368  }
1369  }); // task in runQueue
1370  } catch (...) {
1371  status->resetBeginResources();
1373  nextTask.doneWaiting(std::current_exception());
1374  }
1375  }) | chain::runLast(std::move(iHolder));
1376  }
ProcessContext processContext_
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
std::shared_ptr< RunPrincipal > readRun()
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 281 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

281  {
283  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 284 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 848 of file EventProcessor.cc.

References epSignal, edm::JobReport::reportShutdownSignal(), runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

848  {
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
854  edm::LogSystem("ShutdownSignal") << "an external signal was sent to shutdown the job early.";
856  jr->reportShutdownSignal();
858  }
859  return returnValue;
860  }
Log< level::System, false > LogSystem
volatile std::atomic< bool > shutdown_flag
void reportShutdownSignal()
Definition: JobReport.cc:543

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 842 of file EventProcessor.cc.

References schedule_.

842 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ clearLumiPrincipal()

void edm::EventProcessor::clearLumiPrincipal ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2064 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

2064  {
2065  for (auto& s : subProcesses_) {
2066  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2067  }
2068  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2069  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2070  iStatus.lumiPrincipal()->clearPrincipal();
2071  }
std::vector< SubProcess > subProcesses_

◆ clearRunPrincipal()

void edm::EventProcessor::clearRunPrincipal ( RunProcessingStatus iStatus)

Definition at line 2039 of file EventProcessor.cc.

References edm::RunPrincipal::kUninitialized, edm::RunProcessingStatus::runPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndRunAsync().

2039  {
2040  for (auto& s : subProcesses_) {
2041  s.clearRunPrincipal(*iStatus.runPrincipal());
2042  }
2043  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2044  iStatus.runPrincipal()->clearPrincipal();
2045  }
std::vector< SubProcess > subProcesses_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 975 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

975  {
976  if (fileBlockValid()) {
977  SendSourceTerminationSignalIfException sentry(actReg_.get());
978  input_->closeFile(fb_.get(), cleaningUpAfterException);
979  sentry.completedSuccessfully();
980  }
981  FDEBUG(1) << "\tcloseInputFile\n";
982  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 992 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

992  {
993  schedule_->closeOutputFiles();
994  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
995  processBlockHelper_->clearAfterOutputFilesClose();
996  FDEBUG(1) << "\tcloseOutputFiles\n";
997  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1786 of file EventProcessor.cc.

References dqmdumpme::first, firstItemAfterLumiMerge_, h, handleNextEventForStreamAsync(), input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kProcessing, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeLumi(), edm::waiting_task::chain::runLast(), mps_update::status, streamLumiStatus_, and edm::waiting_task::chain::then().

Referenced by processRuns().

1786  {
1787  chain::first([this](auto nextTask) {
1788  //all streams are sharing the same status at the moment
1789  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1791 
1792  while (lastTransitionType() == InputSource::IsLumi and
1793  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1796  }
1797  firstItemAfterLumiMerge_ = true;
1798  }) | chain::then([this](auto nextTask) mutable {
1799  unsigned int streamIndex = 0;
1800  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1801  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1802  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1803  }
1804  nextTask.group()->run(
1805  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1806  }) | chain::runLast(std::move(iHolder));
1807  }
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1066 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1066  {
1067  FDEBUG(1) << "\tdoErrorStuff\n";
1068  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1069  << "and went to the error state\n"
1070  << "Will attempt to terminate processing normally\n"
1071  << "(IF using the looper the next loop will be attempted)\n"
1072  << "This likely indicates a bug in an input module or corrupted input or both\n";
1073  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 763 of file EventProcessor.cc.

References actReg_, HltBtagPostValidation_cff::c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::first(), watchdog::group, mps_fire::i, input_, MainPageGenerator::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

763  {
764  // Collects exceptions, so we don't throw before all operations are performed.
765  ExceptionCollector c(
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
static std::mutex mutex
Definition: Proxy.cc:8
std::shared_ptr< EDLooperBase const > looper() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1027 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1027  {
1028  if (looper_) {
1029  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToResolverIndices());
1030  looper_->setModuleChanger(&changer);
1031  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1032  looper_->setModuleChanger(nullptr);
1034  return true;
1035  else
1036  return false;
1037  }
1038  FDEBUG(1) << "\tendOfLoop\n";
1039  return true;
1040  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1116 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1116  {
1117  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1118 
1119  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1120  FinalWaitingTask globalWaitTask{taskGroup_};
1121 
1122  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1123  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1124  *schedule_,
1125  transitionInfo,
1126  serviceToken_,
1127  subProcesses_,
1128  cleaningUpAfterException);
1129  globalWaitTask.wait();
1130 
1131  if (beginProcessBlockSucceeded) {
1132  FinalWaitingTask writeWaitTask{taskGroup_};
1134  writeWaitTask.wait();
1135  }
1136 
1137  processBlockPrincipal.clearPrincipal();
1138  for (auto& s : subProcesses_) {
1139  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1140  }
1141  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRunAsync()

void edm::EventProcessor::endRunAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)

Definition at line 1419 of file EventProcessor.cc.

References actReg_, beginRunAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::RunPrincipal::endTime(), espController_, dqmdumpme::first, handleEndRunExceptions(), mps_fire::i, input_, edm::InputSource::IsRun, lastTransitionType(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), serviceToken_, edm::RunPrincipal::setEndTime(), streamEndRunAsync(), streamQueues_, streamQueuesInserter_, and edm::waiting_task::chain::then().

Referenced by beginLumiAsync(), endUnfinishedRun(), handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1419  {
1420  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1421  iRunStatus->setEndTime();
1422  IOVSyncValue ts(
1423  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1424  runPrincipal.endTime());
1425  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1426  WaitingTaskHolder copyHolder(iHolder);
1427  copyHolder.doneWaiting(std::current_exception());
1428  }
1429 
1430  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1431  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1432  nextTask,
1433  iRunStatus->endIOVWaitingTasksEndRun(),
1434  iRunStatus->eventSetupImplsEndRun(),
1436  actReg_.get(),
1437  serviceToken_);
1438  }) | chain::then([this, iRunStatus](std::exception_ptr const* iException, auto nextTask) {
1439  if (iException) {
1440  iRunStatus->setEndingEventSetupSucceeded(false);
1441  handleEndRunExceptions(*iException, nextTask);
1442  }
1444  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1445  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1446  CMS_SA_ALLOW try {
1447  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1448  streamQueues_[i].pause();
1449  streamEndRunAsync(std::move(nextTask), i);
1450  });
1451  } catch (...) {
1452  WaitingTaskHolder copyHolder(nextTask);
1453  copyHolder.doneWaiting(std::current_exception());
1454  }
1455  }
1456  });
1457 
1459  CMS_SA_ALLOW try {
1460  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1461  } catch (...) {
1462  WaitingTaskHolder copyHolder(nextTask);
1463  copyHolder.doneWaiting(std::current_exception());
1464  }
1465  }
1466  }) | chain::runLast(std::move(iHolder));
1467  }
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
InputSource::ItemType lastTransitionType() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< ActivityRegistry > actReg_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( bool  cleaningUpAfterException)

Definition at line 1930 of file EventProcessor.cc.

References cms::cuda::assert(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamRunActive_, and taskGroup_.

1930  {
1931  if (streamRunActive_ == 0) {
1932  assert(streamLumiActive_ == 0);
1933  } else {
1935  if (streamLumiActive_ > 0) {
1936  FinalWaitingTask globalWaitTask{taskGroup_};
1938  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1939  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1940  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1941  }
1942  globalWaitTask.wait();
1943  }
1944  }
1945  }
assert(be >=bs)
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::atomic< unsigned int > streamLumiActive_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( bool  cleaningUpAfterException)

Definition at line 1610 of file EventProcessor.cc.

References endRunAsync(), edm::InputSource::IsStop, lastSourceTransition_, eostools::move(), streamRunActive_, streamRunStatus_, and taskGroup_.

1610  {
1611  if (streamRunActive_ > 0) {
1612  FinalWaitingTask waitTask{taskGroup_};
1613 
1614  auto runStatus = streamRunStatus_[0].get();
1615  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1616  WaitingTaskHolder holder{taskGroup_, &waitTask};
1617  runStatus->setHolderOfTaskInProcessRuns(holder);
1619  endRunAsync(streamRunStatus_[0], std::move(holder));
1620  waitTask.wait();
1621  }
1622  }
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
InputSource::ItemType lastSourceTransition_
oneapi::tbb::task_group taskGroup_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::atomic< unsigned int > streamRunActive_
def move(src, dest)
Definition: eostools.py:511

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 192 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

192 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 832 of file EventProcessor.cc.

References schedule_.

832  {
833  return schedule_->getAllModuleDescriptions();
834  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 830 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

830 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1818 of file EventProcessor.cc.

References clearLumiPrincipal(), CMS_SA_ALLOW, edm::EndLuminosityBlock, esp_, dqmdumpme::first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1819  {
1820  // Get some needed info out of the status object before moving
1821  // it into finalTaskForThisLumi.
1822  auto& lp = *(iLumiStatus->lumiPrincipal());
1823  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1824  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1825  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1826  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1827 
1828  using namespace edm::waiting_task::chain;
1829  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1830  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1831 
1832  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1833  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1834  endGlobalTransitionAsync<Traits>(
1835  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1836  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1837  //Only call writeLumi if beginLumi succeeded
1838  if (didGlobalBeginSucceed) {
1839  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1840  }
1841  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1842  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1843  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1844  //any thrown exception auto propagates to nextTask via the chain
1846  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1847  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1848  if (iException) {
1849  handleEndLumiExceptions(*iException, nextTask);
1850  }
1852 
1853  std::exception_ptr ptr;
1854 
1855  // Try hard to clean up resources so the
1856  // process can terminate in a controlled
1857  // fashion even after exceptions have occurred.
1858  // Caught exception is passed to handleEndLumiExceptions()
1859  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1860  if (not ptr) {
1861  ptr = std::current_exception();
1862  }
1863  }
1864  // Caught exception is passed to handleEndLumiExceptions()
1865  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1866  if (not ptr) {
1867  ptr = std::current_exception();
1868  }
1869  }
1870  // Caught exception is passed to handleEndLumiExceptions()
1871  CMS_SA_ALLOW try {
1872  status->resetResources();
1873  status->globalEndRunHolderDoneWaiting();
1874  status.reset();
1875  } catch (...) {
1876  if (not ptr) {
1877  ptr = std::current_exception();
1878  }
1879  }
1880 
1881  if (ptr && !iException) {
1882  handleEndLumiExceptions(ptr, nextTask);
1883  }
1884  }) | runLast(std::move(iTask));
1885  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ globalEndRunAsync()

void edm::EventProcessor::globalEndRunAsync ( WaitingTaskHolder  iTask,
std::shared_ptr< RunProcessingStatus iRunStatus 
)

Definition at line 1478 of file EventProcessor.cc.

References clearRunPrincipal(), CMS_SA_ALLOW, edm::EndRun, esp_, dqmdumpme::first, handleEndRunExceptions(), edm::waiting_task::chain::ifThen(), looper_, eostools::move(), edm::MergeableRunProductMetadata::preWriteRun(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeRunAsync().

Referenced by beginRunAsync().

1478  {
1479  auto& runPrincipal = *(iRunStatus->runPrincipal());
1480  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1481  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1482  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1483  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1484  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1485 
1486  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1487  using namespace edm::waiting_task::chain;
1488  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1489  auto nextTask) {
1490  if (endingEventSetupSucceeded) {
1491  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1492  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1493  endGlobalTransitionAsync<Traits>(
1494  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1495  }
1496  }) |
1497  ifThen(looper_ && endingEventSetupSucceeded,
1498  [this, &runPrincipal, &es](auto nextTask) {
1499  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1500  }) |
1501  ifThen(looper_ && endingEventSetupSucceeded,
1502  [this, &runPrincipal, &es](auto nextTask) {
1504  looper_->doEndRun(runPrincipal, es, &processContext_);
1505  }) |
1506  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1507  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1508  mergeableRunProductMetadata->preWriteRun();
1509  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1510  }) |
1511  then([status = std::move(iRunStatus),
1512  this,
1513  didGlobalBeginSucceed,
1514  mergeableRunProductMetadata,
1515  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1516  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1517  mergeableRunProductMetadata->postWriteRun();
1518  }
1519  if (iException) {
1520  handleEndRunExceptions(*iException, nextTask);
1521  }
1523 
1524  std::exception_ptr ptr;
1525 
1526  // Try hard to clean up resources so the
1527  // process can terminate in a controlled
1528  // fashion even after exceptions have occurred.
1529  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1530  if (not ptr) {
1531  ptr = std::current_exception();
1532  }
1533  }
1534  CMS_SA_ALLOW try {
1535  status->resumeGlobalRunQueue();
1537  } catch (...) {
1538  if (not ptr) {
1539  ptr = std::current_exception();
1540  }
1541  }
1542  CMS_SA_ALLOW try {
1543  status->resetEndResources();
1544  status.reset();
1545  } catch (...) {
1546  if (not ptr) {
1547  ptr = std::current_exception();
1548  }
1549  }
1550 
1551  if (ptr && !iException) {
1552  handleEndRunExceptions(ptr, nextTask);
1553  }
1554  }) |
1555  runLast(std::move(iTask));
1556  }
ProcessContext processContext_
void clearRunPrincipal(RunProcessingStatus &)
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1809 of file EventProcessor.cc.

References setExceptionMessageLumis(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1809  {
1810  if (holder.taskHasFailed()) {
1812  } else {
1813  WaitingTaskHolder tmp(holder);
1814  tmp.doneWaiting(iException);
1815  }
1816  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleEndRunExceptions()

void edm::EventProcessor::handleEndRunExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1469 of file EventProcessor.cc.

References setExceptionMessageRuns(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by endRunAsync(), globalEndRunAsync(), and streamEndRunAsync().

1469  {
1470  if (holder.taskHasFailed()) {
1472  } else {
1473  WaitingTaskHolder tmp(holder);
1474  tmp.doneWaiting(iException);
1475  }
1476  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 2204 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kStopLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiStatus_, streamRunStatus_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginLumiAsync(), and continueLumiAsync().

2204  {
2205  auto group = iTask.group();
2206  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2207  CMS_SA_ALLOW try {
2208  auto status = streamLumiStatus_[iStreamIndex].get();
2210 
2211  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2212  auto recursionTask =
2213  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2214  if (iEventException) {
2215  WaitingTaskHolder copyHolder(iTask);
2216  copyHolder.doneWaiting(*iEventException);
2217  // Intentionally, we don't return here. The recursive call to
2218  // handleNextEvent takes care of immediately ending the run properly
2219  // using the same code it uses to end the run in other situations.
2220  }
2221  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2222  });
2223 
2224  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2225  } else {
2226  // the stream will stop processing this lumi now
2228  if (not status->haveStartedNextLumiOrEndedRun()) {
2229  status->startNextLumiOrEndRun();
2230  if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2231  CMS_SA_ALLOW try {
2232  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2233  input_->luminosityBlockAuxiliary()->beginTime()),
2234  streamRunStatus_[iStreamIndex],
2235  iTask);
2236  } catch (...) {
2237  WaitingTaskHolder copyHolder(iTask);
2238  copyHolder.doneWaiting(std::current_exception());
2239  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2240  }
2241  } else {
2242  // If appropriate, this will also start the next run.
2243  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2244  }
2245  }
2246  streamEndLumiAsync(iTask, iStreamIndex);
2247  } else {
2248  assert(status->eventProcessingState() ==
2250  auto runStatus = streamRunStatus_[iStreamIndex].get();
2251 
2252  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2253  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2254  }
2255  }
2256  }
2257  } catch (...) {
2258  WaitingTaskHolder copyHolder(iTask);
2259  copyHolder.doneWaiting(std::current_exception());
2260  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2261  }
2262  });
2263  }
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ handleNextItemAfterMergingRunEntries()

void edm::EventProcessor::handleNextItemAfterMergingRunEntries ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2122 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), input_, edm::InputSource::IsFile, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginRunAsync(), and processRuns().

2123  {
2125  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2126  iHolder.doneWaiting(std::exception_ptr{});
2127  } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2128  CMS_SA_ALLOW try {
2129  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2130  input_->luminosityBlockAuxiliary()->beginTime()),
2131  iRunStatus,
2132  iHolder);
2133  } catch (...) {
2134  WaitingTaskHolder copyHolder(iHolder);
2135  iHolder.doneWaiting(std::current_exception());
2136  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2137  }
2138  } else {
2139  // Note that endRunAsync will call beginRunAsync for the following run
2140  // if appropriate.
2141  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2142  }
2143  }
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 325 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, testHGCalDigi_cfg::dumpOptions, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, DMR_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ServiceRegistry::get(), edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ModuleDescription::getUniqueID(), edm::ParameterSet::getUntrackedParameterSet(), watchdog::group, historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::ServiceRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, modulesToIgnoreForDeleteEarly_, moduleTypeResolverMaker_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, muonDTDigis_cfi::pset, referencesToBranches_, edm::ParameterSet::registerIt(), runQueue_, schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, streamRunStatus_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

327  {
328  //std::cerr << processDesc->dump() << std::endl;
329 
330  // register the empty parentage vector , once and for all
332 
333  // register the empty parameter set, once and for all.
334  ParameterSet().registerIt();
335 
336  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
337 
338  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
339  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
340  bool const hasSubProcesses = !subProcessVParameterSet.empty();
341 
342  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
343  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
344  // set in here if the parameters were not explicitly set.
346 
347  // Now set some parameters specific to the main process.
348  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
349  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
350  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
351  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
352  << fileMode << ".\n"
353  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
354  } else {
355  fileModeNoMerge_ = (fileMode == "NOMERGE");
356  }
357  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
359 
360  //threading
361  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
362 
363  // Even if numberOfThreads was set to zero in the Python configuration, the code
364  // in cmsRun.cpp should have reset it to something else.
365  assert(nThreads != 0);
366 
367  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
368  if (nStreams == 0) {
369  nStreams = nThreads;
370  }
371  unsigned int nConcurrentLumis =
372  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
373  if (nConcurrentLumis == 0) {
374  nConcurrentLumis = 2;
375  }
376  if (nConcurrentLumis > nStreams) {
377  nConcurrentLumis = nStreams;
378  }
379  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
380  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
381  nConcurrentRuns = nConcurrentLumis;
382  }
383  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
384  if (!loopers.empty()) {
385  //For now loopers make us run only 1 transition at a time
386  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
387  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
388  "of concurrent runs, and the number of concurrent lumis "
389  "are all being reset to 1. Loopers cannot currently support "
390  "values greater than 1.";
391  nStreams = 1;
392  nConcurrentLumis = 1;
393  nConcurrentRuns = 1;
394  }
395  }
396  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
397  if (dumpOptions) {
398  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
399  } else {
400  if (nThreads > 1 or nStreams > 1) {
401  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
402  }
403  }
404 
405  // The number of concurrent IOVs is configured individually for each record in
406  // the class NumberOfConcurrentIOVs to values less than or equal to this.
407  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
408  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
409  // concurrent run past the first in use cases where IOVs change within a run.
410  unsigned int maxConcurrentIOVs =
411  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
412 
413  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
414 
415  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
417  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
418  //for now, if have a subProcess, don't allow early delete
419  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
420  if (not hasSubProcesses) {
421  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
422  }
423  if (not branchesToDeleteEarly_.empty()) {
424  auto referencePSets =
425  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
426  for (auto const& pset : referencePSets) {
427  auto product = pset.getParameter<std::string>("product");
428  auto references = pset.getParameter<std::vector<std::string>>("references");
429  for (auto const& ref : references) {
430  referencesToBranches_.emplace(product, ref);
431  }
432  }
434  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
435  }
436 
437  // Now do general initialization
438  ScheduleItems items;
439 
440  //initialize the services
441  auto& serviceSets = processDesc->getServicesPSets();
442  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
443  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
444 
445  //make the services available
447 
448  CMS_SA_ALLOW try {
449  if (nThreads > 1) {
451  handler->willBeUsingThreads();
452  }
453 
454  // intialize miscellaneous items
455  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
456 
457  // intialize the event setup provider
458  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
459  esp_ = espController_->makeProvider(
460  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
461 
462  // initialize the looper, if any
463  if (!loopers.empty()) {
465  looper_->setActionTable(items.act_table_.get());
466  looper_->attachTo(*items.actReg_);
467 
468  // in presence of looper do not delete modules
470  }
471 
472  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
473 
474  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
475  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
476  streamQueues_.resize(nStreams);
477  streamRunStatus_.resize(nStreams);
478  streamLumiStatus_.resize(nStreams);
479 
480  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
481 
482  {
483  std::optional<ScheduleItems::MadeModules> madeModules;
484 
485  //setup input and modules concurrently
486  tbb::task_group group;
487 
488  // initialize the input source
489  auto tempReg = std::make_shared<ProductRegistry>();
490  auto sourceID = ModuleDescription::getUniqueID();
491 
492  group.run([&, this]() {
493  // initialize the Schedule
495  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
496  madeModules =
498  });
499 
500  group.run([&, this, tempReg]() {
502  input_ = makeInput(sourceID,
503  *parameterSet,
504  *common,
505  /*items.preg(),*/ tempReg,
506  items.branchIDListHelper(),
508  items.thinnedAssociationsHelper(),
509  items.actReg_,
510  items.processConfiguration(),
512  });
513 
514  group.wait();
515  items.preg()->addFromInput(*tempReg);
516  input_->switchTo(items.preg());
517 
518  {
519  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
520  schedule_ = items.finishSchedule(std::move(*madeModules),
521  *parameterSet,
522  tns,
523  hasSubProcesses,
527  }
528  }
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
535  branchIDListHelper_ = items.branchIDListHelper();
536  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
537  processConfiguration_ = items.processConfiguration();
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(),
549  historyAppender_.get(),
550  index,
551  true /*primary process*/,
554  }
555 
556  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
557  auto rp = std::make_unique<RunPrincipal>(
560  }
561 
562  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
563  auto lp =
564  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
566  }
567 
568  {
569  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
571 
572  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
574  }
575 
576  // fill the subprocesses, if there are any
577  subProcesses_.reserve(subProcessVParameterSet.size());
578  for (auto& subProcessPSet : subProcessVParameterSet) {
579  subProcesses_.emplace_back(subProcessPSet,
580  *parameterSet,
581  preg(),
585  SubProcessParentageHelper(),
587  *actReg_,
588  token,
593  }
594  } catch (...) {
595  //in case of an exception, make sure Services are available
596  // during the following destructors
597  espController_ = nullptr;
598  esp_ = nullptr;
599  schedule_ = nullptr;
600  input_ = nullptr;
601  looper_ = nullptr;
602  actReg_ = nullptr;
603  throw;
604  }
605  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
std::multimap< std::string, std::string > referencesToBranches_
fileMode
Definition: DMR_cfg.py:72
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::string > modulesToIgnoreForDeleteEarly_
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
static ServiceRegistry & instance()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1090 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1090  {
1091  input_->fillProcessBlockHelper();
1092  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1093  while (input_->nextProcessBlock(processBlockPrincipal)) {
1094  readProcessBlock(processBlockPrincipal);
1095 
1096  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1097  FinalWaitingTask globalWaitTask{taskGroup_};
1098 
1099  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1100  beginGlobalTransitionAsync<Traits>(
1101  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1102 
1103  globalWaitTask.wait();
1104 
1105  FinalWaitingTask writeWaitTask{taskGroup_};
1107  writeWaitTask.wait();
1108 
1109  processBlockPrincipal.clearPrincipal();
1110  for (auto& s : subProcesses_) {
1111  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1112  }
1113  }
1114  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

291 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 292 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

292 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 862 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by continueLumiAsync(), processRuns(), readAndMergeLumiEntriesAsync(), readAndMergeRunEntriesAsync(), and readNextEventForStream().

862  {
863  SendSourceTerminationSignalIfException sentry(actReg_.get());
864  InputSource::ItemType itemType;
865  //For now, do nothing with InputSource::IsSynchronize
866  do {
867  itemType = input_->nextItemType();
868  } while (itemType == InputSource::IsSynchronize);
869 
870  lastSourceTransition_ = itemType;
871  sentry.completedSuccessfully();
872 
874 
876  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
878  }
879 
880  return lastSourceTransition_;
881  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 984 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

984  {
985  if (fileBlockValid()) {
986  schedule_->openOutputFiles(*fb_);
987  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
988  }
989  FDEBUG(1) << "\topenOutputFiles\n";
990  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 279 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), and readFile().

279 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 280 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

280 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1048 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1048  {
1049  looper_->prepareForNextLoop(esp_.get());
1050  FDEBUG(1) << "\tprepareForNextLoop\n";
1051  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 143 of file EventProcessor.h.

References processConfiguration_.

143 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2280 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

2280  {
2281  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2282  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2284 of file EventProcessor.cc.

References esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, dqmdumpme::first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

2284  {
2285  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2286 
2288  Service<RandomNumberGenerator> rng;
2289  if (rng.isAvailable()) {
2290  Event ev(*pep, ModuleDescription(), nullptr);
2291  rng->postEventRead(ev);
2292  }
2293 
2294  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2295  using namespace edm::waiting_task::chain;
2296  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2297  EventTransitionInfo info(*pep, es);
2298  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2299  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2300  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2301  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2302  }
2303  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2304  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2305  ServiceRegistry::Operate operateLooper(serviceToken_);
2306  processEventWithLooper(*pep, iStreamIndex);
2307  }) | then([pep](auto nextTask) {
2308  FDEBUG(1) << "\tprocessEvent\n";
2309  pep->clearEventPrincipal();
2310  }) | runLast(iHolder);
2311  }
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 2313 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

2313  {
2314  bool randomAccess = input_->randomAccess();
2315  ProcessingController::ForwardState forwardState = input_->forwardState();
2316  ProcessingController::ReverseState reverseState = input_->reverseState();
2317  ProcessingController pc(forwardState, reverseState, randomAccess);
2318 
2320  do {
2321  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2322  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2323  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2324 
2325  bool succeeded = true;
2326  if (randomAccess) {
2327  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2328  input_->skipEvents(-2);
2329  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2330  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2331  }
2332  }
2333  pc.setLastOperationSucceeded(succeeded);
2334  } while (!pc.lastOperationSucceeded());
2336  shouldWeStop_ = true;
2337  }
2338  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processRuns()

InputSource::ItemType edm::EventProcessor::processRuns ( )

Definition at line 1143 of file EventProcessor.cc.

References cms::cuda::assert(), beginRunAsync(), continueLumiAsync(), handleNextItemAfterMergingRunEntries(), input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeRun(), streamLumiActive_, streamRunActive_, streamRunStatus_, and taskGroup_.

1143  {
1144  FinalWaitingTask waitTask{taskGroup_};
1146  if (streamRunActive_ == 0) {
1147  assert(streamLumiActive_ == 0);
1148 
1149  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1150  WaitingTaskHolder{taskGroup_, &waitTask});
1151  } else {
1153 
1154  auto runStatus = streamRunStatus_[0];
1155 
1156  while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1157  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1158  readAndMergeRun(*runStatus);
1160  }
1161 
1162  WaitingTaskHolder holder{taskGroup_, &waitTask};
1163  runStatus->setHolderOfTaskInProcessRuns(holder);
1164  if (streamLumiActive_ > 0) {
1166  continueLumiAsync(std::move(holder));
1167  } else {
1169  }
1170  }
1171  waitTask.wait();
1172  return lastTransitionType();
1173  }
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
PreallocationConfiguration preallocations_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void readAndMergeRun(RunProcessingStatus &)
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
oneapi::tbb::task_group taskGroup_
std::atomic< unsigned int > streamRunActive_
void continueLumiAsync(WaitingTaskHolder)
std::atomic< unsigned int > streamLumiActive_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ readAndMergeLumi()

void edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1992 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by continueLumiAsync(), and readAndMergeLumiEntriesAsync().

1992  {
1993  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1994  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1995  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1996  input_->processHistoryRegistry().reducedProcessHistoryID(
1997  input_->luminosityBlockAuxiliary()->processHistoryID()));
1998  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1999  assert(lumiOK);
2000  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
2001  {
2002  SendSourceTerminationSignalIfException sentry(actReg_.get());
2003  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2004  sentry.completedSuccessfully();
2005  }
2006  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeLumiEntriesAsync()

void edm::EventProcessor::readAndMergeLumiEntriesAsync ( std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2100 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeLumi(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, and sourceResourcesAcquirer_.

Referenced by beginLumiAsync().

2101  {
2102  auto group = iHolder.group();
2104  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2105  CMS_SA_ALLOW try {
2107 
2108  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2109 
2111  while (lastTransitionType() == InputSource::IsLumi and
2112  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2113  readAndMergeLumi(*iLumiStatus);
2115  }
2116  } catch (...) {
2117  holder.doneWaiting(std::current_exception());
2118  }
2119  });
2120  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readAndMergeRun()

void edm::EventProcessor::readAndMergeRun ( RunProcessingStatus iStatus)

Definition at line 1966 of file EventProcessor.cc.

References actReg_, edm::Principal::adjustToNewProductRegistry(), cms::cuda::assert(), input_, edm::RunPrincipal::mergeAuxiliary(), preg_, and edm::RunProcessingStatus::runPrincipal().

Referenced by processRuns(), and readAndMergeRunEntriesAsync().

1966  {
1967  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1968 
1969  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1970  assert(runOK);
1971  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1972  {
1973  SendSourceTerminationSignalIfException sentry(actReg_.get());
1974  input_->readAndMergeRun(runPrincipal);
1975  sentry.completedSuccessfully();
1976  }
1977  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRunEntriesAsync()

void edm::EventProcessor::readAndMergeRunEntriesAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2073 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeRun(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, sourceResourcesAcquirer_, and mps_update::status.

Referenced by beginRunAsync().

2074  {
2075  auto group = iHolder.group();
2077  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2078  CMS_SA_ALLOW try {
2080 
2081  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2082 
2084  while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2085  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2086  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2087  status->setStopBeforeProcessingRun(true);
2088  return;
2089  }
2092  }
2093  } catch (...) {
2094  status->setStopBeforeProcessingRun(true);
2095  holder.doneWaiting(std::current_exception());
2096  }
2097  });
2098  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeRun(RunProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2265 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, streamLumiStatus_, and streamRunStatus_.

Referenced by readNextEventForStream().

2265  {
2266  //TODO this will have to become per stream
2267  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2268  StreamContext streamContext(event.streamID(), &processContext_);
2269 
2270  SendSourceTerminationSignalIfException sentry(actReg_.get());
2271  input_->readEvent(event, streamContext);
2272 
2273  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2274  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2275  sentry.completedSuccessfully();
2276 
2277  FDEBUG(1) << "\treadEvent\n";
2278  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 950 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, findQualityFiles::size, streamLumiActive_, streamLumiStatus_, streamRunActive_, and streamRunStatus_.

950  {
951  FDEBUG(1) << " \treadFile\n";
952  size_t size = preg_->size();
953  SendSourceTerminationSignalIfException sentry(actReg_.get());
954 
955  if (streamRunActive_ > 0) {
956  streamRunStatus_[0]->runPrincipal()->preReadFile();
957  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
958  }
959 
960  if (streamLumiActive_ > 0) {
961  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
962  }
963 
964  fb_ = input_->readFile();
965  if (size < preg_->size()) {
967  }
970  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
971  }
972  sentry.completedSuccessfully();
973  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
PrincipalCache principalCache_

◆ readLuminosityBlock()

std::shared_ptr< LuminosityBlockPrincipal > edm::EventProcessor::readLuminosityBlock ( std::shared_ptr< RunPrincipal rp)

Definition at line 1979 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableLumiPrincipalPtr(), historyAppender_, input_, eostools::move(), and principalCache_.

Referenced by beginLumiAsync().

1979  {
1981  assert(lbp);
1982  lbp->setAux(*input_->luminosityBlockAuxiliary());
1983  {
1984  SendSourceTerminationSignalIfException sentry(actReg_.get());
1985  input_->readLuminosityBlock(*lbp, *historyAppender_);
1986  sentry.completedSuccessfully();
1987  }
1988  lbp->setRunPrincipal(std::move(rp));
1989  return lbp;
1990  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(be >=bs)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( WaitingTaskHolder const &  iTask,
unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iStatus 
)
private

Definition at line 2145 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::eventProcessingState(), firstItemAfterLumiMerge_, input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kProcessing, edm::LuminosityBlockProcessingStatus::kStopLumi, lastSourceTransition_, lastTransitionType(), edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextTransitionType(), or, readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setEventProcessingState(), shouldWeStop(), sourceMutex_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync().

2147  {
2148  // This function returns true if it successfully reads an event for the stream and that
2149  // requires both that an event is next and there are no problems or requests to stop.
2150 
2151  if (iTask.taskHasFailed()) {
2152  // We want all streams to stop or all streams to pause. If we are already in the
2153  // middle of pausing streams, then finish pausing all of them and the lumi will be
2154  // ended later. Otherwise, just end it now.
2155  if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2157  }
2158  return false;
2159  }
2160 
2161  // Did another stream already stop or pause this lumi?
2162  if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2163  return false;
2164  }
2165 
2166  // Are output modules or the looper requesting we stop?
2167  if (shouldWeStop()) {
2170  return false;
2171  }
2172 
2174 
2175  // need to use lock in addition to the serial task queue because
2176  // of delayed provenance reading and reading data in response to
2177  // edm::Refs etc
2178  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2179 
2180  // If we didn't already call nextTransitionType while merging lumis, call it here.
2181  // This asks the input source what is next and also checks for signals.
2182 
2184  firstItemAfterLumiMerge_ = false;
2185 
2186  if (InputSource::IsEvent != itemType) {
2187  // IsFile may continue processing the lumi and
2188  // looper_ can cause the input source to declare a new IsRun which is actually
2189  // just a continuation of the previous run
2190  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2191  (InputSource::IsRun == itemType and
2192  (iStatus.lumiPrincipal()->run() != input_->run() or
2193  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2195  } else {
2197  }
2198  return false;
2199  }
2200  readEvent(iStreamIndex);
2201  return true;
2202  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
bool shouldWeStop() const

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1947 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1947  {
1948  SendSourceTerminationSignalIfException sentry(actReg_.get());
1949  input_->readProcessBlock(processBlockPrincipal);
1950  sentry.completedSuccessfully();
1951  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::shared_ptr< RunPrincipal > edm::EventProcessor::readRun ( )

Definition at line 1953 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableRunPrincipalPtr(), historyAppender_, input_, and principalCache_.

Referenced by beginRunAsync().

1953  {
1955  assert(rp);
1956  rp->setAux(*input_->runAuxiliary());
1957  {
1958  SendSourceTerminationSignalIfException sentry(actReg_.get());
1959  input_->readRun(*rp, *historyAppender_);
1960  sentry.completedSuccessfully();
1961  }
1962  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1963  return rp;
1964  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ releaseBeginRunResources()

void edm::EventProcessor::releaseBeginRunResources ( unsigned int  iStream)

Definition at line 1410 of file EventProcessor.cc.

References queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), mps_update::status, streamQueues_, and streamRunStatus_.

Referenced by streamBeginRunAsync().

1410  {
1411  auto& status = streamRunStatus_[iStream];
1412  if (status->streamFinishedBeginRun()) {
1413  status->resetBeginResources();
1415  }
1416  streamQueues_[iStream].resume();
1417  }
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 1009 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1009  {
1010  if (fileBlockValid()) {
1011  schedule_->respondToCloseInputFile(*fb_);
1012  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1013  }
1014  FDEBUG(1) << "\trespondToCloseInputFile\n";
1015  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 999 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

999  {
1000  if (fileBlockValid()) {
1002  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1003  schedule_->respondToOpenInputFile(*fb_);
1004  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1005  }
1006  FDEBUG(1) << "\trespondToOpenInputFile\n";
1007  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1042 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1042  {
1043  input_->repeat();
1044  input_->rewind();
1045  FDEBUG(1) << "\trewind\n";
1046  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 377 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

377 { return runToCompletion(); }
StatusCode runToCompletion()

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 883 of file EventProcessor.cc.

References actReg_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

883  {
884  beginJob(); //make sure this was called
885 
886  // make the services available
888  actReg_->beginProcessingSignal_();
889  auto endSignal = [](ActivityRegistry* iReg) { iReg->endProcessingSignal_(); };
890  std::unique_ptr<ActivityRegistry, decltype(endSignal)> guard(actReg_.get(), endSignal);
891  try {
892  FilesProcessor fp(fileModeNoMerge_);
893 
894  convertException::wrap([&]() {
895  bool firstTime = true;
896  do {
897  if (not firstTime) {
899  rewindInput();
900  } else {
901  firstTime = false;
902  }
903  startingNewLoop();
904 
905  auto trans = fp.processFiles(*this);
906 
907  fp.normalEnd();
908 
909  if (deferredExceptionPtrIsSet_.load()) {
910  std::rethrow_exception(deferredExceptionPtr_);
911  }
912  if (trans != InputSource::IsStop) {
913  //problem with the source
914  doErrorStuff();
915 
916  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
917  }
918  } while (not endOfLoop());
919  }); // convertException::wrap
920 
921  } // Try block
922  catch (cms::Exception& e) {
924  std::string message(
925  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
926  e.addAdditionalInfo(message);
927  if (e.alreadyPrinted()) {
928  LogAbsolute("Additional Exceptions") << message;
929  }
930  }
931  if (exceptionMessageRuns_) {
932  std::string message(
933  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
934  e.addAdditionalInfo(message);
935  if (e.alreadyPrinted()) {
936  LogAbsolute("Additional Exceptions") << message;
937  }
938  }
939  if (!exceptionMessageFiles_.empty()) {
940  e.addAdditionalInfo(exceptionMessageFiles_);
941  if (e.alreadyPrinted()) {
942  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
943  }
944  }
945  throw;
946  }
947  return epSuccess;
948  }
std::atomic< bool > exceptionMessageLumis_
std::atomic< bool > exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2361 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

2361  {
2362  bool expected = false;
2363  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2364  deferredExceptionPtr_ = iException;
2365  return true;
2366  }
2367  return false;
2368  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2355 of file EventProcessor.cc.

References exceptionMessageFiles_.

2355 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2359 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2359 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( )

Definition at line 2357 of file EventProcessor.cc.

References exceptionMessageRuns_.

Referenced by handleEndRunExceptions().

2357 { exceptionMessageRuns_ = true; }
std::atomic< bool > exceptionMessageRuns_

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1053 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1053  {
1054  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1055  if (!subProcesses_.empty()) {
1056  for (auto const& subProcess : subProcesses_) {
1057  if (subProcess.shouldWeCloseOutput()) {
1058  return true;
1059  }
1060  }
1061  return false;
1062  }
1063  return schedule_->shouldWeCloseOutput();
1064  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2340 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2340  {
2341  FDEBUG(1) << "\tshouldWeStop\n";
2342  if (shouldWeStop_)
2343  return true;
2344  if (!subProcesses_.empty()) {
2345  for (auto const& subProcess : subProcesses_) {
2346  if (subProcess.terminate()) {
2347  return true;
2348  }
2349  }
2350  return false;
2351  }
2352  return schedule_->terminate();
2353  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 1017 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

1017  {
1018  shouldWeStop_ = false;
1019  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1020  // until after we've called beginOfJob
1021  if (looper_ && looperBeginJobRun_) {
1022  looper_->doStartingNewLoop();
1023  }
1024  FDEBUG(1) << "\tstartingNewLoop\n";
1025  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamBeginRunAsync()

void edm::EventProcessor::streamBeginRunAsync ( unsigned int  iStream,
std::shared_ptr< RunProcessingStatus status,
bool  precedingTasksSucceeded,
WaitingTaskHolder  iHolder 
)

Definition at line 1378 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, edm::RunProcessingStatus::eventSetupImpl(), edm::RunProcessingStatus::eventSetupImpls(), dqmdumpme::first, eostools::move(), releaseBeginRunResources(), edm::waiting_task::chain::runLast(), edm::RunProcessingStatus::runPrincipal(), schedule_, serviceToken_, mps_update::status, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync().

1381  {
1382  // These shouldn't throw
1383  streamQueues_[iStream].pause();
1384  ++streamRunActive_;
1385  streamRunStatus_[iStream] = std::move(status);
1386 
1387  CMS_SA_ALLOW try {
1388  using namespace edm::waiting_task::chain;
1389  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1390  if (precedingTasksSucceeded) {
1391  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1392  RunTransitionInfo transitionInfo(
1393  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1394  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1395  beginStreamTransitionAsync<Traits>(
1396  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1397  }
1398  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1399  if (exceptionFromBeginStreamRun) {
1400  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1401  }
1402  releaseBeginRunResources(iStream);
1403  }) | runLast(iHolder);
1404  } catch (...) {
1405  releaseBeginRunResources(iStream);
1406  iHolder.doneWaiting(std::current_exception());
1407  }
1408  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void releaseBeginRunResources(unsigned int iStream)
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1887 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, submitPVValidationJobs::t, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1887  {
1888  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1889  auto status = streamLumiStatus_[iStreamIndex];
1890  if (iException) {
1891  handleEndLumiExceptions(*iException, iTask);
1892  }
1893 
1894  // reset status before releasing queue else get race condition
1895  streamLumiStatus_[iStreamIndex].reset();
1897  streamQueues_[iStreamIndex].resume();
1898 
1899  //are we the last one?
1900  if (status->streamFinishedLumi()) {
1902  }
1903  });
1904 
1905  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1906 
1907  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1908  // therefore we do not want to hold the shared_ptr
1909  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1910  lumiStatus->setEndTime();
1911 
1912  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1913  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1914  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1915 
1916  if (lumiStatus->didGlobalBeginSucceed()) {
1917  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1918  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1919  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1920  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1921  *schedule_,
1922  iStreamIndex,
1923  transitionInfo,
1924  serviceToken_,
1925  subProcesses_,
1926  cleaningUpAfterException);
1927  }
1928  }
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
bool taskHasFailed() const noexcept
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndRunAsync()

void edm::EventProcessor::streamEndRunAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1558 of file EventProcessor.cc.

References CMS_SA_ALLOW, esp_, exceptionRunStatus_, edm::WaitingTaskHolder::group(), handleEndRunExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endRunAsync().

1558  {
1559  CMS_SA_ALLOW try {
1560  if (!streamRunStatus_[iStreamIndex]) {
1561  if (exceptionRunStatus_->streamFinishedRun()) {
1562  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1563  exceptionRunStatus_.reset();
1564  }
1565  return;
1566  }
1567 
1568  auto runDoneTask =
1569  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1570  if (iException) {
1571  handleEndRunExceptions(*iException, iTask);
1572  }
1573 
1574  auto runStatus = streamRunStatus_[iStreamIndex];
1575 
1576  //reset status before releasing queue else get race condition
1577  if (runStatus->streamFinishedRun()) {
1578  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1579  }
1580  streamRunStatus_[iStreamIndex].reset();
1581  --streamRunActive_;
1582  streamQueues_[iStreamIndex].resume();
1583  });
1584 
1585  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1586 
1587  auto runStatus = streamRunStatus_[iStreamIndex].get();
1588 
1589  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1590  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1591  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1592  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1593 
1594  auto& runPrincipal = *runStatus->runPrincipal();
1595  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1596  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1597  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1598  *schedule_,
1599  iStreamIndex,
1600  transitionInfo,
1601  serviceToken_,
1602  subProcesses_,
1603  cleaningUpAfterException);
1604  }
1605  } catch (...) {
1606  handleEndRunExceptions(std::current_exception(), iTask);
1607  }
1608  }
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 625 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

625  {
628  task.waitNoThrow();
629  assert(task.done());
630  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

285  {
287  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 288 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

288  {
290  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2370 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2370  {
2371  cms::Exception ex("ModulesSynchingOnLumis");
2372  ex << "The framework is configured to use at least two streams, but the following modules\n"
2373  << "require synchronizing on LuminosityBlock boundaries:";
2374  bool found = false;
2375  for (auto worker : schedule_->allWorkers()) {
2376  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2377  found = true;
2378  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2379  }
2380  }
2381  if (found) {
2382  ex << "\n\nThe situation can be fixed by either\n"
2383  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2384  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2385  throw ex;
2386  }
2387  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 836 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

836 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 840 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

840 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 838 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

838 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutLegacyModules()

void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2402 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2402  {
2403  std::unique_ptr<LogSystem> s;
2404  for (auto worker : schedule_->allWorkers()) {
2405  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2406  if (not s) {
2407  s = std::make_unique<LogSystem>("LegacyModules");
2408  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2409  "is going to end soon. These modules need to be converted to have type\n"
2410  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2411  }
2412  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2413  }
2414  }
2415  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutModulesRequiringRunSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringRunSynchronization ( ) const
private

Definition at line 2389 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

2389  {
2390  std::unique_ptr<LogSystem> s;
2391  for (auto worker : schedule_->allWorkers()) {
2392  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2393  if (not s) {
2394  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2395  (*s) << "The following modules require synchronizing on Run boundaries:";
2396  }
2397  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2398  }
2399  }
2400  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 2047 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), findAndChange::op, processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

2047  {
2048  using namespace edm::waiting_task;
2049  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2050  chain::first([&](auto nextTask) {
2052 
2053  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2054  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2055  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2057  for (auto& s : subProcesses_) {
2058  s.writeLumiAsync(nextTask, lumiPrincipal);
2059  }
2061  }
2062  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 2008 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), eostools::move(), findAndChange::op, principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

2008  {
2009  using namespace edm::waiting_task;
2010  chain::first([&](auto nextTask) {
2012  schedule_->writeProcessBlockAsync(
2013  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2014  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2016  for (auto& s : subProcesses_) {
2017  s.writeProcessBlockAsync(nextTask, processBlockType);
2018  }
2019  }) | chain::runLast(std::move(task));
2020  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
RunPrincipal const &  runPrincipal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 2022 of file EventProcessor.cc.

References actReg_, dqmdumpme::first, edm::waiting_task::chain::ifThen(), edm::RunPrincipal::kNo, eostools::move(), findAndChange::op, processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, edm::RunPrincipal::shouldWriteRun(), subProcesses_, and TrackValidation_cff::task.

Referenced by globalEndRunAsync().

2024  {
2025  using namespace edm::waiting_task;
2026  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2027  chain::first([&](auto nextTask) {
2029  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2030  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2032  for (auto& s : subProcesses_) {
2033  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2034  }
2035  }) | chain::runLast(std::move(task));
2036  }
2037  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 319 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 352 of file EventProcessor.h.

Referenced by beginJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 335 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 309 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 347 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 346 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 371 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 368 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 355 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 357 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::atomic<bool> edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 356 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ exceptionRunStatus_

std::shared_ptr<RunProcessingStatus> edm::EventProcessor::exceptionRunStatus_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginRunAsync(), and streamEndRunAsync().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 354 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 364 of file EventProcessor.h.

◆ firstItemAfterLumiMerge_

bool edm::EventProcessor::firstItemAfterLumiMerge_ = true
private

Definition at line 372 of file EventProcessor.h.

Referenced by beginLumiAsync(), continueLumiAsync(), and readNextEventForStream().

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 360 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 358 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 340 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_ = InputSource::IsInvalid
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 359 of file EventProcessor.h.

Referenced by beginRunAsync(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 323 of file EventProcessor.h.

Referenced by init().

◆ modulesToIgnoreForDeleteEarly_

std::vector<std::string> edm::EventProcessor::modulesToIgnoreForDeleteEarly_
private

Definition at line 337 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ moduleTypeResolverMaker_

edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const> > edm::EventProcessor::moduleTypeResolverMaker_
private

Definition at line 315 of file EventProcessor.h.

Referenced by init().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 308 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), readAndMergeRun(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 370 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 310 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 320 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), and processConfiguration().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

◆ referencesToBranches_

std::multimap<std::string, std::string> edm::EventProcessor::referencesToBranches_
private

Definition at line 336 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ runQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::runQueue_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 353 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

◆ streamQueuesInserter_

SerialTaskQueue edm::EventProcessor::streamQueuesInserter_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRunAsync(), and endRunAsync().

◆ streamRunActive_

std::atomic<unsigned int> edm::EventProcessor::streamRunActive_ {0}
private

◆ streamRunStatus_

std::vector<std::shared_ptr<RunProcessingStatus> > edm::EventProcessor::streamRunStatus_
private

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 311 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().