CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Public Types

using ProcessBlockType = PrincipalCache::ProcessBlockType
 
enum  StatusCode {
  epSuccess = 0, epException = 1, epOther = 2, epSignal = 3,
  epInputComplete = 4, epTimedOut = 5, epCountComplete = 6
}
 

Public Member Functions

void beginJob ()
 
void beginLumiAsync (edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
 
void beginProcessBlock (bool &beginProcessBlockSucceeded)
 
void beginRun (ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
void closeInputFile (bool cleaningUpAfterException)
 
void closeOutputFiles ()
 
void continueLumiAsync (edm::WaitingTaskHolder iHolder)
 
void deleteLumiFromCache (LuminosityBlockProcessingStatus &)
 
void deleteRunFromCache (ProcessHistoryID const &phid, RunNumber_t run)
 
void doErrorStuff ()
 
void endJob ()
 
bool endOfLoop ()
 
void endProcessBlock (bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
 
void endRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
 
void endUnfinishedLumi ()
 
void endUnfinishedRun (ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::unique_ptr< ParameterSet > parameterSet, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (EventProcessor const &)=delete
 
bool fileBlockValid ()
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void globalEndLumiAsync (edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
 
void handleEndLumiExceptions (std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
 
void inputProcessBlocks ()
 
InputSource::ItemType lastTransitionType () const
 
edm::LuminosityBlockNumber_t nextLuminosityBlockID ()
 
std::pair
< edm::ProcessHistoryID,
edm::RunNumber_t
nextRunID ()
 
InputSource::ItemType nextTransitionType ()
 
void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
InputSource::ItemType processLumis (std::shared_ptr< void > const &iRunResource)
 
int readAndMergeLumi (LuminosityBlockProcessingStatus &)
 
std::pair< ProcessHistoryID,
RunNumber_t
readAndMergeRun ()
 
void readFile ()
 
void readLuminosityBlock (LuminosityBlockProcessingStatus &)
 
void readProcessBlock (ProcessBlockPrincipal &)
 
std::pair< ProcessHistoryID,
RunNumber_t
readRun ()
 
void respondToCloseInputFile ()
 
void respondToOpenInputFile ()
 
void rewindInput ()
 
StatusCode run ()
 
StatusCode runToCompletion ()
 
bool setDeferredException (std::exception_ptr)
 
void setExceptionMessageFiles (std::string &message)
 
void setExceptionMessageLumis ()
 
void setExceptionMessageRuns (std::string &message)
 
bool shouldWeCloseOutput () const
 
bool shouldWeStop () const
 
void startingNewLoop ()
 
void streamEndLumiAsync (edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void taskCleanup ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void writeLumiAsync (WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
 
void writeProcessBlockAsync (WaitingTaskHolder, ProcessBlockType)
 
void writeRunAsync (WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
 
 ~EventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
void handleNextEventForStreamAsync (WaitingTaskHolder iTask, unsigned int iStreamIndex)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase
const > 
looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
std::shared_ptr
< ProductRegistry const > 
preg () const
 
std::shared_ptr
< ProductRegistry > & 
preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventAsyncImpl (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &, unsigned int iStreamIndex)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
void throwAboutModulesRequiringLuminosityBlockSynchronization () const
 
void warnAboutLegacyModules () const
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const
< std::shared_ptr
< BranchIDListHelper > > 
branchIDListHelper_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
bool deleteNonConsumedUnscheduledModules_ = true
 
edm::propagate_const
< std::shared_ptr
< eventsetup::EventSetupProvider > > 
esp_
 
edm::propagate_const
< std::unique_ptr
< eventsetup::EventSetupsController > > 
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::atomic< bool > exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const
< std::shared_ptr< FileBlock > > 
fb_
 
bool fileModeNoMerge_
 
bool firstEventInBlock_ = true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const
< std::unique_ptr
< HistoryAppender > > 
historyAppender_
 
edm::propagate_const
< std::unique_ptr< InputSource > > 
input_
 
InputSource::ItemType lastSourceTransition_
 
edm::propagate_const
< std::shared_ptr
< EDLooperBase > > 
looper_
 
bool looperBeginJobRun_
 
std::unique_ptr
< edm::LimitedTaskQueue
lumiQueue_
 
MergeableRunProductProcesses mergeableRunProductProcesses_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const
< std::shared_ptr
< ProductRegistry > > 
preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
edm::propagate_const
< std::shared_ptr
< ProcessBlockHelper > > 
processBlockHelper_
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
 
edm::propagate_const
< std::unique_ptr< Schedule > > 
schedule_
 
ServiceToken serviceToken_
 
bool shouldWeStop_
 
std::shared_ptr
< std::recursive_mutex > 
sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
std::atomic< unsigned int > streamLumiActive_ {0}
 
std::vector< std::shared_ptr
< LuminosityBlockProcessingStatus > > 
streamLumiStatus_
 
std::vector< edm::SerialTaskQueuestreamQueues_
 
std::vector< SubProcesssubProcesses_
 
tbb::task_group taskGroup_
 
edm::propagate_const
< std::shared_ptr
< ThinnedAssociationsHelper > > 
thinnedAssociationsHelper_
 

Detailed Description

Definition at line 66 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 363 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 364 of file EventProcessor.h.

Definition at line 243 of file EventProcessor.h.

Member Enumeration Documentation

Enumerator
epSuccess 
epException 
epOther 
epSignal 
epInputComplete 
epTimedOut 
epCountComplete 

Definition at line 76 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 232 of file EventProcessor.cc.

References init(), and eostools::move().

237  : actReg_(),
238  preg_(),
240  serviceToken_(),
241  input_(),
242  espController_(new eventsetup::EventSetupsController),
243  esp_(),
244  act_table_(),
246  schedule_(),
247  subProcesses_(),
248  historyAppender_(new HistoryAppender),
249  fb_(),
250  looper_(),
252  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
253  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
254  principalCache_(),
255  beginJobCalled_(false),
256  shouldWeStop_(false),
257  fileModeNoMerge_(false),
260  exceptionMessageLumis_(false),
261  forceLooperToEnd_(false),
262  looperBeginJobRun_(false),
265  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
266  processDesc->addServices(defaultServices, forcedServices);
267  init(processDesc, iToken, iLegacy);
268  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::unique_ptr< ParameterSet parameterSet,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 270 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and eostools::move().

273  : actReg_(),
274  preg_(),
276  serviceToken_(),
277  input_(),
278  espController_(new eventsetup::EventSetupsController),
279  esp_(),
280  act_table_(),
282  schedule_(),
283  subProcesses_(),
284  historyAppender_(new HistoryAppender),
285  fb_(),
286  looper_(),
288  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
289  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
290  principalCache_(),
291  beginJobCalled_(false),
292  shouldWeStop_(false),
293  fileModeNoMerge_(false),
296  exceptionMessageLumis_(false),
297  forceLooperToEnd_(false),
298  looperBeginJobRun_(false),
302  auto processDesc = std::make_shared<ProcessDesc>(std::move(parameterSet));
303  processDesc->addServices(defaultServices, forcedServices);
305  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 307 of file EventProcessor.cc.

References init().

310  : actReg_(),
311  preg_(),
313  serviceToken_(),
314  input_(),
315  espController_(new eventsetup::EventSetupsController),
316  esp_(),
317  act_table_(),
319  schedule_(),
320  subProcesses_(),
321  historyAppender_(new HistoryAppender),
322  fb_(),
323  looper_(),
325  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
326  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
327  principalCache_(),
328  beginJobCalled_(false),
329  shouldWeStop_(false),
330  fileModeNoMerge_(false),
333  exceptionMessageLumis_(false),
334  forceLooperToEnd_(false),
335  looperBeginJobRun_(false),
339  init(processDesc, token, legacy);
340  }
std::atomic< bool > exceptionMessageLumis_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 560 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

560  {
561  // Make the services available while everything is being deleted.
563  ServiceRegistry::Operate op(token);
564 
565  // manually destroy all these thing that may need the services around
566  // propagate_const<T> has no reset() function
567  espController_ = nullptr;
568  esp_ = nullptr;
569  schedule_ = nullptr;
570  input_ = nullptr;
571  looper_ = nullptr;
572  actReg_ = nullptr;
573 
576  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 585 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, c, edm::checkForModuleDependencyCorrectness(), deleteNonConsumedUnscheduledModules_, getPayloadData::description, esp_, espController_, edm::for_all(), mps_fire::i, edm::InEvent, edm::PathsAndConsumesOfModules::initialize(), edm::InLumi, edm::InProcess, input_, edm::InRun, cmsLHEtoEOSManager::l, looper_, MatrixUtil::merge(), eostools::move(), edm::nonConsumedUnscheduledModules(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::PathsAndConsumesOfModules::removeModules(), schedule_, serviceToken_, subProcesses_, std::swap(), throwAboutModulesRequiringLuminosityBlockSynchronization(), createJobs::tmp, warnAboutLegacyModules(), and edm::convertException::wrap().

Referenced by runToCompletion().

585  {
586  if (beginJobCalled_)
587  return;
588  beginJobCalled_ = true;
589  bk::beginJob();
590 
591  // StateSentry toerror(this); // should we add this ?
592  //make the services available
594 
595  service::SystemBounds bounds(preallocations_.numberOfStreams(),
599  actReg_->preallocateSignal_(bounds);
600  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
602 
603  std::vector<ModuleProcessName> consumedBySubProcesses;
605  [&consumedBySubProcesses, deleteModules = deleteNonConsumedUnscheduledModules_](auto& subProcess) {
606  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
607  if (consumedBySubProcesses.empty()) {
608  consumedBySubProcesses = std::move(c);
609  } else if (not c.empty()) {
610  std::vector<ModuleProcessName> tmp;
611  tmp.reserve(consumedBySubProcesses.size() + c.size());
612  std::merge(consumedBySubProcesses.begin(),
613  consumedBySubProcesses.end(),
614  c.begin(),
615  c.end(),
616  std::back_inserter(tmp));
617  std::swap(consumedBySubProcesses, tmp);
618  }
619  });
620 
621  // Note: all these may throw
624  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedBySubProcesses);
625  not unusedModules.empty()) {
627 
628  edm::LogInfo("DeleteModules").log([&unusedModules](auto& l) {
629  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
630  "and "
631  "therefore they are deleted before beginJob transition.";
632  for (auto const& description : unusedModules) {
633  l << "\n " << description->moduleLabel();
634  }
635  });
636  for (auto const& description : unusedModules) {
637  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
638  }
639  }
640  }
641 
642  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
643 
646  }
648 
649  //NOTE: This implementation assumes 'Job' means one call
650  // the EventProcessor::run
651  // If it really means once per 'application' then this code will
652  // have to be changed.
653  // Also have to deal with case where have 'run' then new Module
654  // added and do 'run'
655  // again. In that case the newly added Module needs its 'beginJob'
656  // to be called.
657 
658  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
659  // For now we delay calling beginOfJob until first beginOfRun
660  //if(looper_) {
661  // looper_->beginOfJob(es);
662  //}
663  try {
664  convertException::wrap([&]() { input_->doBeginJob(); });
665  } catch (cms::Exception& ex) {
666  ex.addContext("Calling beginJob for the source");
667  throw;
668  }
669  espController_->finishConfiguration();
670  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
671  if (looper_) {
672  constexpr bool mustPrefetchMayGet = true;
673  auto const processBlockLookup = preg_->productLookup(InProcess);
674  auto const runLookup = preg_->productLookup(InRun);
675  auto const lumiLookup = preg_->productLookup(InLumi);
676  auto const eventLookup = preg_->productLookup(InEvent);
677  looper_->updateLookup(InProcess, *processBlockLookup, mustPrefetchMayGet);
678  looper_->updateLookup(InRun, *runLookup, mustPrefetchMayGet);
679  looper_->updateLookup(InLumi, *lumiLookup, mustPrefetchMayGet);
680  looper_->updateLookup(InEvent, *eventLookup, mustPrefetchMayGet);
681  looper_->updateLookup(esp_->recordsToProxyIndices());
682  }
683  // toerror.succeeded(); // should we add this?
684  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
685  actReg_->postBeginJobSignal_();
686 
687  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
688  schedule_->beginStream(i);
689  for_all(subProcesses_, [i](auto& subProcess) { subProcess.doBeginStream(i); });
690  }
691  }
ProcessContext processContext_
const edm::EventSetup & c
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:14
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Log< level::Info, false > LogInfo
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void removeModules(std::vector< ModuleDescription const * > const &modules)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
tmp
align.sh
Definition: createJobs.py:716
bool deleteNonConsumedUnscheduledModules_
void edm::EventProcessor::beginLumiAsync ( edm::IOVSyncValue const &  iSyncValue,
std::shared_ptr< void > const &  iRunResource,
edm::WaitingTaskHolder  iHolder 
)

Definition at line 1312 of file EventProcessor.cc.

References actReg_, edm::BeginLuminosityBlock, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), esp_, espController_, edm::PrincipalCache::eventPrincipal(), first, edm::propagate_const< T >::get(), watchdog::group, handleNextEventForStreamAsync(), mps_fire::i, edm::waiting_task::chain::ifThen(), input_, edm::Service< T >::isAvailable(), looper_, lumiQueue_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::SerialTaskQueueChain::push(), edm::SerialTaskQueue::push(), queueWhichWaitsForIOVsToFinish_, readLuminosityBlock(), edm::waiting_task::chain::runLast(), schedule_, edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, edm::WaitingTaskHolder::taskHasFailed(), edm::waiting_task::chain::then(), and createJobs::tmp.

Referenced by handleNextEventForStreamAsync(), and processLumis().

1314  {
1315  if (iHolder.taskHasFailed()) {
1316  return;
1317  }
1318 
1319  // We must be careful with the status object here and in code this function calls. IF we want
1320  // endRun to be called, then we must call resetResources before the things waiting on
1321  // iHolder are allowed to proceed. Otherwise, there will be race condition (possibly causing
1322  // endRun to be called much later than it should be, because status is holding iRunResource).
1323  // Note that this must be done explicitly. Relying on the destructor does not work well
1324  // because the LimitedTaskQueue for the lumiWork holds the shared_ptr in each of its internal
1325  // queues, plus it is difficult to guarantee the destructor is called before iHolder gets
1326  // destroyed inside this function and lumiWork.
1327  auto status =
1328  std::make_shared<LuminosityBlockProcessingStatus>(this, preallocations_.numberOfStreams(), iRunResource);
1329  chain::first([&](auto nextTask) {
1330  auto asyncEventSetup = [](ActivityRegistry* actReg,
1331  auto* espController,
1332  auto& queue,
1333  WaitingTaskHolder task,
1334  auto& status,
1335  IOVSyncValue const& iSync) {
1336  queue.pause();
1337  CMS_SA_ALLOW try {
1338  SendSourceTerminationSignalIfException sentry(actReg);
1339  // Pass in iSync to let the EventSetup system know which run and lumi
1340  // need to be processed and prepare IOVs for it.
1341  // Pass in the endIOVWaitingTasks so the lumi can notify them when the
1342  // lumi is done and no longer needs its EventSetup IOVs.
1343  espController->eventSetupForInstanceAsync(
1344  iSync, task, status->endIOVWaitingTasks(), status->eventSetupImpls());
1345  sentry.completedSuccessfully();
1346  } catch (...) {
1347  task.doneWaiting(std::current_exception());
1348  }
1349  };
1350  if (espController_->doWeNeedToWaitForIOVsToFinish(iSync)) {
1351  // We only get here inside this block if there is an EventSetup
1352  // module not able to handle concurrent IOVs (usually an ESSource)
1353  // and the new sync value is outside the current IOV of that module.
1354  auto group = nextTask.group();
1356  *group, [this, task = std::move(nextTask), iSync, status, asyncEventSetup]() mutable {
1357  asyncEventSetup(
1359  });
1360  } else {
1361  asyncEventSetup(
1363  }
1364  }) | chain::then([this, status](std::exception_ptr const* iPtr, auto nextTask) {
1365  //the call to doneWaiting will cause the count to decrement
1366  auto copyTask = nextTask;
1367  if (iPtr) {
1368  nextTask.doneWaiting(*iPtr);
1369  }
1370  auto group = copyTask.group();
1371  lumiQueue_->pushAndPause(
1372  *group, [this, task = std::move(copyTask), status](edm::LimitedTaskQueue::Resumer iResumer) mutable {
1373  if (task.taskHasFailed()) {
1374  status->resetResources();
1375  return;
1376  }
1377 
1378  status->setResumer(std::move(iResumer));
1379 
1380  auto group = task.group();
1382  *group, [this, postQueueTask = std::move(task), status = std::move(status)]() mutable {
1383  //make the services available
1385  // Caught exception is propagated via WaitingTaskHolder
1386  CMS_SA_ALLOW try {
1387  readLuminosityBlock(*status);
1388 
1389  LuminosityBlockPrincipal& lumiPrincipal = *status->lumiPrincipal();
1390  {
1391  SendSourceTerminationSignalIfException sentry(actReg_.get());
1392 
1393  input_->doBeginLumi(lumiPrincipal, &processContext_);
1394  sentry.completedSuccessfully();
1395  }
1396 
1397  Service<RandomNumberGenerator> rng;
1398  if (rng.isAvailable()) {
1399  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr, false);
1400  rng->preBeginLumi(lb);
1401  }
1402 
1403  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1404 
1405  using namespace edm::waiting_task::chain;
1406  chain::first([this, status, &lumiPrincipal](auto nextTask) {
1407  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1408  {
1409  LumiTransitionInfo transitionInfo(lumiPrincipal, es, &status->eventSetupImpls());
1410  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
1411  beginGlobalTransitionAsync<Traits>(
1412  nextTask, *schedule_, transitionInfo, serviceToken_, subProcesses_);
1413  }
1414  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1415  looper_->prefetchAsync(
1416  nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
1417  }) | ifThen(looper_, [this, status, &es](auto nextTask) {
1418  status->globalBeginDidSucceed();
1419  //make the services available
1420  ServiceRegistry::Operate operateLooper(serviceToken_);
1421  looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
1422  }) | then([this, status](std::exception_ptr const* iPtr, auto holder) mutable {
1423  if (iPtr) {
1424  status->resetResources();
1425  holder.doneWaiting(*iPtr);
1426  } else {
1427  if (not looper_) {
1428  status->globalBeginDidSucceed();
1429  }
1430  EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
1431  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
1432 
1433  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1434  streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
1435  streamQueues_[i].pause();
1436 
1437  auto& event = principalCache_.eventPrincipal(i);
1438  //We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
1439  // held by the container as this lambda may not finish executing before all the tasks it
1440  // spawns have already started to run.
1441  auto eventSetupImpls = &status->eventSetupImpls();
1442  auto lp = status->lumiPrincipal().get();
1443  streamLumiStatus_[i] = std::move(status);
1445  event.setLuminosityBlockPrincipal(lp);
1446  LumiTransitionInfo transitionInfo(*lp, es, eventSetupImpls);
1447  using namespace edm::waiting_task::chain;
1448  chain::first([this, i, &transitionInfo](auto nextTask) {
1449  beginStreamTransitionAsync<Traits>(
1450  std::move(nextTask), *schedule_, i, transitionInfo, serviceToken_, subProcesses_);
1451  }) | then([this, i](std::exception_ptr const* exceptionFromBeginStreamLumi, auto nextTask) {
1452  if (exceptionFromBeginStreamLumi) {
1453  WaitingTaskHolder tmp(nextTask);
1454  tmp.doneWaiting(*exceptionFromBeginStreamLumi);
1455  streamEndLumiAsync(nextTask, i);
1456  } else {
1458  }
1459  }) | runLast(holder);
1460  });
1461  }
1462  }
1463  }) | runLast(postQueueTask);
1464 
1465  } catch (...) {
1466  status->resetResources();
1467  postQueueTask.doneWaiting(std::current_exception());
1468  }
1469  }); // task in sourceResourcesAcquirer
1470  });
1471  }) | chain::runLast(std::move(iHolder));
1472  }
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
ProcessContext processContext_
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
list status
Definition: mps_update.py:107
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
std::vector< edm::SerialTaskQueue > streamQueues_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
bool taskHasFailed() const noexcept
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
SerialTaskQueueChain & serialQueueChain() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
tuple group
Definition: watchdog.py:82
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
constexpr element_type const * get() const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
tmp
align.sh
Definition: createJobs.py:716
PrincipalCache principalCache_
void edm::EventProcessor::beginProcessBlock ( bool &  beginProcessBlockSucceeded)

Definition at line 1008 of file EventProcessor.cc.

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::ProcessBlockPrincipal::fillProcessBlockPrincipal(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processConfiguration_, schedule_, serviceToken_, subProcesses_, and taskGroup_.

1008  {
1009  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1010  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
1011 
1012  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
1013  FinalWaitingTask globalWaitTask;
1014 
1015  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1016  beginGlobalTransitionAsync<Traits>(
1017  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1018 
1019  do {
1020  taskGroup_.wait();
1021  } while (not globalWaitTask.done());
1022 
1023  if (globalWaitTask.exceptionPtr() != nullptr) {
1024  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1025  }
1026  beginProcessBlockSucceeded = true;
1027  }
tbb::task_group taskGroup_
std::vector< SubProcess > subProcesses_
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
ServiceToken serviceToken_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool &  globalBeginSucceeded,
bool &  eventSetupForInstanceSucceeded 
)

Definition at line 1102 of file EventProcessor.cc.

References actReg_, edm::BeginRun, edm::RunPrincipal::beginTime(), edm::FinalWaitingTask::done(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, forceESCacheClearOnNewRun_, edm::waiting_task::chain::ifThen(), input_, looper_, looperBeginJobRun_, eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), taskGroup_, and edm::waiting_task::chain::then().

1105  {
1106  globalBeginSucceeded = false;
1107  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1108  {
1109  SendSourceTerminationSignalIfException sentry(actReg_.get());
1110 
1111  input_->doBeginRun(runPrincipal, &processContext_);
1112  sentry.completedSuccessfully();
1113  }
1114 
1115  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
1117  espController_->forceCacheClear();
1118  }
1119  {
1120  SendSourceTerminationSignalIfException sentry(actReg_.get());
1122  eventSetupForInstanceSucceeded = true;
1123  sentry.completedSuccessfully();
1124  }
1125  auto const& es = esp_->eventSetupImpl();
1126  if (looper_ && looperBeginJobRun_ == false) {
1127  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1128 
1129  FinalWaitingTask waitTask;
1130  using namespace edm::waiting_task::chain;
1131  chain::first([this, &es](auto nextTask) {
1132  looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_);
1133  }) | then([this, &es](auto nextTask) mutable {
1134  looper_->beginOfJob(es);
1135  looperBeginJobRun_ = true;
1136  looper_->doStartingNewLoop();
1137  }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask));
1138 
1139  do {
1140  taskGroup_.wait();
1141  } while (not waitTask.done());
1142  if (waitTask.exceptionPtr() != nullptr) {
1143  std::rethrow_exception(*(waitTask.exceptionPtr()));
1144  }
1145  }
1146  {
1147  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
1148  FinalWaitingTask globalWaitTask;
1149 
1150  using namespace edm::waiting_task::chain;
1151  chain::first([&runPrincipal, &es, this](auto waitTask) {
1152  RunTransitionInfo transitionInfo(runPrincipal, es);
1153  beginGlobalTransitionAsync<Traits>(
1154  std::move(waitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1155  }) | then([&globalBeginSucceeded, run](auto waitTask) mutable {
1156  globalBeginSucceeded = true;
1157  FDEBUG(1) << "\tbeginRun " << run << "\n";
1158  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1159  looper_->prefetchAsync(waitTask, serviceToken_, Transition::BeginRun, runPrincipal, es);
1160  }) | ifThen(looper_, [this, &runPrincipal, &es](auto waitTask) {
1161  looper_->doBeginRun(runPrincipal, es, &processContext_);
1162  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1163 
1164  do {
1165  taskGroup_.wait();
1166  } while (not globalWaitTask.done());
1167  if (globalWaitTask.exceptionPtr() != nullptr) {
1168  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1169  }
1170  }
1171  {
1172  //To wait, the ref count has to be 1+#streams
1173  FinalWaitingTask streamLoopWaitTask;
1174 
1175  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
1176 
1177  RunTransitionInfo transitionInfo(runPrincipal, es);
1178  beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1179  *schedule_,
1181  transitionInfo,
1182  serviceToken_,
1183  subProcesses_);
1184  do {
1185  taskGroup_.wait();
1186  } while (not streamLoopWaitTask.done());
1187  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1188  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1189  }
1190  }
1191  FDEBUG(1) << "\tstreamBeginRun " << run << "\n";
1192  if (looper_) {
1193  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1194  }
1195  }
ProcessContext processContext_
tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 287 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

287  {
289  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 290 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 778 of file EventProcessor.cc.

References epSignal, and edm::shutdown_flag.

Referenced by nextTransitionType().

778  {
779  bool returnValue = false;
780 
781  // Look for a shutdown signal
782  if (shutdown_flag.load(std::memory_order_acquire)) {
783  returnValue = true;
785  }
786  return returnValue;
787  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 772 of file EventProcessor.cc.

References schedule_.

772 { schedule_->clearCounters(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)

Definition at line 908 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, fileBlockValid(), and input_.

908  {
909  if (fileBlockValid()) {
910  SendSourceTerminationSignalIfException sentry(actReg_.get());
911  input_->closeFile(fb_.get(), cleaningUpAfterException);
912  sentry.completedSuccessfully();
913  }
914  FDEBUG(1) << "\tcloseInputFile\n";
915  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )

Definition at line 925 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), processBlockHelper_, schedule_, and subProcesses_.

925  {
926  schedule_->closeOutputFiles();
927  for_all(subProcesses_, [](auto& subProcess) { subProcess.closeOutputFiles(); });
928  processBlockHelper_->clearAfterOutputFilesClose();
929  FDEBUG(1) << "\tcloseOutputFiles\n";
930  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void edm::EventProcessor::continueLumiAsync ( edm::WaitingTaskHolder  iHolder)

Definition at line 1474 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), h, handleNextEventForStreamAsync(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, mps_update::status, and streamLumiStatus_.

Referenced by processLumis().

1474  {
1475  {
1476  //all streams are sharing the same status at the moment
1477  auto status = streamLumiStatus_[0]; //read from streamLumiActive_ happened in calling routine
1478  status->needToContinueLumi();
1479  status->startProcessingEvents();
1480  }
1481 
1482  unsigned int streamIndex = 0;
1483  tbb::task_arena arena{tbb::task_arena::attach()};
1484  for (; streamIndex < preallocations_.numberOfStreams() - 1; ++streamIndex) {
1485  arena.enqueue([this, streamIndex, h = iHolder]() { handleNextEventForStreamAsync(h, streamIndex); });
1486  }
1487  iHolder.group()->run(
1488  [this, streamIndex, h = std::move(iHolder)]() { handleNextEventForStreamAsync(h, streamIndex); });
1489  }
list status
Definition: mps_update.py:107
PreallocationConfiguration preallocations_
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
def move
Definition: eostools.py:511
tbb::task_group * group() const noexcept
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
void edm::EventProcessor::deleteLumiFromCache ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1772 of file EventProcessor.cc.

References edm::LuminosityBlockProcessingStatus::lumiPrincipal(), alignCSCRings::s, and subProcesses_.

Referenced by globalEndLumiAsync().

1772  {
1773  for (auto& s : subProcesses_) {
1774  s.deleteLumiFromCache(*iStatus.lumiPrincipal());
1775  }
1776  iStatus.lumiPrincipal()->clearPrincipal();
1777  //FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1778  }
std::vector< SubProcess > subProcesses_
void edm::EventProcessor::deleteRunFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run 
)

Definition at line 1749 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

Referenced by endUnfinishedRun().

1749  {
1750  principalCache_.deleteRun(phid, run);
1751  for_all(subProcesses_, [run, phid](auto& subProcess) { subProcess.deleteRunFromCache(phid, run); });
1752  FDEBUG(1) << "\tdeleteRunFromCache " << run << "\n";
1753  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )

Definition at line 999 of file EventProcessor.cc.

References FDEBUG.

Referenced by runToCompletion().

999  {
1000  FDEBUG(1) << "\tdoErrorStuff\n";
1001  LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n"
1002  << "and went to the error state\n"
1003  << "Will attempt to terminate processing normally\n"
1004  << "(IF using the looper the next loop will be attempted)\n"
1005  << "This likely indicates a bug in an input module or corrupted input or both\n";
1006  }
Log< level::Error, false > LogError
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 693 of file EventProcessor.cc.

References actReg_, c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::first(), watchdog::group, edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, cmsLHEtoEOSManager::l, edm::waiting_task::chain::lastTask(), looper(), looper_, mutex, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by PythonEventProcessor::~PythonEventProcessor().

693  {
694  // Collects exceptions, so we don't throw before all operations are performed.
695  ExceptionCollector c(
696  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
697 
698  //make the services available
700 
701  using namespace edm::waiting_task::chain;
702 
703  edm::FinalWaitingTask waitTask;
704  tbb::task_group group;
705 
706  {
707  //handle endStream transitions
708  edm::WaitingTaskHolder taskHolder(group, &waitTask);
709  std::mutex collectorMutex;
710  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
711  first([this, i, &c, &collectorMutex](auto nextTask) {
712  std::exception_ptr ep;
713  try {
715  this->schedule_->endStream(i);
716  } catch (...) {
717  ep = std::current_exception();
718  }
719  if (ep) {
720  std::lock_guard<std::mutex> l(collectorMutex);
721  c.call([&ep]() { std::rethrow_exception(ep); });
722  }
723  }) | then([this, i, &c, &collectorMutex](auto nextTask) {
724  for (auto& subProcess : subProcesses_) {
725  first([this, i, &c, &collectorMutex, &subProcess](auto nextTask) {
726  std::exception_ptr ep;
727  try {
729  subProcess.doEndStream(i);
730  } catch (...) {
731  ep = std::current_exception();
732  }
733  if (ep) {
734  std::lock_guard<std::mutex> l(collectorMutex);
735  c.call([&ep]() { std::rethrow_exception(ep); });
736  }
737  }) | lastTask(nextTask);
738  }
739  }) | lastTask(taskHolder);
740  }
741  }
742  group.wait();
743 
744  auto actReg = actReg_.get();
745  c.call([actReg]() { actReg->preEndJobSignal_(); });
746  schedule_->endJob(c);
747  for (auto& subProcess : subProcesses_) {
748  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
749  }
750  c.call(std::bind(&InputSource::doEndJob, input_.get()));
751  if (looper_) {
752  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
753  }
754  c.call([actReg]() { actReg->postEndJobSignal_(); });
755  if (c.hasThrown()) {
756  c.rethrow();
757  }
758  }
const edm::EventSetup & c
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:209
static std::mutex mutex
Definition: Proxy.cc:8
PreallocationConfiguration preallocations_
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
tuple group
Definition: watchdog.py:82
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
std::shared_ptr< EDLooperBase const > looper() const
bool edm::EventProcessor::endOfLoop ( )

Definition at line 960 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

Referenced by runToCompletion().

960  {
961  if (looper_) {
962  ModuleChanger changer(schedule_.get(), preg_.get(), esp_->recordsToProxyIndices());
963  looper_->setModuleChanger(&changer);
964  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetupImpl());
965  looper_->setModuleChanger(nullptr);
967  return true;
968  else
969  return false;
970  }
971  FDEBUG(1) << "\tendOfLoop\n";
972  return true;
973  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
list status
Definition: mps_update.py:107
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void edm::EventProcessor::endProcessBlock ( bool  cleaningUpAfterException,
bool  beginProcessBlockSucceeded 
)

Definition at line 1065 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::New, principalCache_, edm::PrincipalCache::processBlockPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1065  {
1066  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
1067 
1068  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1069  FinalWaitingTask globalWaitTask;
1070 
1071  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1072  endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
1073  *schedule_,
1074  transitionInfo,
1075  serviceToken_,
1076  subProcesses_,
1077  cleaningUpAfterException);
1078  do {
1079  taskGroup_.wait();
1080  } while (not globalWaitTask.done());
1081  if (globalWaitTask.exceptionPtr() != nullptr) {
1082  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1083  }
1084 
1085  if (beginProcessBlockSucceeded) {
1086  FinalWaitingTask writeWaitTask;
1088  do {
1089  taskGroup_.wait();
1090  } while (not writeWaitTask.done());
1091  if (writeWaitTask.exceptionPtr()) {
1092  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1093  }
1094  }
1095 
1096  processBlockPrincipal.clearPrincipal();
1097  for (auto& s : subProcesses_) {
1098  s.clearProcessBlockPrincipal(ProcessBlockType::New);
1099  }
1100  }
tbb::task_group taskGroup_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ProcessBlockPrincipal & processBlockPrincipal() const
PrincipalCache principalCache_
void edm::EventProcessor::endRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException 
)

Definition at line 1224 of file EventProcessor.cc.

References actReg_, edm::FinalWaitingTask::done(), edm::EndRun, edm::RunPrincipal::endTime(), esp_, espController_, edm::WaitingTask::exceptionPtr(), FDEBUG, first, edm::waiting_task::chain::ifThen(), input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), eostools::move(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::RunPrincipal::run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), schedule_, serviceToken_, edm::RunPrincipal::setEndTime(), subProcesses_, edm::eventsetup::synchronousEventSetupForInstance(), and taskGroup_.

Referenced by endUnfinishedRun().

1227  {
1228  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1229  runPrincipal.setEndTime(input_->timestamp());
1230 
1231  IOVSyncValue ts(
1232  EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
1233  runPrincipal.endTime());
1234  {
1235  SendSourceTerminationSignalIfException sentry(actReg_.get());
1237  sentry.completedSuccessfully();
1238  }
1239  auto const& es = esp_->eventSetupImpl();
1240  if (globalBeginSucceeded) {
1241  //To wait, the ref count has to be 1+#streams
1242  FinalWaitingTask streamLoopWaitTask;
1243 
1244  using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
1245 
1246  RunTransitionInfo transitionInfo(runPrincipal, es);
1247  endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
1248  *schedule_,
1250  transitionInfo,
1251  serviceToken_,
1252  subProcesses_,
1253  cleaningUpAfterException);
1254  do {
1255  taskGroup_.wait();
1256  } while (not streamLoopWaitTask.done());
1257  if (streamLoopWaitTask.exceptionPtr() != nullptr) {
1258  std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
1259  }
1260  }
1261  FDEBUG(1) << "\tstreamEndRun " << run << "\n";
1262  if (looper_) {
1263  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1264  }
1265  {
1266  FinalWaitingTask globalWaitTask;
1267 
1268  using namespace edm::waiting_task::chain;
1269  chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) {
1270  RunTransitionInfo transitionInfo(runPrincipal, es);
1271  using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
1272  endGlobalTransitionAsync<Traits>(
1273  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1274  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1275  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndRun, runPrincipal, es);
1276  }) | ifThen(looper_, [this, &runPrincipal, &es](auto nextTask) {
1277  looper_->doEndRun(runPrincipal, es, &processContext_);
1278  }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask));
1279 
1280  do {
1281  taskGroup_.wait();
1282  } while (not globalWaitTask.done());
1283  if (globalWaitTask.exceptionPtr() != nullptr) {
1284  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1285  }
1286  }
1287  FDEBUG(1) << "\tendRun " << run << "\n";
1288  }
ProcessContext processContext_
tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:71
ServiceToken serviceToken_
void synchronousEventSetupForInstance(IOVSyncValue const &syncValue, tbb::task_group &iGroup, eventsetup::EventSetupsController &espController)
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::endUnfinishedLumi ( )

Definition at line 1618 of file EventProcessor.cc.

References edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), mps_fire::i, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamEndLumiAsync(), streamLumiActive_, streamLumiStatus_, and taskGroup_.

1618  {
1619  if (streamLumiActive_.load() > 0) {
1620  FinalWaitingTask globalWaitTask;
1621  {
1622  WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask};
1623  for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
1624  if (streamLumiStatus_[i]) {
1625  streamEndLumiAsync(globalTaskHolder, i);
1626  }
1627  }
1628  }
1629  do {
1630  taskGroup_.wait();
1631  } while (not globalWaitTask.done());
1632  if (globalWaitTask.exceptionPtr() != nullptr) {
1633  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1634  }
1635  }
1636  }
tbb::task_group taskGroup_
PreallocationConfiguration preallocations_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::endUnfinishedRun ( ProcessHistoryID const &  phid,
RunNumber_t  run,
bool  globalBeginSucceeded,
bool  cleaningUpAfterException,
bool  eventSetupForInstanceSucceeded 
)

Definition at line 1197 of file EventProcessor.cc.

References deleteRunFromCache(), edm::FinalWaitingTask::done(), endRun(), edm::WaitingTask::exceptionPtr(), edm::RunPrincipal::mergeableRunProductMetadata(), edm::MergeableRunProductMetadata::postWriteRun(), edm::MergeableRunProductMetadata::preWriteRun(), principalCache_, run(), edm::PrincipalCache::runPrincipal(), submitPVValidationJobs::t, taskGroup_, and writeRunAsync().

1201  {
1202  if (eventSetupForInstanceSucceeded) {
1203  //If we skip empty runs, this would be called conditionally
1204  endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);
1205 
1206  if (globalBeginSucceeded) {
1207  FinalWaitingTask t;
1208  RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
1209  MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
1210  mergeableRunProductMetadata->preWriteRun();
1211  writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
1212  do {
1213  taskGroup_.wait();
1214  } while (not t.done());
1215  mergeableRunProductMetadata->postWriteRun();
1216  if (t.exceptionPtr()) {
1217  std::rethrow_exception(*t.exceptionPtr());
1218  }
1219  }
1220  }
1221  deleteRunFromCache(phid, run);
1222  }
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
tbb::task_group taskGroup_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
MergeableRunProductMetadata * mergeableRunProductMetadata()
Definition: RunPrincipal.h:81
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::fileBlockValid ( )
inline

Definition at line 196 of file EventProcessor.h.

References fb_.

Referenced by closeInputFile(), openOutputFiles(), respondToCloseInputFile(), and respondToOpenInputFile().

196 { return fb_.get() != nullptr; }
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 762 of file EventProcessor.cc.

References schedule_.

762  {
763  return schedule_->getAllModuleDescriptions();
764  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 760 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

760 { return serviceToken_; }
ServiceToken serviceToken_
void edm::EventProcessor::globalEndLumiAsync ( edm::WaitingTaskHolder  iTask,
std::shared_ptr< LuminosityBlockProcessingStatus iLumiStatus 
)

Definition at line 1500 of file EventProcessor.cc.

References CMS_SA_ALLOW, deleteLumiFromCache(), edm::EndLuminosityBlock, esp_, first, handleEndLumiExceptions(), edm::waiting_task::chain::ifThen(), looper_, edm::EventID::maxEventNumber(), eostools::move(), processContext_, queueWhichWaitsForIOVsToFinish_, edm::SerialTaskQueue::resume(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, mps_update::status, subProcesses_, edm::waiting_task::chain::then(), and writeLumiAsync().

Referenced by streamEndLumiAsync().

1501  {
1502  // Get some needed info out of the status object before moving
1503  // it into finalTaskForThisLumi.
1504  auto& lp = *(iLumiStatus->lumiPrincipal());
1505  bool didGlobalBeginSucceed = iLumiStatus->didGlobalBeginSucceed();
1506  bool cleaningUpAfterException = iLumiStatus->cleaningUpAfterException();
1507  EventSetupImpl const& es = iLumiStatus->eventSetupImpl(esp_->subProcessIndex());
1508  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = &iLumiStatus->eventSetupImpls();
1509 
1510  using namespace edm::waiting_task::chain;
1511  chain::first([this, &lp, &es, &eventSetupImpls, cleaningUpAfterException](auto nextTask) {
1512  IOVSyncValue ts(EventID(lp.run(), lp.luminosityBlock(), EventID::maxEventNumber()), lp.beginTime());
1513 
1514  LumiTransitionInfo transitionInfo(lp, es, eventSetupImpls);
1515  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
1516  endGlobalTransitionAsync<Traits>(
1517  std::move(nextTask), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
1518  }) | then([this, didGlobalBeginSucceed, &lumiPrincipal = lp](auto nextTask) {
1519  //Only call writeLumi if beginLumi succeeded
1520  if (didGlobalBeginSucceed) {
1521  writeLumiAsync(std::move(nextTask), lumiPrincipal);
1522  }
1523  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1524  looper_->prefetchAsync(std::move(nextTask), serviceToken_, Transition::EndLuminosityBlock, lp, es);
1525  }) | ifThen(looper_, [this, &lp, &es](auto nextTask) {
1526  //any thrown exception auto propagates to nextTask via the chain
1528  looper_->doEndLuminosityBlock(lp, es, &processContext_);
1529  }) | then([status = std::move(iLumiStatus), this](std::exception_ptr const* iPtr, auto nextTask) mutable {
1530  std::exception_ptr ptr;
1531  if (iPtr) {
1532  ptr = *iPtr;
1533  }
1535 
1536  // Try hard to clean up resources so the
1537  // process can terminate in a controlled
1538  // fashion even after exceptions have occurred.
1539  // Caught exception is passed to handleEndLumiExceptions()
1540  CMS_SA_ALLOW try { deleteLumiFromCache(*status); } catch (...) {
1541  if (not ptr) {
1542  ptr = std::current_exception();
1543  }
1544  }
1545  // Caught exception is passed to handleEndLumiExceptions()
1546  CMS_SA_ALLOW try {
1547  status->resumeGlobalLumiQueue();
1549  } catch (...) {
1550  if (not ptr) {
1551  ptr = std::current_exception();
1552  }
1553  }
1554  // Caught exception is passed to handleEndLumiExceptions()
1555  CMS_SA_ALLOW try {
1556  // This call to status.resetResources() must occur before iTask is destroyed.
1557  // Otherwise there will be a data race which could result in endRun
1558  // being delayed until it is too late to successfully call it.
1559  status->resetResources();
1560  status.reset();
1561  } catch (...) {
1562  if (not ptr) {
1563  ptr = std::current_exception();
1564  }
1565  }
1566 
1567  if (ptr) {
1568  handleEndLumiExceptions(&ptr, nextTask);
1569  }
1570  }) | runLast(std::move(iTask));
1571  }
ProcessContext processContext_
#define CMS_SA_ALLOW
list status
Definition: mps_update.py:107
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
ServiceToken serviceToken_
bool resume()
Resumes processing if the queue was paused.
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void edm::EventProcessor::handleEndLumiExceptions ( std::exception_ptr const *  iPtr,
WaitingTaskHolder holder 
)

Definition at line 1491 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), setDeferredException(), setExceptionMessageLumis(), and createJobs::tmp.

Referenced by globalEndLumiAsync(), and streamEndLumiAsync().

1491  {
1492  if (setDeferredException(*iPtr)) {
1493  WaitingTaskHolder tmp(holder);
1494  tmp.doneWaiting(*iPtr);
1495  } else {
1497  }
1498  }
tmp
align.sh
Definition: createJobs.py:716
bool setDeferredException(std::exception_ptr)
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)
private

Definition at line 1842 of file EventProcessor.cc.

References beginLumiAsync(), CMS_SA_ALLOW, deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::WaitingTaskHolder::doneWaiting(), alignCSCRings::e, edm::WaitingTaskHolder::group(), edm::InputSource::IsLumi, lastTransitionType(), edm::make_waiting_task(), eostools::move(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, sourceResourcesAcquirer_, mps_update::status, streamEndLumiAsync(), and streamLumiStatus_.

Referenced by beginLumiAsync(), and continueLumiAsync().

1842  {
1843  sourceResourcesAcquirer_.serialQueueChain().push(*iTask.group(), [this, iTask, iStreamIndex]() mutable {
1845  //we do not want to extend the lifetime of the shared_ptr to the end of this function
1846  // as steramEndLumiAsync may clear the value from streamLumiStatus_[iStreamIndex]
1847  auto status = streamLumiStatus_[iStreamIndex].get();
1848  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1849  CMS_SA_ALLOW try {
1850  if (readNextEventForStream(iStreamIndex, *status)) {
1851  auto recursionTask = make_waiting_task([this, iTask, iStreamIndex](std::exception_ptr const* iPtr) mutable {
1852  if (iPtr) {
1853  // Try to end the stream properly even if an exception was
1854  // thrown on an event.
1855  bool expected = false;
1856  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1857  // This is the case where the exception in iPtr is the primary
1858  // exception and we want to see its message.
1859  deferredExceptionPtr_ = *iPtr;
1860  WaitingTaskHolder tempHolder(iTask);
1861  tempHolder.doneWaiting(*iPtr);
1862  }
1863  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1864  //the stream will stop now
1865  return;
1866  }
1867  handleNextEventForStreamAsync(std::move(iTask), iStreamIndex);
1868  });
1869 
1870  processEventAsync(WaitingTaskHolder(*iTask.group(), recursionTask), iStreamIndex);
1871  } else {
1872  //the stream will stop now
1873  if (status->isLumiEnding()) {
1874  if (lastTransitionType() == InputSource::IsLumi and not status->haveStartedNextLumi()) {
1875  status->startNextLumi();
1876  beginLumiAsync(status->nextSyncValue(), status->runResource(), iTask);
1877  }
1878  streamEndLumiAsync(std::move(iTask), iStreamIndex);
1879  } else {
1880  iTask.doneWaiting(std::exception_ptr{});
1881  }
1882  }
1883  } catch (...) {
1884  // It is unlikely we will ever get in here ...
1885  // But if we do try to clean up and propagate the exception
1886  if (streamLumiStatus_[iStreamIndex]) {
1887  streamEndLumiAsync(iTask, iStreamIndex);
1888  }
1889  bool expected = false;
1890  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1891  auto e = std::current_exception();
1893  iTask.doneWaiting(e);
1894  }
1895  }
1896  });
1897  }
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
#define CMS_SA_ALLOW
SharedResourcesAcquirer sourceResourcesAcquirer_
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
list status
Definition: mps_update.py:107
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastTransitionType() const
def move
Definition: eostools.py:511
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
SerialTaskQueueChain & serialQueueChain() const
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 342 of file EventProcessor.cc.

References act_table_, actReg_, cms::cuda::assert(), branchIDListHelper(), branchIDListHelper_, edm::errors::Configuration, deleteNonConsumedUnscheduledModules_, edm::dumpOptionsToLogFile(), esp_, espController_, Exception, FDEBUG, dtDQMClient_cfg::fileMode, fileModeNoMerge_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::get_underlying_safe(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::PrincipalCache::insertForInput(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, lumiQueue_, edm::makeInput(), mergeableRunProductProcesses_, eostools::move(), edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfStreams(), or, edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, preg(), preg_, principalCache_, printDependencies_, processBlockHelper_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::MergeableRunProductProcesses::setProcessesWithMergeableRunProducts(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), streamLumiStatus_, streamQueues_, AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, thinnedAssociationsHelper(), thinnedAssociationsHelper_, unpackBuffers-CaloStage2::token, and edm::validateTopLevelParameterSets().

Referenced by EventProcessor().

344  {
345  //std::cerr << processDesc->dump() << std::endl;
346 
347  // register the empty parentage vector , once and for all
349 
350  // register the empty parameter set, once and for all.
351  ParameterSet().registerIt();
352 
353  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
354 
355  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
356  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
357  bool const hasSubProcesses = !subProcessVParameterSet.empty();
358 
359  // Validates the parameters in the 'options', 'maxEvents', 'maxLuminosityBlocks',
360  // and 'maxSecondsUntilRampdown' top level parameter sets. Default values are also
361  // set in here if the parameters were not explicitly set.
362  validateTopLevelParameterSets(parameterSet.get());
363 
364  // Now set some parameters specific to the main process.
365  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options"));
366  auto const& fileMode = optionsPset.getUntrackedParameter<std::string>("fileMode");
367  if (fileMode != "NOMERGE" and fileMode != "FULLMERGE") {
368  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
369  << fileMode << ".\n"
370  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
371  } else {
372  fileModeNoMerge_ = (fileMode == "NOMERGE");
373  }
374  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
375 
376  //threading
377  unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
378 
379  // Even if numberOfThreads was set to zero in the Python configuration, the code
380  // in cmsRun.cpp should have reset it to something else.
381  assert(nThreads != 0);
382 
383  unsigned int nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
384  if (nStreams == 0) {
385  nStreams = nThreads;
386  }
387  unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
388  if (nConcurrentRuns != 1) {
389  throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
390  << "Although the plan is to change this in the future, currently nConcurrentRuns must always be 1.\n";
391  }
392  unsigned int nConcurrentLumis =
393  optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
394  if (nConcurrentLumis == 0) {
395  nConcurrentLumis = 2;
396  }
397  if (nConcurrentLumis > nStreams) {
398  nConcurrentLumis = nStreams;
399  }
400  std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
401  if (!loopers.empty()) {
402  //For now loopers make us run only 1 transition at a time
403  if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
404  edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
405  "of concurrent runs, and the number of concurrent lumis "
406  "are all being reset to 1. Loopers cannot currently support "
407  "values greater than 1.";
408  nStreams = 1;
409  nConcurrentLumis = 1;
410  nConcurrentRuns = 1;
411  }
412  }
413  bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
414  if (dumpOptions) {
415  dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
416  } else {
417  if (nThreads > 1 or nStreams > 1) {
418  edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
419  }
420  }
421  // The number of concurrent IOVs is configured individually for each record in
422  // the class NumberOfConcurrentIOVs to values less than or equal to this.
423  unsigned int maxConcurrentIOVs = nConcurrentLumis;
424 
425  //Check that relationships between threading parameters makes sense
426  /*
427  if(nThreads<nStreams) {
428  //bad
429  }
430  if(nConcurrentRuns>nStreams) {
431  //bad
432  }
433  if(nConcurrentRuns>nConcurrentLumis) {
434  //bad
435  }
436  */
437  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter"));
438 
439  printDependencies_ = optionsPset.getUntrackedParameter<bool>("printDependencies");
441  optionsPset.getUntrackedParameter<bool>("deleteNonConsumedUnscheduledModules");
442 
443  // Now do general initialization
444  ScheduleItems items;
445 
446  //initialize the services
447  auto& serviceSets = processDesc->getServicesPSets();
448  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
449  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
450 
451  //make the services available
453 
454  if (nStreams > 1) {
456  handler->willBeUsingThreads();
457  }
458 
459  // intialize miscellaneous items
460  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
461 
462  // intialize the event setup provider
463  ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
464  esp_ = espController_->makeProvider(
465  *parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);
466 
467  // initialize the looper, if any
468  if (!loopers.empty()) {
469  looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
470  looper_->setActionTable(items.act_table_.get());
471  looper_->attachTo(*items.actReg_);
472 
473  // in presence of looper do not delete modules
474  deleteNonConsumedUnscheduledModules_ = false;
475  }
476 
477  preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
478 
479  lumiQueue_ = std::make_unique<LimitedTaskQueue>(nConcurrentLumis);
480  streamQueues_.resize(nStreams);
481  streamLumiStatus_.resize(nStreams);
482 
483  processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
484 
485  // initialize the input source
486  input_ = makeInput(*parameterSet,
487  *common,
488  items.preg(),
489  items.branchIDListHelper(),
491  items.thinnedAssociationsHelper(),
492  items.actReg_,
493  items.processConfiguration(),
495 
496  // initialize the Schedule
497  schedule_ =
498  items.initSchedule(*parameterSet, hasSubProcesses, preallocations_, &processContext_, *processBlockHelper_);
499 
500  // set the data members
501  act_table_ = std::move(items.act_table_);
502  actReg_ = items.actReg_;
503  preg_ = items.preg();
505  branchIDListHelper_ = items.branchIDListHelper();
506  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
507  processConfiguration_ = items.processConfiguration();
509  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
510 
511  FDEBUG(2) << parameterSet << std::endl;
512 
514  for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
515  // Reusable event principal
516  auto ep = std::make_shared<EventPrincipal>(preg(),
520  historyAppender_.get(),
521  index,
522  true /*primary process*/,
525  }
526 
527  for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
528  auto lp =
529  std::make_unique<LuminosityBlockPrincipal>(preg(), *processConfiguration_, historyAppender_.get(), index);
531  }
532 
533  {
534  auto pb = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
536 
537  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg(), *processConfiguration_);
539  }
540 
541  // fill the subprocesses, if there are any
542  subProcesses_.reserve(subProcessVParameterSet.size());
543  for (auto& subProcessPSet : subProcessVParameterSet) {
544  subProcesses_.emplace_back(subProcessPSet,
545  *parameterSet,
546  preg(),
550  SubProcessParentageHelper(),
552  *actReg_,
553  token,
556  &processContext_);
557  }
558  }
ProcessContext processContext_
void setProcessesWithMergeableRunProducts(ProductRegistry const &productRegistry)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
MergeableRunProductProcesses mergeableRunProductProcesses_
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params, std::vector< std::string > const &loopers)
std::vector< edm::SerialTaskQueue > streamQueues_
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:511
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ProcessBlockHelper > const &processBlockHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Log< level::Info, false > LogInfo
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
std::shared_ptr< ActivityRegistry > actReg_
Log< level::Warning, false > LogWarning
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
bool insertMapped(value_type const &v)
PrincipalCache principalCache_
void dumpOptionsToLogFile(unsigned int nThreads, unsigned int nStreams, unsigned int nConcurrentLumis, unsigned int nConcurrentRuns)
void edm::EventProcessor::inputProcessBlocks ( )

Definition at line 1029 of file EventProcessor.cc.

References edm::Principal::clearPrincipal(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), edm::PrincipalCache::Input, input_, edm::PrincipalCache::inputProcessBlockPrincipal(), principalCache_, readProcessBlock(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, taskGroup_, and writeProcessBlockAsync().

1029  {
1030  input_->fillProcessBlockHelper();
1031  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
1032  while (input_->nextProcessBlock(processBlockPrincipal)) {
1033  readProcessBlock(processBlockPrincipal);
1034 
1035  using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1036  FinalWaitingTask globalWaitTask;
1037 
1038  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
1039  beginGlobalTransitionAsync<Traits>(
1040  WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_);
1041 
1042  do {
1043  taskGroup_.wait();
1044  } while (not globalWaitTask.done());
1045  if (globalWaitTask.exceptionPtr() != nullptr) {
1046  std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
1047  }
1048 
1049  FinalWaitingTask writeWaitTask;
1051  do {
1052  taskGroup_.wait();
1053  } while (not writeWaitTask.done());
1054  if (writeWaitTask.exceptionPtr()) {
1055  std::rethrow_exception(*writeWaitTask.exceptionPtr());
1056  }
1057 
1058  processBlockPrincipal.clearPrincipal();
1059  for (auto& s : subProcesses_) {
1060  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
1061  }
1062  }
1063  }
tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void readProcessBlock(ProcessBlockPrincipal &)
PrincipalCache principalCache_
InputSource::ItemType edm::EventProcessor::lastTransitionType ( ) const
inline

Definition at line 186 of file EventProcessor.h.

References deferredExceptionPtrIsSet_, edm::InputSource::IsStop, and lastSourceTransition_.

Referenced by handleNextEventForStreamAsync(), and processLumis().

186  {
188  return InputSource::IsStop;
189  }
190  return lastSourceTransition_;
191  }
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 297 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

297 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 298 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

298 { return get_underlying_safe(looper_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::LuminosityBlockNumber_t edm::EventProcessor::nextLuminosityBlockID ( )

Definition at line 819 of file EventProcessor.cc.

References input_.

Referenced by readNextEventForStream().

819 { return input_->luminosityBlock(); }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > edm::EventProcessor::nextRunID ( )

Definition at line 815 of file EventProcessor.cc.

References input_.

815  {
816  return std::make_pair(input_->reducedProcessHistoryID(), input_->run());
817  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
InputSource::ItemType edm::EventProcessor::nextTransitionType ( )

Definition at line 789 of file EventProcessor.cc.

References actReg_, checkForAsyncStopRequest(), deferredExceptionPtrIsSet_, epSuccess, edm::ExternalSignal, input_, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, lastSourceTransition_, and runEdmFileComparison::returnCode.

Referenced by readNextEventForStream().

789  {
790  if (deferredExceptionPtrIsSet_.load()) {
792  return InputSource::IsStop;
793  }
794 
795  SendSourceTerminationSignalIfException sentry(actReg_.get());
796  InputSource::ItemType itemType;
797  //For now, do nothing with InputSource::IsSynchronize
798  do {
799  itemType = input_->nextItemType();
800  } while (itemType == InputSource::IsSynchronize);
801 
802  lastSourceTransition_ = itemType;
803  sentry.completedSuccessfully();
804 
806 
807  if (checkForAsyncStopRequest(returnCode)) {
808  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
810  }
811 
812  return lastSourceTransition_;
813  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType lastSourceTransition_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::openOutputFiles ( )

Definition at line 917 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

917  {
918  if (fileBlockValid()) {
919  schedule_->openOutputFiles(*fb_);
920  for_all(subProcesses_, [this](auto& subProcess) { subProcess.openOutputFiles(*fb_); });
921  }
922  FDEBUG(1) << "\topenOutputFiles\n";
923  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 285 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), and readRun().

285 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 286 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

286 { return get_underlying_safe(preg_); }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )

Definition at line 981 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

Referenced by runToCompletion().

981  {
982  looper_->prepareForNextLoop(esp_.get());
983  FDEBUG(1) << "\tprepareForNextLoop\n";
984  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
constexpr element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 140 of file EventProcessor.h.

References processConfiguration_.

140 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1913 of file EventProcessor.cc.

References edm::WaitingTaskHolder::group(), and processEventAsyncImpl().

Referenced by handleNextEventForStreamAsync().

1913  {
1914  iHolder.group()->run([=]() { processEventAsyncImpl(iHolder, iStreamIndex); });
1915  }
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void edm::EventProcessor::processEventAsyncImpl ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 1917 of file EventProcessor.cc.

References esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, first, edm::waiting_task::chain::ifThen(), info(), edm::Service< T >::isAvailable(), looper_, eostools::move(), principalCache_, processEventWithLooper(), edm::waiting_task::chain::runLast(), schedule_, serviceToken_, streamLumiStatus_, subProcesses_, and edm::waiting_task::chain::then().

Referenced by processEventAsync().

1917  {
1918  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1919 
1921  Service<RandomNumberGenerator> rng;
1922  if (rng.isAvailable()) {
1923  Event ev(*pep, ModuleDescription(), nullptr);
1924  rng->postEventRead(ev);
1925  }
1926 
1927  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1928  using namespace edm::waiting_task::chain;
1929  chain::first([this, &es, pep, iStreamIndex](auto nextTask) {
1930  EventTransitionInfo info(*pep, es);
1931  schedule_->processOneEventAsync(std::move(nextTask), iStreamIndex, info, serviceToken_);
1932  }) | ifThen(not subProcesses_.empty(), [this, pep, iStreamIndex](auto nextTask) {
1933  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
1934  subProcess.doEventAsync(nextTask, *pep, &streamLumiStatus_[iStreamIndex]->eventSetupImpls());
1935  }
1936  }) | ifThen(looper_, [this, iStreamIndex, pep](auto nextTask) {
1937  //NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
1938  ServiceRegistry::Operate operateLooper(serviceToken_);
1939  processEventWithLooper(*pep, iStreamIndex);
1940  }) | then([pep](auto nextTask) {
1941  FDEBUG(1) << "\tprocessEvent\n";
1942  pep->clearEventPrincipal();
1943  }) | runLast(iHolder);
1944  }
static const TGPicture * info(bool iBackgroundIsBlack)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:19
constexpr auto then(O &&iO)
Definition: chain_first.h:277
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
PrincipalCache principalCache_
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal,
unsigned int  iStreamIndex 
)
private

Definition at line 1946 of file EventProcessor.cc.

References esp_, input_, edm::InputSource::IsStop, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), lastSourceTransition_, looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), streamLumiStatus_, and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsyncImpl().

1946  {
1947  bool randomAccess = input_->randomAccess();
1948  ProcessingController::ForwardState forwardState = input_->forwardState();
1949  ProcessingController::ReverseState reverseState = input_->reverseState();
1950  ProcessingController pc(forwardState, reverseState, randomAccess);
1951 
1953  do {
1954  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
1955  EventSetupImpl const& es = streamLumiStatus_[iStreamIndex]->eventSetupImpl(esp_->subProcessIndex());
1956  status = looper_->doDuringLoop(iPrincipal, es, pc, &streamContext);
1957 
1958  bool succeeded = true;
1959  if (randomAccess) {
1960  if (pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1961  input_->skipEvents(-2);
1962  } else if (pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1963  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1964  }
1965  }
1966  pc.setLastOperationSucceeded(succeeded);
1967  } while (!pc.lastOperationSucceeded());
1968  if (status != EDLooperBase::kContinue) {
1969  shouldWeStop_ = true;
1971  }
1972  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
list status
Definition: mps_update.py:107
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
InputSource::ItemType lastSourceTransition_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
InputSource::ItemType edm::EventProcessor::processLumis ( std::shared_ptr< void > const &  iRunResource)

Definition at line 1290 of file EventProcessor.cc.

References cms::cuda::assert(), beginLumiAsync(), continueLumiAsync(), edm::FinalWaitingTask::done(), edm::WaitingTask::exceptionPtr(), input_, lastTransitionType(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, streamLumiActive_, and taskGroup_.

1290  {
1291  FinalWaitingTask waitTask;
1292  if (streamLumiActive_ > 0) {
1294  // Continue after opening a new input file
1296  } else {
1297  beginLumiAsync(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1298  input_->luminosityBlockAuxiliary()->beginTime()),
1299  iRunResource,
1300  WaitingTaskHolder{taskGroup_, &waitTask});
1301  }
1302  do {
1303  taskGroup_.wait();
1304  } while (not waitTask.done());
1305 
1306  if (waitTask.exceptionPtr() != nullptr) {
1307  std::rethrow_exception(*(waitTask.exceptionPtr()));
1308  }
1309  return lastTransitionType();
1310  }
tbb::task_group taskGroup_
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
PreallocationConfiguration preallocations_
InputSource::ItemType lastTransitionType() const
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::atomic< unsigned int > streamLumiActive_
int edm::EventProcessor::readAndMergeLumi ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1698 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), or, and preg().

Referenced by readNextEventForStream().

1698  {
1699  auto& lumiPrincipal = *iStatus.lumiPrincipal();
1700  assert(lumiPrincipal.aux().sameIdentity(*input_->luminosityBlockAuxiliary()) or
1701  input_->processHistoryRegistry().reducedProcessHistoryID(lumiPrincipal.aux().processHistoryID()) ==
1702  input_->processHistoryRegistry().reducedProcessHistoryID(
1703  input_->luminosityBlockAuxiliary()->processHistoryID()));
1704  bool lumiOK = lumiPrincipal.adjustToNewProductRegistry(*preg());
1705  assert(lumiOK);
1706  lumiPrincipal.mergeAuxiliary(*input_->luminosityBlockAuxiliary());
1707  {
1708  SendSourceTerminationSignalIfException sentry(actReg_.get());
1709  input_->readAndMergeLumi(*iStatus.lumiPrincipal());
1710  sentry.completedSuccessfully();
1711  }
1712  return input_->luminosityBlock();
1713  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readAndMergeRun ( )

Definition at line 1667 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1667  {
1668  principalCache_.merge(input_->runAuxiliary(), preg());
1669  auto runPrincipal = principalCache_.runPrincipalPtr();
1670  {
1671  SendSourceTerminationSignalIfException sentry(actReg_.get());
1672  input_->readAndMergeRun(*runPrincipal);
1673  sentry.completedSuccessfully();
1674  }
1675  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1676  return std::make_pair(runPrincipal->reducedProcessHistoryID(), input_->run());
1677  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(be >=bs)
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1899 of file EventProcessor.cc.

References actReg_, edmPickEvents::event, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, processContext_, and streamLumiStatus_.

Referenced by readNextEventForStream().

1899  {
1900  //TODO this will have to become per stream
1901  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1902  StreamContext streamContext(event.streamID(), &processContext_);
1903 
1904  SendSourceTerminationSignalIfException sentry(actReg_.get());
1905  input_->readEvent(event, streamContext);
1906 
1907  streamLumiStatus_[iStreamIndex]->updateLastTimestamp(input_->timestamp());
1908  sentry.completedSuccessfully();
1909 
1910  FDEBUG(1) << "\treadEvent\n";
1911  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )

Definition at line 890 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, edm::PrincipalCache::preReadFile(), principalCache_, and findQualityFiles::size.

890  {
891  FDEBUG(1) << " \treadFile\n";
892  size_t size = preg_->size();
893  SendSourceTerminationSignalIfException sentry(actReg_.get());
894 
896 
897  fb_ = input_->readFile();
898  if (size < preg_->size()) {
900  }
903  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
904  }
905  sentry.completedSuccessfully();
906  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::readLuminosityBlock ( LuminosityBlockProcessingStatus iStatus)

Definition at line 1679 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::getAvailableLumiPrincipalPtr(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::errors::LogicError, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), eostools::move(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

Referenced by beginLumiAsync().

1679  {
1681  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readLuminosityBlock\n"
1682  << "Illegal attempt to insert lumi into cache\n"
1683  << "Run is invalid\n"
1684  << "Contact a Framework Developer\n";
1685  }
1687  assert(lbp);
1688  lbp->setAux(*input_->luminosityBlockAuxiliary());
1689  {
1690  SendSourceTerminationSignalIfException sentry(actReg_.get());
1691  input_->readLuminosityBlock(*lbp, *historyAppender_);
1692  sentry.completedSuccessfully();
1693  }
1694  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1695  iStatus.lumiPrincipal() = std::move(lbp);
1696  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
def move
Definition: eostools.py:511
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
LuminosityBlockProcessingStatus iLumiStatus 
)
private

Definition at line 1780 of file EventProcessor.cc.

References CMS_SA_ALLOW, edm::LuminosityBlockProcessingStatus::continuingLumi(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::LuminosityBlockProcessingStatus::endLumi(), edm::LuminosityBlockProcessingStatus::haveContinuedLumi(), input_, edm::InputSource::IsEvent, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, lastSourceTransition_, edm::LuminosityBlockProcessingStatus::lumiPrincipal(), nextLuminosityBlockID(), nextTransitionType(), or, readAndMergeLumi(), readEvent(), serviceToken_, edm::LuminosityBlockProcessingStatus::setNextSyncValue(), shouldWeStop(), sourceMutex_, edm::LuminosityBlockProcessingStatus::stopProcessingEvents(), and edm::LuminosityBlockProcessingStatus::wasEventProcessingStopped().

Referenced by handleNextEventForStreamAsync().

1780  {
1781  if (deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1782  iStatus.endLumi();
1783  return false;
1784  }
1785 
1786  if (iStatus.wasEventProcessingStopped()) {
1787  return false;
1788  }
1789 
1790  if (shouldWeStop()) {
1792  iStatus.stopProcessingEvents();
1793  iStatus.endLumi();
1794  return false;
1795  }
1796 
1798  // Caught exception is propagated to EventProcessor::runToCompletion() via deferredExceptionPtr_
1799  CMS_SA_ALLOW try {
1800  //need to use lock in addition to the serial task queue because
1801  // of delayed provenance reading and reading data in response to
1802  // edm::Refs etc
1803  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1804 
1805  auto itemType = iStatus.continuingLumi() ? InputSource::IsLumi : nextTransitionType();
1806  if (InputSource::IsLumi == itemType) {
1807  iStatus.haveContinuedLumi();
1808  while (itemType == InputSource::IsLumi and iStatus.lumiPrincipal()->run() == input_->run() and
1809  iStatus.lumiPrincipal()->luminosityBlock() == nextLuminosityBlockID()) {
1810  readAndMergeLumi(iStatus);
1811  itemType = nextTransitionType();
1812  }
1813  if (InputSource::IsLumi == itemType) {
1814  iStatus.setNextSyncValue(IOVSyncValue(EventID(input_->run(), input_->luminosityBlock(), 0),
1815  input_->luminosityBlockAuxiliary()->beginTime()));
1816  }
1817  }
1818  if (InputSource::IsEvent != itemType) {
1819  iStatus.stopProcessingEvents();
1820 
1821  //IsFile may continue processing the lumi and
1822  // looper_ can cause the input source to declare a new IsRun which is actually
1823  // just a continuation of the previous run
1824  if (InputSource::IsStop == itemType or InputSource::IsLumi == itemType or
1825  (InputSource::IsRun == itemType and iStatus.lumiPrincipal()->run() != input_->run())) {
1826  iStatus.endLumi();
1827  }
1828  return false;
1829  }
1830  readEvent(iStreamIndex);
1831  } catch (...) {
1832  bool expected = false;
1833  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1834  deferredExceptionPtr_ = std::current_exception();
1835  iStatus.endLumi();
1836  }
1837  return false;
1838  }
1839  return true;
1840  }
void readEvent(unsigned int iStreamIndex)
#define CMS_SA_ALLOW
InputSource::ItemType nextTransitionType()
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType lastSourceTransition_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
std::exception_ptr deferredExceptionPtr_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
bool shouldWeStop() const
void edm::EventProcessor::readProcessBlock ( ProcessBlockPrincipal processBlockPrincipal)

Definition at line 1638 of file EventProcessor.cc.

References actReg_, and input_.

Referenced by inputProcessBlocks().

1638  {
1639  SendSourceTerminationSignalIfException sentry(actReg_.get());
1640  input_->readProcessBlock(processBlockPrincipal);
1641  sentry.completedSuccessfully();
1642  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ActivityRegistry > actReg_
std::pair< ProcessHistoryID, RunNumber_t > edm::EventProcessor::readRun ( )

Definition at line 1644 of file EventProcessor.cc.

References actReg_, cms::cuda::assert(), Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, mergeableRunProductProcesses_, preg(), principalCache_, and processConfiguration_.

1644  {
1646  throw edm::Exception(edm::errors::LogicError) << "EventProcessor::readRun\n"
1647  << "Illegal attempt to insert run into cache\n"
1648  << "Contact a Framework Developer\n";
1649  }
1650  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(),
1651  preg(),
1653  historyAppender_.get(),
1654  0,
1655  true,
1657  {
1658  SendSourceTerminationSignalIfException sentry(actReg_.get());
1659  input_->readRun(*rp, *historyAppender_);
1660  sentry.completedSuccessfully();
1661  }
1662  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1663  principalCache_.insert(rp);
1664  return std::make_pair(rp->reducedProcessHistoryID(), input_->run());
1665  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
assert(be >=bs)
MergeableRunProductProcesses mergeableRunProductProcesses_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void insert(std::unique_ptr< ProcessBlockPrincipal >)
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )

Definition at line 942 of file EventProcessor.cc.

References fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

942  {
943  if (fileBlockValid()) {
944  schedule_->respondToCloseInputFile(*fb_);
945  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToCloseInputFile(*fb_); });
946  }
947  FDEBUG(1) << "\trespondToCloseInputFile\n";
948  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::respondToOpenInputFile ( )

Definition at line 932 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, fileBlockValid(), edm::for_all(), schedule_, and subProcesses_.

932  {
933  if (fileBlockValid()) {
935  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
936  schedule_->respondToOpenInputFile(*fb_);
937  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
938  }
939  FDEBUG(1) << "\trespondToOpenInputFile\n";
940  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )

Definition at line 975 of file EventProcessor.cc.

References FDEBUG, and input_.

Referenced by runToCompletion().

975  {
976  input_->repeat();
977  input_->rewind();
978  FDEBUG(1) << "\trewind\n";
979  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventProcessor::StatusCode edm::EventProcessor::run ( )
inline

Definition at line 373 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), Types.LuminosityBlockID::cppID(), endUnfinishedRun(), and writeRunAsync().

373 { return runToCompletion(); }
StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )

Definition at line 821 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, doErrorStuff(), alignCSCRings::e, endOfLoop(), epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, fileModeNoMerge_, personalPlayback::fp, edm::InputSource::IsStop, prepareForNextLoop(), runEdmFileComparison::returnCode, rewindInput(), serviceToken_, startingNewLoop(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

821  {
824  {
825  beginJob(); //make sure this was called
826 
827  // make the services available
829 
831  try {
832  FilesProcessor fp(fileModeNoMerge_);
833 
834  convertException::wrap([&]() {
835  bool firstTime = true;
836  do {
837  if (not firstTime) {
839  rewindInput();
840  } else {
841  firstTime = false;
842  }
843  startingNewLoop();
844 
845  auto trans = fp.processFiles(*this);
846 
847  fp.normalEnd();
848 
849  if (deferredExceptionPtrIsSet_.load()) {
850  std::rethrow_exception(deferredExceptionPtr_);
851  }
852  if (trans != InputSource::IsStop) {
853  //problem with the source
854  doErrorStuff();
855 
856  throw cms::Exception("BadTransition") << "Unexpected transition change " << trans;
857  }
858  } while (not endOfLoop());
859  }); // convertException::wrap
860 
861  } // Try block
862  catch (cms::Exception& e) {
864  std::string message(
865  "Another exception was caught while trying to clean up lumis after the primary fatal exception.");
866  e.addAdditionalInfo(message);
867  if (e.alreadyPrinted()) {
868  LogAbsolute("Additional Exceptions") << message;
869  }
870  }
871  if (!exceptionMessageRuns_.empty()) {
873  if (e.alreadyPrinted()) {
874  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
875  }
876  }
877  if (!exceptionMessageFiles_.empty()) {
879  if (e.alreadyPrinted()) {
880  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
881  }
882  }
883  throw;
884  }
885  }
886 
887  return returnCode;
888  }
std::atomic< bool > exceptionMessageLumis_
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:177
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
Log< level::System, true > LogAbsolute
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
bool edm::EventProcessor::setDeferredException ( std::exception_ptr  iException)

Definition at line 1995 of file EventProcessor.cc.

References deferredExceptionPtr_, and deferredExceptionPtrIsSet_.

Referenced by handleEndLumiExceptions().

1995  {
1996  bool expected = false;
1997  if (deferredExceptionPtrIsSet_.compare_exchange_strong(expected, true)) {
1998  deferredExceptionPtr_ = iException;
1999  return true;
2000  }
2001  return false;
2002  }
std::atomic< bool > deferredExceptionPtrIsSet_
std::exception_ptr deferredExceptionPtr_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)

Definition at line 1989 of file EventProcessor.cc.

References exceptionMessageFiles_.

1989 { exceptionMessageFiles_ = message; }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( )

Definition at line 1993 of file EventProcessor.cc.

References exceptionMessageLumis_.

Referenced by handleEndLumiExceptions().

1993 { exceptionMessageLumis_ = true; }
std::atomic< bool > exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)

Definition at line 1991 of file EventProcessor.cc.

References exceptionMessageRuns_.

1991 { exceptionMessageRuns_ = message; }
std::string exceptionMessageRuns_
bool edm::EventProcessor::shouldWeCloseOutput ( ) const

Definition at line 986 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

986  {
987  FDEBUG(1) << "\tshouldWeCloseOutput\n";
988  if (!subProcesses_.empty()) {
989  for (auto const& subProcess : subProcesses_) {
990  if (subProcess.shouldWeCloseOutput()) {
991  return true;
992  }
993  }
994  return false;
995  }
996  return schedule_->shouldWeCloseOutput();
997  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const

Definition at line 1974 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

1974  {
1975  FDEBUG(1) << "\tshouldWeStop\n";
1976  if (shouldWeStop_)
1977  return true;
1978  if (!subProcesses_.empty()) {
1979  for (auto const& subProcess : subProcesses_) {
1980  if (subProcess.terminate()) {
1981  return true;
1982  }
1983  }
1984  return false;
1985  }
1986  return schedule_->terminate();
1987  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )

Definition at line 950 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

Referenced by runToCompletion().

950  {
951  shouldWeStop_ = false;
952  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
953  // until after we've called beginOfJob
954  if (looper_ && looperBeginJobRun_) {
955  looper_->doStartingNewLoop();
956  }
957  FDEBUG(1) << "\tstartingNewLoop\n";
958  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::streamEndLumiAsync ( edm::WaitingTaskHolder  iTask,
unsigned int  iStreamIndex 
)

Definition at line 1573 of file EventProcessor.cc.

References esp_, globalEndLumiAsync(), edm::WaitingTaskHolder::group(), handleEndLumiExceptions(), edm::make_waiting_task(), edm::EventID::maxEventNumber(), eostools::move(), schedule_, serviceToken_, mps_update::status, streamLumiActive_, streamLumiStatus_, streamQueues_, subProcesses_, and submitPVValidationJobs::t.

Referenced by beginLumiAsync(), endUnfinishedLumi(), and handleNextEventForStreamAsync().

1573  {
1574  auto t = edm::make_waiting_task([this, iStreamIndex, iTask](std::exception_ptr const* iPtr) mutable {
1575  if (iPtr) {
1576  handleEndLumiExceptions(iPtr, iTask);
1577  }
1578  auto status = streamLumiStatus_[iStreamIndex];
1579  //reset status before releasing queue else get race condtion
1580  streamLumiStatus_[iStreamIndex].reset();
1582  streamQueues_[iStreamIndex].resume();
1583 
1584  //are we the last one?
1585  if (status->streamFinishedLumi()) {
1587  }
1588  });
1589 
1590  edm::WaitingTaskHolder lumiDoneTask{*iTask.group(), t};
1591 
1592  //Need to be sure the lumi status is released before lumiDoneTask can every be called.
1593  // therefore we do not want to hold the shared_ptr
1594  auto lumiStatus = streamLumiStatus_[iStreamIndex].get();
1595  lumiStatus->setEndTime();
1596 
1597  EventSetupImpl const& es = lumiStatus->eventSetupImpl(esp_->subProcessIndex());
1598 
1599  bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException();
1600  auto eventSetupImpls = &lumiStatus->eventSetupImpls();
1601 
1602  if (lumiStatus->didGlobalBeginSucceed()) {
1603  auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
1604  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1605  lumiPrincipal.endTime());
1606  using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
1607  LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
1608  endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
1609  *schedule_,
1610  iStreamIndex,
1611  transitionInfo,
1612  serviceToken_,
1613  subProcesses_,
1614  cleaningUpAfterException);
1615  }
1616  }
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
list status
Definition: mps_update.py:107
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::vector< edm::SerialTaskQueue > streamQueues_
def move
Definition: eostools.py:511
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:96
tbb::task_group * group() const noexcept
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::atomic< unsigned int > streamLumiActive_
void edm::EventProcessor::taskCleanup ( )

Definition at line 578 of file EventProcessor.cc.

References cms::cuda::assert(), edm::FinalWaitingTask::done(), espController_, and taskGroup_.

578  {
580  espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task});
581  taskGroup_.wait();
582  assert(task.done());
583  }
tbb::task_group taskGroup_
assert(be >=bs)
bool done() const
Definition: WaitingTask.h:82
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 291 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

291  {
293  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 294 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

294  {
296  }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
void edm::EventProcessor::throwAboutModulesRequiringLuminosityBlockSynchronization ( ) const
private

Definition at line 2004 of file EventProcessor.cc.

References newFWLiteAna::found, and schedule_.

Referenced by beginJob().

2004  {
2005  cms::Exception ex("ModulesSynchingOnLumis");
2006  ex << "The framework is configured to use at least two streams, but the following modules\n"
2007  << "require synchronizing on LuminosityBlock boundaries:";
2008  bool found = false;
2009  for (auto worker : schedule_->allWorkers()) {
2010  if (worker->wantsGlobalLuminosityBlocks() and worker->globalLuminosityBlocksQueue()) {
2011  found = true;
2012  ex << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2013  }
2014  }
2015  if (found) {
2016  ex << "\n\nThe situation can be fixed by either\n"
2017  << " * modifying the modules to support concurrent LuminosityBlocks (preferred), or\n"
2018  << " * setting 'process.options.numberOfConcurrentLuminosityBlocks = 1' in the configuration file";
2019  throw ex;
2020  }
2021  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 766 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

766 { return schedule_->totalEvents(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 770 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

770 { return schedule_->totalEventsFailed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 768 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

768 { return schedule_->totalEventsPassed(); }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::warnAboutLegacyModules ( ) const
private

Definition at line 2023 of file EventProcessor.cc.

References edm::Worker::kLegacy, alignCSCRings::s, and schedule_.

Referenced by beginJob().

2023  {
2024  std::unique_ptr<LogSystem> s;
2025  for (auto worker : schedule_->allWorkers()) {
2026  if (worker->moduleConcurrencyType() == Worker::kLegacy) {
2027  if (not s) {
2028  s = std::make_unique<LogSystem>("LegacyModules");
2029  (*s) << "The following legacy modules are configured. Support for legacy modules\n"
2030  "is going to end soon. These modules need to be converted to have type\n"
2031  "edm::global, edm::stream, edm::one, or in rare cases edm::limited.";
2032  }
2033  (*s) << "\n " << worker->description()->moduleName() << " " << worker->description()->moduleLabel();
2034  }
2035  }
2036  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumiAsync ( WaitingTaskHolder  task,
LuminosityBlockPrincipal lumiPrincipal 
)

Definition at line 1755 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), edm::waiting_task::chain::lastTask(), edm::LuminosityBlockPrincipal::luminosityBlock(), edm::RunPrincipal::mergeableRunProductMetadata(), eostools::move(), processContext_, edm::LuminosityBlockPrincipal::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, subProcesses_, edm::LuminosityBlockPrincipal::willBeContinued(), and edm::MergeableRunProductMetadata::writeLumi().

Referenced by globalEndLumiAsync().

1755  {
1756  using namespace edm::waiting_task;
1757  if (not lumiPrincipal.willBeContinued()) {
1758  chain::first([&](auto nextTask) {
1760 
1761  lumiPrincipal.runPrincipal().mergeableRunProductMetadata()->writeLumi(lumiPrincipal.luminosityBlock());
1762  schedule_->writeLumiAsync(nextTask, lumiPrincipal, &processContext_, actReg_.get());
1763  }) | chain::ifThen(not subProcesses_.empty(), [this, &lumiPrincipal](auto nextTask) {
1765  for (auto& s : subProcesses_) {
1766  s.writeLumiAsync(nextTask, lumiPrincipal);
1767  }
1768  }) | chain::lastTask(std::move(task));
1769  }
1770  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
auto lastTask(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:299
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::writeProcessBlockAsync ( WaitingTaskHolder  task,
ProcessBlockType  processBlockType 
)

Definition at line 1715 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, edm::PrincipalCache::processBlockPrincipal(), processContext_, edm::waiting_task::chain::runLast(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endProcessBlock(), and inputProcessBlocks().

1715  {
1716  using namespace edm::waiting_task;
1717  chain::first([&](auto nextTask) {
1719  schedule_->writeProcessBlockAsync(
1720  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
1721  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
1723  for (auto& s : subProcesses_) {
1724  s.writeProcessBlockAsync(nextTask, processBlockType);
1725  }
1726  }) | chain::runLast(std::move(task));
1727  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ProcessBlockPrincipal & processBlockPrincipal() const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::writeRunAsync ( WaitingTaskHolder  task,
ProcessHistoryID const &  phid,
RunNumber_t  run,
MergeableRunProductMetadata const *  mergeableRunProductMetadata 
)

Definition at line 1729 of file EventProcessor.cc.

References actReg_, first, edm::waiting_task::chain::ifThen(), eostools::move(), principalCache_, processContext_, run(), edm::waiting_task::chain::runLast(), edm::PrincipalCache::runPrincipal(), alignCSCRings::s, schedule_, serviceToken_, and subProcesses_.

Referenced by endUnfinishedRun().

1732  {
1733  using namespace edm::waiting_task;
1734  chain::first([&](auto nextTask) {
1736  schedule_->writeRunAsync(nextTask,
1738  &processContext_,
1739  actReg_.get(),
1740  mergeableRunProductMetadata);
1741  }) | chain::ifThen(not subProcesses_.empty(), [this, phid, run, mergeableRunProductMetadata](auto nextTask) {
1743  for (auto& s : subProcesses_) {
1744  s.writeRunAsync(nextTask, phid, run, mergeableRunProductMetadata);
1745  }
1746  }) | chain::runLast(std::move(task));
1747  }
ProcessContext processContext_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
def move
Definition: eostools.py:511
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 323 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 359 of file EventProcessor.h.

Referenced by runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 360 of file EventProcessor.h.

Referenced by runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 347 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 314 of file EventProcessor.h.

Referenced by branchIDListHelper(), init(), and respondToOpenInputFile().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
bool edm::EventProcessor::deleteNonConsumedUnscheduledModules_ = true
private

Definition at line 368 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private
ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 365 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 350 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::atomic<bool> edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 352 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 351 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::shared_ptr<FileBlock> > edm::EventProcessor::fb_
private
bool edm::EventProcessor::fileModeNoMerge_
private

Definition at line 349 of file EventProcessor.h.

Referenced by init(), and runToCompletion().

bool edm::EventProcessor::firstEventInBlock_ = true
private

Definition at line 361 of file EventProcessor.h.

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 355 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 353 of file EventProcessor.h.

Referenced by endOfLoop().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 335 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
InputSource::ItemType edm::EventProcessor::lastSourceTransition_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 354 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::unique_ptr<edm::LimitedTaskQueue> edm::EventProcessor::lumiQueue_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginLumiAsync(), and init().

MergeableRunProductProcesses edm::EventProcessor::mergeableRunProductProcesses_
private

Definition at line 327 of file EventProcessor.h.

Referenced by init(), and readRun().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 326 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 313 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), preg(), and readFile().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 367 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::shared_ptr<ProcessBlockHelper> > edm::EventProcessor::processBlockHelper_
private

Definition at line 315 of file EventProcessor.h.

Referenced by beginJob(), closeOutputFiles(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 324 of file EventProcessor.h.

Referenced by beginJob(), beginProcessBlock(), init(), processConfiguration(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::SerialTaskQueue edm::EventProcessor::queueWhichWaitsForIOVsToFinish_
private

Definition at line 322 of file EventProcessor.h.

Referenced by beginLumiAsync(), and globalEndLumiAsync().

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::shouldWeStop_
private

Definition at line 348 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 345 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 344 of file EventProcessor.h.

Referenced by beginLumiAsync(), and handleNextEventForStreamAsync().

std::atomic<unsigned int> edm::EventProcessor::streamLumiActive_ {0}
private
std::vector<std::shared_ptr<LuminosityBlockProcessingStatus> > edm::EventProcessor::streamLumiStatus_
private
std::vector<edm::SerialTaskQueue> edm::EventProcessor::streamQueues_
private

Definition at line 329 of file EventProcessor.h.

Referenced by beginLumiAsync(), init(), and streamEndLumiAsync().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
tbb::task_group edm::EventProcessor::taskGroup_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().