CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair
< edm::ProcessHistoryID,
edm::RunNumber_t
nextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID,
RunNumber_t
readAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::pair< ProcessHistoryID,
RunNumber_t
readRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase
const > 
looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr
< ProductRegistry const > 
preg () const
 
std::shared_ptr
< ProductRegistry > & 
preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
std::vector< std::string > branchesToDeleteEarly_
 
edm::propagate_const
< std::shared_ptr
< BranchIDListHelper > > 
branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const
< std::shared_ptr
< eventsetup::EventSetupProvider > > 
esp_
 
edm::propagate_const
< std::unique_ptr
< eventsetup::EventSetupsController > > 
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const
< std::shared_ptr< FileBlock > > 
fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const
< std::unique_ptr
< HistoryAppender > > 
historyAppender_
 
edm::propagate_const
< std::unique_ptr< InputSource > > 
input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const
< std::shared_ptr
< EDLooperBase > > 
looper_
 
bool looperBeginJobRun_
 
std::unique_ptr
< edm::LimitedTaskQueue
lumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const
< std::shared_ptr
< ProductRegistry > > 
preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const
< std::shared_ptr
< ProcessBlockHelper > > 
processBlockHelper_
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const
< std::unique_ptr< Schedule > > 
schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr
< std::recursive_mutex > 
sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr
< LuminosityBlockProcessingStatus > > 
streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
oneapi::tbb::task_group taskGroup_
 
edm::propagate_const
< std::shared_ptr
< ThinnedAssociationsHelper > > 
thinnedAssociationsHelper_
 

Detailed Description

Definition at line 66 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 365 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 366 of file EventProcessor.h.

Definition at line 243 of file EventProcessor.h.

Member Enumeration Documentation

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 76 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 233 of file EventProcessor.cc.

References init(), and eostools::move().

238  : actReg_(),
239  preg_(),
241  serviceToken_(),
242  input_(),
243  espController_(new eventsetup::EventSetupsController),
244  esp_(),
245  act_table_(),
247  schedule_(),
248  subProcesses_(),
249  historyAppender_(new HistoryAppender),
250  fb_(),
251  looper_(),
253  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
254  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
255  principalCache_(),
256  beginJobCalled_(false),
257  shouldWeStop_(false),
258  fileModeNoMerge_(false),
261  exceptionMessageLumis_(false),
262  forceLooperToEnd_(false),
263  looperBeginJobRun_(false),
266  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
267  processDesc->addServices(defaultServices, forcedServices);
268  init(processDesc, iToken, iLegacy);
269  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 271 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and eostools::move().

274  : actReg_(),
275  preg_(),
277  serviceToken_(),
278  input_(),
279  espController_(new eventsetup::EventSetupsController),
280  esp_(),
281  act_table_(),
283  schedule_(),
284  subProcesses_(),
285  historyAppender_(new HistoryAppender),
286  fb_(),
287  looper_(),
289  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
290  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
291  principalCache_(),
292  beginJobCalled_(false),
293  shouldWeStop_(false),
294  fileModeNoMerge_(false),
297  exceptionMessageLumis_(false),
298  forceLooperToEnd_(false),
299  looperBeginJobRun_(false),
303  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
304  processDesc->addServices(defaultServices, forcedServices);
306  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 308 of file EventProcessor.cc.

References init().

311  : actReg_(),
312  preg_(),
314  serviceToken_(),
315  input_(),
316  espController_(new eventsetup::EventSetupsController),
317  esp_(),
318  act_table_(),
320  schedule_(),
321  subProcesses_(),
322  historyAppender_(new HistoryAppender),
323  fb_(),
324  looper_(),
326  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
327  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
328  principalCache_(),
329  beginJobCalled_(false),
330  shouldWeStop_(false),
331  fileModeNoMerge_(false),
334  exceptionMessageLumis_(false),
335  forceLooperToEnd_(false),
336  looperBeginJobRun_(false),
340  init(processDesc, token, legacy);
341  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 567 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

567  {
568  // Make the services available while everything is being deleted.
570  ServiceRegistry::Operate op(token);
571 
572  // manually destroy all these thing that may need the services around
573  // propagate_const<T> has no reset() function
574  espController_ = nullptr;
575  esp_ = nullptr;
576  schedule_ = nullptr;
577  input_ = nullptr;
578  looper_ = nullptr;
579  actReg_ = nullptr;
580 
583  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 592 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, branchesToDeleteEarly_, c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, getPayloadData::description, esp_, espController_, edm::WaitingTask::exceptionPtr(), edm::first(), edm::for_all(), watchdog::group, mps_fire::i, edm::waiting_task::chain::ifThen(), edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, cmsLHEtoEOSManager::l, dqmdumpme::last, edm::waiting_task::chain::lastTask(), looper_, MatrixUtil::merge(), eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::PathsAndConsumesOfModules::removeModules(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, subProcesses_, edm::swap(), std::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), and edm::convertException::wrap().

Referenced by runToCompletion().

592  {
593  if (beginJobCalled_)
594  return;
595  beginJobCalled_ = true;
596  bk::beginJob();
597 
598  // StateSentry toerror(this); // should we add this ?
599  //make the services available
601 
602  service::SystemBounds bounds(preallocations_.numberOfStreams(),
606  actReg_->preallocateSignal_(bounds);
607  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
609 
610  std::vector<ModuleProcessName> consumedBySubProcesses;
612  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
613  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
614  if (consumedBySubProcesses.empty()) {
615  consumedBySubProcesses = std::move(c);
616  } else if (not c.empty()) {
617  std::vector<ModuleProcessName> tmp;
618  tmp.reserve(consumedBySubProcesses.size() + c.size());
619  std::merge(consumedBySubProcesses.begin(),
620  consumedBySubProcesses.end(),
621  c.begin(),
622  c.end(),
623  std::back_inserter(tmp));
624  std::swap(consumedBySubProcesses, tmp);
625  }
626  });
627 
628  // Note: all these may throw
631  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
632  not unusedModules.empty()) {
634 
635  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
636  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
637  "and "
638  "therefore they are deleted before beginJob transition.";
639  for (auto const& description : unusedModules) {
640  l << "\n " << description->moduleLabel();
641  }
642  });
643  for (auto const& description : unusedModules) {
644  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
645  }
646  }
647  }
648  // Initialize after the deletion of non-consumed unscheduled
649  // modules to avoid non-consumed non-run modules to keep the
650  // products unnecessarily alive
651  if (not branchesToDeleteEarly_.empty()) {
652  schedule_->initializeEarlyDelete(branchesToDeleteEarly_, *preg_);
654  }
655 
656  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
657 
660  }
662 
663  //NOTE: This implementation assumes 'Job' means one call
664  // the EventProcessor::run
665  // If it really means once per 'application' then this code will
666  // have to be changed.
667  // Also have to deal with case where have 'run' then new Module
668  // added and do 'run'
669  // again. In that case the newly added Module needs its 'beginJob'
670  // to be called.
671 
672  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
673  // For now we delay calling beginOfJob until first beginOfRun
674  //if(looper_) {
675  // looper_->beginOfJob(es);
676  //}
677  try {
678  convertException::wrap([&]() { input_->doBeginJob(); });
679  } catch (cms::Exception& ex) {
680  ex.addContext("Calling beginJob for the source");
681  throw;
682  }
683  espController_->finishConfiguration();
684  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
685  if (looper_) {
686  constexpr bool mustPrefetchMayGet = true;
687  auto const processBlockLookup = preg_->productLookup(InProcess);
688  auto const runLookup = preg_->productLookup(InRun);
689  auto const lumiLookup = preg_->productLookup(InLumi);
690  auto const eventLookup = preg_->productLookup(InEvent);
691  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
692  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
693  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
694  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
695  looper_->updateLookup(esp_->recordsToProxyIndices());
696  }
697  // toerror.succeeded(); // should we add this?
698  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
699  actReg_->postBeginJobSignal_();
700 
701  FinalWaitingTask last;
702  oneapi::tbb::task_group group;
703  using namespace edm::waiting_task::chain;
704  first([this](auto nextTask) {
705  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
706  first([i, this](auto nextTask) {
708  schedule_->beginStream(i);
709  }) | ifThen(not subProcesses_.empty(), [this, i](auto nextTask) {
711  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
712  }) | lastTask(nextTask);
713  }
714  }) | runLast(WaitingTaskHolder(group, &last));
715  group.wait();
716  if (last.exceptionPtr()) {
717  std::rethrow_exception(*last.exceptionPtr());
718  }
719  }
ProcessContext processContext_
const edm::EventSetup & c
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:117
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
tuple group
Definition: watchdog.py:82
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void removeModules(std::vector< ModuleDescription const * > const &modules)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
tuple last
Definition: dqmdumpme.py:56
T first(std::pair< T, U > const &p)
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1340 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), first, edm::propagate_const< T >::get(), watchdog::group, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

1342  {
1343  if (iHolder.taskHasFailed()) {
1344  return;
1345  }
1346 
1347  // We must be careful with the status object here and in code this function calls. IF we want
1348  // endRun to be called, then we must call resetResources before the things waiting on
1349  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1350  // endRun to be called much later than it should be, because status is holding iRunResource).
1351  // Note that this must be done explicitly. Relying on the destructor does not work well
1352  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1353  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1354  // destroyed inside this function and lumiWork.
1355  auto status =
1356  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1357  chain::first([&](auto nextTask) {
1358  auto asyncEventSetup = [](ActivityRegistry* actReg,
1359  auto* espController,
1360  auto& queue,
1361  WaitingTaskHolder task,
1362  auto& status,
1363  IOVSyncValue const& iSync) {
1364  queue.pause();
1365  CMS_SA_ALLOW try {
1366  SendSourceTerminationSignalIfException sentry(actReg);
1367  // Pass in iSync to let the EventSetup system know which run and lumi
1368  // need to be processed and prepare IOVs for it.
1369  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1370  // lumi is done and no longer needs its EventSetup IOVs.
1371  espController->eventSetupForInstanceAsync(
1372  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1373  sentry.completedSuccessfully();
1374  } catch (...) {
1375  task.doneWaiting(std::current_exception());
1376  }
1377  };
1378  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1379  // We only get here inside this block if there is an EventSetup
1380  // module not able to handle concurrent IOVs (usually an ESSource)
1381  // and the new sync value is outside the current IOV of that module.
1382  auto group = nextTask.group();
1384  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1385  asyncEventSetup(
1387  });
1388  } else {
1389  asyncEventSetup(
1391  }
1392  }) | chain::then([this, status](std::exception_ptr const* iPtr, auto nextTask) {
1393  //the call to doneWaiting will cause the count to decrement
1394  auto copyTask = nextTask;
1395  if (iPtr) {
1396  nextTask.doneWaiting(*iPtr);
1397  }
1398  auto group = copyTask.group();
1399  lumiQueue_->pushAndPause(
1400  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1401  if (task.taskHasFailed()) {
1402  status->resetResources();
1403  return;
1404  }
1405 
1406  status->setResumer(std::move(iResumer));
1407 
1408  auto group = task.group();
1410  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1411  //make the services available
1413  // Caught exception is propagated via WaitingTaskHolder
1414  CMS_SA_ALLOW try {
1415  readLuminosityBlock(*status);
1416 
1417  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1418  {
1419  SendSourceTerminationSignalIfException sentry(actReg_.get());
1420 
1421  input_->doBeginLumi(lumiPrincipal, &processContext_);
1422  sentry.completedSuccessfully();
1423  }
1424 
1425  Service<RandomNumberGenerator> rng;
1426  if (rng.isAvailable()) {
1427  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1428  rng->preBeginLumi(lb);
1429  }
1430 
1431  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1432 
1433  using namespace edm::waiting_task::chain;
1434  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1435  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1436  {
1437  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1438  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1439  beginGlobalTransitionAsync<Traits>(
1440  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1441  }
1442  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1443  looper_->prefetchAsync(
1444  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1445  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1446  status->globalBeginDidSucceed();
1447  //make the services available
1448  ServiceRegistry::Operate operateLooper(serviceToken_);
1449  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1450  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1451  if (iPtr) {
1452  status->resetResources();
1453  holder.doneWaiting(*iPtr);
1454  } else {
1455  if (not looper_) {
1456  status->globalBeginDidSucceed();
1457  }
1458  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1459  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1460 
1461  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1462  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1463  streamQueues_[i].pause();
1464 
1465  auto& event = principalCache_.eventPrincipal(i);
1466  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1467  // held by the container as this lambda may not finish executing before all the tasks it
1468  // spawns have already started to run.
1469  auto eventSetupImpls = &status->eventSetupImpls();
1470  auto lp = status->lumiPrincipal().get();
1471  streamLumiStatus_[i] = std::move(status);
1473  event.setLuminosityBlockPrincipal(lp);
1474  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1475  using namespace edm::waiting_task::chain;
1476  chain::first([this, i, &transitionInfo](auto nextTask) {
1477  beginStreamTransitionAsync<Traits>(
1478  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1479  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1480  if (exceptionFromBeginStreamLumi) {
1481  WaitingTaskHolder tmp(nextTask);
1482  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1483  streamEndLumiAsync(nextTask, i);
1484  } else {
1486  }
1487  }) | runLast(holder);
1488  });
1489  }
1490  }
1491  }) | runLast(postQueueTask);
1492 
1493  } catch (...) {
1494  status->resetResources();
1495  postQueueTask.doneWaiting(std::current_exception());
1496  }
1497  }); // task in sourceResourcesAcquirer
1498  });
1499  }) | chain::runLast(std::move(iHolder));
1500  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
list status
Definition: mps_update.py:107
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
SerialTaskQueueChain & serialQueueChain() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
tuple group
Definition: watchdog.py:82
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
constexpr element_type const * get() const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
tmp
align.sh
Definition: createJobs.py:716
PrincipalCache principalCache_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1036 of file EventProcessor.cc.

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1036  {
1037  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1038  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1039 
1040  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1041  FinalWaitingTask globalWaitTask;
1042 
1043  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1044  beginGlobalTransitionAsync<Traits>(
1045  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1046 
1047  do {
1048  taskGroup_.wait();
1049  } while (not globalWaitTask.done());
1050 
1051  if (globalWaitTask.exceptionPtr() != nullptr) {
1052  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1053  }
1054  beginProcessBlockSucceeded = true;
1055  }
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 1130 of file EventProcessor.cc.

References actReg_, edm::BeginRun, edm::RunPrincipal::beginTime(), edm::FinalWaitingTask::done(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, forceESCacheClearOnNewRun_, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), taskGroup_, and edm::waiting_task::chain::then().

1133  {
1134  globalBeginSucceeded = false;
1135  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1136  {
1137  SendSourceTerminationSignalIfException sentry(actReg_.get());
1138 
1139  input_->doBeginRun(runPrincipal, &processContext_);
1140  sentry.completedSuccessfully();
1141  }
1142 
1143  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1145  espController_->forceCacheClear();
1146  }
1147  {
1148  SendSourceTerminationSignalIfException sentry(actReg_.get());
1150  eventSetupForInstanceSucceeded = true;
1151  sentry.completedSuccessfully();
1152  }
1153  auto const& es = esp_->eventSetupImpl();
1154  if (looper_ && looperBeginJobRun_ == false) {
1155  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1156 
1157  FinalWaitingTask waitTask;
1158  using namespace edm::waiting_task::chain;
1159  chain::first([this, &es](auto nextTask) {
1160  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1161  }) | then([this, &es](auto nextTask) mutable {
1162  looper_->beginOfJob(es);
1163  looperBeginJobRun_ = true;
1164  looper_->doStartingNewLoop();
1165  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1166 
1167  do {
1168  taskGroup_.wait();
1169  } while (not waitTask.done());
1170  if (waitTask.exceptionPtr() != nullptr) {
1171  std::rethrow_exception(*(waitTask.exceptionPtr()));
1172  }
1173  }
1174  {
1175  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1176  FinalWaitingTask globalWaitTask;
1177 
1178  using namespace edm::waiting_task::chain;
1179  chain::first([&runPrincipal, &es, this](auto waitTask) {
1180  RunTransitionInfo transitionInfo(runPrincipal, es);
1181  beginGlobalTransitionAsync<Traits>(
1182  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1183  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1184  globalBeginSucceeded = true;
1185  FDEBUG(1) << "\tbeginRun " << run << "\n";
1186  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1187  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1188  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1189  looper_->doBeginRun(runPrincipal, es, &processContext_);
1190  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1191 
1192  do {
1193  taskGroup_.wait();
1194  } while (not globalWaitTask.done());
1195  if (globalWaitTask.exceptionPtr() != nullptr) {
1196  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1197  }
1198  }
1199  {
1200  //To wait, the ref count has to be 1+#streams
1201  FinalWaitingTask streamLoopWaitTask;
1202 
1203  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1204 
1205  RunTransitionInfo transitionInfo(runPrincipal, es);
1206  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1207  *schedule_,
1209  transitionInfo,
1210  serviceToken_,
1211  subProcesses_);
1212  do {
1213  taskGroup_.wait();
1214  } while (not streamLoopWaitTask.done());
1215  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1216  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1217  }
1218  }
1219  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1220  if (looper_) {
1221  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1222  }
1223  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 287 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

287  {
289  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 290 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 806 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType().

806  {
807  bool returnValue = false;
808 
809  // Look for a shutdown signal
810  if (shutdown_flag.load(std::memory_order_acquire)) {
811  returnValue = true;
813  }
814  return returnValue;
815  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 800 of file EventProcessor.cc.

References schedule_.

800 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 936 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

936  {
937  if (fileBlockValid()) {
938  SendSourceTerminationSignalIfException sentry(actReg_.get());
939  input_->closeFile(fb_.get(), cleaningUpAfterException);
940  sentry.completedSuccessfully();
941  }
942  FDEBUG(1) << "\tcloseInputFile\n";
943  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 953 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

953  {
954  schedule_->closeOutputFiles();
955  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
956  processBlockHelper_->clearAfterOutputFilesClose();
957  FDEBUG(1) << "\tcloseOutputFiles\n";
958  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1502 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), h, handleNextEventForStreamAsync(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1502  {
1503  {
1504  //all streams are sharing the same status at the moment
1505  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1506  status->needToContinueLumi();
1507  status->startProcessingEvents();
1508  }
1509 
1510  unsigned int streamIndex = 0;
1511  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
1512  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1513  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1514  }
1515  iHolder.group()->run(
1516  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1517  }
list status
Definition: mps_update.py:107
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
oneapi::tbb::task_group * group() const noexcept
def move
Definition: eostools.py:511
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1800 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1800  {
1801  for (auto& s : subProcesses_) {
1802  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1803  }
1804  iStatus.lumiPrincipal()->clearPrincipal();
1805  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1806  }
std::vector< SubProcess > subProcesses_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1777 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

Referenced by endUnfinishedRun().

1777  {
1778  principalCache_.deleteRun(phid, run);
1779  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1780  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1781  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 1027 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

1027  {
1028  FDEBUG(1) << "\tdoErrorStuff\n";
1029  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1030  << "and went to the error state\n"
1031  << "Will attempt to terminate processing normally\n"
1032  << "(IF using the looper the next loop will be attempted)\n"
1033  << "This likely indicates a bug in an input module or corrupted input or both\n";
1034  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 721 of file EventProcessor.cc.

References actReg_, c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::first(), watchdog::group, edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, cmsLHEtoEOSManager::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

721  {
722  // Collects exceptions, so we don't throw before all operations are performed.
723  ExceptionCollector c(
724  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
725 
726  //make the services available
728 
729  using namespace edm::waiting_task::chain;
730 
731  edm::FinalWaitingTask waitTask;
732  oneapi::tbb::task_group group;
733 
734  {
735  //handle endStream transitions
736  edm::WaitingTaskHolder taskHolder(group, &waitTask);
737  std::mutex collectorMutex;
738  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
739  first([this, i, &c, &collectorMutex](auto nextTask) {
740  std::exception_ptr ep;
741  try {
743  this->schedule_->endStream(i);
744  } catch (...) {
745  ep = std::current_exception();
746  }
747  if (ep) {
748  std::lock_guard<std::mutex> l(collectorMutex);
749  c.call([&ep]() { std::rethrow_exception(ep); });
750  }
751  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
752  for (auto& subProcess : subProcesses_) {
753  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
754  std::exception_ptr ep;
755  try {
757  subProcess.doEndStream(i);
758  } catch (...) {
759  ep = std::current_exception();
760  }
761  if (ep) {
762  std::lock_guard<std::mutex> l(collectorMutex);
763  c.call([&ep]() { std::rethrow_exception(ep); });
764  }
765  }) | lastTask(nextTask);
766  }
767  }) | lastTask(taskHolder);
768  }
769  }
770  group.wait();
771 
772  auto actReg = actReg_.get();
773  c.call([actReg]() { actReg->preEndJobSignal_(); });
774  schedule_->endJob(c);
775  for (auto& subProcess : subProcesses_) {
776  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
777  }
778  c.call(std::bind(&InputSource::doEndJob, input_.get()));
779  if (looper_) {
780  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
781  }
782  c.call([actReg]() { actReg->postEndJobSignal_(); });
783  if (c.hasThrown()) {
784  c.rethrow();
785  }
786  }
const edm::EventSetup & c
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
static std::mutex mutex
Definition: Proxy.cc:8
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
tuple group
Definition: watchdog.py:82
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
std::shared_ptr< EDLooperBase const > looper() const
bool edm::EventProcessor::endOfLoop ( )

Definition at line 988 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

988  {
989  if (looper_) {
990  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
991  looper_->setModuleChanger(&changer);
992  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
993  looper_->setModuleChanger(nullptr);
995  return true;
996  else
997  return false;
998  }
999  FDEBUG(1) << "\tendOfLoop\n";
1000  return true;
1001  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
list status
Definition: mps_update.py:107
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1093 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1093  {
1094  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1095 
1096  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1097  FinalWaitingTask globalWaitTask;
1098 
1099  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1100  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1101  *schedule_,
1102  transitionInfo,
1103  serviceToken_,
1104  subProcesses_,
1105  cleaningUpAfterException);
1106  do {
1107  taskGroup_.wait();
1108  } while (not globalWaitTask.done());
1109  if (globalWaitTask.exceptionPtr() != nullptr) {
1110  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1111  }
1112 
1113  if (beginProcessBlockSucceeded) {
1114  FinalWaitingTask writeWaitTask;
1116  do {
1117  taskGroup_.wait();
1118  } while (not writeWaitTask.done());
1119  if (writeWaitTask.exceptionPtr()) {
1120  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1121  }
1122  }
1123 
1124  processBlockPrincipal.clearPrincipal();
1125  for (auto& s : subProcesses_) {
1126  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1127  }
1128  }
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ProcessBlockPrincipal & processBlockPrincipal() const
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1252 of file EventProcessor.cc.

References actReg_, edm::FinalWaitingTask::done(), edm::EndRun, edm::RunPrincipal::endTime(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, edm::waiting_task::chain::ifThen(), input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), and taskGroup_.

Referenced by endUnfinishedRun().

1255  {
1256  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1257  runPrincipal.setEndTime(input_->timestamp());
1258 
1259  IOVSyncValue ts(
1260  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1261  runPrincipal.endTime());
1262  {
1263  SendSourceTerminationSignalIfException sentry(actReg_.get());
1265  sentry.completedSuccessfully();
1266  }
1267  auto const& es = esp_->eventSetupImpl();
1268  if (globalBeginSucceeded) {
1269  //To wait, the ref count has to be 1+#streams
1270  FinalWaitingTask streamLoopWaitTask;
1271 
1272  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1273 
1274  RunTransitionInfo transitionInfo(runPrincipal, es);
1275  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1276  *schedule_,
1278  transitionInfo,
1279  serviceToken_,
1280  subProcesses_,
1281  cleaningUpAfterException);
1282  do {
1283  taskGroup_.wait();
1284  } while (not streamLoopWaitTask.done());
1285  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1286  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1287  }
1288  }
1289  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1290  if (looper_) {
1291  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1292  }
1293  {
1294  FinalWaitingTask globalWaitTask;
1295 
1296  using namespace edm::waiting_task::chain;
1297  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1298  RunTransitionInfo transitionInfo(runPrincipal, es);
1299  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1300  endGlobalTransitionAsync<Traits>(
1301  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1302  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1303  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1304  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1305  looper_->doEndRun(runPrincipal, es, &processContext_);
1306  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1307 
1308  do {
1309  taskGroup_.wait();
1310  } while (not globalWaitTask.done());
1311  if (globalWaitTask.exceptionPtr() != nullptr) {
1312  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1313  }
1314  }
1315  FDEBUG(1) << "\tendRun " << run << "\n";
1316  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, oneapi::tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1646 of file EventProcessor.cc.

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, and taskGroup_.

1646  {
1647  if (streamLumiActive_.load() > 0) {
1648  FinalWaitingTask globalWaitTask;
1649  {
1650  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1651  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1652  if (streamLumiStatus_[i]) {
1653  streamEndLumiAsync(globalTaskHolder, i);
1654  }
1655  }
1656  }
1657  do {
1658  taskGroup_.wait();
1659  } while (not globalWaitTask.done());
1660  if (globalWaitTask.exceptionPtr() != nullptr) {
1661  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1662  }
1663  }
1664  }
PreallocationConfiguration preallocations_
oneapi::tbb::task_group taskGroup_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1225 of file EventProcessor.cc.

References deleteRunFromCache(), edm::FinalWaitingTask::done(), endRun(), edm::WaitingTask::exceptionPtr(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), submitPVValidationJobs::t, taskGroup_, and writeRunAsync().

1229  {
1230  if (eventSetupForInstanceSucceeded) {
1231  //If we skip empty runs, this would be called conditionally
1232  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1233 
1234  if (globalBeginSucceeded) {
1235  FinalWaitingTask t;
1236  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1237  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1238  mergeableRunProductMetadata->preWriteRun();
1239  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1240  do {
1241  taskGroup_.wait();
1242  } while (not t.done());
1243  mergeableRunProductMetadata->postWriteRun();
1244  if (t.exceptionPtr()) {
1245  std::rethrow_exception(*t.exceptionPtr());
1246  }
1247  }
1248  }
1249  deleteRunFromCache(phid, run);
1250  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
oneapi::tbb::task_group taskGroup_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 196 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

196 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 790 of file EventProcessor.cc.

References schedule_.

790  {
791  return schedule_->getAllModuleDescriptions();
792  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 788 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

788 { return serviceToken_; }
ServiceToken serviceToken_
void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1528 of file EventProcessor.cc.

References CMS_SA_ALLOW, deleteLumiFromCache(), edm::EndLuminosityBlock, esp_, first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1529  {
1530  // Get some needed info out of the status object before moving
1531  // it into finalTaskForThisLumi.
1532  auto& lp = *(iLumiStatus->lumiPrincipal());
1533  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1534  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1535  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1536  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1537 
1538  using namespace edm::waiting_task::chain;
1539  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1540  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1541 
1542  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1543  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1544  endGlobalTransitionAsync<Traits>(
1545  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1546  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1547  //Only call writeLumi if beginLumi succeeded
1548  if (didGlobalBeginSucceed) {
1549  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1550  }
1551  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1552  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1553  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1554  //any thrown exception auto propagates to nextTask via the chain
1556  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1557  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1558  std::exception_ptr ptr;
1559  if (iPtr) {
1560  ptr = *iPtr;
1561  }
1563 
1564  // Try hard to clean up resources so the
1565  // process can terminate in a controlled
1566  // fashion even after exceptions have occurred.
1567  // Caught exception is passed to handleEndLumiExceptions()
1568  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1569  if (not ptr) {
1570  ptr = std::current_exception();
1571  }
1572  }
1573  // Caught exception is passed to handleEndLumiExceptions()
1574  CMS_SA_ALLOW try {
1575  status->resumeGlobalLumiQueue();
1577  } catch (...) {
1578  if (not ptr) {
1579  ptr = std::current_exception();
1580  }
1581  }
1582  // Caught exception is passed to handleEndLumiExceptions()
1583  CMS_SA_ALLOW try {
1584  // This call to status.resetResources() must occur before iTask is destroyed.
1585  // Otherwise there will be a data race which could result in endRun
1586  // being delayed until it is too late to successfully call it.
1587  status->resetResources();
1588  status.reset();
1589  } catch (...) {
1590  if (not ptr) {
1591  ptr = std::current_exception();
1592  }
1593  }
1594 
1595  if (ptr) {
1596  handleEndLumiExceptions(&ptr, nextTask);
1597  }
1598  }) | runLast(std::move(iTask));
1599  }
ProcessContext processContext_
#define CMS_SA_ALLOW
list status
Definition: mps_update.py:107
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1519 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1519  {
1520  if (setDeferredException(*iPtr)) {
1521  WaitingTaskHolder tmp(holder);
1522  tmp.doneWaiting(*iPtr);
1523  } else {
1525  }
1526  }
tmp
align.sh
Definition: createJobs.py:716
bool setDeferredException(std::exception_ptr)
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1870 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), alignCSCRings::e, edm::WaitingTaskHolder::group(), edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1870  {
1871  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1873  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1874  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1875  auto status = streamLumiStatus_[iStreamIndex].get();
1876  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1877  CMS_SA_ALLOW try {
1878  if (readNextEventForStream(iStreamIndex, *status)) {
1879  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1880  if (iPtr) {
1881  // Try to end the stream properly even if an exception was
1882  // thrown on an event.
1883  bool expected = false;
1884  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1885  // This is the case where the exception in iPtr is the primary
1886  // exception and we want to see its message.
1887  deferredExceptionPtr_ = *iPtr;
1888  WaitingTaskHolder tempHolder(iTask);
1889  tempHolder.doneWaiting(*iPtr);
1890  }
1891  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1892  //the stream will stop now
1893  return;
1894  }
1895  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1896  });
1897 
1898  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1899  } else {
1900  //the stream will stop now
1901  if (status->isLumiEnding()) {
1902  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1903  status->startNextLumi();
1904  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1905  }
1906  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1907  } else {
1908  iTask.doneWaiting(std::exception_ptr{});
1909  }
1910  }
1911  } catch (...) {
1912  // It is unlikely we will ever get in here ...
1913  // But if we do try to clean up and propagate the exception
1914  if (streamLumiStatus_[iStreamIndex]) {
1915  streamEndLumiAsync(iTask, iStreamIndex);
1916  }
1917  bool expected = false;
1918  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1919  auto e = std::current_exception();
1921  iTask.doneWaiting(e);
1922  }
1923  }
1924  });
1925  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
list status
Definition: mps_update.py:107
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastTransitionType() const
def move
Definition: eostools.py:511
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
SerialTaskQueueChain & serialQueueChain() const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 343 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), cms::cuda::assert(), branchesToDeleteEarly_, edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, edm::dumpOptionsToLogFile(), edm::ensureAvailableAccelerators(), esp_, espController_, Exception, FDEBUG, dtDQMClient_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::get_underlying_safe(), historyAppender_, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

345  {
346  //std::cerr << processDesc->dump() << std::endl;
347 
348  // register the empty parentage vector , once and for all
350 
351  // register the empty parameter set, once and for all.
352  ParameterSet().registerIt();
353 
354  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
355 
356  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
357  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
358  bool const hasSubProcesses = !subProcessVParameterSet.empty();
359 
360  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
361  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
362  // set in here if the parameters were not explicitly set.
363  validateTopLevelParameterSets(parameterSet.get());
364 
365  // Now set some parameters specific to the main process.
366  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
367  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
368  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
369  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
370  << fileMode << ".\n"
371  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
372  } else {
373  fileModeNoMerge_ = (fileMode == "NOMERGE");
374  }
375  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
376  ensureAvailableAccelerators(*parameterSet);
377 
378  //threading
379  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
380 
381  // Even if numberOfThreads was set to zero in the Python configuration, the code
382  // in cmsRun.cpp should have reset it to something else.
383  assert(nThreads != 0);
384 
385  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
386  if (nStreams == 0) {
387  nStreams = nThreads;
388  }
389  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
390  if (nConcurrentRuns != 1) {
391  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
392  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
393  }
394  unsigned int nConcurrentLumis =
395  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
396  if (nConcurrentLumis == 0) {
397  nConcurrentLumis = 2;
398  }
399  if (nConcurrentLumis > nStreams) {
400  nConcurrentLumis = nStreams;
401  }
402  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
403  if (!loopers.empty()) {
404  //For now loopers make us run only 1 transition at a time
405  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
406  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
407  "of concurrent runs, and the number of concurrent lumis "
408  "are all being reset to 1. Loopers cannot currently support "
409  "values greater than 1.";
410  nStreams = 1;
411  nConcurrentLumis = 1;
412  nConcurrentRuns = 1;
413  }
414  }
415  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
416  if (dumpOptions) {
417  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
418  } else {
419  if (nThreads > 1 or nStreams > 1) {
420  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
421  }
422  }
423  // The number of concurrent IOVs is configured individually for each record in
424  // the class NumberOfConcurrentIOVs to values less than or equal to this.
425  unsigned int maxConcurrentIOVs = nConcurrentLumis;
426 
427  //Check that relationships between threading parameters makes sense
428  /*
429  if(nThreads<nStreams) {
430  //bad
431  }
432  if(nConcurrentRuns>nStreams) {
433  //bad
434  }
435  if(nConcurrentRuns>nConcurrentLumis) {
436  //bad
437  }
438  */
439  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
440 
441  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
443  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
444  //for now, if have a subProcess, don't allow early delete
445  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
446  if (not hasSubProcesses) {
447  branchesToDeleteEarly_ = optionsPset.getUntrackedParameter<std::vector<std::string>>("canDeleteEarly");
448  }
449 
450  // Now do general initialization
451  ScheduleItems items;
452 
453  //initialize the services
454  auto& serviceSets = processDesc->getServicesPSets();
455  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
456  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
457 
458  //make the services available
460 
461  if (nStreams > 1) {
463  handler->willBeUsingThreads();
464  }
465 
466  // intialize miscellaneous items
467  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
468 
469  // intialize the event setup provider
470  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
471  esp_ = espController_->makeProvider(
472  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
473 
474  // initialize the looper, if any
475  if (!loopers.empty()) {
476  looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
477  looper_->setActionTable(items.act_table_.get());
478  looper_->attachTo(*items.actReg_);
479 
480  // in presence of looper do not delete modules
481  deleteNonConsumedUnscheduledModules_ = false;
482  }
483 
484  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
485 
486  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
487  streamQueues_.resize(nStreams);
488  streamLumiStatus_.resize(nStreams);
489 
490  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
491 
492  // initialize the input source
493  input_ = makeInput(*parameterSet,
494  *common,
495  items.preg(),
496  items.branchIDListHelper(),
498  items.thinnedAssociationsHelper(),
499  items.actReg_,
500  items.processConfiguration(),
502 
503  // initialize the Schedule
504  schedule_ =
505  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
506 
507  // set the data members
508  act_table_ = std::move(items.act_table_);
509  actReg_ = items.actReg_;
510  preg_ = items.preg();
512  branchIDListHelper_ = items.branchIDListHelper();
513  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
514  processConfiguration_ = items.processConfiguration();
516  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
517 
518  FDEBUG(2) << parameterSet << std::endl;
519 
521  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
522  // Reusable event principal
523  auto ep = std::make_shared<EventPrincipal>(preg(),
527  historyAppender_.get(),
528  index,
529  true /*primary process*/,
532  }
533 
534  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
535  auto lp =
536  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
538  }
539 
540  {
541  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
543 
544  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
546  }
547 
548  // fill the subprocesses, if there are any
549  subProcesses_.reserve(subProcessVParameterSet.size());
550  for (auto& subProcessPSet : subProcessVParameterSet) {
551  subProcesses_.emplace_back(subProcessPSet,
552  *parameterSet,
553  preg(),
557  SubProcessParentageHelper(),
559  *actReg_,
560  token,
563  &processContext_);
564  }
565  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void ensureAvailableAccelerators(edm::ParameterSet const &parameterSet)
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:511
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Log< level::Info, false > LogInfo
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
bool insertMapped(value_type const &v)
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1057 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1057  {
1058  input_->fillProcessBlockHelper();
1059  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1060  while (input_->nextProcessBlock(processBlockPrincipal)) {
1061  readProcessBlock(processBlockPrincipal);
1062 
1063  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1064  FinalWaitingTask globalWaitTask;
1065 
1066  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1067  beginGlobalTransitionAsync<Traits>(
1068  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1069 
1070  do {
1071  taskGroup_.wait();
1072  } while (not globalWaitTask.done());
1073  if (globalWaitTask.exceptionPtr() != nullptr) {
1074  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1075  }
1076 
1077  FinalWaitingTask writeWaitTask;
1079  do {
1080  taskGroup_.wait();
1081  } while (not writeWaitTask.done());
1082  if (writeWaitTask.exceptionPtr()) {
1083  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1084  }
1085 
1086  processBlockPrincipal.clearPrincipal();
1087  for (auto& s : subProcesses_) {
1088  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1089  }
1090  }
1091  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
oneapi::tbb::task_group taskGroup_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_
InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 186 of file EventProcessor.h.

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

186  {
188  return InputSource::IsStop;
189  }
190  return lastSourceTransition_;
191  }
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 297 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

297 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 298 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

298 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 847 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

847 { return input_->luminosityBlock(); }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 843 of file EventProcessor.cc.

References input_.

843  {
844  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
845  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 817 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

817  {
818  if (deferredExceptionPtrIsSet_.load()) {
820  return InputSource::IsStop;
821  }
822 
823  SendSourceTerminationSignalIfException sentry(actReg_.get());
824  InputSource::ItemType itemType;
825  //For now, do nothing with InputSource::IsSynchronize
826  do {
827  itemType = input_->nextItemType();
828  } while (itemType == InputSource::IsSynchronize);
829 
830  lastSourceTransition_ = itemType;
831  sentry.completedSuccessfully();
832 
834 
835  if (checkForAsyncStopRequest(returnCode)) {
836  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
838  }
839 
840  return lastSourceTransition_;
841  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 945 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

945  {
946  if (fileBlockValid()) {
947  schedule_->openOutputFiles(*fb_);
948  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
949  }
950  FDEBUG(1) << "\topenOutputFiles\n";
951  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

285 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 286 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

286 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 1009 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

1009  {
1010  looper_->prepareForNextLoop(esp_.get());
1011  FDEBUG(1) << "\tprepareForNextLoop\n";
1012  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 140 of file EventProcessor.h.

References processConfiguration_.

140 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1941 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1941  {
1942  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1943  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1945 of file EventProcessor.cc.

References esp_, makeMEIFBenchmarkPlots::ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

1945  {
1946  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1947 
1949  Service<RandomNumberGenerator> rng;
1950  if (rng.isAvailable()) {
1951  Event ev(*pep, ModuleDescription(), nullptr);
1952  rng->postEventRead(ev);
1953  }
1954 
1955  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1956  using namespace edm::waiting_task::chain;
1957  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1958  EventTransitionInfo info(*pep, es);
1959  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1960  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1961  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1962  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1963  }
1964  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1965  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1966  ServiceRegistry::Operate operateLooper(serviceToken_);
1967  processEventWithLooper(*pep, iStreamIndex);
1968  }) | then([pep](auto nextTask) {
1969  FDEBUG(1) << "\tprocessEvent\n";
1970  pep->clearEventPrincipal();
1971  }) | runLast(iHolder);
1972  }
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
PrincipalCache principalCache_
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1974 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1974  {
1975  bool randomAccess = input_->randomAccess();
1976  ProcessingController::ForwardState forwardState = input_->forwardState();
1977  ProcessingController::ReverseState reverseState = input_->reverseState();
1978  ProcessingController pc(forwardState, reverseState, randomAccess);
1979 
1981  do {
1982  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1983  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1984  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1985 
1986  bool succeeded = true;
1987  if (randomAccess) {
1988  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1989  input_->skipEvents(-2);
1990  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1991  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1992  }
1993  }
1994  pc.setLastOperationSucceeded(succeeded);
1995  } while (!pc.lastOperationSucceeded());
1996  if (status != EDLooperBase::kContinue) {
1997  shouldWeStop_ = true;
1999  }
2000  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
list status
Definition: mps_update.py:107
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1318 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), input_, lastTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamLumiActive_, and taskGroup_.

1318  {
1319  FinalWaitingTask waitTask;
1320  if (streamLumiActive_ > 0) {
1322  // Continue after opening a new input file
1324  } else {
1325  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1326  input_->luminosityBlockAuxiliary()->beginTime()),
1327  iRunResource,
1328  WaitingTaskHolder{taskGroup_, &waitTask});
1329  }
1330  do {
1331  taskGroup_.wait();
1332  } while (not waitTask.done());
1333 
1334  if (waitTask.exceptionPtr() != nullptr) {
1335  std::rethrow_exception(*(waitTask.exceptionPtr()));
1336  }
1337  return lastTransitionType();
1338  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
PreallocationConfiguration preallocations_
InputSource::ItemType lastTransitionType() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
oneapi::tbb::task_group taskGroup_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_
int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1726 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1726  {
1727  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1728  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1729  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1730  input_->processHistoryRegistry().reducedProcessHistoryID(
1731  input_->luminosityBlockAuxiliary()->processHistoryID()));
1732  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1733  assert(lumiOK);
1734  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1735  {
1736  SendSourceTerminationSignalIfException sentry(actReg_.get());
1737  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1738  sentry.completedSuccessfully();
1739  }
1740  return input_->luminosityBlock();
1741  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1695 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1695  {
1696  principalCache_.merge(input_->runAuxiliary(), preg());
1697  auto runPrincipal = principalCache_.runPrincipalPtr();
1698  {
1699  SendSourceTerminationSignalIfException sentry(actReg_.get());
1700  input_->readAndMergeRun(*runPrincipal);
1701  sentry.completedSuccessfully();
1702  }
1703  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1704  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1705  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1927 of file EventProcessor.cc.

References actReg_, edmPickEvents::event, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1927  {
1928  //TODO this will have to become per stream
1929  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1930  StreamContext streamContext(event.streamID(), &processContext_);
1931 
1932  SendSourceTerminationSignalIfException sentry(actReg_.get());
1933  input_->readEvent(event, streamContext);
1934 
1935  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1936  sentry.completedSuccessfully();
1937 
1938  FDEBUG(1) << "\treadEvent\n";
1939  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 918 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, edm::PrincipalCache::preReadFile(), principalCache_, and findQualityFiles::size.

918  {
919  FDEBUG(1) << " \treadFile\n";
920  size_t size = preg_->size();
921  SendSourceTerminationSignalIfException sentry(actReg_.get());
922 
924 
925  fb_ = input_->readFile();
926  if (size < preg_->size()) {
928  }
931  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
932  }
933  sentry.completedSuccessfully();
934  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1707 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1707  {
1709  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1710  << "Illegal attempt to insert lumi into cache\n"
1711  << "Run is invalid\n"
1712  << "Contact a Framework Developer\n";
1713  }
1715  assert(lbp);
1716  lbp->setAux(*input_->luminosityBlockAuxiliary());
1717  {
1718  SendSourceTerminationSignalIfException sentry(actReg_.get());
1719  input_->readLuminosityBlock(*lbp, *historyAppender_);
1720  sentry.completedSuccessfully();
1721  }
1722  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1723  iStatus.lumiPrincipal() = std::move(lbp);
1724  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
def move
Definition: eostools.py:511
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1808 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1808  {
1809  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1810  iStatus.endLumi();
1811  return false;
1812  }
1813 
1814  if (iStatus.wasEventProcessingStopped()) {
1815  return false;
1816  }
1817 
1818  if (shouldWeStop()) {
1820  iStatus.stopProcessingEvents();
1821  iStatus.endLumi();
1822  return false;
1823  }
1824 
1826  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1827  CMS_SA_ALLOW try {
1828  //need to use lock in addition to the serial task queue because
1829  // of delayed provenance reading and reading data in response to
1830  // edm::Refs etc
1831  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1832 
1833  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1834  if (InputSource::IsLumi == itemType) {
1835  iStatus.haveContinuedLumi();
1836  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1837  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1838  readAndMergeLumi(iStatus);
1839  itemType = nextTransitionType();
1840  }
1841  if (InputSource::IsLumi == itemType) {
1842  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1843  input_->luminosityBlockAuxiliary()->beginTime()));
1844  }
1845  }
1846  if (InputSource::IsEvent != itemType) {
1847  iStatus.stopProcessingEvents();
1848 
1849  //IsFile may continue processing the lumi and
1850  // looper_ can cause the input source to declare a new IsRun which is actually
1851  // just a continuation of the previous run
1852  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1853  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1854  iStatus.endLumi();
1855  }
1856  return false;
1857  }
1858  readEvent(iStreamIndex);
1859  } catch (...) {
1860  bool expected = false;
1861  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1862  deferredExceptionPtr_ = std::current_exception();
1863  iStatus.endLumi();
1864  }
1865  return false;
1866  }
1867  return true;
1868  }
void readEvent(unsigned int iStreamIndex)
#define CMS_SA_ALLOW
InputSource::ItemType nextTransitionType()
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
bool shouldWeStop() const
void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1666 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1666  {
1667  SendSourceTerminationSignalIfException sentry(actReg_.get());
1668  input_->readProcessBlock(processBlockPrincipal);
1669  sentry.completedSuccessfully();
1670  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1672 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

1672  {
1674  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1675  << "Illegal attempt to insert run into cache\n"
1676  << "Contact a Framework Developer\n";
1677  }
1678  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1679  preg(),
1681  historyAppender_.get(),
1682  0,
1683  true,
1685  {
1686  SendSourceTerminationSignalIfException sentry(actReg_.get());
1687  input_->readRun(*rp, *historyAppender_);
1688  sentry.completedSuccessfully();
1689  }
1690  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1691  principalCache_.insert(rp);
1692  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1693  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 970 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

970  {
971  if (fileBlockValid()) {
972  schedule_->respondToCloseInputFile(*fb_);
973  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
974  }
975  FDEBUG(1) << "\trespondToCloseInputFile\n";
976  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 960 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

960  {
961  if (fileBlockValid()) {
963  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
964  schedule_->respondToOpenInputFile(*fb_);
965  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
966  }
967  FDEBUG(1) << "\trespondToOpenInputFile\n";
968  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 1003 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

1003  {
1004  input_->repeat();
1005  input_->rewind();
1006  FDEBUG(1) << "\trewind\n";
1007  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 375 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), Types.LuminosityBlockID::cppID(), endUnfinishedRun(), and writeRunAsync().

375 { return runToCompletion(); }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 849 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), alignCSCRings::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

849  {
852  {
853  beginJob(); //make sure this was called
854 
855  // make the services available
857 
859  try {
860  FilesProcessor fp(fileModeNoMerge_);
861 
862  convertException::wrap([&]() {
863  bool firstTime = true;
864  do {
865  if (not firstTime) {
867  rewindInput();
868  } else {
869  firstTime = false;
870  }
871  startingNewLoop();
872 
873  auto trans = fp.processFiles(*this);
874 
875  fp.normalEnd();
876 
877  if (deferredExceptionPtrIsSet_.load()) {
878  std::rethrow_exception(deferredExceptionPtr_);
879  }
880  if (trans != InputSource::IsStop) {
881  //problem with the source
882  doErrorStuff();
883 
884  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
885  }
886  } while (not endOfLoop());
887  }); // convertException::wrap
888 
889  } // Try block
890  catch (cms::Exception& e) {
892  std::string message(
893  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
894  e.addAdditionalInfo(message);
895  if (e.alreadyPrinted()) {
896  LogAbsolute("Additional Exceptions") << message;
897  }
898  }
899  if (!exceptionMessageRuns_.empty()) {
901  if (e.alreadyPrinted()) {
902  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
903  }
904  }
905  if (!exceptionMessageFiles_.empty()) {
907  if (e.alreadyPrinted()) {
908  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
909  }
910  }
911  throw;
912  }
913  }
914 
915  return returnCode;
916  }
std::atomic< bool > exceptionMessageLumis_
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:177
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 2023 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

2023  {
2024  bool expected = false;
2025  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
2026  deferredExceptionPtr_ = iException;
2027  return true;
2028  }
2029  return false;
2030  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 2017 of file EventProcessor.cc.

References exceptionMessageFiles_.

2017 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 2021 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

2021 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 2019 of file EventProcessor.cc.

References exceptionMessageRuns_.

2019 { exceptionMessageRuns_ = message; }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 1014 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1014  {
1015  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1016  if (!subProcesses_.empty()) {
1017  for (auto const& subProcess : subProcesses_) {
1018  if (subProcess.shouldWeCloseOutput()) {
1019  return true;
1020  }
1021  }
1022  return false;
1023  }
1024  return schedule_->shouldWeCloseOutput();
1025  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 2002 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2002  {
2003  FDEBUG(1) << "\tshouldWeStop\n";
2004  if (shouldWeStop_)
2005  return true;
2006  if (!subProcesses_.empty()) {
2007  for (auto const& subProcess : subProcesses_) {
2008  if (subProcess.terminate()) {
2009  return true;
2010  }
2011  }
2012  return false;
2013  }
2014  return schedule_->terminate();
2015  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )

Definition at line 978 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

978  {
979  shouldWeStop_ = false;
980  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
981  // until after we've called beginOfJob
982  if (looper_ && looperBeginJobRun_) {
983  looper_->doStartingNewLoop();
984  }
985  FDEBUG(1) << "\tstartingNewLoop\n";
986  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1601 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and submitPVValidationJobs::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

1601  {
1602  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1603  if (iPtr) {
1604  handleEndLumiExceptions(iPtr, iTask);
1605  }
1606  auto status = streamLumiStatus_[iStreamIndex];
1607  //reset status before releasing queue else get race condtion
1608  streamLumiStatus_[iStreamIndex].reset();
1610  streamQueues_[iStreamIndex].resume();
1611 
1612  //are we the last one?
1613  if (status->streamFinishedLumi()) {
1615  }
1616  });
1617 
1618  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1619 
1620  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1621  // therefore we do not want to hold the shared_ptr
1622  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1623  lumiStatus->setEndTime();
1624 
1625  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1626 
1627  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1628  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1629 
1630  if (lumiStatus->didGlobalBeginSucceed()) {
1631  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1632  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1633  lumiPrincipal.endTime());
1634  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1635  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1636  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1637  *schedule_,
1638  iStreamIndex,
1639  transitionInfo,
1640  serviceToken_,
1641  subProcesses_,
1642  cleaningUpAfterException);
1643  }
1644  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
list status
Definition: mps_update.py:107
std::vector< SubProcess > subProcesses_
oneapi::tbb::task_group * group() const noexcept
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
def move
Definition: eostools.py:511
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::taskCleanup ( )

Definition at line 585 of file EventProcessor.cc.

References cms::cuda::assert(), edm::FinalWaitingTask::done(), espController_, and taskGroup_.

585  {
587  espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
588  taskGroup_.wait();
589  assert(task.done());
590  }
assert(be >=bs)
bool done() const
Definition: WaitingTask.h:82
oneapi::tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

291  {
293  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 294 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

294  {
296  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2032 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2032  {
2033  cms::Exception ex("ModulesSynchingOnLumis");
2034  ex << "The framework is configured to use at least two streams, but the following modules\n"
2035  << "require synchronizing on LuminosityBlock boundaries:";
2036  bool found = false;
2037  for (auto worker : schedule_->allWorkers()) {
2038  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2039  found = true;
2040  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2041  }
2042  }
2043  if (found) {
2044  ex << "\n\nThe situation can be fixed by either\n"
2045  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2046  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2047  throw ex;
2048  }
2049  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 794 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

794 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 798 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

798 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 796 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

796 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2051 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2051  {
2052  std::unique_ptr<LogSystem> s;
2053  for (auto worker : schedule_->allWorkers()) {
2054  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2055  if (not s) {
2056  s = std::make_unique<LogSystem>("LegacyModules");
2057  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2058  "is going to end soon. These modules need to be converted to have type\n"
2059  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2060  }
2061  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2062  }
2063  }
2064  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1783 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, edm::LuminosityBlockPrincipal::willBeContinued(), and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

1783  {
1784  using namespace edm::waiting_task;
1785  if (not lumiPrincipal.willBeContinued()) {
1786  chain::first([&](auto nextTask) {
1788 
1789  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1790  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1791  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1793  for (auto& s : subProcesses_) {
1794  s.writeLumiAsync(nextTask, lumiPrincipal);
1795  }
1796  }) | chain::lastTask(std::move(task));
1797  }
1798  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 1743 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endProcessBlock(), and inputProcessBlocks().

1743  {
1744  using namespace edm::waiting_task;
1745  chain::first([&](auto nextTask) {
1747  schedule_->writeProcessBlockAsync(
1748  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1749  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1751  for (auto& s : subProcesses_) {
1752  s.writeProcessBlockAsync(nextTask, processBlockType);
1753  }
1754  }) | chain::runLast(std::move(task));
1755  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1757 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, processContext_, run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endUnfinishedRun().

1760  {
1761  using namespace edm::waiting_task;
1762  chain::first([&](auto nextTask) {
1764  schedule_->writeRunAsync(nextTask,
1766  &processContext_,
1767  actReg_.get(),
1768  mergeableRunProductMetadata);
1769  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1771  for (auto& s : subProcesses_) {
1772  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1773  }
1774  }) | chain::runLast(std::move(task));
1775  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 323 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 361 of file EventProcessor.h.

Referenced by runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 362 of file EventProcessor.h.

Referenced by runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 349 of file EventProcessor.h.

Referenced by beginJob().

std::vector<std::string> edm::EventProcessor::branchesToDeleteEarly_
private

Definition at line 334 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 314 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 370 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private
ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 367 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 352 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 354 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 353 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 351 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 363 of file EventProcessor.h.

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 357 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 355 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 337 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 356 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 327 of file EventProcessor.h.

Referenced by init(), and readRun().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 313 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 369 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 315 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 324 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), processConfiguration(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 350 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 347 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 346 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private
std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
oneapi::tbb::task_group edm::EventProcessor::taskGroup_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().