CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair< edm::ProcessHistoryID, edm::RunNumber_tnextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID, RunNumber_treadAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::pair< ProcessHistoryID, RunNumber_treadRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns ()
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::atomic< bool > exceptionMessageRuns_
 
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
std::unique_ptr< edm::LimitedTaskQueuelumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Detailed Description

Definition at line 66 of file EventProcessor.h.

Member Typedef Documentation

◆ ExcludedData

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 363 of file EventProcessor.h.

◆ ExcludedDataMap

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 364 of file EventProcessor.h.

◆ ProcessBlockType

Definition at line 243 of file EventProcessor.h.

Member Enumeration Documentation

◆ StatusCode

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 76 of file EventProcessor.h.

Constructor & Destructor Documentation

◆ EventProcessor() [1/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 236 of file EventProcessor.cc.

References init(), eostools::move(), and edm::parameterSet().

241  : actReg_(),
242  preg_(),
244  serviceToken_(),
245  input_(),
246  espController_(new eventsetup::EventSetupsController),
247  esp_(),
248  act_table_(),
250  schedule_(),
251  subProcesses_(),
252  historyAppender_(new HistoryAppender),
253  fb_(),
254  looper_(),
256  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
257  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
258  principalCache_(),
259  beginJobCalled_(false),
260  shouldWeStop_(false),
261  fileModeNoMerge_(false),
263  exceptionMessageRuns_(false),
264  exceptionMessageLumis_(false),
265  forceLooperToEnd_(false),
266  looperBeginJobRun_(false),
269  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
270  processDesc->addServices(defaultServices, forcedServices);
271  init(processDesc, iToken, iLegacy);
272  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [2/4]

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 274 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, eostools::move(), and edm::parameterSet().

277  : actReg_(),
278  preg_(),
280  serviceToken_(),
281  input_(),
282  espController_(new eventsetup::EventSetupsController),
283  esp_(),
284  act_table_(),
286  schedule_(),
287  subProcesses_(),
288  historyAppender_(new HistoryAppender),
289  fb_(),
290  looper_(),
292  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
293  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
294  principalCache_(),
295  beginJobCalled_(false),
296  shouldWeStop_(false),
297  fileModeNoMerge_(false),
299  exceptionMessageRuns_(false),
300  exceptionMessageLumis_(false),
301  forceLooperToEnd_(false),
302  looperBeginJobRun_(false),
305  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
306  processDesc->addServices(defaultServices, forcedServices);
308  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ EventProcessor() [3/4]

edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 310 of file EventProcessor.cc.

References init(), and unpackBuffers-CaloStage2::token.

313  : actReg_(),
314  preg_(),
316  serviceToken_(),
317  input_(),
318  espController_(new eventsetup::EventSetupsController),
319  esp_(),
320  act_table_(),
322  schedule_(),
323  subProcesses_(),
324  historyAppender_(new HistoryAppender),
325  fb_(),
326  looper_(),
328  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
329  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
330  principalCache_(),
331  beginJobCalled_(false),
332  shouldWeStop_(false),
333  fileModeNoMerge_(false),
335  exceptionMessageRuns_(false),
336  exceptionMessageLumis_(false),
337  forceLooperToEnd_(false),
338  looperBeginJobRun_(false),
341  init(processDesc, token, legacy);
342  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_

◆ ~EventProcessor()

edm::EventProcessor::~EventProcessor ( )

Definition at line 612 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

612  {
613  // Make the services available while everything is being deleted.
616 
617  // manually destroy all these thing that may need the services around
618  // propagate_const<T> has no reset() function
619  espController_ = nullptr;
620  esp_ = nullptr;
621  schedule_ = nullptr;
622  input_ = nullptr;
623  looper_ = nullptr;
624  actReg_ = nullptr;
625 
628  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12

◆ EventProcessor() [4/4]

edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

◆ beginJob()

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 637 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, edmLumisInFiles::description, esp_, espController_, edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, cmsLHEtoEOSManager::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, MatrixUtil::merge(), eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, edm::swap(), std::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), and edm::convertException::wrap().

Referenced by runToCompletion().

637  {
638  if (beginJobCalled_)
639  return;
640  beginJobCalled_ = true;
641  bk::beginJob();
642 
643  // StateSentry toerror(this); // should we add this ?
644  //make the services available
646 
647  service::SystemBounds bounds(preallocations_.numberOfStreams(),
651  actReg_->preallocateSignal_(bounds);
652  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
654 
655  std::vector<ModuleProcessName> consumedBySubProcesses;
657  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
658  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
659  if (consumedBySubProcesses.empty()) {
660  consumedBySubProcesses = std::move(c);
661  } else if (not c.empty()) {
662  std::vector<ModuleProcessName> tmp;
663  tmp.reserve(consumedBySubProcesses.size() + c.size());
664  std::merge(consumedBySubProcesses.begin(),
665  consumedBySubProcesses.end(),
666  c.begin(),
667  c.end(),
668  std::back_inserter(tmp));
669  std::swap(consumedBySubProcesses, tmp);
670  }
671  });
672 
673  // Note: all these may throw
676  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
677  not unusedModules.empty()) {
679 
680  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
681  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
682  "and "
683  "therefore they are deleted before beginJob transition.";
684  for (auto const& description : unusedModules) {
685  l << "\n " << description->moduleLabel();
686  }
687  });
688  for (auto const& description : unusedModules) {
689  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
690  }
691  }
692  }
693  // Initialize after the deletion of non-consumed unscheduled
694  // modules to avoid non-consumed non-run modules to keep the
695  // products unnecessarily alive
696  if (not branchesToDeleteEarly_.empty()) {
697  schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
699  }
700 
701  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
702 
705  }
707 
708  //NOTE: This implementation assumes 'Job' means one call
709  // the EventProcessor::run
710  // If it really means once per 'application' then this code will
711  // have to be changed.
712  // Also have to deal with case where have 'run' then new Module
713  // added and do 'run'
714  // again. In that case the newly added Module needs its 'beginJob'
715  // to be called.
716 
717  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
718  // For now we delay calling beginOfJob until first beginOfRun
719  //if(looper_) {
720  // looper_->beginOfJob(es);
721  //}
722  try {
723  convertException::wrap([&]() { input_->doBeginJob(); });
724  } catch (cms::Exception& ex) {
725  ex.addContext("Calling beginJob for the source");
726  throw;
727  }
728  espController_->finishConfiguration();
729  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
730  if (looper_) {
731  constexpr bool mustPrefetchMayGet = true;
732  auto const processBlockLookup = preg_->productLookup(InProcess);
733  auto const runLookup = preg_->productLookup(InRun);
734  auto const lumiLookup = preg_->productLookup(InLumi);
735  auto const eventLookup = preg_->productLookup(InEvent);
736  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
737  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
738  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
739  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
740  looper_->updateLookup(esp_->recordsToProxyIndices());
741  }
742  // toerror.succeeded(); // should we add this?
743  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
744  actReg_->postBeginJobSignal_();
745 
746  oneapi::tbb::task_group group;
748  using namespace edm::waiting_task::chain;
749  first([this](auto nextTask) {
750  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
751  first([i, this](auto nextTask) {
753  schedule_->beginStream(i);
754  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
756  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
757  }) | lastTask(nextTask);
758  }
760  last.wait();
761  }
ProcessContext processContext_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
void removeModules(std::vector< ModuleDescription const *> const &modules)
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
def move(src, dest)
Definition: eostools.py:511
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:205

◆ beginLumiAsync()

void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1325 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::signalslot::Signal< T >::emit(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), first, watchdog::group, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::ActivityRegistry::preESSyncIOVSignal_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), createBeamHaloJobs::queue, queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, TrackValidation_cff::task, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

1327  {
1328  if (iHolder.taskHasFailed()) {
1329  return;
1330  }
1331 
1332  actReg_->esSyncIOVQueuingSignal_.emit(iSync);
1333  // We must be careful with the status object here and in code this function calls. IF we want
1334  // endRun to be called, then we must call resetResources before the things waiting on
1335  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1336  // endRun to be called much later than it should be, because status is holding iRunResource).
1337  // Note that this must be done explicitly. Relying on the destructor does not work well
1338  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1339  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1340  // destroyed inside this function and lumiWork.
1341  auto status =
1342  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1343  chain::first([&](auto nextTask) {
1344  auto asyncEventSetup = [](ActivityRegistry* actReg,
1345  auto* espController,
1346  auto& queue,
1348  auto& status,
1349  IOVSyncValue const& iSync) {
1350  queue.pause();
1351  CMS_SA_ALLOW try {
1352  SendSourceTerminationSignalIfException sentry(actReg);
1353  // Pass in iSync to let the EventSetup system know which run and lumi
1354  // need to be processed and prepare IOVs for it.
1355  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1356  // lumi is done and no longer needs its EventSetup IOVs.
1357  actReg->preESSyncIOVSignal_.emit(iSync);
1358  espController->eventSetupForInstanceAsync(
1359  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1360  sentry.completedSuccessfully();
1361  } catch (...) {
1362  task.doneWaiting(std::current_exception());
1363  }
1364  };
1365  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1366  // We only get here inside this block if there is an EventSetup
1367  // module not able to handle concurrent IOVs (usually an ESSource)
1368  // and the new sync value is outside the current IOV of that module.
1369  auto group = nextTask.group();
1371  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1372  asyncEventSetup(
1374  });
1375  } else {
1376  asyncEventSetup(
1377  actReg_.get(), espController_.get(), queueWhichWaitsForIOVsToFinish_, std::move(nextTask), status, iSync);
1378  }
1379  }) | chain::then([this, status, iSync](std::exception_ptr const* iPtr, auto nextTask) {
1380  actReg_->postESSyncIOVSignal_.emit(iSync);
1381  //the call to doneWaiting will cause the count to decrement
1382  auto copyTask = nextTask;
1383  if (iPtr) {
1384  nextTask.doneWaiting(*iPtr);
1385  }
1386  auto group = copyTask.group();
1387  lumiQueue_->pushAndPause(
1388  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1389  if (task.taskHasFailed()) {
1390  status->resetResources();
1391  return;
1392  }
1393 
1394  status->setResumer(std::move(iResumer));
1395 
1396  auto group = task.group();
1398  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1399  //make the services available
1401  // Caught exception is propagated via WaitingTaskHolder
1402  CMS_SA_ALLOW try {
1404 
1405  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1406  {
1407  SendSourceTerminationSignalIfException sentry(actReg_.get());
1408 
1409  input_->doBeginLumi(lumiPrincipal, &processContext_);
1410  sentry.completedSuccessfully();
1411  }
1412 
1413  Service<RandomNumberGenerator> rng;
1414  if (rng.isAvailable()) {
1415  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1416  rng->preBeginLumi(lb);
1417  }
1418 
1419  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1420 
1421  using namespace edm::waiting_task::chain;
1422  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1423  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1424  {
1425  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1426  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1427  beginGlobalTransitionAsync<Traits>(
1428  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1429  }
1430  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1431  looper_->prefetchAsync(
1432  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1433  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1434  status->globalBeginDidSucceed();
1435  //make the services available
1436  ServiceRegistry::Operate operateLooper(serviceToken_);
1437  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1438  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1439  if (iPtr) {
1440  status->resetResources();
1441  holder.doneWaiting(*iPtr);
1442  } else {
1443  if (not looper_) {
1444  status->globalBeginDidSucceed();
1445  }
1446  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1447  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1448 
1449  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1450  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1451  streamQueues_[i].pause();
1452 
1453  auto& event = principalCache_.eventPrincipal(i);
1454  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1455  // held by the container as this lambda may not finish executing before all the tasks it
1456  // spawns have already started to run.
1457  auto eventSetupImpls = &status->eventSetupImpls();
1458  auto lp = status->lumiPrincipal().get();
1461  event.setLuminosityBlockPrincipal(lp);
1462  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1463  using namespace edm::waiting_task::chain;
1464  chain::first([this, i, &transitionInfo](auto nextTask) {
1465  beginStreamTransitionAsync<Traits>(
1466  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1467  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1468  if (exceptionFromBeginStreamLumi) {
1469  WaitingTaskHolder tmp(nextTask);
1470  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1471  streamEndLumiAsync(nextTask, i);
1472  } else {
1474  }
1475  }) | runLast(holder);
1476  });
1477  }
1478  }
1479  }) | runLast(postQueueTask);
1480 
1481  } catch (...) {
1482  status->resetResources();
1483  postQueueTask.doneWaiting(std::current_exception());
1484  }
1485  }); // task in sourceResourcesAcquirer
1486  });
1487  }) | chain::runLast(std::move(iHolder));
1488  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
SerialTaskQueueChain & serialQueueChain() const
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
tmp
align.sh
Definition: createJobs.py:716
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ beginProcessBlock()

void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1074 of file EventProcessor.cc.

References edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1074  {
1075  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1076  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1077 
1078  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1079  FinalWaitingTask globalWaitTask{taskGroup_};
1080 
1081  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1082  beginGlobalTransitionAsync<Traits>(
1083  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1084 
1085  globalWaitTask.wait();
1086  beginProcessBlockSucceeded = true;
1087  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ beginRun()

void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 1142 of file EventProcessor.cc.

References actReg_, edm::BeginRun, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, first, forceESCacheClearOnNewRun_, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), taskGroup_, and edm::waiting_task::chain::then().

1145  {
1146  globalBeginSucceeded = false;
1147  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1148  {
1149  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150 
1151  input_->doBeginRun(runPrincipal, &processContext_);
1152  sentry.completedSuccessfully();
1153  }
1154 
1155  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1157  espController_->forceCacheClear();
1158  }
1159  {
1160  SendSourceTerminationSignalIfException sentry(actReg_.get());
1161  actReg_->preESSyncIOVSignal_.emit(ts);
1163  actReg_->postESSyncIOVSignal_.emit(ts);
1164  eventSetupForInstanceSucceeded = true;
1165  sentry.completedSuccessfully();
1166  }
1167  auto const& es = esp_->eventSetupImpl();
1168  if (looper_ && looperBeginJobRun_ == false) {
1169  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1170 
1171  FinalWaitingTask waitTask{taskGroup_};
1172  using namespace edm::waiting_task::chain;
1173  chain::first([this, &es](auto nextTask) {
1174  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1175  }) | then([this, &es](auto nextTask) mutable {
1176  looper_->beginOfJob(es);
1177  looperBeginJobRun_ = true;
1178  looper_->doStartingNewLoop();
1179  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1180 
1181  waitTask.wait();
1182  }
1183  {
1184  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1185  FinalWaitingTask globalWaitTask{taskGroup_};
1186 
1187  using namespace edm::waiting_task::chain;
1188  chain::first([&runPrincipal, &es, this](auto waitTask) {
1189  RunTransitionInfo transitionInfo(runPrincipal, es);
1190  beginGlobalTransitionAsync<Traits>(
1191  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1192  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1193  globalBeginSucceeded = true;
1194  FDEBUG(1) << "\tbeginRun " << run << "\n";
1195  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1196  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1197  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1198  looper_->doBeginRun(runPrincipal, es, &processContext_);
1199  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1200 
1201  globalWaitTask.wait();
1202  }
1203  {
1204  //To wait, the ref count has to be 1+#streams
1205  FinalWaitingTask streamLoopWaitTask{taskGroup_};
1206 
1207  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1208 
1209  RunTransitionInfo transitionInfo(runPrincipal, es);
1210  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1211  *schedule_,
1213  transitionInfo,
1214  serviceToken_,
1215  subProcesses_);
1216  streamLoopWaitTask.wait();
1217  }
1218  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1219  if (looper_) {
1220  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1221  }
1222  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ branchIDListHelper() [1/2]

std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 287 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

287  {
289  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ branchIDListHelper() [2/2]

std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 290 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ checkForAsyncStopRequest()

bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 848 of file EventProcessor.cc.

References epSignal, runEdmFileComparison::returnCode, and edm::shutdown_flag.

Referenced by nextTransitionType().

848  {
849  bool returnValue = false;
850 
851  // Look for a shutdown signal
852  if (shutdown_flag.load(std::memory_order_acquire)) {
853  returnValue = true;
855  }
856  return returnValue;
857  }
volatile std::atomic< bool > shutdown_flag

◆ clearCounters()

void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 842 of file EventProcessor.cc.

References schedule_.

842 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ closeInputFile()

void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 974 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

974  {
975  if (fileBlockValid()) {
976  SendSourceTerminationSignalIfException sentry(actReg_.get());
977  input_->closeFile(fb_.get(), cleaningUpAfterException);
978  sentry.completedSuccessfully();
979  }
980  FDEBUG(1) << "\tcloseInputFile\n";
981  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_

◆ closeOutputFiles()

void edm::EventProcessor::closeOutputFiles ( )

Definition at line 991 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

991  {
992  schedule_->closeOutputFiles();
993  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
994  processBlockHelper_->clearAfterOutputFilesClose();
995  FDEBUG(1) << "\tcloseOutputFiles\n";
996  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_

◆ continueLumiAsync()

void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1490 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), h, handleNextEventForStreamAsync(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1490  {
1491  {
1492  //all streams are sharing the same status at the moment
1493  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1494  status->needToContinueLumi();
1495  status->startProcessingEvents();
1496  }
1497 
1498  unsigned int streamIndex = 0;
1499  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1500  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1501  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1502  }
1503  iHolder.group()->run(
1504  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1505  }
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
oneapi::tbb::task_group * group() const noexcept
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
def move(src, dest)
Definition: eostools.py:511

◆ deleteLumiFromCache()

void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1783 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::kUninitialized, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1783  {
1784  for (auto& s : subProcesses_) {
1785  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1786  }
1787  iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
1788  iStatus.lumiPrincipal()->clearPrincipal();
1789  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1790  }
std::vector< SubProcess > subProcesses_

◆ deleteRunFromCache()

void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1760 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, run(), and subProcesses_.

Referenced by endUnfinishedRun().

1760  {
1761  principalCache_.deleteRun(phid, run);
1762  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1763  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1764  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_

◆ doErrorStuff()

void edm::EventProcessor::doErrorStuff ( )

Definition at line 1065 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1065  {
1066  FDEBUG(1) << "\tdoErrorStuff\n";
1067  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1068  << "and went to the error state\n"
1069  << "Will attempt to terminate processing normally\n"
1070  << "(IF using the looper the next loop will be attempted)\n"
1071  << "This likely indicates a bug in an input module or corrupted input or both\n";
1072  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ endJob()

void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 763 of file EventProcessor.cc.

References actReg_, c, edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, edm::first(), watchdog::group, mps_fire::i, input_, cmsLHEtoEOSManager::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

763  {
764  // Collects exceptions, so we don't throw before all operations are performed.
765  ExceptionCollector c(
766  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
767 
768  //make the services available
770 
771  using namespace edm::waiting_task::chain;
772 
773  oneapi::tbb::task_group group;
774  edm::FinalWaitingTask waitTask{group};
775 
776  {
777  //handle endStream transitions
778  edm::WaitingTaskHolder taskHolder(group, &waitTask);
779  std::mutex collectorMutex;
780  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
781  first([this, i, &c, &collectorMutex](auto nextTask) {
782  std::exception_ptr ep;
783  try {
785  this->schedule_->endStream(i);
786  } catch (...) {
787  ep = std::current_exception();
788  }
789  if (ep) {
790  std::lock_guard<std::mutex> l(collectorMutex);
791  c.call([&ep]() { std::rethrow_exception(ep); });
792  }
793  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
794  for (auto& subProcess : subProcesses_) {
795  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
796  std::exception_ptr ep;
797  try {
799  subProcess.doEndStream(i);
800  } catch (...) {
801  ep = std::current_exception();
802  }
803  if (ep) {
804  std::lock_guard<std::mutex> l(collectorMutex);
805  c.call([&ep]() { std::rethrow_exception(ep); });
806  }
807  }) | lastTask(nextTask);
808  }
809  }) | lastTask(taskHolder);
810  }
811  }
812  waitTask.waitNoThrow();
813 
814  auto actReg = actReg_.get();
815  c.call([actReg]() { actReg->preEndJobSignal_(); });
816  schedule_->endJob(c);
817  for (auto& subProcess : subProcesses_) {
818  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
819  }
820  c.call(std::bind(&InputSource::doEndJob, input_.get()));
821  if (looper_) {
822  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
823  }
824  c.call([actReg]() { actReg->postEndJobSignal_(); });
825  if (c.hasThrown()) {
826  c.rethrow();
827  }
828  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
static std::mutex mutex
Definition: Proxy.cc:8
std::shared_ptr< EDLooperBase const > looper() const
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)

◆ endOfLoop()

bool edm::EventProcessor::endOfLoop ( )

Definition at line 1026 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

1026  {
1027  if (looper_) {
1028  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
1029  looper_->setModuleChanger(&changer);
1030  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
1031  looper_->setModuleChanger(nullptr);
1033  return true;
1034  else
1035  return false;
1036  }
1037  FDEBUG(1) << "\tendOfLoop\n";
1038  return true;
1039  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ endProcessBlock()

void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1115 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1115  {
1116  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1117 
1118  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1119  FinalWaitingTask globalWaitTask{taskGroup_};
1120 
1121  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1122  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1123  *schedule_,
1124  transitionInfo,
1125  serviceToken_,
1126  subProcesses_,
1127  cleaningUpAfterException);
1128  globalWaitTask.wait();
1129 
1130  if (beginProcessBlockSucceeded) {
1131  FinalWaitingTask writeWaitTask{taskGroup_};
1133  writeWaitTask.wait();
1134  }
1135 
1136  processBlockPrincipal.clearPrincipal();
1137  for (auto& s : subProcesses_) {
1138  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1139  }
1140  }
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ endRun()

void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1251 of file EventProcessor.cc.

References actReg_, edm::EndRun, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, first, edm::waiting_task::chain::ifThen(), input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), and taskGroup_.

Referenced by endUnfinishedRun().

1254  {
1255  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1256  runPrincipal.setEndTime(input_->timestamp());
1257 
1258  IOVSyncValue ts(
1259  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1260  runPrincipal.endTime());
1261  {
1262  SendSourceTerminationSignalIfException sentry(actReg_.get());
1263  actReg_->preESSyncIOVSignal_.emit(ts);
1265  actReg_->postESSyncIOVSignal_.emit(ts);
1266  sentry.completedSuccessfully();
1267  }
1268  auto const& es = esp_->eventSetupImpl();
1269  if (globalBeginSucceeded) {
1270  //To wait, the ref count has to be 1+#streams
1271  FinalWaitingTask streamLoopWaitTask{taskGroup_};
1272 
1273  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1274 
1275  RunTransitionInfo transitionInfo(runPrincipal, es);
1276  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1277  *schedule_,
1279  transitionInfo,
1280  serviceToken_,
1281  subProcesses_,
1282  cleaningUpAfterException);
1283  streamLoopWaitTask.wait();
1284  }
1285  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1286  if (looper_) {
1287  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1288  }
1289  {
1290  FinalWaitingTask globalWaitTask{taskGroup_};
1291 
1292  using namespace edm::waiting_task::chain;
1293  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1294  RunTransitionInfo transitionInfo(runPrincipal, es);
1295  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1296  endGlobalTransitionAsync<Traits>(
1297  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1298  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1299  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1300  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1301  looper_->doEndRun(runPrincipal, es, &processContext_);
1302  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1303 
1304  globalWaitTask.wait();
1305  }
1306  FDEBUG(1) << "\tendRun " << run << "\n";
1307  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ endUnfinishedLumi()

void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1634 of file EventProcessor.cc.

References mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, and taskGroup_.

1634  {
1635  if (streamLumiActive_.load() > 0) {
1636  FinalWaitingTask globalWaitTask{taskGroup_};
1637  {
1638  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1639  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1640  if (streamLumiStatus_[i]) {
1641  streamEndLumiAsync(globalTaskHolder, i);
1642  }
1643  }
1644  }
1645  globalWaitTask.wait();
1646  }
1647  }
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::atomic< unsigned int > streamLumiActive_

◆ endUnfinishedRun()

void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1224 of file EventProcessor.cc.

References deleteRunFromCache(), endRun(), edm::RunPrincipal::kNo, edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), edm::RunPrincipal::shouldWriteRun(), submitPVValidationJobs::t, taskGroup_, and writeRunAsync().

1228  {
1229  if (eventSetupForInstanceSucceeded) {
1230  //If we skip empty runs, this would be called conditionally
1231  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1232 
1233  if (globalBeginSucceeded) {
1234  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1235  if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
1237  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1238  mergeableRunProductMetadata->preWriteRun();
1239  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1240  auto exceptn = t.waitNoThrow();
1241  mergeableRunProductMetadata->postWriteRun();
1242  if (exceptn) {
1243  std::rethrow_exception(exceptn);
1244  }
1245  }
1246  }
1247  }
1248  deleteRunFromCache(phid, run);
1249  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_

◆ fileBlockValid()

bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 196 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

196 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 832 of file EventProcessor.cc.

References schedule_.

832  {
833  return schedule_->getAllModuleDescriptions();
834  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ getToken()

ServiceToken edm::EventProcessor::getToken ( )

Definition at line 830 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

830 { return serviceToken_; }
ServiceToken serviceToken_

◆ globalEndLumiAsync()

void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1516 of file EventProcessor.cc.

References CMS_SA_ALLOW, deleteLumiFromCache(), edm::EndLuminosityBlock, esp_, first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1517  {
1518  // Get some needed info out of the status object before moving
1519  // it into finalTaskForThisLumi.
1520  auto& lp = *(iLumiStatus->lumiPrincipal());
1521  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1522  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1523  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1524  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1525 
1526  using namespace edm::waiting_task::chain;
1527  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1528  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1529 
1530  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1531  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1532  endGlobalTransitionAsync<Traits>(
1533  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1534  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1535  //Only call writeLumi if beginLumi succeeded
1536  if (didGlobalBeginSucceed) {
1537  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1538  }
1539  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1540  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1541  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1542  //any thrown exception auto propagates to nextTask via the chain
1544  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1545  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1546  std::exception_ptr ptr;
1547  if (iPtr) {
1548  ptr = *iPtr;
1549  }
1551 
1552  // Try hard to clean up resources so the
1553  // process can terminate in a controlled
1554  // fashion even after exceptions have occurred.
1555  // Caught exception is passed to handleEndLumiExceptions()
1556  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1557  if (not ptr) {
1558  ptr = std::current_exception();
1559  }
1560  }
1561  // Caught exception is passed to handleEndLumiExceptions()
1562  CMS_SA_ALLOW try {
1563  status->resumeGlobalLumiQueue();
1565  } catch (...) {
1566  if (not ptr) {
1567  ptr = std::current_exception();
1568  }
1569  }
1570  // Caught exception is passed to handleEndLumiExceptions()
1571  CMS_SA_ALLOW try {
1572  // This call to status.resetResources() must occur before iTask is destroyed.
1573  // Otherwise there will be a data race which could result in endRun
1574  // being delayed until it is too late to successfully call it.
1575  status->resetResources();
1576  status.reset();
1577  } catch (...) {
1578  if (not ptr) {
1579  ptr = std::current_exception();
1580  }
1581  }
1582 
1583  if (ptr) {
1584  handleEndLumiExceptions(&ptr, nextTask);
1585  }
1586  }) | runLast(std::move(iTask));
1587  }
ProcessContext processContext_
#define CMS_SA_ALLOW
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
def move(src, dest)
Definition: eostools.py:511

◆ handleEndLumiExceptions()

void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1507 of file EventProcessor.cc.

References setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1507  {
1508  if (setDeferredException(*iPtr)) {
1509  WaitingTaskHolder tmp(holder);
1510  tmp.doneWaiting(*iPtr);
1511  } else {
1513  }
1514  }
tmp
align.sh
Definition: createJobs.py:716
bool setDeferredException(std::exception_ptr)

◆ handleNextEventForStreamAsync()

void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1854 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::WaitingTaskHolder::group(), edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1854  {
1855  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1857  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1858  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1859  auto status = streamLumiStatus_[iStreamIndex].get();
1860  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1861  CMS_SA_ALLOW try {
1862  if (readNextEventForStream(iStreamIndex, *status)) {
1863  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1864  if (iPtr) {
1865  // Try to end the stream properly even if an exception was
1866  // thrown on an event.
1867  bool expected = false;
1868  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1869  // This is the case where the exception in iPtr is the primary
1870  // exception and we want to see its message.
1871  deferredExceptionPtr_ = *iPtr;
1872  WaitingTaskHolder tempHolder(iTask);
1873  tempHolder.doneWaiting(*iPtr);
1874  }
1875  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1876  //the stream will stop now
1877  return;
1878  }
1879  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1880  });
1881 
1882  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1883  } else {
1884  //the stream will stop now
1885  if (status->isLumiEnding()) {
1886  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1887  status->startNextLumi();
1888  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1889  }
1890  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1891  } else {
1892  iTask.doneWaiting(std::exception_ptr{});
1893  }
1894  }
1895  } catch (...) {
1896  // It is unlikely we will ever get in here ...
1897  // But if we do try to clean up and propagate the exception
1898  if (streamLumiStatus_[iStreamIndex]) {
1899  streamEndLumiAsync(iTask, iStreamIndex);
1900  }
1901  bool expected = false;
1902  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1903  auto e = std::current_exception();
1905  iTask.doneWaiting(e);
1906  }
1907  }
1908  });
1909  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
InputSource::ItemType lastTransitionType() const
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue

◆ init()

void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 344 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchesToDeleteEarly_, branchIDListHelper(), branchIDListHelper_, CMS_SA_ALLOW, trackingPlots::common, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, testHGCalDigiSingleMuonPt100_cfg::dumpOptions, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), SiStripBadComponentsDQMServiceTemplate_cfg::ep, esp_, espController_, Exception, FDEBUG, processOptions_cff::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ServiceRegistry::get(), edm::get_underlying_safe(), edm::ParameterSet::getParameter(), edm::ModuleDescription::getUniqueID(), edm::ParameterSet::getUntrackedParameterSet(), watchdog::group, historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::ServiceRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

346  {
347  //std::cerr << processDesc->dump() << std::endl;
348 
349  // register the empty parentage vector , once and for all
351 
352  // register the empty parameter set, once and for all.
353  ParameterSet().registerIt();
354 
355  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
356 
357  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
358  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
359  bool const hasSubProcesses = !subProcessVParameterSet.empty();
360 
361  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
362  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
363  // set in here if the parameters were not explicitly set.
365 
366  // Now set some parameters specific to the main process.
367  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
368  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
369  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
370  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
371  << fileMode << ".\n"
372  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
373  } else {
374  fileModeNoMerge_ = (fileMode == "NOMERGE");
375  }
376  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
378 
379  //threading
380  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
381 
382  // Even if numberOfThreads was set to zero in the Python configuration, the code
383  // in cmsRun.cpp should have reset it to something else.
384  assert(nThreads != 0);
385 
386  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
387  if (nStreams == 0) {
388  nStreams = nThreads;
389  }
390  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
391  if (nConcurrentRuns != 1) {
392  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
393  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
394  }
395  unsigned int nConcurrentLumis =
396  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
397  if (nConcurrentLumis == 0) {
398  nConcurrentLumis = 2;
399  }
400  if (nConcurrentLumis > nStreams) {
401  nConcurrentLumis = nStreams;
402  }
403  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
404  if (!loopers.empty()) {
405  //For now loopers make us run only 1 transition at a time
406  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
407  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
408  "of concurrent runs, and the number of concurrent lumis "
409  "are all being reset to 1. Loopers cannot currently support "
410  "values greater than 1.";
411  nStreams = 1;
412  nConcurrentLumis = 1;
413  nConcurrentRuns = 1;
414  }
415  }
416  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
417  if (dumpOptions) {
418  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
419  } else {
420  if (nThreads > 1 or nStreams > 1) {
421  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
422  }
423  }
424  // The number of concurrent IOVs is configured individually for each record in
425  // the class NumberOfConcurrentIOVs to values less than or equal to this.
426  unsigned int maxConcurrentIOVs = nConcurrentLumis;
427 
428  //Check that relationships between threading parameters makes sense
429  /*
430  if(nThreads<nStreams) {
431  //bad
432  }
433  if(nConcurrentRuns>nStreams) {
434  //bad
435  }
436  if(nConcurrentRuns>nConcurrentLumis) {
437  //bad
438  }
439  */
440  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
441 
442  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
444  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
445  //for now, if have a subProcess, don't allow early delete
446  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
447  if (not hasSubProcesses) {
448  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
449  }
450 
451  // Now do general initialization
452  ScheduleItems items;
453 
454  //initialize the services
455  auto& serviceSets = processDesc->getServicesPSets();
456  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
457  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
458 
459  //make the services available
461 
462  CMS_SA_ALLOW try {
463  if (nStreams > 1) {
465  handler->willBeUsingThreads();
466  }
467 
468  // intialize miscellaneous items
469  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
470 
471  // intialize the event setup provider
472  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
473  esp_ = espController_->makeProvider(
474  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
475 
476  // initialize the looper, if any
477  if (!loopers.empty()) {
479  looper_->setActionTable(items.act_table_.get());
480  looper_->attachTo(*items.actReg_);
481 
482  // in presence of looper do not delete modules
484  }
485 
486  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
487 
488  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
489  streamQueues_.resize(nStreams);
490  streamLumiStatus_.resize(nStreams);
491 
492  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
493 
494  {
495  std::optional<ScheduleItems::MadeModules> madeModules;
496 
497  //setup input and modules concurrently
498  tbb::task_group group;
499 
500  // initialize the input source
501  auto tempReg = std::make_shared<ProductRegistry>();
502  auto sourceID = ModuleDescription::getUniqueID();
503 
504  group.run([&, this]() {
505  // initialize the Schedule
507  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
508  madeModules = items.initModules(*parameterSet, tns, preallocations_, &processContext_);
509  });
510 
511  group.run([&, this, tempReg]() {
513  input_ = makeInput(sourceID,
514  *parameterSet,
515  *common,
516  /*items.preg(),*/ tempReg,
517  items.branchIDListHelper(),
519  items.thinnedAssociationsHelper(),
520  items.actReg_,
521  items.processConfiguration(),
523  });
524 
525  group.wait();
526  items.preg()->addFromInput(*tempReg);
527  input_->switchTo(items.preg());
528 
529  {
530  auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
531  schedule_ = items.finishSchedule(std::move(*madeModules),
532  *parameterSet,
533  tns,
534  hasSubProcesses,
538  }
539  }
540 
541  // set the data members
542  act_table_ = std::move(items.act_table_);
543  actReg_ = items.actReg_;
544  preg_ = items.preg();
546  branchIDListHelper_ = items.branchIDListHelper();
547  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
548  processConfiguration_ = items.processConfiguration();
550  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
551 
552  FDEBUG(2) << parameterSet << std::endl;
553 
555  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
556  // Reusable event principal
557  auto ep = std::make_shared<EventPrincipal>(preg(),
561  historyAppender_.get(),
562  index,
563  true /*primary process*/,
566  }
567 
568  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
569  auto lp =
570  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
572  }
573 
574  {
575  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
577 
578  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
580  }
581 
582  // fill the subprocesses, if there are any
583  subProcesses_.reserve(subProcessVParameterSet.size());
584  for (auto& subProcessPSet : subProcessVParameterSet) {
585  subProcesses_.emplace_back(subProcessPSet,
586  *parameterSet,
587  preg(),
591  SubProcessParentageHelper(),
593  *actReg_,
594  token,
597  &processContext_);
598  }
599  } catch (...) {
600  //in case of an exception, make sure Services are available
601  // during the following destructors
602  espController_ = nullptr;
603  esp_ = nullptr;
604  schedule_ = nullptr;
605  input_ = nullptr;
606  looper_ = nullptr;
607  actReg_ = nullptr;
608  throw;
609  }
610  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
#define CMS_SA_ALLOW
std::shared_ptr< ProductRegistry const > preg() const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
static unsigned int getUniqueID()
Returns a unique id each time called. Intended to be passed to ModuleDescription&#39;s constructor&#39;s modI...
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
static ServiceRegistry & instance()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< InputSource > makeInput(unsigned int moduleIndex, ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
Log< level::Info, false > LogInfo
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool insertMapped(value_type const &v)
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)

◆ inputProcessBlocks()

void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1089 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1089  {
1090  input_->fillProcessBlockHelper();
1091  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1092  while (input_->nextProcessBlock(processBlockPrincipal)) {
1093  readProcessBlock(processBlockPrincipal);
1094 
1095  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1096  FinalWaitingTask globalWaitTask{taskGroup_};
1097 
1098  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1099  beginGlobalTransitionAsync<Traits>(
1100  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1101 
1102  globalWaitTask.wait();
1103 
1104  FinalWaitingTask writeWaitTask{taskGroup_};
1106  writeWaitTask.wait();
1107 
1108  processBlockPrincipal.clearPrincipal();
1109  for (auto& s : subProcesses_) {
1110  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1111  }
1112  }
1113  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_

◆ lastTransitionType()

InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 186 of file EventProcessor.h.

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

186  {
188  return InputSource::IsStop;
189  }
190  return lastSourceTransition_;
191  }
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_

◆ looper() [1/2]

std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 297 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

297 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ looper() [2/2]

std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 298 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

298 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ nextLuminosityBlockID()

edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 889 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

889 { return input_->luminosityBlock(); }
edm::propagate_const< std::unique_ptr< InputSource > > input_

◆ nextRunID()

std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 885 of file EventProcessor.cc.

References input_.

885  {
886  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
887  }
edm::propagate_const< std::unique_ptr< InputSource > > input_

◆ nextTransitionType()

InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 859 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

859  {
860  if (deferredExceptionPtrIsSet_.load()) {
862  return InputSource::IsStop;
863  }
864 
865  SendSourceTerminationSignalIfException sentry(actReg_.get());
866  InputSource::ItemType itemType;
867  //For now, do nothing with InputSource::IsSynchronize
868  do {
869  itemType = input_->nextItemType();
870  } while (itemType == InputSource::IsSynchronize);
871 
872  lastSourceTransition_ = itemType;
873  sentry.completedSuccessfully();
874 
876 
878  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
880  }
881 
882  return lastSourceTransition_;
883  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_

◆ openOutputFiles()

void edm::EventProcessor::openOutputFiles ( )

Definition at line 983 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

983  {
984  if (fileBlockValid()) {
985  schedule_->openOutputFiles(*fb_);
986  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
987  }
988  FDEBUG(1) << "\topenOutputFiles\n";
989  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ operator=()

EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete

◆ preg() [1/2]

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

285 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ preg() [2/2]

std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 286 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

286 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)

◆ prepareForNextLoop()

void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1047 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

Referenced by runToCompletion().

1047  {
1048  looper_->prepareForNextLoop(esp_.get());
1049  FDEBUG(1) << "\tprepareForNextLoop\n";
1050  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processConfiguration()

ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 140 of file EventProcessor.h.

References processConfiguration_.

140 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_

◆ processEventAsync()

void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1925 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1925  {
1926  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1927  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)

◆ processEventAsyncImpl()

void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1929 of file EventProcessor.cc.

References esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

1929  {
1930  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1931 
1933  Service<RandomNumberGenerator> rng;
1934  if (rng.isAvailable()) {
1935  Event ev(*pep, ModuleDescription(), nullptr);
1936  rng->postEventRead(ev);
1937  }
1938 
1939  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1940  using namespace edm::waiting_task::chain;
1941  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1942  EventTransitionInfo info(*pep, es);
1943  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1944  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1945  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1946  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1947  }
1948  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1949  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1950  ServiceRegistry::Operate operateLooper(serviceToken_);
1951  processEventWithLooper(*pep, iStreamIndex);
1952  }) | then([pep](auto nextTask) {
1953  FDEBUG(1) << "\tprocessEvent\n";
1954  pep->clearEventPrincipal();
1955  }) | runLast(iHolder);
1956  }
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ processEventWithLooper()

void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1958 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1958  {
1959  bool randomAccess = input_->randomAccess();
1960  ProcessingController::ForwardState forwardState = input_->forwardState();
1961  ProcessingController::ReverseState reverseState = input_->reverseState();
1962  ProcessingController pc(forwardState, reverseState, randomAccess);
1963 
1965  do {
1966  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1967  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1968  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1969 
1970  bool succeeded = true;
1971  if (randomAccess) {
1972  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1973  input_->skipEvents(-2);
1974  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1975  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1976  }
1977  }
1978  pc.setLastOperationSucceeded(succeeded);
1979  } while (!pc.lastOperationSucceeded());
1981  shouldWeStop_ = true;
1983  }
1984  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_

◆ processLumis()

InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1309 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), input_, lastTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamLumiActive_, and taskGroup_.

1309  {
1310  FinalWaitingTask waitTask{taskGroup_};
1311  if (streamLumiActive_ > 0) {
1313  // Continue after opening a new input file
1315  } else {
1316  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1317  input_->luminosityBlockAuxiliary()->beginTime()),
1318  iRunResource,
1319  WaitingTaskHolder{taskGroup_, &waitTask});
1320  }
1321  waitTask.wait();
1322  return lastTransitionType();
1323  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType lastTransitionType() const
assert(be >=bs)
PreallocationConfiguration preallocations_
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
oneapi::tbb::task_group taskGroup_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_

◆ readAndMergeLumi()

int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1709 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1709  {
1710  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1711  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1712  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1713  input_->processHistoryRegistry().reducedProcessHistoryID(
1714  input_->luminosityBlockAuxiliary()->processHistoryID()));
1715  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1716  assert(lumiOK);
1717  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1718  {
1719  SendSourceTerminationSignalIfException sentry(actReg_.get());
1720  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1721  sentry.completedSuccessfully();
1722  }
1723  return input_->luminosityBlock();
1724  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< ActivityRegistry > actReg_

◆ readAndMergeRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1678 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1678  {
1679  principalCache_.merge(input_->runAuxiliary(), preg());
1680  auto runPrincipal = principalCache_.runPrincipalPtr();
1681  {
1682  SendSourceTerminationSignalIfException sentry(actReg_.get());
1683  input_->readAndMergeRun(*runPrincipal);
1684  sentry.completedSuccessfully();
1685  }
1686  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1687  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1688  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ readEvent()

void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1911 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1911  {
1912  //TODO this will have to become per stream
1913  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1914  StreamContext streamContext(event.streamID(), &processContext_);
1915 
1916  SendSourceTerminationSignalIfException sentry(actReg_.get());
1917  input_->readEvent(event, streamContext);
1918 
1919  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1920  sentry.completedSuccessfully();
1921 
1922  FDEBUG(1) << "\treadEvent\n";
1923  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_

◆ readFile()

void edm::EventProcessor::readFile ( )

Definition at line 956 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, edm::PrincipalCache::preReadFile(), principalCache_, and findQualityFiles::size.

956  {
957  FDEBUG(1) << " \treadFile\n";
958  size_t size = preg_->size();
959  SendSourceTerminationSignalIfException sentry(actReg_.get());
960 
962 
963  fb_ = input_->readFile();
964  if (size < preg_->size()) {
966  }
969  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
970  }
971  sentry.completedSuccessfully();
972  }
size
Write out results.
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ readLuminosityBlock()

void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1690 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1690  {
1692  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1693  << "Illegal attempt to insert lumi into cache\n"
1694  << "Run is invalid\n"
1695  << "Contact a Framework Developer\n";
1696  }
1698  assert(lbp);
1699  lbp->setAux(*input_->luminosityBlockAuxiliary());
1700  {
1701  SendSourceTerminationSignalIfException sentry(actReg_.get());
1702  input_->readLuminosityBlock(*lbp, *historyAppender_);
1703  sentry.completedSuccessfully();
1704  }
1705  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1706  iStatus.lumiPrincipal() = std::move(lbp);
1707  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ readNextEventForStream()

bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1792 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1792  {
1793  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1794  iStatus.endLumi();
1795  return false;
1796  }
1797 
1798  if (iStatus.wasEventProcessingStopped()) {
1799  return false;
1800  }
1801 
1802  if (shouldWeStop()) {
1804  iStatus.stopProcessingEvents();
1805  iStatus.endLumi();
1806  return false;
1807  }
1808 
1810  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1811  CMS_SA_ALLOW try {
1812  //need to use lock in addition to the serial task queue because
1813  // of delayed provenance reading and reading data in response to
1814  // edm::Refs etc
1815  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1816 
1817  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1818  if (InputSource::IsLumi == itemType) {
1819  iStatus.haveContinuedLumi();
1820  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1821  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1822  readAndMergeLumi(iStatus);
1823  itemType = nextTransitionType();
1824  }
1825  if (InputSource::IsLumi == itemType) {
1826  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1827  input_->luminosityBlockAuxiliary()->beginTime()));
1828  }
1829  }
1830  if (InputSource::IsEvent != itemType) {
1831  iStatus.stopProcessingEvents();
1832 
1833  //IsFile may continue processing the lumi and
1834  // looper_ can cause the input source to declare a new IsRun which is actually
1835  // just a continuation of the previous run
1836  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1837  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1838  iStatus.endLumi();
1839  }
1840  return false;
1841  }
1842  readEvent(iStreamIndex);
1843  } catch (...) {
1844  bool expected = false;
1845  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1846  deferredExceptionPtr_ = std::current_exception();
1847  iStatus.endLumi();
1848  }
1849  return false;
1850  }
1851  return true;
1852  }
void readEvent(unsigned int iStreamIndex)
#define CMS_SA_ALLOW
InputSource::ItemType nextTransitionType()
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
bool shouldWeStop() const
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()

◆ readProcessBlock()

void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1649 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1649  {
1650  SendSourceTerminationSignalIfException sentry(actReg_.get());
1651  input_->readProcessBlock(processBlockPrincipal);
1652  sentry.completedSuccessfully();
1653  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_

◆ readRun()

std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1655 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

1655  {
1657  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1658  << "Illegal attempt to insert run into cache\n"
1659  << "Contact a Framework Developer\n";
1660  }
1661  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1662  preg(),
1664  historyAppender_.get(),
1665  0,
1666  true,
1668  {
1669  SendSourceTerminationSignalIfException sentry(actReg_.get());
1670  input_->readRun(*rp, *historyAppender_);
1671  sentry.completedSuccessfully();
1672  }
1673  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1674  principalCache_.insert(rp);
1675  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1676  }
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_

◆ respondToCloseInputFile()

void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 1008 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

1008  {
1009  if (fileBlockValid()) {
1010  schedule_->respondToCloseInputFile(*fb_);
1011  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
1012  }
1013  FDEBUG(1) << "\trespondToCloseInputFile\n";
1014  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ respondToOpenInputFile()

void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 998 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

998  {
999  if (fileBlockValid()) {
1001  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
1002  schedule_->respondToOpenInputFile(*fb_);
1003  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1004  }
1005  FDEBUG(1) << "\trespondToOpenInputFile\n";
1006  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_

◆ rewindInput()

void edm::EventProcessor::rewindInput ( )

Definition at line 1041 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1041  {
1042  input_->repeat();
1043  input_->rewind();
1044  FDEBUG(1) << "\trewind\n";
1045  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ run()

EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

◆ runToCompletion()

EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 891 of file EventProcessor.cc.

References beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), MillePedeFileConverter_cfg::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

891  {
892  beginJob(); //make sure this was called
893 
894  // make the services available
896 
897  try {
898  FilesProcessor fp(fileModeNoMerge_);
899 
900  convertException::wrap([&]() {
901  bool firstTime = true;
902  do {
903  if (not firstTime) {
905  rewindInput();
906  } else {
907  firstTime = false;
908  }
909  startingNewLoop();
910 
911  auto trans = fp.processFiles(*this);
912 
913  fp.normalEnd();
914 
915  if (deferredExceptionPtrIsSet_.load()) {
916  std::rethrow_exception(deferredExceptionPtr_);
917  }
918  if (trans != InputSource::IsStop) {
919  //problem with the source
920  doErrorStuff();
921 
922  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
923  }
924  } while (not endOfLoop());
925  }); // convertException::wrap
926 
927  } // Try block
928  catch (cms::Exception& e) {
930  std::string message(
931  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
932  e.addAdditionalInfo(message);
933  if (e.alreadyPrinted()) {
934  LogAbsolute("Additional Exceptions") << message;
935  }
936  }
937  if (exceptionMessageRuns_) {
938  std::string message(
939  "Another exception was caught while trying to clean up runs after the primary fatal exception.");
940  e.addAdditionalInfo(message);
941  if (e.alreadyPrinted()) {
942  LogAbsolute("Additional Exceptions") << message;
943  }
944  }
945  if (!exceptionMessageFiles_.empty()) {
946  e.addAdditionalInfo(exceptionMessageFiles_);
947  if (e.alreadyPrinted()) {
948  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
949  }
950  }
951  throw;
952  }
953  return epSuccess;
954  }
std::atomic< bool > exceptionMessageLumis_
std::atomic< bool > exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageFiles_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())

◆ setDeferredException()

bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2007 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

2007  {
2008  bool expected = false;
2009  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2010  deferredExceptionPtr_ = iException;
2011  return true;
2012  }
2013  return false;
2014  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_

◆ setExceptionMessageFiles()

void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2001 of file EventProcessor.cc.

References exceptionMessageFiles_.

2001 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_

◆ setExceptionMessageLumis()

void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2005 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2005 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_

◆ setExceptionMessageRuns()

void edm::EventProcessor::setExceptionMessageRuns ( )

Definition at line 2003 of file EventProcessor.cc.

References exceptionMessageRuns_.

2003 { exceptionMessageRuns_ = true; }
std::atomic< bool > exceptionMessageRuns_

◆ shouldWeCloseOutput()

bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1052 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1052  {
1053  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1054  if (!subProcesses_.empty()) {
1055  for (auto const& subProcess : subProcesses_) {
1056  if (subProcess.shouldWeCloseOutput()) {
1057  return true;
1058  }
1059  }
1060  return false;
1061  }
1062  return schedule_->shouldWeCloseOutput();
1063  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ shouldWeStop()

bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1986 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1986  {
1987  FDEBUG(1) << "\tshouldWeStop\n";
1988  if (shouldWeStop_)
1989  return true;
1990  if (!subProcesses_.empty()) {
1991  for (auto const& subProcess : subProcesses_) {
1992  if (subProcess.terminate()) {
1993  return true;
1994  }
1995  }
1996  return false;
1997  }
1998  return schedule_->terminate();
1999  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ startingNewLoop()

void edm::EventProcessor::startingNewLoop ( )

Definition at line 1016 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

1016  {
1017  shouldWeStop_ = false;
1018  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1019  // until after we've called beginOfJob
1020  if (looper_ && looperBeginJobRun_) {
1021  looper_->doStartingNewLoop();
1022  }
1023  FDEBUG(1) << "\tstartingNewLoop\n";
1024  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_

◆ streamEndLumiAsync()

void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1589 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and submitPVValidationJobs::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

1589  {
1590  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1591  if (iPtr) {
1592  handleEndLumiExceptions(iPtr, iTask);
1593  }
1594  auto status = streamLumiStatus_[iStreamIndex];
1595  //reset status before releasing queue else get race condtion
1596  streamLumiStatus_[iStreamIndex].reset();
1598  streamQueues_[iStreamIndex].resume();
1599 
1600  //are we the last one?
1601  if (status->streamFinishedLumi()) {
1603  }
1604  });
1605 
1606  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1607 
1608  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1609  // therefore we do not want to hold the shared_ptr
1610  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1611  lumiStatus->setEndTime();
1612 
1613  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1614 
1615  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1616  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1617 
1618  if (lumiStatus->didGlobalBeginSucceed()) {
1619  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1620  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1621  lumiPrincipal.endTime());
1622  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1623  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1624  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1625  *schedule_,
1626  iStreamIndex,
1627  transitionInfo,
1628  serviceToken_,
1629  subProcesses_,
1630  cleaningUpAfterException);
1631  }
1632  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
def move(src, dest)
Definition: eostools.py:511

◆ taskCleanup()

void edm::EventProcessor::taskCleanup ( )

Definition at line 630 of file EventProcessor.cc.

References cms::cuda::assert(), espController_, TrackValidation_cff::task, and taskGroup_.

630  {
633  task.waitNoThrow();
634  assert(task.done());
635  }
assert(be >=bs)
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_

◆ thinnedAssociationsHelper() [1/2]

std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

291  {
293  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ thinnedAssociationsHelper() [2/2]

std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 294 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

294  {
296  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_

◆ throwAboutModulesRequiringLuminosityBlockSynchronization()

void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2016 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2016  {
2017  cms::Exception ex("ModulesSynchingOnLumis");
2018  ex << "The framework is configured to use at least two streams, but the following modules\n"
2019  << "require synchronizing on LuminosityBlock boundaries:";
2020  bool found = false;
2021  for (auto worker : schedule_->allWorkers()) {
2022  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2023  found = true;
2024  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2025  }
2026  }
2027  if (found) {
2028  ex << "\n\nThe situation can be fixed by either\n"
2029  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2030  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2031  throw ex;
2032  }
2033  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEvents()

int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 836 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

836 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsFailed()

int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 840 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

840 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ totalEventsPassed()

int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 838 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

838 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ warnAboutLegacyModules()

void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2035 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2035  {
2036  std::unique_ptr<LogSystem> s;
2037  for (auto worker : schedule_->allWorkers()) {
2038  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2039  if (not s) {
2040  s = std::make_unique<LogSystem>("LegacyModules");
2041  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2042  "is going to end soon. These modules need to be converted to have type\n"
2043  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2044  }
2045  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2046  }
2047  }
2048  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_

◆ writeLumiAsync()

void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1766 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), edm::LuminosityBlockPrincipal::kNo, edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, edm::LuminosityBlockPrincipal::shouldWriteLumi(), subProcesses_, TrackValidation_cff::task, and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

1766  {
1767  using namespace edm::waiting_task;
1768  if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
1769  chain::first([&](auto nextTask) {
1771 
1772  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1773  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1774  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1776  for (auto& s : subProcesses_) {
1777  s.writeLumiAsync(nextTask, lumiPrincipal);
1778  }
1780  }
1781  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511

◆ writeProcessBlockAsync()

void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 1726 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endProcessBlock(), and inputProcessBlocks().

1726  {
1727  using namespace edm::waiting_task;
1728  chain::first([&](auto nextTask) {
1730  schedule_->writeProcessBlockAsync(
1731  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1732  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1734  for (auto& s : subProcesses_) {
1735  s.writeProcessBlockAsync(nextTask, processBlockType);
1736  }
1737  }) | chain::runLast(std::move(task));
1738  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ProcessBlockPrincipal & processBlockPrincipal() const
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

◆ writeRunAsync()

void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1740 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, processContext_, run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, and TrackValidation_cff::task.

Referenced by endUnfinishedRun().

1743  {
1744  using namespace edm::waiting_task;
1745  chain::first([&](auto nextTask) {
1747  schedule_->writeRunAsync(nextTask,
1749  &processContext_,
1750  actReg_.get(),
1751  mergeableRunProductMetadata);
1752  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1754  for (auto& s : subProcesses_) {
1755  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1756  }
1757  }) | chain::runLast(std::move(task));
1758  }
ProcessContext processContext_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:511
PrincipalCache principalCache_

Member Data Documentation

◆ act_table_

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 323 of file EventProcessor.h.

Referenced by init().

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

◆ beginJobCalled_

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 349 of file EventProcessor.h.

Referenced by beginJob().

◆ branchesToDeleteEarly_

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 334 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ branchIDListHelper_

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 314 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

◆ deferredExceptionPtr_

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

◆ deferredExceptionPtrIsSet_

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

◆ deleteNonConsumedUnscheduledModules_

bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 368 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ esp_

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

◆ espController_

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

◆ eventSetupDataToExcludeFromPrefetching_

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 365 of file EventProcessor.h.

◆ exceptionMessageFiles_

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 352 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

◆ exceptionMessageLumis_

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 354 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

◆ exceptionMessageRuns_

std::atomic<bool> edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 353 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

◆ fb_

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private

◆ fileModeNoMerge_

bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 351 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

◆ firstEventInBlock_

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 361 of file EventProcessor.h.

◆ forceESCacheClearOnNewRun_

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 357 of file EventProcessor.h.

Referenced by beginRun(), and init().

◆ forceLooperToEnd_

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 355 of file EventProcessor.h.

Referenced by endOfLoop().

◆ historyAppender_

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 337 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

◆ input_

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

◆ lastSourceTransition_

InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private

◆ looper_

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private

◆ looperBeginJobRun_

bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 356 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

◆ lumiQueue_

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

◆ mergeableRunProductProcesses_

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 327 of file EventProcessor.h.

Referenced by init(), and readRun().

◆ pathsAndConsumesOfModules_

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginJob().

◆ preallocations_

PreallocationConfiguration edm::EventProcessor::preallocations_
private

◆ preg_

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 313 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

◆ principalCache_

PrincipalCache edm::EventProcessor::principalCache_
private

◆ printDependencies_

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 367 of file EventProcessor.h.

Referenced by beginJob(), and init().

◆ processBlockHelper_

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 315 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

◆ processConfiguration_

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 324 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), processConfiguration(), and readRun().

◆ processContext_

ProcessContext edm::EventProcessor::processContext_
private

◆ queueWhichWaitsForIOVsToFinish_

edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

◆ schedule_

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private

◆ serviceToken_

ServiceToken edm::EventProcessor::serviceToken_
private

◆ shouldWeStop_

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 350 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

◆ sourceMutex_

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 347 of file EventProcessor.h.

Referenced by readNextEventForStream().

◆ sourceResourcesAcquirer_

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 346 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

◆ streamLumiActive_

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private

◆ streamLumiStatus_

std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private

◆ streamQueues_

std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

◆ subProcesses_

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private

◆ taskGroup_

oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private

◆ thinnedAssociationsHelper_

edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().