CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const override
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void beginRun (statemachine::Run const &run) override
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException) override
 
virtual void closeOutputFiles () override
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void deleteRunFromCache (statemachine::Run const &run) override
 
virtual void doErrorStuff () override
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
 
virtual bool endOfLoop () override
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException) override
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles () override
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop () override
 
ProcessConfiguration const & processConfiguration () const
 
virtual int readAndMergeLumi () override
 
virtual statemachine::Run readAndMergeRun () override
 
virtual void readAndProcessEvent () override
 
virtual void readFile () override
 
virtual int readLuminosityBlock () override
 
virtual statemachine::Run readRun () override
 
virtual void respondToCloseInputFile () override
 
virtual void respondToOpenInputFile () override
 
virtual void rewindInput () override
 
StatusCode run ()
 
virtual StatusCode runToCompletion () override
 
virtual void setExceptionMessageFiles (std::string &message) override
 
virtual void setExceptionMessageLumis (std::string &message) override
 
virtual void setExceptionMessageRuns (std::string &message) override
 
virtual bool shouldWeCloseOutput () const override
 
virtual bool shouldWeStop () const override
 
virtual void startingNewLoop () override
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void writeRun (statemachine::Run const &run) override
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
std::unique_ptr
< statemachine::Machine
createStateMachine ()
 
void handleNextEventForStreamAsync (WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase
const > 
looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
void possiblyContinueAfterForkChildFailure ()
 
std::shared_ptr
< ProductRegistry const > 
preg () const
 
std::shared_ptr
< ProductRegistry > & 
preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void setupSignal ()
 
void terminateMachine (std::unique_ptr< statemachine::Machine >)
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const
< std::shared_ptr
< BranchIDListHelper > > 
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
edm::propagate_const
< std::shared_ptr
< eventsetup::EventSetupProvider > > 
esp_
 
edm::propagate_const
< std::unique_ptr
< eventsetup::EventSetupsController > > 
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const
< std::unique_ptr< FileBlock > > 
fb_
 
std::string fileMode_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const
< std::unique_ptr
< HistoryAppender > > 
historyAppender_
 
edm::propagate_const
< std::unique_ptr< InputSource > > 
input_
 
edm::propagate_const
< std::shared_ptr
< EDLooperBase > > 
looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const
< std::shared_ptr
< ProductRegistry > > 
preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const
< std::unique_ptr< Schedule > > 
schedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
std::shared_ptr
< std::recursive_mutex > 
sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
bool stateMachineWasInErrorState_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const
< std::shared_ptr
< ThinnedAssociationsHelper > > 
thinnedAssociationsHelper_
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 65 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 326 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 327 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 223 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

227  :
228  actReg_(),
229  preg_(),
231  serviceToken_(),
232  input_(),
233  espController_(new eventsetup::EventSetupsController),
234  esp_(),
235  act_table_(),
237  schedule_(),
238  subProcesses_(),
239  historyAppender_(new HistoryAppender),
240  fb_(),
241  looper_(),
243  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
244  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
245  principalCache_(),
246  beginJobCalled_(false),
247  shouldWeStop_(false),
249  fileMode_(),
255  forceLooperToEnd_(false),
256  looperBeginJobRun_(false),
260  setCpuAffinity_(false),
262  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
263  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
264  processDesc->addServices(defaultServices, forcedServices);
265  init(processDesc, iToken, iLegacy);
266  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 268 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

270  :
271  actReg_(),
272  preg_(),
274  serviceToken_(),
275  input_(),
276  espController_(new eventsetup::EventSetupsController),
277  esp_(),
278  act_table_(),
280  schedule_(),
281  subProcesses_(),
282  historyAppender_(new HistoryAppender),
283  fb_(),
284  looper_(),
286  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
287  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
288  principalCache_(),
289  beginJobCalled_(false),
290  shouldWeStop_(false),
292  fileMode_(),
298  forceLooperToEnd_(false),
299  looperBeginJobRun_(false),
303  setCpuAffinity_(false),
307  {
308  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
309  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
310  processDesc->addServices(defaultServices, forcedServices);
312  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 314 of file EventProcessor.cc.

References init().

316  :
317  actReg_(),
318  preg_(),
320  serviceToken_(),
321  input_(),
322  espController_(new eventsetup::EventSetupsController),
323  esp_(),
324  act_table_(),
326  schedule_(),
327  subProcesses_(),
328  historyAppender_(new HistoryAppender),
329  fb_(),
330  looper_(),
332  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
333  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
334  principalCache_(),
335  beginJobCalled_(false),
336  shouldWeStop_(false),
338  fileMode_(),
344  forceLooperToEnd_(false),
345  looperBeginJobRun_(false),
349  setCpuAffinity_(false),
353  {
354  init(processDesc, token, legacy);
355  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 358 of file EventProcessor.cc.

References mps_alisetup::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

358  :
359  actReg_(),
360  preg_(),
362  serviceToken_(),
363  input_(),
364  espController_(new eventsetup::EventSetupsController),
365  esp_(),
366  act_table_(),
368  schedule_(),
369  subProcesses_(),
370  historyAppender_(new HistoryAppender),
371  fb_(),
372  looper_(),
374  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
375  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
376  principalCache_(),
377  beginJobCalled_(false),
378  shouldWeStop_(false),
380  fileMode_(),
386  forceLooperToEnd_(false),
387  looperBeginJobRun_(false),
391  setCpuAffinity_(false),
395  {
396  if(isPython) {
397  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
398  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
400  }
401  else {
402  auto processDesc = std::make_shared<ProcessDesc>(config);
404  }
405  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 581 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

581  {
582  // Make the services available while everything is being deleted.
584  ServiceRegistry::Operate op(token);
585 
586  // manually destroy all these thing that may need the services around
587  // propagate_const<T> has no reset() function
588  espController_ = nullptr;
589  esp_ = nullptr;
590  schedule_ = nullptr;
591  input_ = nullptr;
592  looper_ = nullptr;
593  actReg_ = nullptr;
594 
597  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:44
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2143 of file EventProcessor.cc.

2143  {
2145  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 600 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

600  {
601  if(beginJobCalled_) return;
602  beginJobCalled_=true;
603  bk::beginJob();
604 
605  // StateSentry toerror(this); // should we add this ?
606  //make the services available
608 
609  service::SystemBounds bounds(preallocations_.numberOfStreams(),
613  actReg_->preallocateSignal_(bounds);
615 
616  //NOTE: this may throw
618  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
619 
620  //NOTE: This implementation assumes 'Job' means one call
621  // the EventProcessor::run
622  // If it really means once per 'application' then this code will
623  // have to be changed.
624  // Also have to deal with case where have 'run' then new Module
625  // added and do 'run'
626  // again. In that case the newly added Module needs its 'beginJob'
627  // to be called.
628 
629  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
630  // For now we delay calling beginOfJob until first beginOfRun
631  //if(looper_) {
632  // looper_->beginOfJob(es);
633  //}
634  try {
635  convertException::wrap([&]() {
636  input_->doBeginJob();
637  });
638  }
639  catch(cms::Exception& ex) {
640  ex.addContext("Calling beginJob for the source");
641  throw;
642  }
643  schedule_->beginJob(*preg_);
644  // toerror.succeeded(); // should we add this?
645  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
646  actReg_->postBeginJobSignal_();
647 
648  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
649  schedule_->beginStream(i);
650  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
651  }
652  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1696 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, edm::for_all(), i, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), rng, and edm::LuminosityBlockPrincipal::run().

1696  {
1697  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1698  {
1699  SendSourceTerminationSignalIfException sentry(actReg_.get());
1700 
1701  input_->doBeginLumi(lumiPrincipal, &processContext_);
1702  sentry.completedSuccessfully();
1703  }
1704 
1706  if(rng.isAvailable()) {
1707  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1708  rng->preBeginLumi(lb);
1709  }
1710 
1711  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1712  // lumi blocks know their start and end times why not also start and end events?
1713  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1714  {
1715  SendSourceTerminationSignalIfException sentry(actReg_.get());
1716  espController_->eventSetupForInstance(ts);
1717  sentry.completedSuccessfully();
1718  }
1719  EventSetup const& es = esp_->eventSetup();
1720  {
1721  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1722  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1723  for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
1724  }
1725  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1726  if(looper_) {
1727  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1728  }
1729  {
1730  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1731  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1732  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1733  for_all(subProcesses_, [i, &lumiPrincipal, &ts](auto& subProcess){ subProcess.doStreamBeginLuminosityBlock(i,lumiPrincipal, ts); });
1734  }
1735  }
1736  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1737  if(looper_) {
1738  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1739  }
1740  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::Service< edm::RandomNumberGenerator > rng
edm::propagate_const< std::unique_ptr< InputSource > > input_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1606 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, edm::for_all(), i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1606  {
1607  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1608  {
1609  SendSourceTerminationSignalIfException sentry(actReg_.get());
1610 
1611  input_->doBeginRun(runPrincipal, &processContext_);
1612  sentry.completedSuccessfully();
1613  }
1614 
1615  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1616  runPrincipal.beginTime());
1618  espController_->forceCacheClear();
1619  }
1620  {
1621  SendSourceTerminationSignalIfException sentry(actReg_.get());
1622  espController_->eventSetupForInstance(ts);
1623  sentry.completedSuccessfully();
1624  }
1625  EventSetup const& es = esp_->eventSetup();
1626  if(looper_ && looperBeginJobRun_== false) {
1627  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1628  looper_->beginOfJob(es);
1629  looperBeginJobRun_ = true;
1630  looper_->doStartingNewLoop();
1631  }
1632  {
1633  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1634  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1635  for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
1636  }
1637  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1638  if(looper_) {
1639  looper_->doBeginRun(runPrincipal, es, &processContext_);
1640  }
1641  {
1642  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1643  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1644  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1645  for_all(subProcesses_, [i, &runPrincipal, &ts](auto& subProcess){ subProcess.doStreamBeginRun(i, runPrincipal, ts); });
1646  }
1647  }
1648  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1649  if(looper_) {
1650  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1651  }
1652  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 262 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 263 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1287 of file EventProcessor.cc.

References edm::shutdown_flag.

1287  {
1288  bool returnValue = false;
1289 
1290  // Look for a shutdown signal
1291  if(shutdown_flag.load(std::memory_order_acquire)) {
1292  returnValue = true;
1293  returnCode = epSignal;
1294  }
1295  return returnValue;
1296  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1249 of file EventProcessor.cc.

1249  {
1250  schedule_->clearCounters();
1251  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1506 of file EventProcessor.cc.

References FDEBUG.

1506  {
1507  if (fb_.get() != nullptr) {
1508  SendSourceTerminationSignalIfException sentry(actReg_.get());
1509  input_->closeFile(fb_.get(), cleaningUpAfterException);
1510  sentry.completedSuccessfully();
1511  }
1512  FDEBUG(1) << "\tcloseInputFile\n";
1513  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1523 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1523  {
1524  if (fb_.get() != nullptr) {
1525  schedule_->closeOutputFiles();
1526  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1527  }
1528  FDEBUG(1) << "\tcloseOutputFiles\n";
1529  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::unique_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1255 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1255  {
1257  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1258  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1259  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1260  else {
1261  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1262  << fileMode_ << ".\n"
1263  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1264  }
1265 
1266  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1267  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1268  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1269  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1270  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1271  else {
1272  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1273  << emptyRunLumiMode_ << ".\n"
1274  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1275  }
1276 
1277  auto machine = std::make_unique<statemachine::Machine>(
1278  this,
1279  fileMode,
1280  emptyRunLumiMode);
1281 
1282  machine->initiate();
1283  return machine;
1284  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1867 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1867  {
1869  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1870  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1871  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1855 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1855  {
1856  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1857  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1858  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1859  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1595 of file EventProcessor.cc.

References FDEBUG.

1595  {
1596  FDEBUG(1) << "\tdoErrorStuff\n";
1597  LogError("StateMachine")
1598  << "The EventProcessor state machine encountered an unexpected event\n"
1599  << "and went to the error state\n"
1600  << "Will attempt to terminate processing normally\n"
1601  << "(IF using the looper the next loop will be attempted)\n"
1602  << "This likely indicates a bug in an input module or corrupted input or both\n";
1604  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1234 of file EventProcessor.cc.

1234  {
1235  schedule_->enableEndPaths(active);
1236  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 655 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

655  {
656  // Collects exceptions, so we don't throw before all operations are performed.
657  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
658 
659  //make the services available
661 
662  //NOTE: this really should go elsewhere in the future
663  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
664  c.call([this,i](){this->schedule_->endStream(i);});
665  for(auto& subProcess : subProcesses_) {
666  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
667  }
668  }
669  auto actReg = actReg_.get();
670  c.call([actReg](){actReg->preEndJobSignal_();});
671  schedule_->endJob(c);
672  for(auto& subProcess : subProcesses_) {
673  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
674  }
675  c.call(std::bind(&InputSource::doEndJob, input_.get()));
676  if(looper_) {
677  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
678  }
679  c.call([actReg](){actReg->postEndJobSignal_();});
680  if(c.hasThrown()) {
681  c.rethrow();
682  }
683  }
int i
Definition: DBlmapReader.cc:9
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1742 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, edm::for_all(), i, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::LuminosityBlockPrincipal::run(), edm::LuminosityBlockPrincipal::setComplete(), and edm::LuminosityBlockPrincipal::setEndTime().

Referenced by Types.EventRange::cppID().

1742  {
1743  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1744  {
1745  SendSourceTerminationSignalIfException sentry(actReg_.get());
1746 
1747  lumiPrincipal.setEndTime(input_->timestamp());
1748  lumiPrincipal.setComplete();
1749  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1750  sentry.completedSuccessfully();
1751  }
1752  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1753  // lumi blocks know their start and end times why not also start and end events?
1754  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1755  lumiPrincipal.endTime());
1756  {
1757  SendSourceTerminationSignalIfException sentry(actReg_.get());
1758  espController_->eventSetupForInstance(ts);
1759  sentry.completedSuccessfully();
1760  }
1761  EventSetup const& es = esp_->eventSetup();
1762  {
1763  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1764  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1765  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1766  for_all(subProcesses_, [i, &lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException); });
1767  }
1768  }
1769  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1770  if(looper_) {
1771  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1772  }
1773  {
1774  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1775  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1776  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1777  }
1778  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1779  if(looper_) {
1780  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1781  }
1782  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1558 of file EventProcessor.cc.

References FDEBUG, and mps_update::status.

1558  {
1559  if(looper_) {
1560  ModuleChanger changer(schedule_.get(),preg_.get());
1561  looper_->setModuleChanger(&changer);
1562  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1563  looper_->setModuleChanger(nullptr);
1564  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1565  else return false;
1566  }
1567  FDEBUG(1) << "\tendOfLoop\n";
1568  return true;
1569  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
tuple status
Definition: mps_update.py:57
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1239 of file EventProcessor.cc.

1239  {
1240  return schedule_->endPathsEnabled();
1241  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1654 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, edm::for_all(), i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::RunPrincipal::setComplete(), and edm::RunPrincipal::setEndTime().

1654  {
1655  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1656  {
1657  SendSourceTerminationSignalIfException sentry(actReg_.get());
1658 
1659  runPrincipal.setEndTime(input_->timestamp());
1660  runPrincipal.setComplete();
1661  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1662  sentry.completedSuccessfully();
1663  }
1664 
1666  runPrincipal.endTime());
1667  {
1668  SendSourceTerminationSignalIfException sentry(actReg_.get());
1669  espController_->eventSetupForInstance(ts);
1670  sentry.completedSuccessfully();
1671  }
1672  EventSetup const& es = esp_->eventSetup();
1673  {
1674  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1675  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1676  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1677  for_all(subProcesses_, [i, &runPrincipal, &ts, cleaningUpAfterException](auto& subProcess) { subProcess.doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1678  });
1679  }
1680  }
1681  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1682  if(looper_) {
1683  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1684  }
1685  {
1686  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1687  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1688  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1689  }
1690  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1691  if(looper_) {
1692  looper_->doEndRun(runPrincipal, es, &processContext_);
1693  }
1694  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 913 of file EventProcessor.cc.

References assert(), bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

913  {
914 
915  if(0 == numberOfForkedChildren_) {return true;}
917  //do what we want done in common
918  {
919  beginJob(); //make sure this was run
920  // make the services available
922 
923  InputSource::ItemType itemType;
924  itemType = input_->nextItemType();
925 
926  assert(itemType == InputSource::IsFile);
927  {
928  readFile();
929  }
930  itemType = input_->nextItemType();
931  assert(itemType == InputSource::IsRun);
932 
933  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
934  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
935  input_->runAuxiliary()->beginTime());
936  espController_->eventSetupForInstance(ts);
937  EventSetup const& es = esp_->eventSetup();
938 
939  //now get all the data available in the EventSetup
940  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
941  es.fillAvailableRecordKeys(recordKeys);
942  std::vector<eventsetup::DataKey> dataKeys;
943  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
944  itKey != itEnd;
945  ++itKey) {
946  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
947  //see if this is on our exclusion list
948  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
949  ExcludedData const* excludedData(nullptr);
950  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
951  excludedData = &(itExcludeRec->second);
952  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
953  //skip all items in this record
954  continue;
955  }
956  }
957  if(0 != recordPtr) {
958  dataKeys.clear();
959  recordPtr->fillRegisteredDataKeys(dataKeys);
960  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
961  itDataKey != itDataKeyEnd;
962  ++itDataKey) {
963  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
964  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
965  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
966  continue;
967  }
968  try {
969  recordPtr->doGet(*itDataKey);
970  } catch(cms::Exception& e) {
971  LogWarning("ForkingEventSetupPreFetching") << e.what();
972  }
973  }
974  }
975  }
976  }
977  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
978  {
979  // make the services available
981  Service<JobReport> jobReport;
982  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
983 
984  //Now actually do the forking
985  actReg_->preForkReleaseResourcesSignal_();
986  input_->doPreForkReleaseResources();
987  schedule_->preForkReleaseResources();
988  }
989  installCustomHandler(SIGCHLD, ep_sigchld);
990 
991 
992  unsigned int childIndex = 0;
993  unsigned int const kMaxChildren = numberOfForkedChildren_;
994  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
995  std::vector<pid_t> childrenIds;
996  childrenIds.reserve(kMaxChildren);
997  std::vector<int> childrenSockets;
998  childrenSockets.reserve(kMaxChildren);
999  std::vector<int> childrenPipes;
1000  childrenPipes.reserve(kMaxChildren);
1001  std::vector<int> childrenSocketsCopy;
1002  childrenSocketsCopy.reserve(kMaxChildren);
1003  std::vector<int> childrenPipesCopy;
1004  childrenPipesCopy.reserve(kMaxChildren);
1005  int pipes[] {0, 0};
1006 
1007  {
1008  // make the services available
1010  Service<JobReport> jobReport;
1011  int sockets[2], fd_flags;
1012  for(; childIndex < kMaxChildren; ++childIndex) {
1013  // Create a UNIX_DGRAM socket pair
1014  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1015  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1016  exit(EXIT_FAILURE);
1017  }
1018  if (pipe(pipes)) {
1019  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1023  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1024  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1025  exit(EXIT_FAILURE);
1026  }
1027  // Mark socket as non-block. Child must be careful to do select prior
1028  // to reading from socket.
1029  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1030  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1031  exit(EXIT_FAILURE);
1032  }
1033  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1034  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1035  exit(EXIT_FAILURE);
1036  }
1037  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1038  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1039  exit(EXIT_FAILURE);
1040  }
1041  // Linux man page notes there are some edge cases where reading from a
1042  // fd can block, even after a select.
1043  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1044  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1045  exit(EXIT_FAILURE);
1046  }
1047  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1048  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1049  exit(EXIT_FAILURE);
1050  }
1051 
1052  childrenPipesCopy = childrenPipes;
1053  childrenSocketsCopy = childrenSockets;
1054 
1055  pid_t value = fork();
1056  if(value == 0) {
1057  // Close the parent's side of the socket and pipe which will talk to us.
1058  close(pipes[0]);
1059  close(sockets[0]);
1060  // Close our copies of the parent's other communication pipes.
1061  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1062  close(*it);
1063  }
1064  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1065  close(*it);
1066  }
1067 
1068  // this is the child process, redirect stdout and stderr to a log file
1069  fflush(stdout);
1070  fflush(stderr);
1071  std::stringstream stout;
1072  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1073  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1074  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1075  }
1076  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1077  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1078  }
1079 
1080  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1081  if(setCpuAffinity_) {
1082  // CPU affinity is handled differently on macosx.
1083  // We disable it and print a message until someone reads:
1084  //
1085  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1086  //
1087  // and implements it.
1088 #ifdef __APPLE__
1089  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1090 #else
1091  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1092  cpu_set_t mask;
1093  CPU_ZERO(&mask);
1094  CPU_SET(childIndex, &mask);
1095  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1096  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1097  exit(-1);
1098  }
1099 #endif
1100  }
1101  break;
1102  } else {
1103  //this is the parent
1104  close(pipes[1]);
1105  close(sockets[1]);
1106  }
1107  if(value < 0) {
1108  LogError("ForkingChild") << "failed to create a child";
1109  exit(-1);
1110  }
1111  childrenIds.push_back(value);
1112  childrenSockets.push_back(sockets[0]);
1113  childrenPipes.push_back(pipes[0]);
1114  }
1115 
1116  if(childIndex < kMaxChildren) {
1117  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1118  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1119 
1120  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1121  input_->doPostForkReacquireResources(receiver);
1122  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1123  //NOTE: sources have to reset themselves by listening to the post fork message
1124  //rewindInput();
1125  return true;
1126  }
1127  jobReport->parentAfterFork(jobReportFile);
1128  }
1129 
1130  //this is the original, which is now the master for all the children
1131 
1132  //Need to wait for signals from the children or externally
1133  // To wait we must
1134  // 1) block the signals we want to wait on so we do not have a race condition
1135  // 2) check that we haven't already meet our ending criteria
1136  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1137  sigset_t blockingSigSet;
1138  sigset_t unblockingSigSet;
1139  sigset_t oldSigSet;
1140  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1141  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1142  sigaddset(&blockingSigSet, SIGCHLD);
1143  sigaddset(&blockingSigSet, SIGUSR2);
1144  sigaddset(&blockingSigSet, SIGINT);
1145  sigdelset(&unblockingSigSet, SIGCHLD);
1146  sigdelset(&unblockingSigSet, SIGUSR2);
1147  sigdelset(&unblockingSigSet, SIGINT);
1148  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1149 
1150  // If there are too many fd's (unlikely, but possible) for select, denote this
1151  // because the sender will fail.
1152  bool too_many_fds = false;
1153  if (pipes[1]+1 > FD_SETSIZE) {
1154  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1155  too_many_fds = true;
1156  }
1157 
1158  //create a thread that sends the units of work to workers
1159  // we create it after all signals were blocked so that this
1160  // thread is never interupted by a signal
1161  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1162  boost::thread senderThread(sender);
1163 
1164  if(not too_many_fds) {
1165  //NOTE: a child could have failed before we got here and even after this call
1166  // which is why the 'if' is conditional on continueAfterChildFailure_
1168  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1169  sigsuspend(&unblockingSigSet);
1171  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1172  }
1173  }
1174  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1175 
1176  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1177  if(child_failed) {
1178  LogError("ForkingStopping") << "child failed";
1179  }
1180  if(shutdown_flag) {
1181  LogSystem("ForkingStopping") << "asked to shutdown";
1182  }
1183 
1184  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1185  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1186  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1187  it != itEnd; ++it) {
1188  /* int result = */ kill(*it, SIGUSR2);
1189  }
1190  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1191  while(num_children_done != kMaxChildren) {
1192  sigsuspend(&unblockingSigSet);
1193  }
1194  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1195  }
1196  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1197  senderThread.join();
1198  if(child_failed && !continueAfterChildFailure_) {
1199  if (child_fail_signal) {
1200  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1201  } else if (child_fail_exit_status) {
1202  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1203  } else {
1204  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1205  }
1206  }
1207  if(too_many_fds) {
1208  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1209  }
1210  return false;
1211  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
void possiblyContinueAfterForkChildFailure()
assert(m_qm.get())
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
virtual void readFile() override
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1214 of file EventProcessor.cc.

1214  {
1215  return schedule_->getAllModuleDescriptions();
1216  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 686 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

686  {
687  return serviceToken_;
688  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1244 of file EventProcessor.cc.

1244  {
1245  schedule_->getTriggerReport(rep);
1246  }
string rep
Definition: cuy.py:1188
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTask iTask,
unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1929 of file EventProcessor.cc.

References pyrootRender::destroy(), edm::WaitingTaskHolder::doneWaiting(), h, edm::make_waiting_task(), and cmsPerfStripChart::operate().

1932  {
1933  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
1934  if(iPtr) {
1935  bool expected = false;
1936  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1937  deferredExceptionPtr_ = *iPtr;
1938  {
1939  WaitingTaskHolder h(iTask);
1940  h.doneWaiting(*iPtr);
1941  }
1942  }
1943  //the stream will stop now
1944  iTask->decrement_ref_count();
1945  return;
1946  }
1947 
1948  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
1949  });
1950 
1951  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
1953 
1954  try {
1955  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
1956  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
1957  } else {
1958  //the stream will stop now
1959  tbb::task::destroy(*recursionTask);
1960  iTask->decrement_ref_count();
1961  }
1962  } catch(...) {
1963  WaitingTaskHolder h(recursionTask);
1964  h.doneWaiting(std::current_exception());
1965  }
1966  });
1967  }
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void push(const T &iAction)
asynchronously pushes functor iAction into queue
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 408 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, cmsHarvester::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), thinnedAssociationsHelper_, and unpackBuffers-CaloStage2::token.

Referenced by EventProcessor().

410  {
411 
412  //std::cerr << processDesc->dump() << std::endl;
413 
414  // register the empty parentage vector , once and for all
416 
417  // register the empty parameter set, once and for all.
418  ParameterSet().registerIt();
419 
420  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
421 
422  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
423  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
424  bool const hasSubProcesses = !subProcessVParameterSet.empty();
425 
426  // Now set some parameters specific to the main process.
427  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
428  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
429  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
430  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
431  //threading
432  unsigned int nThreads=1;
433  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
434  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
435  if(nThreads == 0) {
436  nThreads = 1;
437  }
438  }
439  /* TODO: when we support having each stream run in a different thread use this default
440  unsigned int nStreams =nThreads;
441  */
442  unsigned int nStreams =1;
443  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
444  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
445  if(nStreams==0) {
446  nStreams = nThreads;
447  }
448  // PG: Log the number of streams
449  edm::LogInfo("StreamSetup") <<"setting # streams "<<nStreams;
450  }
451  /*
452  bool nRunsSet = false;
453  */
454  unsigned int nConcurrentRuns =1;
455  /*
456  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
457  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
458  }
459  */
460  unsigned int nConcurrentLumis =1;
461  /*
462  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
463  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
464  } else {
465  nConcurrentLumis = nConcurrentRuns;
466  }
467  */
468  //Check that relationships between threading parameters makes sense
469  /*
470  if(nThreads<nStreams) {
471  //bad
472  }
473  if(nConcurrentRuns>nStreams) {
474  //bad
475  }
476  if(nConcurrentRuns>nConcurrentLumis) {
477  //bad
478  }
479  */
480  //forking
481  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
482  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
483  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
484  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
485  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
486  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
487  for(auto const& ps : excluded) {
488  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
489  ps.getUntrackedParameter<std::string>("label", ""));
490  }
491  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
492 
493  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
494 
495  // Now do general initialization
496  ScheduleItems items;
497 
498  //initialize the services
499  auto& serviceSets = processDesc->getServicesPSets();
500  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
501  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
502 
503  //make the services available
505 
506  if(nStreams>1) {
508  handler->willBeUsingThreads();
509  }
510 
511  // intialize miscellaneous items
512  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
513 
514  // intialize the event setup provider
515  esp_ = espController_->makeProvider(*parameterSet);
516 
517  // initialize the looper, if any
518  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
519  if(looper_) {
520  looper_->setActionTable(items.act_table_.get());
521  looper_->attachTo(*items.actReg_);
522 
523  //For now loopers make us run only 1 transition at a time
524  nStreams=1;
525  nConcurrentLumis=1;
526  nConcurrentRuns=1;
527  }
528 
529  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
530 
531  // initialize the input source
532  input_ = makeInput(*parameterSet,
533  *common,
534  items.preg(),
535  items.branchIDListHelper(),
536  items.thinnedAssociationsHelper(),
537  items.actReg_,
538  items.processConfiguration(),
540 
541  // intialize the Schedule
542  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
543 
544  // set the data members
545  act_table_ = std::move(items.act_table_);
546  actReg_ = items.actReg_;
547  preg_ = items.preg();
548  branchIDListHelper_ = items.branchIDListHelper();
549  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
550  processConfiguration_ = items.processConfiguration();
552  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
553 
554  FDEBUG(2) << parameterSet << std::endl;
555 
557  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
558  // Reusable event principal
559  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
562  }
563 
564  // fill the subprocesses, if there are any
565  subProcesses_.reserve(subProcessVParameterSet.size());
566  for(auto& subProcessPSet : subProcessVParameterSet) {
567  subProcesses_.emplace_back(subProcessPSet,
568  *parameterSet,
569  preg(),
573  *actReg_,
574  token,
577  &processContext_);
578  }
579  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:510
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:610
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 266 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

266 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 267 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

267 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::openOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1515 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1515  {
1516  if (fb_.get() != nullptr) {
1517  schedule_->openOutputFiles(*fb_);
1518  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1519  }
1520  FDEBUG(1) << "\topenOutputFiles\n";
1521  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 897 of file EventProcessor.cc.

897  {
898  if(child_failed && continueAfterChildFailure_) {
899  if (child_fail_signal) {
900  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
901  child_fail_signal=0;
902  } else if (child_fail_exit_status) {
903  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
904  child_fail_exit_status=0;
905  } else {
906  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
907  }
908  child_failed =false;
909  }
910  }
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 260 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), and init().

260 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

261 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1577 of file EventProcessor.cc.

References FDEBUG.

1577  {
1578  looper_->prepareForNextLoop(esp_.get());
1579  FDEBUG(1) << "\tprepareForNextLoop\n";
1580  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 124 of file EventProcessor.h.

References processConfiguration_.

124 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2026 of file EventProcessor.cc.

References assert(), edm::WaitingTaskHolder::doneWaiting(), ev, FDEBUG, edm::Service< T >::isAvailable(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), and rng.

2027  {
2028  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2029  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2031  if(rng.isAvailable()) {
2032  Event ev(*pep, ModuleDescription(), nullptr);
2033  rng->postEventRead(ev);
2034  }
2035  assert(pep->luminosityBlockPrincipalPtrValid());
2036  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2037  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2038 
2039  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2040  tbb::task::allocate_root(),
2041  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2042  {
2044 
2045  //NOTE: If we have a looper we only have one Stream
2046  if(looper_) {
2047  processEventWithLooper(*pep);
2048  }
2049 
2050  FDEBUG(1) << "\tprocessEvent\n";
2051  pep->clearEventPrincipal();
2052  if(iPtr) {
2053  iHolder.doneWaiting(*iPtr);
2054  } else {
2055  iHolder.doneWaiting(std::exception_ptr());
2056  }
2057  }
2058  )
2059  );
2060  WaitingTaskHolder afterProcessTask;
2061  if(subProcesses_.empty()) {
2062  afterProcessTask = std::move(finalizeEventTask);
2063  } else {
2064  //Need to run SubProcesses after schedule has finished
2065  // with the event
2066  afterProcessTask = WaitingTaskHolder(
2067  make_waiting_task(tbb::task::allocate_root(),
2068  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2069  {
2070  if(not iPtr) {
2072 
2073  //when run with 1 thread, we want to the order to be what
2074  // it was before. This requires reversing the order since
2075  // tasks are run last one in first one out
2076  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2077  subProcess.doEventAsync(finalizeEventTask,*pep);
2078  }
2079  } else {
2080  finalizeEventTask.doneWaiting(*iPtr);
2081  }
2082  })
2083  );
2084  }
2085 
2086  schedule_->processOneEventAsync(std::move(afterProcessTask),
2087  iStreamIndex,*pep, esp_->eventSetup());
2088 
2089  }
edm::Service< edm::RandomNumberGenerator > rng
assert(m_qm.get())
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Definition: Event.h:16
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
def move
Definition: eostools.py:510
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void processEventWithLooper(EventPrincipal &)
PrincipalCache principalCache_
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 2091 of file EventProcessor.cc.

References edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

2091  {
2092  bool randomAccess = input_->randomAccess();
2093  ProcessingController::ForwardState forwardState = input_->forwardState();
2094  ProcessingController::ReverseState reverseState = input_->reverseState();
2095  ProcessingController pc(forwardState, reverseState, randomAccess);
2096 
2098  do {
2099 
2100  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2101  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2102 
2103  bool succeeded = true;
2104  if(randomAccess) {
2105  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2106  input_->skipEvents(-2);
2107  }
2108  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2109  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2110  }
2111  }
2112  pc.setLastOperationSucceeded(succeeded);
2113  } while(!pc.lastOperationSucceeded());
2114  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2115  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
tuple status
Definition: mps_update.py:57
int edm::EventProcessor::readAndMergeLumi ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1839 of file EventProcessor.cc.

References edm::preg.

1839  {
1840  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1841  {
1842  SendSourceTerminationSignalIfException sentry(actReg_.get());
1843  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1844  sentry.completedSuccessfully();
1845  }
1846  return input_->luminosityBlock();
1847  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1802 of file EventProcessor.cc.

References assert(), edm::preg, and PDRates::Run.

1802  {
1803  principalCache_.merge(input_->runAuxiliary(), preg());
1804  auto runPrincipal =principalCache_.runPrincipalPtr();
1805  {
1806  SendSourceTerminationSignalIfException sentry(actReg_.get());
1807  input_->readAndMergeRun(*runPrincipal);
1808  sentry.completedSuccessfully();
1809  }
1810  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1811  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1812  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(m_qm.get())
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1969 of file EventProcessor.cc.

References edm::make_empty_waiting_task(), and edm::make_waiting_task().

1969  {
1970  if(numberOfForkedChildren_>0) {
1971  //Have to do something special for forking since
1972  // after each event the system may have to skip
1973  // some transitions. This is handled in runToCompletion
1974  readEvent(0);
1975  auto eventLoopWaitTask = make_empty_waiting_task();
1976  eventLoopWaitTask->increment_ref_count();
1977  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
1978  eventLoopWaitTask->wait_for_all();
1979  return;
1980  }
1983 
1984  std::atomic<bool> finishedProcessingEvents{false};
1985  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
1986 
1987  //The state machine already found the event so
1988  // we have to avoid looking again
1989  firstEventInBlock_ = true;
1990 
1991  //To wait, the ref count has to b 1+#streams
1992  auto eventLoopWaitTask = make_empty_waiting_task();
1993  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
1994  eventLoopWaitTask->increment_ref_count();
1995 
1996  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1997  unsigned int iStreamIndex = 0;
1998  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1999  eventLoopWaitTask->increment_ref_count();
2000  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2001  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2002  }) );
2003  }
2004  eventLoopWaitTask->increment_ref_count();
2005  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2006  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2007  }));
2008 
2009  //One of the processing threads saw an exception
2011  std::rethrow_exception(deferredExceptionPtr_);
2012  }
2013  }
void readEvent(unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2014 of file EventProcessor.cc.

References event(), and FDEBUG.

2014  {
2015  //TODO this will have to become per stream
2016  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2017  StreamContext streamContext(event.streamID(), &processContext_);
2018 
2019  SendSourceTerminationSignalIfException sentry(actReg_.get());
2020  input_->readEvent(event, streamContext);
2021  sentry.completedSuccessfully();
2022 
2023  FDEBUG(1) << "\treadEvent\n";
2024  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1488 of file EventProcessor.cc.

References FDEBUG, or, edm::preg, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1488  {
1489  FDEBUG(1) << " \treadFile\n";
1490  size_t size = preg_->size();
1491  SendSourceTerminationSignalIfException sentry(actReg_.get());
1492 
1493  fb_ = input_->readFile();
1494  if(size < preg_->size()) {
1496  }
1498  if((numberOfForkedChildren_ > 0) or
1501  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1502  }
1503  sentry.completedSuccessfully();
1504  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1814 of file EventProcessor.cc.

References Exception, edm::errors::LogicError, and edm::preg.

1814  {
1817  << "EventProcessor::readRun\n"
1818  << "Illegal attempt to insert lumi into cache\n"
1819  << "Contact a Framework Developer\n";
1820  }
1823  << "EventProcessor::readRun\n"
1824  << "Illegal attempt to insert lumi into cache\n"
1825  << "Run is invalid\n"
1826  << "Contact a Framework Developer\n";
1827  }
1828  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1829  {
1830  SendSourceTerminationSignalIfException sentry(actReg_.get());
1831  input_->readLuminosityBlock(*lbp, *historyAppender_);
1832  sentry.completedSuccessfully();
1833  }
1834  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1835  principalCache_.insert(lbp);
1836  return input_->luminosityBlock();
1837  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1873 of file EventProcessor.cc.

References cmsPerfStripChart::operate().

1874  {
1875  if(shouldWeStop()) {
1876  return false;
1877  }
1878 
1879  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1880  return false;
1881  }
1882 
1883  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1884  return false;
1885  }
1886 
1890  handler->initializeThisThreadForUse();
1891  }
1892 
1893  try {
1894  //need to use lock in addition to the serial task queue because
1895  // of delayed provenance reading and reading data in response to
1896  // edm::Refs etc
1897  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1898  if(not firstEventInBlock_) {
1899  //The state machine already called input_->nextItemType
1900  // and found an event. We can't call input_->nextItemType
1901  // again since it would move to the next transition
1902  InputSource::ItemType itemType = input_->nextItemType();
1903  if (InputSource::IsEvent !=itemType) {
1905  finishedProcessingEvents->store(true,std::memory_order_release);
1906  //std::cerr<<"next item type "<<itemType<<"\n";
1907  return false;
1908  }
1910  //std::cerr<<"task told to async stop\n";
1911  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1912  return false;
1913  }
1914  } else {
1915  firstEventInBlock_ = false;
1916  }
1917  readEvent(iStreamIndex);
1918  } catch (...) {
1919  bool expected =false;
1920  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1921  deferredExceptionPtr_ = std::current_exception();
1922 
1923  }
1924  return false;
1925  }
1926  return true;
1927  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< std::recursive_mutex > sourceMutex_
StatusCode asyncStopStatusCodeFromProcessingEvents_
virtual bool shouldWeStop() const override
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
statemachine::Run edm::EventProcessor::readRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1784 of file EventProcessor.cc.

References assert(), Exception, edm::errors::LogicError, edm::preg, and PDRates::Run.

1784  {
1787  << "EventProcessor::readRun\n"
1788  << "Illegal attempt to insert run into cache\n"
1789  << "Contact a Framework Developer\n";
1790  }
1791  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1792  {
1793  SendSourceTerminationSignalIfException sentry(actReg_.get());
1794  input_->readRun(*rp, *historyAppender_);
1795  sentry.completedSuccessfully();
1796  }
1797  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1798  principalCache_.insert(rp);
1799  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1800  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(m_qm.get())
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1540 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1540  {
1541  if (fb_.get() != nullptr) {
1542  schedule_->respondToCloseInputFile(*fb_);
1543  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1544  }
1545  FDEBUG(1) << "\trespondToCloseInputFile\n";
1546  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1531 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1531  {
1532  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1533  if (fb_.get() != nullptr) {
1534  schedule_->respondToOpenInputFile(*fb_);
1535  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1536  }
1537  FDEBUG(1) << "\trespondToOpenInputFile\n";
1538  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1571 of file EventProcessor.cc.

References FDEBUG.

1571  {
1572  input_->repeat();
1573  input_->rewind();
1574  FDEBUG(1) << "\trewind\n";
1575  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 337 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

337  {
338  return runToCompletion();
339  }
virtual StatusCode runToCompletion() override
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1300 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), bk::beginJob(), alignCSCRings::e, Exception, FDEBUG, edm::errors::LogicError, eostools::move(), cmsPerfStripChart::operate(), edm::preg, runEdmFileComparison::returnCode, findQualityFiles::size, and edm::convertException::wrap().

Referenced by PythonEventProcessor::run(), and run().

1300  {
1301 
1304  std::unique_ptr<statemachine::Machine> machine;
1305  {
1306  beginJob(); //make sure this was called
1307 
1308  //StatusCode returnCode = epSuccess;
1310 
1311  // make the services available
1313 
1314  machine = createStateMachine();
1317  try {
1318  convertException::wrap([&]() {
1319 
1320  InputSource::ItemType itemType;
1321 
1322  while(true) {
1323 
1324  bool more = true;
1325  if(numberOfForkedChildren_ > 0) {
1326  size_t size = preg_->size();
1327  {
1328  SendSourceTerminationSignalIfException sentry(actReg_.get());
1329  more = input_->skipForForking();
1330  sentry.completedSuccessfully();
1331  }
1332  if(more) {
1333  if(size < preg_->size()) {
1335  }
1337  }
1338  }
1339  {
1340  SendSourceTerminationSignalIfException sentry(actReg_.get());
1341  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1342  sentry.completedSuccessfully();
1343  }
1344 
1345  FDEBUG(1) << "itemType = " << itemType << "\n";
1346 
1347  if(checkForAsyncStopRequest(returnCode)) {
1348  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1349  forceLooperToEnd_ = true;
1350  machine->process_event(statemachine::Stop());
1351  forceLooperToEnd_ = false;
1352  break;
1353  }
1354 
1355  if(itemType == InputSource::IsEvent) {
1356  machine->process_event(statemachine::Event());
1358  forceLooperToEnd_ = true;
1359  machine->process_event(statemachine::Stop());
1360  forceLooperToEnd_ = false;
1362  break;
1363  }
1365  }
1366 
1367  if(itemType == InputSource::IsEvent) {
1368  }
1369  else if(itemType == InputSource::IsStop) {
1370  machine->process_event(statemachine::Stop());
1371  }
1372  else if(itemType == InputSource::IsFile) {
1373  machine->process_event(statemachine::File());
1374  }
1375  else if(itemType == InputSource::IsRun) {
1376  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1377  }
1378  else if(itemType == InputSource::IsLumi) {
1379  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1380  }
1381  else if(itemType == InputSource::IsSynchronize) {
1382  //For now, we don't have to do anything
1383  }
1384  // This should be impossible
1385  else {
1387  << "Unknown next item type passed to EventProcessor\n"
1388  << "Please report this error to the Framework group\n";
1389  }
1390  if(machine->terminated()) {
1391  break;
1392  }
1393  } // End of loop over state machine events
1394  }); // convertException::wrap
1395  } // Try block
1396  // Some comments on exception handling related to the boost state machine:
1397  //
1398  // Some states used in the machine are special because they
1399  // perform actions while the machine is being terminated, actions
1400  // such as close files, call endRun, call endLumi etc ... Each of these
1401  // states has two functions that perform these actions. The functions
1402  // are almost identical. The major difference is that one version
1403  // catches all exceptions and the other lets exceptions pass through.
1404  // The destructor catches them and the other function named "exit" lets
1405  // them pass through. On a normal termination, boost will always call
1406  // "exit" and then the state destructor. In our state classes, the
1407  // the destructors do nothing if the exit function already took
1408  // care of things. Here's the interesting part. When boost is
1409  // handling an exception the "exit" function is not called (a boost
1410  // feature).
1411  //
1412  // If an exception occurs while the boost machine is in control
1413  // (which usually means inside a process_event call), then
1414  // the boost state machine destroys its states and "terminates" itself.
1415  // This already done before we hit the catch blocks below. In this case
1416  // the call to terminateMachine below only destroys an already
1417  // terminated state machine. Because exit is not called, the state destructors
1418  // handle cleaning up lumis, runs, and files. The destructors swallow
1419  // all exceptions and only pass through the exceptions messages, which
1420  // are tacked onto the original exception below.
1421  //
1422  // If an exception occurs when the boost state machine is not
1423  // in control (outside the process_event functions), then boost
1424  // cannot destroy its own states. The terminateMachine function
1425  // below takes care of that. The flag "alreadyHandlingException"
1426  // is set true so that the state exit functions do nothing (and
1427  // cannot throw more exceptions while handling the first). Then the
1428  // state destructors take care of this because exit did nothing.
1429  //
1430  // In both cases above, the EventProcessor::endOfLoop function is
1431  // not called because it can throw exceptions.
1432  //
1433  // One tricky aspect of the state machine is that things that can
1434  // throw should not be invoked by the state machine while another
1435  // exception is being handled.
1436  // Another tricky aspect is that it appears to be important to
1437  // terminate the state machine before invoking its destructor.
1438  // We've seen crashes that are not understood when that is not
1439  // done. Maintainers of this code should be careful about this.
1440 
1441  catch (cms::Exception & e) {
1443  terminateMachine(std::move(machine));
1444  alreadyHandlingException_ = false;
1445  if (!exceptionMessageLumis_.empty()) {
1447  if (e.alreadyPrinted()) {
1448  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1449  }
1450  }
1451  if (!exceptionMessageRuns_.empty()) {
1453  if (e.alreadyPrinted()) {
1454  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1455  }
1456  }
1457  if (!exceptionMessageFiles_.empty()) {
1459  if (e.alreadyPrinted()) {
1460  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1461  }
1462  }
1463  throw;
1464  }
1465 
1466  if(machine->terminated()) {
1467  FDEBUG(1) << "The state machine reports it has been terminated\n";
1468  machine.reset();
1469  }
1470 
1472  throw cms::Exception("BadState")
1473  << "The boost state machine in the EventProcessor exited after\n"
1474  << "entering the Error state.\n";
1475  }
1476 
1477  }
1478  if(machine.get() != nullptr) {
1479  terminateMachine(std::move(machine));
1481  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1482  << "Please report this error to the Framework group\n";
1483  }
1484 
1485  return returnCode;
1486  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
InputSource::ItemType nextItemTypeFromProcessingEvents_
def move
Definition: eostools.py:510
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2131 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2131  {
2133  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2139 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2139  {
2141  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2135 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2135  {
2137  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1582 of file EventProcessor.cc.

References FDEBUG.

1582  {
1583  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1584  if(!subProcesses_.empty()) {
1585  for(auto const& subProcess : subProcesses_) {
1586  if(subProcess.shouldWeCloseOutput()) {
1587  return true;
1588  }
1589  }
1590  return false;
1591  }
1592  return schedule_->shouldWeCloseOutput();
1593  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2117 of file EventProcessor.cc.

References FDEBUG.

2117  {
2118  FDEBUG(1) << "\tshouldWeStop\n";
2119  if(shouldWeStop_) return true;
2120  if(!subProcesses_.empty()) {
2121  for(auto const& subProcess : subProcesses_) {
2122  if(subProcess.terminate()) {
2123  return true;
2124  }
2125  }
2126  return false;
2127  }
2128  return schedule_->terminate();
2129  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1548 of file EventProcessor.cc.

References FDEBUG.

1548  {
1549  shouldWeStop_ = false;
1550  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1551  // until after we've called beginOfJob
1552  if(looper_ && looperBeginJobRun_) {
1553  looper_->doStartingNewLoop();
1554  }
1555  FDEBUG(1) << "\tstartingNewLoop\n";
1556  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::terminateMachine ( std::unique_ptr< statemachine::Machine iMachine)
private

Definition at line 2147 of file EventProcessor.cc.

References FDEBUG.

2147  {
2148  if(iMachine.get() != nullptr) {
2149  if(!iMachine->terminated()) {
2150  forceLooperToEnd_ = true;
2151  iMachine->process_event(statemachine::Stop());
2152  forceLooperToEnd_ = false;
2153  }
2154  else {
2155  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2156  }
2157  if(iMachine->terminated()) {
2158  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2159  }
2160  }
2161  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 264 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 265 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1219 of file EventProcessor.cc.

Referenced by PythonEventProcessor::totalEvents().

1219  {
1220  return schedule_->totalEvents();
1221  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1229 of file EventProcessor.cc.

Referenced by PythonEventProcessor::totalEventsFailed().

1229  {
1230  return schedule_->totalEventsFailed();
1231  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1224 of file EventProcessor.cc.

Referenced by PythonEventProcessor::totalEventsPassed().

1224  {
1225  return schedule_->totalEventsPassed();
1226  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1861 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1861  {
1863  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1864  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1865  }
ProcessContext processContext_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1849 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1849  {
1850  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1851  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1852  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1853  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 275 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 309 of file EventProcessor.h.

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 321 of file EventProcessor.h.

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 323 of file EventProcessor.h.

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 301 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 277 of file EventProcessor.h.

Referenced by branchIDListHelper(), and init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 317 of file EventProcessor.h.

Referenced by init().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 296 of file EventProcessor.h.

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 295 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 305 of file EventProcessor.h.

Referenced by init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

Definition at line 282 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 281 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 328 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 306 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 308 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 307 of file EventProcessor.h.

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private

Definition at line 291 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 304 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 324 of file EventProcessor.h.

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 312 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 310 of file EventProcessor.h.

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

Definition at line 280 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 311 of file EventProcessor.h.

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 322 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 314 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 315 of file EventProcessor.h.

Referenced by init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 286 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private

Definition at line 319 of file EventProcessor.h.

Referenced by beginJob(), endJob(), and init().

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 276 of file EventProcessor.h.

Referenced by beginJob(), init(), and preg().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 300 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 284 of file EventProcessor.h.

Referenced by init(), and processConfiguration().

ProcessContext edm::EventProcessor::processContext_
private

Definition at line 285 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 279 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 302 of file EventProcessor.h.

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 299 of file EventProcessor.h.

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 298 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 303 of file EventProcessor.h.

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().