CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRunAsync (IOVSyncValue const &, WaitingTaskHolder)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void clearLumiPrincipal (LuminosityBlockProcessingStatus &)
 
void clearRunPrincipal (RunProcessingStatus &)
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (WaitingTaskHolder)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRunAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void endUnfinishedLumi (bool cleaningUpAfterException)
 
void endUnfinishedRun (bool cleaningUpAfterException)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
 
void globalEndRunAsync (WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
 
void handleEndLumiExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void handleEndRunExceptions (std::exception_ptr, WaitingTaskHolder const &)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processRuns ()
 
void readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
void readAndMergeRun (RunProcessingStatus &)
 
void readFile ()
 
std::shared_ptr< LuminosityBlockPrincipalreadLuminosityBlock (std::shared_ptr< RunPrincipal > rp)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::shared_ptr< RunPrincipalreadRun ()
 
void releaseBeginRunResources (unsigned int iStream)
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns ()
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamBeginRunAsync (unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
 
void streamEndLumiAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void streamEndRunAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder, unsigned int iStreamIndex)
 
void handleNextItemAfterMergingRunEntries (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readAndMergeLumiEntriesAsync (std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
 
void readAndMergeRunEntriesAsync (std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 
void warnAboutModulesRequiringRunSynchronization () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::atomic< bool > exceptionMessageRuns_
 
std::shared_ptr< RunProcessingStatusexceptionRunStatus_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool firstItemAfterLumiMerge_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_ = InputSource::IsInvalid
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
std::vector< std::string > modulesToIgnoreForDeleteEarly_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
std::multimap< std::string, std::string > referencesToBranches_
 
std::unique_ptr< edm::LimitedTaskQueuerunQueue_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
SerialTaskQueue streamQueuesInserter_
 
std::atomic< unsigned int > streamRunActive_ {0}
 
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 68 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 364 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 365 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 234 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 78 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 213 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

218  : actReg_(),
219  preg_(),
221  serviceToken_(),
222  input_(),
223  espController_(new eventsetup::EventSetupsController),
224  esp_(),
225  act_table_(),
227  schedule_(),
228  subProcesses_(),
229  historyAppender_(new HistoryAppender),
230  fb_(),
231  looper_(),
233  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
234  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
235  principalCache_(),
236  beginJobCalled_(false),
237  shouldWeStop_(false),
238  fileModeNoMerge_(false),
240  exceptionMessageRuns_(false),
241  exceptionMessageLumis_(false),
242  forceLooperToEnd_(false),
243  looperBeginJobRun_(false),
246  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
247  processDesc->addServices(defaultServices, forcedServices);
248  init(processDesc, iToken, iLegacy);
249  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 251 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

254  : actReg_(),
255  preg_(),
257  serviceToken_(),
258  input_(),
259  espController_(new eventsetup::EventSetupsController),
260  esp_(),
261  act_table_(),
263  schedule_(),
264  subProcesses_(),
265  historyAppender_(new HistoryAppender),
266  fb_(),
267  looper_(),
269  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
270  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
271  principalCache_(),
272  beginJobCalled_(false),
273  shouldWeStop_(false),
274  fileModeNoMerge_(false),
276  exceptionMessageRuns_(false),
277  exceptionMessageLumis_(false),
278  forceLooperToEnd_(false),
279  looperBeginJobRun_(false),
282  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
283  processDesc->addServices(defaultServices, forcedServices);
285  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 287 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

290  : actReg_(),
291  preg_(),
293  serviceToken_(),
294  input_(),
295  espController_(new eventsetup::EventSetupsController),
296  esp_(),
297  act_table_(),
299  schedule_(),
300  subProcesses_(),
301  historyAppender_(new HistoryAppender),
302  fb_(),
303  looper_(),
305  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
306  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
307  principalCache_(),
308  beginJobCalled_(false),
309  shouldWeStop_(false),
310  fileModeNoMerge_(false),
312  exceptionMessageRuns_(false),
313  exceptionMessageLumis_(false),
314  forceLooperToEnd_(false),
315  looperBeginJobRun_(false),
318  init(processDesc, token, legacy);
319  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 601 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

601  {
602  // Make the services available while everything is being deleted.
605 
606  // manually destroy all these thing that may need the services around
607  // propagate_const<T> has no reset() function
608  espController_ = nullptr;
609  esp_ = nullptr;
610  schedule_ = nullptr;
611  input_ = nullptr;
612  looper_ = nullptr;
613  actReg_ = nullptr;
614 
617  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 626 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, makeListRunsInFiles::description, esp_, espController_, edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, cmsLHEtoEOSManager::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, merge(), modulesToIgnoreForDeleteEarly_, eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, referencesToBranches_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, std::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), warnAboutModulesRequiringRunSynchronization(), and edm::convertException::wrap().

Referenced by runToCompletion().

626  {
627  if (beginJobCalled_)
628  return;
629  beginJobCalled_ = true;
630  bk::beginJob();
631 
632  // StateSentry toerror(this); // should we add this ?
633  //make the services available
635 
636  service::SystemBounds bounds(preallocations_.numberOfStreams(),
640  actReg_->preallocateSignal_(bounds);
641  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
643 
644  std::vector<ModuleProcessName> consumedBySubProcesses;
646  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
647  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
648  if (consumedBySubProcesses.empty()) {
649  consumedBySubProcesses = std::move(c);
650  } else if (not c.empty()) {
651  std::vector<ModuleProcessName> tmp;
652  tmp.reserve(consumedBySubProcesses.size() + c.size());
653  std::merge(consumedBySubProcesses.begin(),
654  consumedBySubProcesses.end(),
655  c.begin(),
656  c.end(),
657  std::back_inserter(tmp));
658  std::swap(consumedBySubProcesses, tmp);
659  }
660  });
661 
662  // Note: all these may throw
665  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
666  not unusedModules.empty()) {
668 
669  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
670  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
671  "and "
672  "therefore they are deleted before beginJob transition.";
673  for (auto const& description : unusedModules) {
674  l << "\n " << description->moduleLabel();
675  }
676  });
677  for (auto const& description : unusedModules) {
678  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
679  }
680  }
681  }
682  // Initialize after the deletion of non-consumed unscheduled
683  // modules to avoid non-consumed non-run modules to keep the
684  // products unnecessarily alive
685  if (not branchesToDeleteEarly_.empty()) {
686  auto modulesToSkip = std::move(modulesToIgnoreForDeleteEarly_);
687  auto branchesToDeleteEarly = std::move(branchesToDeleteEarly_);
688  auto referencesToBranches = std::move(referencesToBranches_);
689  schedule_->initializeEarlyDelete(branchesToDeleteEarly, referencesToBranches, modulesToSkip, *preg_);
690  }
691 
692  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
693 
696  }
697  if (preallocations_.numberOfRuns() > 1) {
699  }
701 
702  //NOTE: This implementation assumes 'Job' means one call
703  // the EventProcessor::run
704  // If it really means once per 'application' then this code will
705  // have to be changed.
706  // Also have to deal with case where have 'run' then new Module
707  // added and do 'run'
708  // again. In that case the newly added Module needs its 'beginJob'
709  // to be called.
710 
711  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
712  // For now we delay calling beginOfJob until first beginOfRun
713  //if(looper_) {
714  // looper_->beginOfJob(es);
715  //}
716  try {
717  convertException::wrap([&]() { input_->doBeginJob(); });
718  } catch (cms::Exception& ex) {
719  ex.addContext("Calling beginJob for the source");
720  throw;
721  }
722  espController_->finishConfiguration();
723  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
724  if (looper_) {
725  constexpr bool mustPrefetchMayGet = true;
726  auto const processBlockLookup = preg_->productLookup(InProcess);
727  auto const runLookup = preg_->productLookup(InRun);
728  auto const lumiLookup = preg_->productLookup(InLumi);
729  auto const eventLookup = preg_->productLookup(InEvent);
730  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
731  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
732  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
733  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
734  looper_->updateLookup(esp_->recordsToProxyIndices());
735  }
736  // toerror.succeeded(); // should we add this?
737  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
738  actReg_->postBeginJobSignal_();
739 
740  oneapi::tbb::task_group group;
742  using namespace edm::waiting_task::chain;
743  first([this](auto nextTask) {
744  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
745  first([i, this](auto nextTask) {
747  schedule_->beginStream(i);
748  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
750  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
751  }) | lastTask(nextTask);
752  }
754  last.wait();
755  }
ProcessContext processContext_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
std::multimap< std::string, std::string > referencesToBranches_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::string > modulesToIgnoreForDeleteEarly_
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void warnAboutModulesRequiringRunSynchronization() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( IOVSyncValue const &  iSync,
std::shared_ptr< RunProcessingStatus iRunStatus,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1619 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), first, firstItemAfterLumiMerge_, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeLumiEntriesAsync(), readLuminosityBlock(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, streamQueuesInserter_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1621  {
1622  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1623 
1624  auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
1625  chain::first([this, &iSync, &status](auto nextTask) {
1626  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1627  nextTask,
1628  status->endIOVWaitingTasks(),
1629  status->eventSetupImpls(),
1631  actReg_.get(),
1632  serviceToken_);
1633  }) | chain::then([this, status, iRunStatus, iSync](std::exception_ptr const* iException, auto nextTask) {
1634  CMS_SA_ALLOW try {
1635  //the call to doneWaiting will cause the count to decrement
1636  if (iException) {
1637  WaitingTaskHolder copyHolder(nextTask);
1638  copyHolder.doneWaiting(*iException);
1639  }
1640 
1642  actReg_->postESSyncIOVSignal_.emit(iSync);
1643 
1644  lumiQueue_->pushAndPause(
1645  *nextTask.group(),
1646  [this, postLumiQueueTask = nextTask, status, iRunStatus](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1647  CMS_SA_ALLOW try {
1648  if (postLumiQueueTask.taskHasFailed()) {
1649  status->resetResources();
1651  endRunAsync(iRunStatus, postLumiQueueTask);
1652  return;
1653  }
1654 
1655  status->setResumer(std::move(iResumer));
1656 
1658  *postLumiQueueTask.group(),
1659  [this, postSourceTask = postLumiQueueTask, status, iRunStatus]() mutable {
1660  CMS_SA_ALLOW try {
1662 
1663  if (postSourceTask.taskHasFailed()) {
1664  status->resetResources();
1666  endRunAsync(iRunStatus, postSourceTask);
1667  return;
1668  }
1669 
1670  status->setLumiPrincipal(readLuminosityBlock(iRunStatus->runPrincipal()));
1671 
1672  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1673  {
1674  SendSourceTerminationSignalIfException sentry(actReg_.get());
1675  input_->doBeginLumi(lumiPrincipal, &processContext_);
1676  sentry.completedSuccessfully();
1677  }
1678 
1679  Service<RandomNumberGenerator> rng;
1680  if (rng.isAvailable()) {
1681  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1682  rng->preBeginLumi(lb);
1683  }
1684 
1685  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1686 
1687  using namespace edm::waiting_task::chain;
1688  chain::first([this, status](auto nextTask) mutable {
1690  firstItemAfterLumiMerge_ = true;
1691  }) | then([this, status, &es, &lumiPrincipal](auto nextTask) {
1692  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1693  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1694  beginGlobalTransitionAsync<Traits>(
1695  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1696  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1697  looper_->prefetchAsync(
1698  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1699  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1700  status->globalBeginDidSucceed();
1701  ServiceRegistry::Operate operateLooper(serviceToken_);
1702  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1703  }) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
1704  if (iException) {
1705  status->resetResources();
1707  WaitingTaskHolder copyHolder(holder);
1708  copyHolder.doneWaiting(*iException);
1709  endRunAsync(iRunStatus, holder);
1710  } else {
1711  if (not looper_) {
1712  status->globalBeginDidSucceed();
1713  }
1714 
1715  status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
1716 
1717  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1718  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1719 
1720  streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
1721  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1722  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1723  streamQueues_[i].pause();
1724 
1725  auto& event = principalCache_.eventPrincipal(i);
1726  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1727  // held by the container as this lambda may not finish executing before all the tasks it
1728  // spawns have already started to run.
1729  auto eventSetupImpls = &status->eventSetupImpls();
1730  auto lp = status->lumiPrincipal().get();
1733  event.setLuminosityBlockPrincipal(lp);
1734  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1735  using namespace edm::waiting_task::chain;
1736  chain::first([this, i, &transitionInfo](auto nextTask) {
1737  beginStreamTransitionAsync<Traits>(std::move(nextTask),
1738  *schedule_,
1739  i,
1740  transitionInfo,
1741  serviceToken_,
1742  subProcesses_);
1743  }) |
1744  then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi,
1745  auto nextTask) {
1746  if (exceptionFromBeginStreamLumi) {
1747  WaitingTaskHolder copyHolder(nextTask);
1748  copyHolder.doneWaiting(*exceptionFromBeginStreamLumi);
1749  }
1751  }) |
1752  runLast(std::move(holder));
1753  });
1754  } // end for loop over streams
1755  });
1756  }
1757  }) | runLast(postSourceTask);
1758  } catch (...) {
1759  status->resetResources();
1761  WaitingTaskHolder copyHolder(postSourceTask);
1762  copyHolder.doneWaiting(std::current_exception());
1763  endRunAsync(iRunStatus, postSourceTask);
1764  }
1765  }); // task in sourceResourcesAcquirer
1766  } catch (...) {
1767  status->resetResources();
1769  WaitingTaskHolder copyHolder(postLumiQueueTask);
1770  copyHolder.doneWaiting(std::current_exception());
1771  endRunAsync(iRunStatus, postLumiQueueTask);
1772  }
1773  }); // task in lumiQueue
1774  } catch (...) {
1775  status->resetResources();
1777  WaitingTaskHolder copyHolder(nextTask);
1778  copyHolder.doneWaiting(std::current_exception());
1779  endRunAsync(iRunStatus, nextTask);
1780  }
1781  }) | chain::runLast(std::move(iHolder));
1782  }
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1064 of file EventProcessor.cc.

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1064  {
1065  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1066  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1067 
1068  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1069  FinalWaitingTask globalWaitTask{taskGroup_};
1070 
1071  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1072  beginGlobalTransitionAsync<Traits>(
1073  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1074 
1075  globalWaitTask.wait();
1076  beginProcessBlockSucceeded = true;
1077  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRunAsync()

void edm::EventProcessor::beginRunAsync ( IOVSyncValue const &  iSync,
WaitingTaskHolder  iHolder 
)

Definition at line 1164 of file EventProcessor.cc.

References actReg_, edm::BeginRun, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, exceptionRunStatus_, first, forceESCacheClearOnNewRun_, globalEndRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, handleNextItemAfterMergingRunEntries(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, edm::make_waiting_task(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readAndMergeRunEntriesAsync(), readRun(), edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), runQueue_, schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamBeginRunAsync(), streamQueues_, streamQueuesInserter_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), and edm::waiting_task::chain::then().

Referenced by endRunAsync(), and processRuns().

1164  {
1165  if (iHolder.taskHasFailed()) {
1166  return;
1167  }
1168 
1169  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1170 
1171  auto status = std::make_shared<RunProcessingStatus>(preallocations_.numberOfStreams(), iHolder);
1172 
1173  chain::first([this, &status, &iSync](auto nextTask) {
1174  espController_->runOrQueueEventSetupForInstanceAsync(iSync,
1175  nextTask,
1176  status->endIOVWaitingTasks(),
1177  status->eventSetupImpls(),
1179  actReg_.get(),
1180  serviceToken_,
1182  }) | chain::then([this, status, iSync](std::exception_ptr const* iException, auto nextTask) {
1183  CMS_SA_ALLOW try {
1184  if (iException) {
1185  WaitingTaskHolder copyHolder(nextTask);
1186  copyHolder.doneWaiting(*iException);
1187  // Finish handling the exception in the task pushed to runQueue_
1188  }
1190  actReg_->postESSyncIOVSignal_.emit(iSync);
1191 
1192  runQueue_->pushAndPause(
1193  *nextTask.group(),
1194  [this, postRunQueueTask = nextTask, status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1195  CMS_SA_ALLOW try {
1196  if (postRunQueueTask.taskHasFailed()) {
1197  status->resetBeginResources();
1199  return;
1200  }
1201 
1202  status->setResumer(std::move(iResumer));
1203 
1205  *postRunQueueTask.group(), [this, postSourceTask = postRunQueueTask, status]() mutable {
1206  CMS_SA_ALLOW try {
1208 
1209  if (postSourceTask.taskHasFailed()) {
1210  status->resetBeginResources();
1212  status->resumeGlobalRunQueue();
1213  return;
1214  }
1215 
1216  status->setRunPrincipal(readRun());
1217 
1218  RunPrincipal& runPrincipal = *status->runPrincipal();
1219  {
1220  SendSourceTerminationSignalIfException sentry(actReg_.get());
1221  input_->doBeginRun(runPrincipal, &processContext_);
1222  sentry.completedSuccessfully();
1223  }
1224 
1225  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1226  if (looper_ && looperBeginJobRun_ == false) {
1227  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1228 
1229  oneapi::tbb::task_group group;
1230  FinalWaitingTask waitTask{group};
1231  using namespace edm::waiting_task::chain;
1232  chain::first([this, &es](auto nextTask) {
1233  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1234  }) | then([this, &es](auto nextTask) mutable {
1235  looper_->beginOfJob(es);
1236  looperBeginJobRun_ = true;
1237  looper_->doStartingNewLoop();
1238  }) | runLast(WaitingTaskHolder(group, &waitTask));
1239  waitTask.wait();
1240  }
1241 
1242  using namespace edm::waiting_task::chain;
1243  chain::first([this, status](auto nextTask) mutable {
1244  CMS_SA_ALLOW try { readAndMergeRunEntriesAsync(std::move(status), nextTask); } catch (...) {
1245  status->setStopBeforeProcessingRun(true);
1246  nextTask.doneWaiting(std::current_exception());
1247  }
1248  }) | then([this, status, &es](auto nextTask) {
1249  if (status->stopBeforeProcessingRun()) {
1250  return;
1251  }
1252  RunTransitionInfo transitionInfo(*status->runPrincipal(), es, &status->eventSetupImpls());
1253  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1254  beginGlobalTransitionAsync<Traits>(
1255  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1256  }) | then([status](auto nextTask) mutable {
1257  if (status->stopBeforeProcessingRun()) {
1258  return;
1259  }
1260  status->globalBeginDidSucceed();
1261  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1262  if (status->stopBeforeProcessingRun()) {
1263  return;
1264  }
1265  looper_->prefetchAsync(
1266  nextTask, serviceToken_, Transition::BeginRun, *status->runPrincipal(), es);
1267  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1268  if (status->stopBeforeProcessingRun()) {
1269  return;
1270  }
1271  ServiceRegistry::Operate operateLooper(serviceToken_);
1272  looper_->doBeginRun(*status->runPrincipal(), es, &processContext_);
1273  }) | then([this, status](std::exception_ptr const* iException, auto holder) mutable {
1274  bool precedingTasksSucceeded = true;
1275  if (iException) {
1276  precedingTasksSucceeded = false;
1277  WaitingTaskHolder copyHolder(holder);
1278  copyHolder.doneWaiting(*iException);
1279  }
1280 
1281  if (status->stopBeforeProcessingRun()) {
1282  // We just quit now if there was a failure when merging runs
1283  status->resetBeginResources();
1285  status->resumeGlobalRunQueue();
1286  return;
1287  }
1288  CMS_SA_ALLOW try {
1289  // Under normal circumstances, this task runs after endRun has completed for all streams
1290  // and global endLumi has completed for all lumis contained in this run
1291  auto globalEndRunTask =
1292  edm::make_waiting_task([this, status](std::exception_ptr const*) mutable {
1293  WaitingTaskHolder taskHolder = status->holderOfTaskInProcessRuns();
1294  status->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
1296  });
1297  status->setGlobalEndRunHolder(WaitingTaskHolder{*holder.group(), globalEndRunTask});
1298  } catch (...) {
1299  status->resetBeginResources();
1301  status->resumeGlobalRunQueue();
1302  holder.doneWaiting(std::current_exception());
1303  return;
1304  }
1305 
1306  // After this point we are committed to end the run via endRunAsync
1307 
1309 
1310  // The only purpose of the pause is to cause stream begin run to execute before
1311  // global begin lumi in the single threaded case (maintains consistency with
1312  // the order that existed before concurrent runs were implemented).
1313  PauseQueueSentry pauseQueueSentry(streamQueuesInserter_);
1314 
1315  CMS_SA_ALLOW try {
1317  *holder.group(), [this, status, precedingTasksSucceeded, holder]() mutable {
1318  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1319  CMS_SA_ALLOW try {
1320  streamQueues_[i].push(
1321  *holder.group(),
1322  [this, i, status, precedingTasksSucceeded, holder]() mutable {
1324  i, std::move(status), precedingTasksSucceeded, std::move(holder));
1325  });
1326  } catch (...) {
1327  if (status->streamFinishedBeginRun()) {
1328  WaitingTaskHolder copyHolder(holder);
1329  copyHolder.doneWaiting(std::current_exception());
1330  status->resetBeginResources();
1333  }
1334  }
1335  }
1336  });
1337  } catch (...) {
1338  WaitingTaskHolder copyHolder(holder);
1339  copyHolder.doneWaiting(std::current_exception());
1340  status->resetBeginResources();
1343  }
1345  }) | runLast(postSourceTask);
1346  } catch (...) {
1347  status->resetBeginResources();
1349  status->resumeGlobalRunQueue();
1350  postSourceTask.doneWaiting(std::current_exception());
1351  }
1352  }); // task in sourceResourcesAcquirer
1353  } catch (...) {
1354  status->resetBeginResources();
1356  status->resumeGlobalRunQueue();
1357  postRunQueueTask.doneWaiting(std::current_exception());
1358  }
1359  }); // task in runQueue
1360  } catch (...) {
1361  status->resetBeginResources();
1363  nextTask.doneWaiting(std::current_exception());
1364  }
1365  }) | chain::runLast(std::move(iHolder));
1366  }
ProcessContext processContext_
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
std::shared_ptr< RunPrincipal > readRun()
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, bool precedingTasksSucceeded, WaitingTaskHolder)
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 280 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

280  {
282  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 283 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 842 of file EventProcessor.cc.

References epSignal, runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

842  {
843  bool returnValue = false;
844 
845  // Look for a shutdown signal
846  if (shutdown_flag.load(std::memory_order_acquire)) {
847  returnValue = true;
849  }
850  return returnValue;
851  }
volatile std::atomic< bool > shutdown_flag

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 836 of file EventProcessor.cc.

References schedule_.

836 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ clearLumiPrincipal()

void edm::EventProcessor::clearLumiPrincipal ( LuminosityBlockProcessingStatus iStatus)

Definition at line 2062 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

2062  {
2063  for (auto& s : subProcesses_) {
2064  s.clearLumiPrincipal(*iStatus.lumiPrincipal());
2065  }
2066  iStatus.lumiPrincipal()->setRunPrincipal(std::shared_ptr<RunPrincipal>());
2067  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
2068  iStatus.lumiPrincipal()->clearPrincipal();
2069  }
std::vector< SubProcess > subProcesses_

◆ clearRunPrincipal()

void edm::EventProcessor::clearRunPrincipal ( RunProcessingStatus iStatus)

Definition at line 2037 of file EventProcessor.cc.

References edm::RunPrincipal::kUninitialized, edm::RunProcessingStatus::runPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndRunAsync().

2037  {
2038  for (auto& s : subProcesses_) {
2039  s.clearRunPrincipal(*iStatus.runPrincipal());
2040  }
2041  iStatus.runPrincipal()->setShouldWriteRun(RunPrincipal::kUninitialized);
2042  iStatus.runPrincipal()->clearPrincipal();
2043  }
std::vector< SubProcess > subProcesses_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 964 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

964  {
965  if (fileBlockValid()) {
966  SendSourceTerminationSignalIfException sentry(actReg_.get());
967  input_->closeFile(fb_.get(), cleaningUpAfterException);
968  sentry.completedSuccessfully();
969  }
970  FDEBUG(1) << "\tcloseInputFile\n";
971  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 981 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

981  {
982  schedule_->closeOutputFiles();
983  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
984  processBlockHelper_->clearAfterOutputFilesClose();
985  FDEBUG(1) << "\tcloseOutputFiles\n";
986  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1784 of file EventProcessor.cc.

References first, firstItemAfterLumiMerge_, h, handleNextEventForStreamAsync(), input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kProcessing, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeLumi(), edm::waiting_task::chain::runLast(), mps_update::status, streamLumiStatus_, and edm::waiting_task::chain::then().

Referenced by processRuns().

1784  {
1785  chain::first([this](auto nextTask) {
1786  //all streams are sharing the same status at the moment
1787  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1789 
1790  while (lastTransitionType() == InputSource::IsLumi and
1791  status->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
1794  }
1795  firstItemAfterLumiMerge_ = true;
1796  }) | chain::then([this](auto nextTask) mutable {
1797  unsigned int streamIndex = 0;
1798  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1799  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1800  arena.enqueue([this, streamIndex, h = nextTask]() { handleNextEventForStreamAsync(h, streamIndex); });
1801  }
1802  nextTask.group()->run(
1803  [this, streamIndex, h = std::move(nextTask)]() { handleNextEventForStreamAsync(h, streamIndex); });
1804  }) | chain::runLast(std::move(iHolder));
1805  }
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1055 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1055  {
1056  FDEBUG(1) << "\tdoErrorStuff\n";
1057  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1058  << "and went to the error state\n"
1059  << "Will attempt to terminate processing normally\n"
1060  << "(IF using the looper the next loop will be attempted)\n"
1061  << "This likely indicates a bug in an input module or corrupted input or both\n";
1062  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 757 of file EventProcessor.cc.

References actReg_, c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::first(), watchdog::group, mps_fire::i, input_, cmsLHEtoEOSManager::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

757  {
758  // Collects exceptions, so we don't throw before all operations are performed.
759  ExceptionCollector c(
760  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
761 
762  //make the services available
764 
765  using namespace edm::waiting_task::chain;
766 
767  oneapi::tbb::task_group group;
768  edm::FinalWaitingTask waitTask{group};
769 
770  {
771  //handle endStream transitions
772  edm::WaitingTaskHolder taskHolder(group, &waitTask);
773  std::mutex collectorMutex;
774  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
775  first([this, i, &c, &collectorMutex](auto nextTask) {
776  std::exception_ptr ep;
777  try {
779  this->schedule_->endStream(i);
780  } catch (...) {
781  ep = std::current_exception();
782  }
783  if (ep) {
784  std::lock_guard<std::mutex> l(collectorMutex);
785  c.call([&ep]() { std::rethrow_exception(ep); });
786  }
787  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
788  for (auto& subProcess : subProcesses_) {
789  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
790  std::exception_ptr ep;
791  try {
793  subProcess.doEndStream(i);
794  } catch (...) {
795  ep = std::current_exception();
796  }
797  if (ep) {
798  std::lock_guard<std::mutex> l(collectorMutex);
799  c.call([&ep]() { std::rethrow_exception(ep); });
800  }
801  }) | lastTask(nextTask);
802  }
803  }) | lastTask(taskHolder);
804  }
805  }
806  waitTask.waitNoThrow();
807 
808  auto actReg = actReg_.get();
809  c.call([actReg]() { actReg->preEndJobSignal_(); });
810  schedule_->endJob(c);
811  for (auto& subProcess : subProcesses_) {
812  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
813  }
814  c.call(std::bind(&InputSource::doEndJob, input_.get()));
815  if (looper_) {
816  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
817  }
818  c.call([actReg]() { actReg->postEndJobSignal_(); });
819  if (c.hasThrown()) {
820  c.rethrow();
821  }
822  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
static std::mutex mutex
Definition: Proxy.cc:8
std::shared_ptr< EDLooperBase const > looper() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1016 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1016  {
1017  if (looper_) {
1018  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1019  looper_->setModuleChanger(&changer);
1020  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1021  looper_->setModuleChanger(nullptr);
1023  return true;
1024  else
1025  return false;
1026  }
1027  FDEBUG(1) << "\tendOfLoop\n";
1028  return true;
1029  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1105 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1105  {
1106  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1107 
1108  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1109  FinalWaitingTask globalWaitTask{taskGroup_};
1110 
1111  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1112  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1113  *schedule_,
1114  transitionInfo,
1115  serviceToken_,
1116  subProcesses_,
1117  cleaningUpAfterException);
1118  globalWaitTask.wait();
1119 
1120  if (beginProcessBlockSucceeded) {
1121  FinalWaitingTask writeWaitTask{taskGroup_};
1123  writeWaitTask.wait();
1124  }
1125 
1126  processBlockPrincipal.clearPrincipal();
1127  for (auto& s : subProcesses_) {
1128  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1129  }
1130  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRunAsync()

void edm::EventProcessor::endRunAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)

Definition at line 1409 of file EventProcessor.cc.

References actReg_, beginRunAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::RunPrincipal::endTime(), espController_, first, handleEndRunExceptions(), mps_fire::i, input_, edm::InputSource::IsRun, lastTransitionType(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), serviceToken_, edm::RunPrincipal::setEndTime(), streamEndRunAsync(), streamQueues_, streamQueuesInserter_, and edm::waiting_task::chain::then().

Referenced by beginLumiAsync(), endUnfinishedRun(), handleNextEventForStreamAsync(), and handleNextItemAfterMergingRunEntries().

1409  {
1410  RunPrincipal& runPrincipal = *iRunStatus->runPrincipal();
1411  iRunStatus->setEndTime();
1412  IOVSyncValue ts(
1413  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1414  runPrincipal.endTime());
1415  CMS_SA_ALLOW try { actReg_->esSyncIOVQueuingSignal_.emit(ts); } catch (...) {
1416  WaitingTaskHolder copyHolder(iHolder);
1417  copyHolder.doneWaiting(std::current_exception());
1418  }
1419 
1420  chain::first([this, &iRunStatus, &ts](auto nextTask) {
1421  espController_->runOrQueueEventSetupForInstanceAsync(ts,
1422  nextTask,
1423  iRunStatus->endIOVWaitingTasksEndRun(),
1424  iRunStatus->eventSetupImplsEndRun(),
1426  actReg_.get(),
1427  serviceToken_);
1428  }) | chain::then([this, iRunStatus, ts](std::exception_ptr const* iException, auto nextTask) {
1429  if (iException) {
1430  iRunStatus->setEndingEventSetupSucceeded(false);
1431  handleEndRunExceptions(*iException, nextTask);
1432  }
1434  CMS_SA_ALLOW try { actReg_->postESSyncIOVSignal_.emit(ts); } catch (...) {
1435  WaitingTaskHolder copyHolder(nextTask);
1436  copyHolder.doneWaiting(std::current_exception());
1437  }
1438 
1439  streamQueuesInserter_.push(*nextTask.group(), [this, nextTask]() mutable {
1440  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1441  CMS_SA_ALLOW try {
1442  streamQueues_[i].push(*nextTask.group(), [this, i, nextTask]() mutable {
1443  streamQueues_[i].pause();
1444  streamEndRunAsync(std::move(nextTask), i);
1445  });
1446  } catch (...) {
1447  WaitingTaskHolder copyHolder(nextTask);
1448  copyHolder.doneWaiting(std::current_exception());
1449  }
1450  }
1451  });
1452 
1454  CMS_SA_ALLOW try {
1455  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()), nextTask);
1456  } catch (...) {
1457  WaitingTaskHolder copyHolder(nextTask);
1458  copyHolder.doneWaiting(std::current_exception());
1459  }
1460  }
1461  }) | chain::runLast(std::move(iHolder));
1462  }
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
SerialTaskQueue streamQueuesInserter_
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
InputSource::ItemType lastTransitionType() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr< ActivityRegistry > actReg_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( bool  cleaningUpAfterException)

Definition at line 1928 of file EventProcessor.cc.

References cms::cuda::assert(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamRunActive_, and taskGroup_.

1928  {
1929  if (streamRunActive_ == 0) {
1930  assert(streamLumiActive_ == 0);
1931  } else {
1933  if (streamLumiActive_ > 0) {
1934  FinalWaitingTask globalWaitTask{taskGroup_};
1936  streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
1937  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1938  streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
1939  }
1940  globalWaitTask.wait();
1941  }
1942  }
1943  }
assert(be >=bs)
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::atomic< unsigned int > streamLumiActive_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( bool  cleaningUpAfterException)

Definition at line 1605 of file EventProcessor.cc.

References endRunAsync(), edm::InputSource::IsStop, lastSourceTransition_, eostools::move(), streamRunActive_, streamRunStatus_, and taskGroup_.

1605  {
1606  if (streamRunActive_ > 0) {
1607  FinalWaitingTask waitTask{taskGroup_};
1608 
1609  auto runStatus = streamRunStatus_[0].get();
1610  runStatus->setCleaningUpAfterException(cleaningUpAfterException);
1611  WaitingTaskHolder holder{taskGroup_, &waitTask};
1612  runStatus->setHolderOfTaskInProcessRuns(holder);
1614  endRunAsync(streamRunStatus_[0], std::move(holder));
1615  waitTask.wait();
1616  }
1617  }
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
InputSource::ItemType lastSourceTransition_
oneapi::tbb::task_group taskGroup_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::atomic< unsigned int > streamRunActive_
def move(src, dest)
Definition: eostools.py:511

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 191 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

191 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 826 of file EventProcessor.cc.

References schedule_.

826  {
827  return schedule_->getAllModuleDescriptions();
828  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 824 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

824 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1816 of file EventProcessor.cc.

References clearLumiPrincipal(), CMS_SA_ALLOW, edm::EndLuminosityBlock, esp_, first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1817  {
1818  // Get some needed info out of the status object before moving
1819  // it into finalTaskForThisLumi.
1820  auto& lp = *(iLumiStatus->lumiPrincipal());
1821  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1822  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1823  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1824  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1825 
1826  using namespace edm::waiting_task::chain;
1827  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1828  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1829 
1830  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1831  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1832  endGlobalTransitionAsync<Traits>(
1833  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1834  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1835  //Only call writeLumi if beginLumi succeeded
1836  if (didGlobalBeginSucceed) {
1837  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1838  }
1839  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1840  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1841  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1842  //any thrown exception auto propagates to nextTask via the chain
1844  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1845  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iException, auto nextTask) mutable {
1846  if (iException) {
1847  handleEndLumiExceptions(*iException, nextTask);
1848  }
1850 
1851  std::exception_ptr ptr;
1852 
1853  // Try hard to clean up resources so the
1854  // process can terminate in a controlled
1855  // fashion even after exceptions have occurred.
1856  // Caught exception is passed to handleEndLumiExceptions()
1857  CMS_SA_ALLOW try { clearLumiPrincipal(*status); } catch (...) {
1858  if (not ptr) {
1859  ptr = std::current_exception();
1860  }
1861  }
1862  // Caught exception is passed to handleEndLumiExceptions()
1863  CMS_SA_ALLOW try { queueWhichWaitsForIOVsToFinish_.resume(); } catch (...) {
1864  if (not ptr) {
1865  ptr = std::current_exception();
1866  }
1867  }
1868  // Caught exception is passed to handleEndLumiExceptions()
1869  CMS_SA_ALLOW try {
1870  status->resetResources();
1871  status->globalEndRunHolderDoneWaiting();
1872  status.reset();
1873  } catch (...) {
1874  if (not ptr) {
1875  ptr = std::current_exception();
1876  }
1877  }
1878 
1879  if (ptr && !iException) {
1880  handleEndLumiExceptions(ptr, nextTask);
1881  }
1882  }) | runLast(std::move(iTask));
1883  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ globalEndRunAsync()

void edm::EventProcessor::globalEndRunAsync ( WaitingTaskHolder  iTask,
std::shared_ptr< RunProcessingStatus iRunStatus 
)

Definition at line 1473 of file EventProcessor.cc.

References clearRunPrincipal(), CMS_SA_ALLOW, edm::EndRun, esp_, first, handleEndRunExceptions(), edm::waiting_task::chain::ifThen(), looper_, eostools::move(), edm::MergeableRunProductMetadata::preWriteRun(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and writeRunAsync().

Referenced by beginRunAsync().

1473  {
1474  auto& runPrincipal = *(iRunStatus->runPrincipal());
1475  bool didGlobalBeginSucceed = iRunStatus->didGlobalBeginSucceed();
1476  bool cleaningUpAfterException = iRunStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1477  EventSetupImpl const& es = iRunStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1478  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iRunStatus->eventSetupImplsEndRun();
1479  bool endingEventSetupSucceeded = iRunStatus->endingEventSetupSucceeded();
1480 
1481  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1482  using namespace edm::waiting_task::chain;
1483  chain::first([this, &runPrincipal, &es, &eventSetupImpls, cleaningUpAfterException, endingEventSetupSucceeded](
1484  auto nextTask) {
1485  if (endingEventSetupSucceeded) {
1486  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1487  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1488  endGlobalTransitionAsync<Traits>(
1489  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1490  }
1491  }) |
1492  ifThen(looper_ && endingEventSetupSucceeded,
1493  [this, &runPrincipal, &es](auto nextTask) {
1494  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1495  }) |
1496  ifThen(looper_ && endingEventSetupSucceeded,
1497  [this, &runPrincipal, &es](auto nextTask) {
1499  looper_->doEndRun(runPrincipal, es, &processContext_);
1500  }) |
1501  ifThen(didGlobalBeginSucceed && endingEventSetupSucceeded,
1502  [this, mergeableRunProductMetadata, &runPrincipal = runPrincipal](auto nextTask) {
1503  mergeableRunProductMetadata->preWriteRun();
1504  writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
1505  }) |
1506  then([status = std::move(iRunStatus),
1507  this,
1508  didGlobalBeginSucceed,
1509  mergeableRunProductMetadata,
1510  endingEventSetupSucceeded](std::exception_ptr const* iException, auto nextTask) mutable {
1511  if (didGlobalBeginSucceed && endingEventSetupSucceeded) {
1512  mergeableRunProductMetadata->postWriteRun();
1513  }
1514  if (iException) {
1515  handleEndRunExceptions(*iException, nextTask);
1516  }
1518 
1519  std::exception_ptr ptr;
1520 
1521  // Try hard to clean up resources so the
1522  // process can terminate in a controlled
1523  // fashion even after exceptions have occurred.
1524  CMS_SA_ALLOW try { clearRunPrincipal(*status); } catch (...) {
1525  if (not ptr) {
1526  ptr = std::current_exception();
1527  }
1528  }
1529  CMS_SA_ALLOW try {
1530  status->resumeGlobalRunQueue();
1532  } catch (...) {
1533  if (not ptr) {
1534  ptr = std::current_exception();
1535  }
1536  }
1537  CMS_SA_ALLOW try {
1538  status->resetEndResources();
1539  status.reset();
1540  } catch (...) {
1541  if (not ptr) {
1542  ptr = std::current_exception();
1543  }
1544  }
1545 
1546  if (ptr && !iException) {
1547  handleEndRunExceptions(ptr, nextTask);
1548  }
1549  }) |
1550  runLast(std::move(iTask));
1551  }
ProcessContext processContext_
void clearRunPrincipal(RunProcessingStatus &)
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1807 of file EventProcessor.cc.

References setExceptionMessageLumis(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1807  {
1808  if (holder.taskHasFailed()) {
1810  } else {
1811  WaitingTaskHolder tmp(holder);
1812  tmp.doneWaiting(iException);
1813  }
1814  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleEndRunExceptions()

void edm::EventProcessor::handleEndRunExceptions ( std::exception_ptr  iException,
WaitingTaskHolder const &  holder 
)

Definition at line 1464 of file EventProcessor.cc.

References setExceptionMessageRuns(), edm::WaitingTaskHolder::taskHasFailed(), and createJobs::tmp.

Referenced by endRunAsync(), globalEndRunAsync(), and streamEndRunAsync().

1464  {
1465  if (holder.taskHasFailed()) {
1467  } else {
1468  WaitingTaskHolder tmp(holder);
1469  tmp.doneWaiting(iException);
1470  }
1471  }
tmp
align.sh
Definition: createJobs.py:716

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 2202 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kStopLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiStatus_, streamRunStatus_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginLumiAsync(), and continueLumiAsync().

2202  {
2203  auto group = iTask.group();
2204  sourceResourcesAcquirer_.serialQueueChain().push(*group, [this, iTask = std::move(iTask), iStreamIndex]() mutable {
2205  CMS_SA_ALLOW try {
2206  auto status = streamLumiStatus_[iStreamIndex].get();
2208 
2209  if (readNextEventForStream(iTask, iStreamIndex, *status)) {
2210  auto recursionTask =
2211  make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iEventException) mutable {
2212  if (iEventException) {
2213  WaitingTaskHolder copyHolder(iTask);
2214  copyHolder.doneWaiting(*iEventException);
2215  // Intentionally, we don't return here. The recursive call to
2216  // handleNextEvent takes care of immediately ending the run properly
2217  // using the same code it uses to end the run in other situations.
2218  }
2219  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2220  });
2221 
2222  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
2223  } else {
2224  // the stream will stop processing this lumi now
2226  if (not status->haveStartedNextLumiOrEndedRun()) {
2227  status->startNextLumiOrEndRun();
2228  if (lastTransitionType() == InputSource::IsLumi && !iTask.taskHasFailed()) {
2229  CMS_SA_ALLOW try {
2230  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2231  input_->luminosityBlockAuxiliary()->beginTime()),
2232  streamRunStatus_[iStreamIndex],
2233  iTask);
2234  } catch (...) {
2235  WaitingTaskHolder copyHolder(iTask);
2236  copyHolder.doneWaiting(std::current_exception());
2237  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2238  }
2239  } else {
2240  // If appropriate, this will also start the next run.
2241  endRunAsync(streamRunStatus_[iStreamIndex], iTask);
2242  }
2243  }
2244  streamEndLumiAsync(iTask, iStreamIndex);
2245  } else {
2246  assert(status->eventProcessingState() ==
2248  auto runStatus = streamRunStatus_[iStreamIndex].get();
2249 
2250  if (runStatus->holderOfTaskInProcessRuns().hasTask()) {
2251  runStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2252  }
2253  }
2254  }
2255  } catch (...) {
2256  WaitingTaskHolder copyHolder(iTask);
2257  copyHolder.doneWaiting(std::current_exception());
2258  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
2259  }
2260  });
2261  }
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ handleNextItemAfterMergingRunEntries()

void edm::EventProcessor::handleNextItemAfterMergingRunEntries ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2120 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), endRunAsync(), input_, edm::InputSource::IsFile, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), and edm::WaitingTaskHolder::taskHasFailed().

Referenced by beginRunAsync(), and processRuns().

2121  {
2123  iRunStatus->holderOfTaskInProcessRuns().doneWaiting(std::exception_ptr{});
2124  iHolder.doneWaiting(std::exception_ptr{});
2125  } else if (lastTransitionType() == InputSource::IsLumi && !iHolder.taskHasFailed()) {
2126  CMS_SA_ALLOW try {
2127  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
2128  input_->luminosityBlockAuxiliary()->beginTime()),
2129  iRunStatus,
2130  iHolder);
2131  } catch (...) {
2132  WaitingTaskHolder copyHolder(iHolder);
2133  iHolder.doneWaiting(std::current_exception());
2134  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2135  }
2136  } else {
2137  // Note that endRunAsync will call beginRunAsync for the following run
2138  // if appropriate.
2139  endRunAsync(std::move(iRunStatus), std::move(iHolder));
2140  }
2141  }
#define CMS_SA_ALLOW
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 321 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, testHGCalDigi_cfg::dumpOptions, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, DMR_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ServiceRegistry::get(), edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ModuleDescription::getUniqueID(), edm::ParameterSet::getUntrackedParameterSet(), watchdog::group, historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::ServiceRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, modulesToIgnoreForDeleteEarly_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, muonDTDigis_cfi::pset, referencesToBranches_, edm::ParameterSet::registerIt(), runQueue_, schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, streamRunStatus_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

323  {
324  //std::cerr << processDesc->dump() << std::endl;
325 
326  // register the empty parentage vector , once and for all
328 
329  // register the empty parameter set, once and for all.
330  ParameterSet().registerIt();
331 
332  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
333 
334  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
335  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
336  bool const hasSubProcesses = !subProcessVParameterSet.empty();
337 
338  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
339  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
340  // set in here if the parameters were not explicitly set.
342 
343  // Now set some parameters specific to the main process.
344  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
345  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
346  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
347  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
348  << fileMode << ".\n"
349  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
350  } else {
351  fileModeNoMerge_ = (fileMode == "NOMERGE");
352  }
353  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
355 
356  //threading
357  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
358 
359  // Even if numberOfThreads was set to zero in the Python configuration, the code
360  // in cmsRun.cpp should have reset it to something else.
361  assert(nThreads != 0);
362 
363  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
364  if (nStreams == 0) {
365  nStreams = nThreads;
366  }
367  unsigned int nConcurrentLumis =
368  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
369  if (nConcurrentLumis == 0) {
370  nConcurrentLumis = 2;
371  }
372  if (nConcurrentLumis > nStreams) {
373  nConcurrentLumis = nStreams;
374  }
375  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
376  if (nConcurrentRuns == 0 || nConcurrentRuns > nConcurrentLumis) {
377  nConcurrentRuns = nConcurrentLumis;
378  }
379  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
380  if (!loopers.empty()) {
381  //For now loopers make us run only 1 transition at a time
382  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
383  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
384  "of concurrent runs, and the number of concurrent lumis "
385  "are all being reset to 1. Loopers cannot currently support "
386  "values greater than 1.";
387  nStreams = 1;
388  nConcurrentLumis = 1;
389  nConcurrentRuns = 1;
390  }
391  }
392  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
393  if (dumpOptions) {
394  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
395  } else {
396  if (nThreads > 1 or nStreams > 1) {
397  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
398  }
399  }
400 
401  // The number of concurrent IOVs is configured individually for each record in
402  // the class NumberOfConcurrentIOVs to values less than or equal to this.
403  // This maximum simplifies to being equal nConcurrentLumis if nConcurrentRuns is 1.
404  // Considering endRun, beginRun, and beginLumi we might need 3 concurrent IOVs per
405  // concurrent run past the first in use cases where IOVs change within a run.
406  unsigned int maxConcurrentIOVs =
407  3 * nConcurrentRuns - 2 + ((nConcurrentLumis > nConcurrentRuns) ? (nConcurrentLumis - nConcurrentRuns) : 0);
408 
409  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
410 
411  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
413  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
414  //for now, if have a subProcess, don't allow early delete
415  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
416  if (not hasSubProcesses) {
417  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
418  }
419  if (not branchesToDeleteEarly_.empty()) {
420  auto referencePSets =
421  optionsPset.getUntrackedParameter<std::vector<edm::ParameterSet>>("holdsReferencesToDeleteEarly");
422  for (auto const& pset : referencePSets) {
423  auto product = pset.getParameter<std::string>("product");
424  auto references = pset.getParameter<std::vector<std::string>>("references");
425  for (auto const& ref : references) {
426  referencesToBranches_.emplace(product, ref);
427  }
428  }
430  optionsPset.getUntrackedParameter<std::vector<std::string>>("modulesToIgnoreForDeleteEarly");
431  }
432 
433  // Now do general initialization
434  ScheduleItems items;
435 
436  //initialize the services
437  auto& serviceSets = processDesc->getServicesPSets();
438  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
439  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
440 
441  //make the services available
443 
444  CMS_SA_ALLOW try {
445  if (nThreads > 1) {
447  handler->willBeUsingThreads();
448  }
449 
450  // intialize miscellaneous items
451  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
452 
453  // intialize the event setup provider
454  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
455  esp_ = espController_->makeProvider(
456  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
457 
458  // initialize the looper, if any
459  if (!loopers.empty()) {
461  looper_->setActionTable(items.act_table_.get());
462  looper_->attachTo(*items.actReg_);
463 
464  // in presence of looper do not delete modules
466  }
467 
468  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
469 
470  runQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentRuns);
471  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
472  streamQueues_.resize(nStreams);
473  streamRunStatus_.resize(nStreams);
474  streamLumiStatus_.resize(nStreams);
475 
476  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
477 
478  {
479  std::optional<ScheduleItems::MadeModules> madeModules;
480 
481  //setup input and modules concurrently
482  tbb::task_group group;
483 
484  // initialize the input source
485  auto tempReg = std::make_shared<ProductRegistry>();
486  auto sourceID = ModuleDescription::getUniqueID();
487 
488  group.run([&, this]() {
489  // initialize the Schedule
491  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
492  madeModules = items.initModules(*parameterSet, tns, preallocations_, &processContext_);
493  });
494 
495  group.run([&, this, tempReg]() {
497  input_ = makeInput(sourceID,
498  *parameterSet,
499  *common,
500  /*items.preg(),*/ tempReg,
501  items.branchIDListHelper(),
503  items.thinnedAssociationsHelper(),
504  items.actReg_,
505  items.processConfiguration(),
507  });
508 
509  group.wait();
510  items.preg()->addFromInput(*tempReg);
511  input_->switchTo(items.preg());
512 
513  {
514  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
515  schedule_ = items.finishSchedule(std::move(*madeModules),
516  *parameterSet,
517  tns,
518  hasSubProcesses,
522  }
523  }
524 
525  // set the data members
526  act_table_ = std::move(items.act_table_);
527  actReg_ = items.actReg_;
528  preg_ = items.preg();
530  branchIDListHelper_ = items.branchIDListHelper();
531  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
532  processConfiguration_ = items.processConfiguration();
534 
535  FDEBUG(2) << parameterSet << std::endl;
536 
538  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
539  // Reusable event principal
540  auto ep = std::make_shared<EventPrincipal>(preg(),
544  historyAppender_.get(),
545  index,
546  true /*primary process*/,
549  }
550 
551  for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
552  auto rp = std::make_unique<RunPrincipal>(
555  }
556 
557  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
558  auto lp =
559  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
561  }
562 
563  {
564  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
566 
567  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
569  }
570 
571  // fill the subprocesses, if there are any
572  subProcesses_.reserve(subProcessVParameterSet.size());
573  for (auto& subProcessPSet : subProcessVParameterSet) {
574  subProcesses_.emplace_back(subProcessPSet,
575  *parameterSet,
576  preg(),
580  SubProcessParentageHelper(),
582  *actReg_,
583  token,
586  &processContext_);
587  }
588  } catch (...) {
589  //in case of an exception, make sure Services are available
590  // during the following destructors
591  espController_ = nullptr;
592  esp_ = nullptr;
593  schedule_ = nullptr;
594  input_ = nullptr;
595  looper_ = nullptr;
596  actReg_ = nullptr;
597  throw;
598  }
599  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::multimap< std::string, std::string > referencesToBranches_
fileMode
Definition: DMR_cfg.py:72
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::string > modulesToIgnoreForDeleteEarly_
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
static ServiceRegistry & instance()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:798
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1079 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1079  {
1080  input_->fillProcessBlockHelper();
1081  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1082  while (input_->nextProcessBlock(processBlockPrincipal)) {
1083  readProcessBlock(processBlockPrincipal);
1084 
1085  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1086  FinalWaitingTask globalWaitTask{taskGroup_};
1087 
1088  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1089  beginGlobalTransitionAsync<Traits>(
1090  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1091 
1092  globalWaitTask.wait();
1093 
1094  FinalWaitingTask writeWaitTask{taskGroup_};
1096  writeWaitTask.wait();
1097 
1098  processBlockPrincipal.clearPrincipal();
1099  for (auto& s : subProcesses_) {
1100  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1101  }
1102  }
1103  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 290 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

290 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 291 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

291 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 853 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by continueLumiAsync(), processRuns(), readAndMergeLumiEntriesAsync(), readAndMergeRunEntriesAsync(), and readNextEventForStream().

853  {
854  SendSourceTerminationSignalIfException sentry(actReg_.get());
855  InputSource::ItemType itemType;
856  //For now, do nothing with InputSource::IsSynchronize
857  do {
858  itemType = input_->nextItemType();
859  } while (itemType == InputSource::IsSynchronize);
860 
861  lastSourceTransition_ = itemType;
862  sentry.completedSuccessfully();
863 
865 
867  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
869  }
870 
871  return lastSourceTransition_;
872  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 973 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

973  {
974  if (fileBlockValid()) {
975  schedule_->openOutputFiles(*fb_);
976  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
977  }
978  FDEBUG(1) << "\topenOutputFiles\n";
979  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 278 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), and readFile().

278 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 279 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

279 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1037 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1037  {
1038  looper_->prepareForNextLoop(esp_.get());
1039  FDEBUG(1) << "\tprepareForNextLoop\n";
1040  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 142 of file EventProcessor.h.

References processConfiguration_.

142 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2278 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

2278  {
2279  iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
2280  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2282 of file EventProcessor.cc.

References esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

2282  {
2283  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2284 
2286  Service<RandomNumberGenerator> rng;
2287  if (rng.isAvailable()) {
2288  Event ev(*pep, ModuleDescription(), nullptr);
2289  rng->postEventRead(ev);
2290  }
2291 
2292  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2293  using namespace edm::waiting_task::chain;
2294  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
2295  EventTransitionInfo info(*pep, es);
2296  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
2297  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
2298  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
2299  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
2300  }
2301  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
2302  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
2303  ServiceRegistry::Operate operateLooper(serviceToken_);
2304  processEventWithLooper(*pep, iStreamIndex);
2305  }) | then([pep](auto nextTask) {
2306  FDEBUG(1) << "\tprocessEvent\n";
2307  pep->clearEventPrincipal();
2308  }) | runLast(iHolder);
2309  }
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 2311 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

2311  {
2312  bool randomAccess = input_->randomAccess();
2313  ProcessingController::ForwardState forwardState = input_->forwardState();
2314  ProcessingController::ReverseState reverseState = input_->reverseState();
2315  ProcessingController pc(forwardState, reverseState, randomAccess);
2316 
2318  do {
2319  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2320  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
2321  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
2322 
2323  bool succeeded = true;
2324  if (randomAccess) {
2325  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2326  input_->skipEvents(-2);
2327  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2328  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2329  }
2330  }
2331  pc.setLastOperationSucceeded(succeeded);
2332  } while (!pc.lastOperationSucceeded());
2334  shouldWeStop_ = true;
2335  }
2336  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processRuns()

InputSource::ItemType edm::EventProcessor::processRuns ( )

Definition at line 1132 of file EventProcessor.cc.

References cms::cuda::assert(), beginRunAsync(), continueLumiAsync(), handleNextItemAfterMergingRunEntries(), input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, readAndMergeRun(), streamLumiActive_, streamRunActive_, streamRunStatus_, and taskGroup_.

1132  {
1133  FinalWaitingTask waitTask{taskGroup_};
1135  if (streamRunActive_ == 0) {
1136  assert(streamLumiActive_ == 0);
1137 
1138  beginRunAsync(IOVSyncValue(EventID(input_->run(), 0, 0), input_->runAuxiliary()->beginTime()),
1139  WaitingTaskHolder{taskGroup_, &waitTask});
1140  } else {
1142 
1143  auto runStatus = streamRunStatus_[0];
1144 
1145  while (lastTransitionType() == InputSource::IsRun and runStatus->runPrincipal()->run() == input_->run() and
1146  runStatus->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
1147  readAndMergeRun(*runStatus);
1149  }
1150 
1151  WaitingTaskHolder holder{taskGroup_, &waitTask};
1152  runStatus->setHolderOfTaskInProcessRuns(holder);
1153  if (streamLumiActive_ > 0) {
1155  continueLumiAsync(std::move(holder));
1156  } else {
1158  }
1159  }
1160  waitTask.wait();
1161  return lastTransitionType();
1162  }
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
PreallocationConfiguration preallocations_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void readAndMergeRun(RunProcessingStatus &)
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
oneapi::tbb::task_group taskGroup_
std::atomic< unsigned int > streamRunActive_
void continueLumiAsync(WaitingTaskHolder)
std::atomic< unsigned int > streamLumiActive_
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
def move(src, dest)
Definition: eostools.py:511

◆ readAndMergeLumi()

void edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1990 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by continueLumiAsync(), and readAndMergeLumiEntriesAsync().

1990  {
1991  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1992  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1993  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1994  input_->processHistoryRegistry().reducedProcessHistoryID(
1995  input_->luminosityBlockAuxiliary()->processHistoryID()));
1996  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1997  assert(lumiOK);
1998  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1999  {
2000  SendSourceTerminationSignalIfException sentry(actReg_.get());
2001  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
2002  sentry.completedSuccessfully();
2003  }
2004  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeLumiEntriesAsync()

void edm::EventProcessor::readAndMergeLumiEntriesAsync ( std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2098 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsLumi, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeLumi(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, and sourceResourcesAcquirer_.

Referenced by beginLumiAsync().

2099  {
2100  auto group = iHolder.group();
2102  *group, [this, iLumiStatus = std::move(iLumiStatus), holder = std::move(iHolder)]() mutable {
2103  CMS_SA_ALLOW try {
2105 
2106  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2107 
2109  while (lastTransitionType() == InputSource::IsLumi and
2110  iLumiStatus->lumiPrincipal()->luminosityBlock() == input_->luminosityBlock()) {
2111  readAndMergeLumi(*iLumiStatus);
2113  }
2114  } catch (...) {
2115  holder.doneWaiting(std::current_exception());
2116  }
2117  });
2118  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readAndMergeRun()

void edm::EventProcessor::readAndMergeRun ( RunProcessingStatus iStatus)

Definition at line 1964 of file EventProcessor.cc.

References actReg_, edm::Principal::adjustToNewProductRegistry(), cms::cuda::assert(), input_, edm::RunPrincipal::mergeAuxiliary(), preg_, and edm::RunProcessingStatus::runPrincipal().

Referenced by processRuns(), and readAndMergeRunEntriesAsync().

1964  {
1965  RunPrincipal& runPrincipal = *iStatus.runPrincipal();
1966 
1967  bool runOK = runPrincipal.adjustToNewProductRegistry(*preg_);
1968  assert(runOK);
1969  runPrincipal.mergeAuxiliary(*input_->runAuxiliary());
1970  {
1971  SendSourceTerminationSignalIfException sentry(actReg_.get());
1972  input_->readAndMergeRun(runPrincipal);
1973  sentry.completedSuccessfully();
1974  }
1975  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRunEntriesAsync()

void edm::EventProcessor::readAndMergeRunEntriesAsync ( std::shared_ptr< RunProcessingStatus iRunStatus,
WaitingTaskHolder  iHolder 
)
private

Definition at line 2071 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::group(), watchdog::group, input_, edm::InputSource::IsRun, lastTransitionType(), eostools::move(), nextTransitionType(), edm::SerialTaskQueueChain::push(), readAndMergeRun(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceMutex_, sourceResourcesAcquirer_, and mps_update::status.

Referenced by beginRunAsync().

2072  {
2073  auto group = iHolder.group();
2075  *group, [this, status = std::move(iRunStatus), holder = std::move(iHolder)]() mutable {
2076  CMS_SA_ALLOW try {
2078 
2079  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2080 
2082  while (lastTransitionType() == InputSource::IsRun and status->runPrincipal()->run() == input_->run() and
2083  status->runPrincipal()->reducedProcessHistoryID() == input_->reducedProcessHistoryID()) {
2084  if (status->holderOfTaskInProcessRuns().taskHasFailed()) {
2085  status->setStopBeforeProcessingRun(true);
2086  return;
2087  }
2090  }
2091  } catch (...) {
2092  status->setStopBeforeProcessingRun(true);
2093  holder.doneWaiting(std::current_exception());
2094  }
2095  });
2096  }
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
SerialTaskQueueChain & serialQueueChain() const
std::shared_ptr< std::recursive_mutex > sourceMutex_
void readAndMergeRun(RunProcessingStatus &)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2263 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, streamLumiStatus_, and streamRunStatus_.

Referenced by readNextEventForStream().

2263  {
2264  //TODO this will have to become per stream
2265  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2266  StreamContext streamContext(event.streamID(), &processContext_);
2267 
2268  SendSourceTerminationSignalIfException sentry(actReg_.get());
2269  input_->readEvent(event, streamContext);
2270 
2271  streamRunStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2272  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
2273  sentry.completedSuccessfully();
2274 
2275  FDEBUG(1) << "\treadEvent\n";
2276  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 939 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, findQualityFiles::size, streamLumiActive_, streamLumiStatus_, streamRunActive_, and streamRunStatus_.

939  {
940  FDEBUG(1) << " \treadFile\n";
941  size_t size = preg_->size();
942  SendSourceTerminationSignalIfException sentry(actReg_.get());
943 
944  if (streamRunActive_ > 0) {
945  streamRunStatus_[0]->runPrincipal()->preReadFile();
946  streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
947  }
948 
949  if (streamLumiActive_ > 0) {
950  streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
951  }
952 
953  fb_ = input_->readFile();
954  if (size < preg_->size()) {
956  }
959  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
960  }
961  sentry.completedSuccessfully();
962  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const >)
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::atomic< unsigned int > streamRunActive_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
PrincipalCache principalCache_

◆ readLuminosityBlock()

std::shared_ptr< LuminosityBlockPrincipal > edm::EventProcessor::readLuminosityBlock ( std::shared_ptr< RunPrincipal rp)

Definition at line 1977 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableLumiPrincipalPtr(), historyAppender_, input_, eostools::move(), and principalCache_.

Referenced by beginLumiAsync().

1977  {
1979  assert(lbp);
1980  lbp->setAux(*input_->luminosityBlockAuxiliary());
1981  {
1982  SendSourceTerminationSignalIfException sentry(actReg_.get());
1983  input_->readLuminosityBlock(*lbp, *historyAppender_);
1984  sentry.completedSuccessfully();
1985  }
1986  lbp->setRunPrincipal(std::move(rp));
1987  return lbp;
1988  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(be >=bs)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( WaitingTaskHolder const &  iTask,
unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iStatus 
)
private

Definition at line 2143 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::eventProcessingState(), firstItemAfterLumiMerge_, input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::LuminosityBlockProcessingStatus::kPauseForFileTransition, edm::LuminosityBlockProcessingStatus::kProcessing, edm::LuminosityBlockProcessingStatus::kStopLumi, lastSourceTransition_, lastTransitionType(), edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextTransitionType(), or, readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setEventProcessingState(), shouldWeStop(), sourceMutex_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by handleNextEventForStreamAsync().

2145  {
2146  // This function returns true if it successfully reads an event for the stream and that
2147  // requires both that an event is next and there are no problems or requests to stop.
2148 
2149  if (iTask.taskHasFailed()) {
2150  // We want all streams to stop or all streams to pause. If we are already in the
2151  // middle of pausing streams, then finish pausing all of them and the lumi will be
2152  // ended later. Otherwise, just end it now.
2153  if (iStatus.eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2155  }
2156  return false;
2157  }
2158 
2159  // Did another stream already stop or pause this lumi?
2160  if (iStatus.eventProcessingState() != LuminosityBlockProcessingStatus::EventProcessingState::kProcessing) {
2161  return false;
2162  }
2163 
2164  // Are output modules or the looper requesting we stop?
2165  if (shouldWeStop()) {
2168  return false;
2169  }
2170 
2172 
2173  // need to use lock in addition to the serial task queue because
2174  // of delayed provenance reading and reading data in response to
2175  // edm::Refs etc
2176  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
2177 
2178  // If we didn't already call nextTransitionType while merging lumis, call it here.
2179  // This asks the input source what is next and also checks for signals.
2180 
2182  firstItemAfterLumiMerge_ = false;
2183 
2184  if (InputSource::IsEvent != itemType) {
2185  // IsFile may continue processing the lumi and
2186  // looper_ can cause the input source to declare a new IsRun which is actually
2187  // just a continuation of the previous run
2188  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
2189  (InputSource::IsRun == itemType and
2190  (iStatus.lumiPrincipal()->run() != input_->run() or
2191  iStatus.lumiPrincipal()->runPrincipal().reducedProcessHistoryID() != input_->reducedProcessHistoryID()))) {
2193  } else {
2195  }
2196  return false;
2197  }
2198  readEvent(iStreamIndex);
2199  return true;
2200  }
void readEvent(unsigned int iStreamIndex)
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
ServiceToken serviceToken_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
bool shouldWeStop() const

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1945 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1945  {
1946  SendSourceTerminationSignalIfException sentry(actReg_.get());
1947  input_->readProcessBlock(processBlockPrincipal);
1948  sentry.completedSuccessfully();
1949  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::shared_ptr< RunPrincipal > edm::EventProcessor::readRun ( )

Definition at line 1951 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), edm::PrincipalCache::getAvailableRunPrincipalPtr(), historyAppender_, input_, and principalCache_.

Referenced by beginRunAsync().

1951  {
1953  assert(rp);
1954  rp->setAux(*input_->runAuxiliary());
1955  {
1956  SendSourceTerminationSignalIfException sentry(actReg_.get());
1957  input_->readRun(*rp, *historyAppender_);
1958  sentry.completedSuccessfully();
1959  }
1960  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1961  return rp;
1962  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ releaseBeginRunResources()

void edm::EventProcessor::releaseBeginRunResources ( unsigned int  iStream)

Definition at line 1400 of file EventProcessor.cc.

References queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), mps_update::status, streamQueues_, and streamRunStatus_.

Referenced by streamBeginRunAsync().

1400  {
1401  auto& status = streamRunStatus_[iStream];
1402  if (status->streamFinishedBeginRun()) {
1403  status->resetBeginResources();
1405  }
1406  streamQueues_[iStream].resume();
1407  }
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
bool resume()
Resumes processing if the queue was paused.
std::vector< edm::SerialTaskQueue > streamQueues_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 998 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

998  {
999  if (fileBlockValid()) {
1000  schedule_->respondToCloseInputFile(*fb_);
1001  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1002  }
1003  FDEBUG(1) << "\trespondToCloseInputFile\n";
1004  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 988 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

988  {
989  if (fileBlockValid()) {
991  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
992  schedule_->respondToOpenInputFile(*fb_);
993  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
994  }
995  FDEBUG(1) << "\trespondToOpenInputFile\n";
996  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1031 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1031  {
1032  input_->repeat();
1033  input_->rewind();
1034  FDEBUG(1) << "\trewind\n";
1035  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 375 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

375 { return runToCompletion(); }
StatusCode runToCompletion()

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 874 of file EventProcessor.cc.

References beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

874  {
875  beginJob(); //make sure this was called
876 
877  // make the services available
879 
880  try {
881  FilesProcessor fp(fileModeNoMerge_);
882 
883  convertException::wrap([&]() {
884  bool firstTime = true;
885  do {
886  if (not firstTime) {
888  rewindInput();
889  } else {
890  firstTime = false;
891  }
892  startingNewLoop();
893 
894  auto trans = fp.processFiles(*this);
895 
896  fp.normalEnd();
897 
898  if (deferredExceptionPtrIsSet_.load()) {
899  std::rethrow_exception(deferredExceptionPtr_);
900  }
901  if (trans != InputSource::IsStop) {
902  //problem with the source
903  doErrorStuff();
904 
905  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
906  }
907  } while (not endOfLoop());
908  }); // convertException::wrap
909 
910  } // Try block
911  catch (cms::Exception& e) {
913  std::string message(
914  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
915  e.addAdditionalInfo(message);
916  if (e.alreadyPrinted()) {
917  LogAbsolute("Additional Exceptions") << message;
918  }
919  }
920  if (exceptionMessageRuns_) {
921  std::string message(
922  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
923  e.addAdditionalInfo(message);
924  if (e.alreadyPrinted()) {
925  LogAbsolute("Additional Exceptions") << message;
926  }
927  }
928  if (!exceptionMessageFiles_.empty()) {
929  e.addAdditionalInfo(exceptionMessageFiles_);
930  if (e.alreadyPrinted()) {
931  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
932  }
933  }
934  throw;
935  }
936  return epSuccess;
937  }
std::atomic< bool > exceptionMessageLumis_
std::atomic< bool > exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2359 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

2359  {
2360  bool expected = false;
2361  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2362  deferredExceptionPtr_ = iException;
2363  return true;
2364  }
2365  return false;
2366  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2353 of file EventProcessor.cc.

References exceptionMessageFiles_.

2353 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2357 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2357 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( )

Definition at line 2355 of file EventProcessor.cc.

References exceptionMessageRuns_.

Referenced by handleEndRunExceptions().

2355 { exceptionMessageRuns_ = true; }
std::atomic< bool > exceptionMessageRuns_

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1042 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1042  {
1043  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1044  if (!subProcesses_.empty()) {
1045  for (auto const& subProcess : subProcesses_) {
1046  if (subProcess.shouldWeCloseOutput()) {
1047  return true;
1048  }
1049  }
1050  return false;
1051  }
1052  return schedule_->shouldWeCloseOutput();
1053  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2338 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2338  {
2339  FDEBUG(1) << "\tshouldWeStop\n";
2340  if (shouldWeStop_)
2341  return true;
2342  if (!subProcesses_.empty()) {
2343  for (auto const& subProcess : subProcesses_) {
2344  if (subProcess.terminate()) {
2345  return true;
2346  }
2347  }
2348  return false;
2349  }
2350  return schedule_->terminate();
2351  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 1006 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

1006  {
1007  shouldWeStop_ = false;
1008  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1009  // until after we've called beginOfJob
1010  if (looper_ && looperBeginJobRun_) {
1011  looper_->doStartingNewLoop();
1012  }
1013  FDEBUG(1) << "\tstartingNewLoop\n";
1014  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamBeginRunAsync()

void edm::EventProcessor::streamBeginRunAsync ( unsigned int  iStream,
std::shared_ptr< RunProcessingStatus status,
bool  precedingTasksSucceeded,
WaitingTaskHolder  iHolder 
)

Definition at line 1368 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, edm::RunProcessingStatus::eventSetupImpl(), edm::RunProcessingStatus::eventSetupImpls(), first, eostools::move(), releaseBeginRunResources(), edm::waiting_task::chain::runLast(), edm::RunProcessingStatus::runPrincipal(), schedule_, serviceToken_, mps_update::status, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by beginRunAsync().

1371  {
1372  // These shouldn't throw
1373  streamQueues_[iStream].pause();
1374  ++streamRunActive_;
1375  streamRunStatus_[iStream] = std::move(status);
1376 
1377  CMS_SA_ALLOW try {
1378  using namespace edm::waiting_task::chain;
1379  chain::first([this, iStream, precedingTasksSucceeded](auto nextTask) {
1380  if (precedingTasksSucceeded) {
1381  RunProcessingStatus& rs = *streamRunStatus_[iStream];
1382  RunTransitionInfo transitionInfo(
1383  *rs.runPrincipal(), rs.eventSetupImpl(esp_->subProcessIndex()), &rs.eventSetupImpls());
1384  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1385  beginStreamTransitionAsync<Traits>(
1386  std::move(nextTask), *schedule_, iStream, transitionInfo, serviceToken_, subProcesses_);
1387  }
1388  }) | then([this, iStream](std::exception_ptr const* exceptionFromBeginStreamRun, auto nextTask) {
1389  if (exceptionFromBeginStreamRun) {
1390  nextTask.doneWaiting(*exceptionFromBeginStreamRun);
1391  }
1392  releaseBeginRunResources(iStream);
1393  }) | runLast(iHolder);
1394  } catch (...) {
1395  releaseBeginRunResources(iStream);
1396  iHolder.doneWaiting(std::current_exception());
1397  }
1398  }
#define CMS_SA_ALLOW
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
void releaseBeginRunResources(unsigned int iStream)
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1885 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, submitPVValidationJobs::t, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endUnfinishedLumi(), and handleNextEventForStreamAsync().

1885  {
1886  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iException) mutable {
1887  auto status = streamLumiStatus_[iStreamIndex];
1888  if (iException) {
1889  handleEndLumiExceptions(*iException, iTask);
1890  }
1891 
1892  // reset status before releasing queue else get race condition
1893  streamLumiStatus_[iStreamIndex].reset();
1895  streamQueues_[iStreamIndex].resume();
1896 
1897  //are we the last one?
1898  if (status->streamFinishedLumi()) {
1900  }
1901  });
1902 
1903  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1904 
1905  // Need to be sure the lumi status is released before lumiDoneTask can every be called.
1906  // therefore we do not want to hold the shared_ptr
1907  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1908  lumiStatus->setEndTime();
1909 
1910  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1911  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1912  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1913 
1914  if (lumiStatus->didGlobalBeginSucceed()) {
1915  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1916  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1917  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1918  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1919  *schedule_,
1920  iStreamIndex,
1921  transitionInfo,
1922  serviceToken_,
1923  subProcesses_,
1924  cleaningUpAfterException);
1925  }
1926  }
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
bool taskHasFailed() const noexcept
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ streamEndRunAsync()

void edm::EventProcessor::streamEndRunAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1553 of file EventProcessor.cc.

References CMS_SA_ALLOW, esp_, exceptionRunStatus_, edm::WaitingTaskHolder::group(), handleEndRunExceptions(), edm::make_waiting_task(), eostools::move(), schedule_, serviceToken_, streamQueues_, streamRunActive_, streamRunStatus_, subProcesses_, and edm::WaitingTaskHolder::taskHasFailed().

Referenced by endRunAsync().

1553  {
1554  CMS_SA_ALLOW try {
1555  if (!streamRunStatus_[iStreamIndex]) {
1556  if (exceptionRunStatus_->streamFinishedRun()) {
1557  exceptionRunStatus_->globalEndRunHolder().doneWaiting(std::exception_ptr());
1558  exceptionRunStatus_.reset();
1559  }
1560  return;
1561  }
1562 
1563  auto runDoneTask =
1564  edm::make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iException) mutable {
1565  if (iException) {
1566  handleEndRunExceptions(*iException, iTask);
1567  }
1568 
1569  auto runStatus = streamRunStatus_[iStreamIndex];
1570 
1571  //reset status before releasing queue else get race condition
1572  if (runStatus->streamFinishedRun()) {
1573  runStatus->globalEndRunHolder().doneWaiting(std::exception_ptr());
1574  }
1575  streamRunStatus_[iStreamIndex].reset();
1576  --streamRunActive_;
1577  streamQueues_[iStreamIndex].resume();
1578  });
1579 
1580  WaitingTaskHolder runDoneTaskHolder{*iTask.group(), runDoneTask};
1581 
1582  auto runStatus = streamRunStatus_[iStreamIndex].get();
1583 
1584  if (runStatus->didGlobalBeginSucceed() && runStatus->endingEventSetupSucceeded()) {
1585  EventSetupImpl const& es = runStatus->eventSetupImplEndRun(esp_->subProcessIndex());
1586  auto eventSetupImpls = &runStatus->eventSetupImplsEndRun();
1587  bool cleaningUpAfterException = runStatus->cleaningUpAfterException() || iTask.taskHasFailed();
1588 
1589  auto& runPrincipal = *runStatus->runPrincipal();
1590  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1591  RunTransitionInfo transitionInfo(runPrincipal, es, eventSetupImpls);
1592  endStreamTransitionAsync<Traits>(std::move(runDoneTaskHolder),
1593  *schedule_,
1594  iStreamIndex,
1595  transitionInfo,
1596  serviceToken_,
1597  subProcesses_,
1598  cleaningUpAfterException);
1599  }
1600  } catch (...) {
1601  handleEndRunExceptions(std::current_exception(), iTask);
1602  }
1603  }
#define CMS_SA_ALLOW
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
std::atomic< unsigned int > streamRunActive_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 619 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

619  {
622  task.waitNoThrow();
623  assert(task.done());
624  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 284 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

284  {
286  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 287 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

287  {
289  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2368 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2368  {
2369  cms::Exception ex("ModulesSynchingOnLumis");
2370  ex << "The framework is configured to use at least two streams, but the following modules\n"
2371  << "require synchronizing on LuminosityBlock boundaries:";
2372  bool found = false;
2373  for (auto worker : schedule_->allWorkers()) {
2374  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2375  found = true;
2376  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2377  }
2378  }
2379  if (found) {
2380  ex << "\n\nThe situation can be fixed by either\n"
2381  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2382  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2383  throw ex;
2384  }
2385  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 830 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

830 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 834 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

834 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 832 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

832 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutLegacyModules()

void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2400 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2400  {
2401  std::unique_ptr<LogSystem> s;
2402  for (auto worker : schedule_->allWorkers()) {
2403  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2404  if (not s) {
2405  s = std::make_unique<LogSystem>("LegacyModules");
2406  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2407  "is going to end soon. These modules need to be converted to have type\n"
2408  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2409  }
2410  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2411  }
2412  }
2413  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutModulesRequiringRunSynchronization()

void edm::EventProcessor::warnAboutModulesRequiringRunSynchronization ( ) const
private

Definition at line 2387 of file EventProcessor.cc.

References alignCSCRings::s, and schedule_.

Referenced by beginJob().

2387  {
2388  std::unique_ptr<LogSystem> s;
2389  for (auto worker : schedule_->allWorkers()) {
2390  if (worker->wantsGlobalRuns() and worker->globalRunsQueue()) {
2391  if (not s) {
2392  s = std::make_unique<LogSystem>("ModulesSynchingOnRuns");
2393  (*s) << "The following modules require synchronizing on Run boundaries:";
2394  }
2395  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2396  }
2397  }
2398  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 2045 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

2045  {
2046  using namespace edm::waiting_task;
2047  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
2048  chain::first([&](auto nextTask) {
2050 
2051  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
2052  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
2053  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
2055  for (auto& s : subProcesses_) {
2056  s.writeLumiAsync(nextTask, lumiPrincipal);
2057  }
2059  }
2060  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 2006 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

2006  {
2007  using namespace edm::waiting_task;
2008  chain::first([&](auto nextTask) {
2010  schedule_->writeProcessBlockAsync(
2011  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
2012  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
2014  for (auto& s : subProcesses_) {
2015  s.writeProcessBlockAsync(nextTask, processBlockType);
2016  }
2017  }) | chain::runLast(std::move(task));
2018  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
RunPrincipal const &  runPrincipal,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 2020 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), edm::RunPrincipal::kNo, eostools::move(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, edm::RunPrincipal::shouldWriteRun(), subProcesses_, and TrackValidation_cff::task.

Referenced by globalEndRunAsync().

2022  {
2023  using namespace edm::waiting_task;
2024  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
2025  chain::first([&](auto nextTask) {
2027  schedule_->writeRunAsync(nextTask, runPrincipal, &processContext_, actReg_.get(), mergeableRunProductMetadata);
2028  }) | chain::ifThen(not subProcesses_.empty(), [this, &runPrincipal, mergeableRunProductMetadata](auto nextTask) {
2030  for (auto& s : subProcesses_) {
2031  s.writeRunAsync(nextTask, runPrincipal, mergeableRunProductMetadata);
2032  }
2033  }) | chain::runLast(std::move(task));
2034  }
2035  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 317 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 350 of file EventProcessor.h.

Referenced by beginJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 333 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 308 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 345 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 344 of file EventProcessor.h.

Referenced by runToCompletion(), and setDeferredException().

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 369 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 366 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 353 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 355 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::atomic<bool> edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 354 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ exceptionRunStatus_

std::shared_ptr<RunProcessingStatus> edm::EventProcessor::exceptionRunStatus_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginRunAsync(), and streamEndRunAsync().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 352 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 362 of file EventProcessor.h.

◆ firstItemAfterLumiMerge_

bool edm::EventProcessor::firstItemAfterLumiMerge_ = true
private

Definition at line 370 of file EventProcessor.h.

Referenced by beginLumiAsync(), continueLumiAsync(), and readNextEventForStream().

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 358 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 356 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 338 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_ = InputSource::IsInvalid
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 357 of file EventProcessor.h.

Referenced by beginRunAsync(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 321 of file EventProcessor.h.

Referenced by init().

◆ modulesToIgnoreForDeleteEarly_

std::vector<std::string> edm::EventProcessor::modulesToIgnoreForDeleteEarly_
private

Definition at line 335 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 320 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 307 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), readAndMergeRun(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 368 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 309 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 318 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), and processConfiguration().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

◆ referencesToBranches_

std::multimap<std::string, std::string> edm::EventProcessor::referencesToBranches_
private

Definition at line 334 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ runQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::runQueue_
private

Definition at line 325 of file EventProcessor.h.

Referenced by beginRunAsync(), and init().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 351 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

◆ streamQueuesInserter_

SerialTaskQueue edm::EventProcessor::streamQueuesInserter_
private

Definition at line 324 of file EventProcessor.h.

Referenced by beginLumiAsync(), beginRunAsync(), and endRunAsync().

◆ streamRunActive_

std::atomic<unsigned int> edm::EventProcessor::streamRunActive_ {0}
private

◆ streamRunStatus_

std::vector<std::shared_ptr<RunProcessingStatus> > edm::EventProcessor::streamRunStatus_
private

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 310 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().