CMS 3D CMS Logo

List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const override
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void beginRun (statemachine::Run const &run) override
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException) override
 
virtual void closeOutputFiles () override
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void deleteRunFromCache (statemachine::Run const &run) override
 
virtual void doErrorStuff () override
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
 
virtual bool endOfLoop () override
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException) override
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles () override
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop () override
 
ProcessConfiguration const & processConfiguration () const
 
virtual int readAndMergeLumi () override
 
virtual statemachine::Run readAndMergeRun () override
 
virtual void readAndProcessEvent () override
 
virtual void readFile () override
 
virtual int readLuminosityBlock () override
 
virtual statemachine::Run readRun () override
 
virtual void respondToCloseInputFile () override
 
virtual void respondToOpenInputFile () override
 
virtual void rewindInput () override
 
StatusCode run ()
 
virtual StatusCode runToCompletion () override
 
virtual void setExceptionMessageFiles (std::string &message) override
 
virtual void setExceptionMessageLumis (std::string &message) override
 
virtual void setExceptionMessageRuns (std::string &message) override
 
virtual bool shouldWeCloseOutput () const override
 
virtual bool shouldWeStop () const override
 
virtual void startingNewLoop () override
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void writeRun (statemachine::Run const &run) override
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
std::unique_ptr< statemachine::MachinecreateStateMachine ()
 
void handleNextEventForStreamAsync (WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
void possiblyContinueAfterForkChildFailure ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void setupSignal ()
 
void terminateMachine (std::unique_ptr< statemachine::Machine >)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
std::string fileMode_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
bool stateMachineWasInErrorState_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 65 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 326 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 327 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 227 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

231  :
232  actReg_(),
233  preg_(),
235  serviceToken_(),
236  input_(),
237  espController_(new eventsetup::EventSetupsController),
238  esp_(),
239  act_table_(),
241  schedule_(),
242  subProcesses_(),
243  historyAppender_(new HistoryAppender),
244  fb_(),
245  looper_(),
247  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
248  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
249  principalCache_(),
250  beginJobCalled_(false),
251  shouldWeStop_(false),
253  fileMode_(),
259  forceLooperToEnd_(false),
260  looperBeginJobRun_(false),
264  setCpuAffinity_(false),
266  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
267  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
268  processDesc->addServices(defaultServices, forcedServices);
269  init(processDesc, iToken, iLegacy);
270  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 272 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

274  :
275  actReg_(),
276  preg_(),
278  serviceToken_(),
279  input_(),
280  espController_(new eventsetup::EventSetupsController),
281  esp_(),
282  act_table_(),
284  schedule_(),
285  subProcesses_(),
286  historyAppender_(new HistoryAppender),
287  fb_(),
288  looper_(),
290  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
291  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
292  principalCache_(),
293  beginJobCalled_(false),
294  shouldWeStop_(false),
296  fileMode_(),
302  forceLooperToEnd_(false),
303  looperBeginJobRun_(false),
307  setCpuAffinity_(false),
311  {
312  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
313  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
314  processDesc->addServices(defaultServices, forcedServices);
316  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 318 of file EventProcessor.cc.

References init().

320  :
321  actReg_(),
322  preg_(),
324  serviceToken_(),
325  input_(),
326  espController_(new eventsetup::EventSetupsController),
327  esp_(),
328  act_table_(),
330  schedule_(),
331  subProcesses_(),
332  historyAppender_(new HistoryAppender),
333  fb_(),
334  looper_(),
336  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
337  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
338  principalCache_(),
339  beginJobCalled_(false),
340  shouldWeStop_(false),
342  fileMode_(),
348  forceLooperToEnd_(false),
349  looperBeginJobRun_(false),
353  setCpuAffinity_(false),
357  {
358  init(processDesc, token, legacy);
359  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 362 of file EventProcessor.cc.

References mps_alisetup::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

362  :
363  actReg_(),
364  preg_(),
366  serviceToken_(),
367  input_(),
368  espController_(new eventsetup::EventSetupsController),
369  esp_(),
370  act_table_(),
372  schedule_(),
373  subProcesses_(),
374  historyAppender_(new HistoryAppender),
375  fb_(),
376  looper_(),
378  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
379  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
380  principalCache_(),
381  beginJobCalled_(false),
382  shouldWeStop_(false),
384  fileMode_(),
390  forceLooperToEnd_(false),
391  looperBeginJobRun_(false),
395  setCpuAffinity_(false),
399  {
400  if(isPython) {
401  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
402  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
404  }
405  else {
406  auto processDesc = std::make_shared<ProcessDesc>(config);
408  }
409  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 588 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

588  {
589  // Make the services available while everything is being deleted.
590  ServiceToken token = getToken();
591  ServiceRegistry::Operate op(token);
592 
593  // manually destroy all these thing that may need the services around
594  // propagate_const<T> has no reset() function
595  espController_ = nullptr;
596  esp_ = nullptr;
597  schedule_ = nullptr;
598  input_ = nullptr;
599  looper_ = nullptr;
600  actReg_ = nullptr;
601 
604  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:44
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2219 of file EventProcessor.cc.

References alreadyHandlingException_.

2219  {
2221  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 607 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by forkProcess(), and runToCompletion().

607  {
608  if(beginJobCalled_) return;
609  beginJobCalled_=true;
610  bk::beginJob();
611 
612  // StateSentry toerror(this); // should we add this ?
613  //make the services available
615 
616  service::SystemBounds bounds(preallocations_.numberOfStreams(),
620  actReg_->preallocateSignal_(bounds);
622 
623  //NOTE: this may throw
625  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
626 
627  //NOTE: This implementation assumes 'Job' means one call
628  // the EventProcessor::run
629  // If it really means once per 'application' then this code will
630  // have to be changed.
631  // Also have to deal with case where have 'run' then new Module
632  // added and do 'run'
633  // again. In that case the newly added Module needs its 'beginJob'
634  // to be called.
635 
636  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
637  // For now we delay calling beginOfJob until first beginOfRun
638  //if(looper_) {
639  // looper_->beginOfJob(es);
640  //}
641  try {
642  convertException::wrap([&]() {
643  input_->doBeginJob();
644  });
645  }
646  catch(cms::Exception& ex) {
647  ex.addContext("Calling beginJob for the source");
648  throw;
649  }
650  schedule_->beginJob(*preg_);
651  // toerror.succeeded(); // should we add this?
652  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
653  actReg_->postBeginJobSignal_();
654 
655  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
656  schedule_->beginStream(i);
657  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
658  }
659  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1740 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, espController_, FDEBUG, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, and subProcesses_.

1740  {
1741  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1742  {
1743  SendSourceTerminationSignalIfException sentry(actReg_.get());
1744 
1745  input_->doBeginLumi(lumiPrincipal, &processContext_);
1746  sentry.completedSuccessfully();
1747  }
1748 
1750  if(rng.isAvailable()) {
1751  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1752  rng->preBeginLumi(lb);
1753  }
1754 
1755  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1756  // lumi blocks know their start and end times why not also start and end events?
1757  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1758  {
1759  SendSourceTerminationSignalIfException sentry(actReg_.get());
1760  espController_->eventSetupForInstance(ts);
1761  sentry.completedSuccessfully();
1762  }
1763  EventSetup const& es = esp_->eventSetup();
1764  {
1765  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1766  auto globalWaitTask = make_empty_waiting_task();
1767  globalWaitTask->increment_ref_count();
1768  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1769  *schedule_,
1770  lumiPrincipal,
1771  ts,
1772  es,
1773  subProcesses_);
1774  globalWaitTask->wait_for_all();
1775  if(globalWaitTask->exceptionPtr() != nullptr) {
1776  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1777  }
1778  }
1779  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1780  if(looper_) {
1781  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1782  }
1783  {
1784  //To wait, the ref count has to b 1+#streams
1785  auto streamLoopWaitTask = make_empty_waiting_task();
1786  streamLoopWaitTask->increment_ref_count();
1787 
1788  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1789 
1790  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1791  *schedule_,
1793  lumiPrincipal,
1794  ts,
1795  es,
1796  subProcesses_);
1797  streamLoopWaitTask->wait_for_all();
1798  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1799  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1800  }
1801  }
1802 
1803  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1804  if(looper_) {
1805  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1806  }
1807  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1613 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1613  {
1614  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1615  {
1616  SendSourceTerminationSignalIfException sentry(actReg_.get());
1617 
1618  input_->doBeginRun(runPrincipal, &processContext_);
1619  sentry.completedSuccessfully();
1620  }
1621 
1622  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1623  runPrincipal.beginTime());
1625  espController_->forceCacheClear();
1626  }
1627  {
1628  SendSourceTerminationSignalIfException sentry(actReg_.get());
1629  espController_->eventSetupForInstance(ts);
1630  sentry.completedSuccessfully();
1631  }
1632  EventSetup const& es = esp_->eventSetup();
1633  if(looper_ && looperBeginJobRun_== false) {
1634  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1635  looper_->beginOfJob(es);
1636  looperBeginJobRun_ = true;
1637  looper_->doStartingNewLoop();
1638  }
1639  {
1640  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1641  auto globalWaitTask = make_empty_waiting_task();
1642  globalWaitTask->increment_ref_count();
1643  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1644  *schedule_,
1645  runPrincipal,
1646  ts,
1647  es,
1648  subProcesses_);
1649  globalWaitTask->wait_for_all();
1650  if(globalWaitTask->exceptionPtr() != nullptr) {
1651  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1652  }
1653  }
1654  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1655  if(looper_) {
1656  looper_->doBeginRun(runPrincipal, es, &processContext_);
1657  }
1658  {
1659  //To wait, the ref count has to be 1+#streams
1660  auto streamLoopWaitTask = make_empty_waiting_task();
1661  streamLoopWaitTask->increment_ref_count();
1662 
1663  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1664 
1665  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1666  *schedule_,
1668  runPrincipal,
1669  ts,
1670  es,
1671  subProcesses_);
1672 
1673  streamLoopWaitTask->wait_for_all();
1674  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1675  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1676  }
1677  }
1678  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1679  if(looper_) {
1680  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1681  }
1682  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 262 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 263 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1294 of file EventProcessor.cc.

References edm::IEventProcessor::epSignal, and edm::shutdown_flag.

Referenced by readNextEventForStream(), and runToCompletion().

1294  {
1295  bool returnValue = false;
1296 
1297  // Look for a shutdown signal
1298  if(shutdown_flag.load(std::memory_order_acquire)) {
1299  returnValue = true;
1300  returnCode = epSignal;
1301  }
1302  return returnValue;
1303  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1256 of file EventProcessor.cc.

References schedule_.

1256  {
1257  schedule_->clearCounters();
1258  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1513 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

1513  {
1514  if (fb_.get() != nullptr) {
1515  SendSourceTerminationSignalIfException sentry(actReg_.get());
1516  input_->closeFile(fb_.get(), cleaningUpAfterException);
1517  sentry.completedSuccessfully();
1518  }
1519  FDEBUG(1) << "\tcloseInputFile\n";
1520  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1530 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1530  {
1531  if (fb_.get() != nullptr) {
1532  schedule_->closeOutputFiles();
1533  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1534  }
1535  FDEBUG(1) << "\tcloseOutputFiles\n";
1536  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::unique_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1262 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, emptyRunLumiMode_, Exception, fileMode_, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by runToCompletion().

1262  {
1263  statemachine::FileMode fileMode;
1264  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1265  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1266  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1267  else {
1268  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1269  << fileMode_ << ".\n"
1270  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1271  }
1272 
1273  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1274  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1275  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1276  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1277  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1278  else {
1279  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1280  << emptyRunLumiMode_ << ".\n"
1281  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1282  }
1283 
1284  auto machine = std::make_unique<statemachine::Machine>(
1285  this,
1286  fileMode,
1287  emptyRunLumiMode);
1288 
1289  machine->initiate();
1290  return machine;
1291  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1948 of file EventProcessor.cc.

References edm::PrincipalCache::deleteLumi(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1948  {
1950  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1951  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1952  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1936 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), and subProcesses_.

1936  {
1937  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1938  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1939  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1940  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1602 of file EventProcessor.cc.

References FDEBUG, and stateMachineWasInErrorState_.

1602  {
1603  FDEBUG(1) << "\tdoErrorStuff\n";
1604  LogError("StateMachine")
1605  << "The EventProcessor state machine encountered an unexpected event\n"
1606  << "and went to the error state\n"
1607  << "Will attempt to terminate processing normally\n"
1608  << "(IF using the looper the next loop will be attempted)\n"
1609  << "This likely indicates a bug in an input module or corrupted input or both\n";
1611  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1241 of file EventProcessor.cc.

References schedule_.

1241  {
1242  schedule_->enableEndPaths(active);
1243  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 662 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

662  {
663  // Collects exceptions, so we don't throw before all operations are performed.
664  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
665 
666  //make the services available
668 
669  //NOTE: this really should go elsewhere in the future
670  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
671  c.call([this,i](){this->schedule_->endStream(i);});
672  for(auto& subProcess : subProcesses_) {
673  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
674  }
675  }
676  auto actReg = actReg_.get();
677  c.call([actReg](){actReg->preEndJobSignal_();});
678  schedule_->endJob(c);
679  for(auto& subProcess : subProcesses_) {
680  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
681  }
682  c.call(std::bind(&InputSource::doEndJob, input_.get()));
683  if(looper_) {
684  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
685  }
686  c.call([actReg](){actReg->postEndJobSignal_();});
687  if(c.hasThrown()) {
688  c.rethrow();
689  }
690  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1809 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, edm::Principal::setAtEndTransition(), edm::LuminosityBlockPrincipal::setComplete(), edm::LuminosityBlockPrincipal::setEndTime(), and subProcesses_.

Referenced by Types.EventRange::cppID().

1809  {
1810  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1811  {
1812  SendSourceTerminationSignalIfException sentry(actReg_.get());
1813 
1814  lumiPrincipal.setEndTime(input_->timestamp());
1815  lumiPrincipal.setComplete();
1816  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1817  sentry.completedSuccessfully();
1818  }
1819  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1820  // lumi blocks know their start and end times why not also start and end events?
1821  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1822  lumiPrincipal.endTime());
1823  {
1824  SendSourceTerminationSignalIfException sentry(actReg_.get());
1825  espController_->eventSetupForInstance(ts);
1826  sentry.completedSuccessfully();
1827  }
1828  EventSetup const& es = esp_->eventSetup();
1829  {
1830  //To wait, the ref count has to b 1+#streams
1831  auto streamLoopWaitTask = make_empty_waiting_task();
1832  streamLoopWaitTask->increment_ref_count();
1833 
1834  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1835 
1836  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1837  *schedule_,
1839  lumiPrincipal,
1840  ts,
1841  es,
1842  subProcesses_,
1843  cleaningUpAfterException);
1844  streamLoopWaitTask->wait_for_all();
1845  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1846  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1847  }
1848  }
1849  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1850  if(looper_) {
1851  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1852  }
1853  {
1854  lumiPrincipal.setAtEndTransition(true);
1855  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1856  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1857  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1858  }
1859  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1860  if(looper_) {
1861  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1862  }
1863  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1565 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

1565  {
1566  if(looper_) {
1567  ModuleChanger changer(schedule_.get(),preg_.get());
1568  looper_->setModuleChanger(&changer);
1569  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1570  looper_->setModuleChanger(nullptr);
1571  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1572  else return false;
1573  }
1574  FDEBUG(1) << "\tendOfLoop\n";
1575  return true;
1576  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1246 of file EventProcessor.cc.

References schedule_.

1246  {
1247  return schedule_->endPathsEnabled();
1248  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1684 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, edm::Principal::setAtEndTransition(), edm::RunPrincipal::setComplete(), edm::RunPrincipal::setEndTime(), and subProcesses_.

1684  {
1685  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1686  {
1687  SendSourceTerminationSignalIfException sentry(actReg_.get());
1688 
1689  runPrincipal.setEndTime(input_->timestamp());
1690  runPrincipal.setComplete();
1691  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1692  sentry.completedSuccessfully();
1693  }
1694 
1696  runPrincipal.endTime());
1697  {
1698  SendSourceTerminationSignalIfException sentry(actReg_.get());
1699  espController_->eventSetupForInstance(ts);
1700  sentry.completedSuccessfully();
1701  }
1702  EventSetup const& es = esp_->eventSetup();
1703  {
1704  //To wait, the ref count has to be 1+#streams
1705  auto streamLoopWaitTask = make_empty_waiting_task();
1706  streamLoopWaitTask->increment_ref_count();
1707 
1708  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1709 
1710  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1711  *schedule_,
1713  runPrincipal,
1714  ts,
1715  es,
1716  subProcesses_,
1717  cleaningUpAfterException);
1718 
1719  streamLoopWaitTask->wait_for_all();
1720  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1721  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1722  }
1723  }
1724  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1725  if(looper_) {
1726  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1727  }
1728  {
1729  runPrincipal.setAtEndTransition(true);
1730  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1731  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1732  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1733  }
1734  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1735  if(looper_) {
1736  looper_->doEndRun(runPrincipal, es, &processContext_);
1737  }
1738  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 920 of file EventProcessor.cc.

References actReg_, beginJob(), continueAfterChildFailure_, edm::eventsetup::EventSetupRecord::doGet(), MillePedeFileConverter_cfg::e, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), input_, edm::installCustomHandler(), edm::InputSource::IsFile, edm::InputSource::IsRun, RecoTauDiscriminantConfiguration::mask, NULL, numberOfForkedChildren_, numberOfSequentialEventsPerChild_, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), possiblyContinueAfterForkChildFailure(), readFile(), schedule_, serviceToken_, setCpuAffinity_, edm::shutdown_flag, and cms::Exception::what().

920  {
921 
922  if(0 == numberOfForkedChildren_) {return true;}
923  assert(0<numberOfForkedChildren_);
924  //do what we want done in common
925  {
926  beginJob(); //make sure this was run
927  // make the services available
929 
930  InputSource::ItemType itemType;
931  itemType = input_->nextItemType();
932 
933  assert(itemType == InputSource::IsFile);
934  {
935  readFile();
936  }
937  itemType = input_->nextItemType();
938  assert(itemType == InputSource::IsRun);
939 
940  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
941  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
942  input_->runAuxiliary()->beginTime());
943  espController_->eventSetupForInstance(ts);
944  EventSetup const& es = esp_->eventSetup();
945 
946  //now get all the data available in the EventSetup
947  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
948  es.fillAvailableRecordKeys(recordKeys);
949  std::vector<eventsetup::DataKey> dataKeys;
950  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
951  itKey != itEnd;
952  ++itKey) {
953  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
954  //see if this is on our exclusion list
955  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
956  ExcludedData const* excludedData(nullptr);
957  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
958  excludedData = &(itExcludeRec->second);
959  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
960  //skip all items in this record
961  continue;
962  }
963  }
964  if(0 != recordPtr) {
965  dataKeys.clear();
966  recordPtr->fillRegisteredDataKeys(dataKeys);
967  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
968  itDataKey != itDataKeyEnd;
969  ++itDataKey) {
970  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
971  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
972  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
973  continue;
974  }
975  try {
976  recordPtr->doGet(*itDataKey);
977  } catch(cms::Exception& e) {
978  LogWarning("ForkingEventSetupPreFetching") << e.what();
979  }
980  }
981  }
982  }
983  }
984  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
985  {
986  // make the services available
988  Service<JobReport> jobReport;
989  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
990 
991  //Now actually do the forking
992  actReg_->preForkReleaseResourcesSignal_();
993  input_->doPreForkReleaseResources();
994  schedule_->preForkReleaseResources();
995  }
996  installCustomHandler(SIGCHLD, ep_sigchld);
997 
998 
999  unsigned int childIndex = 0;
1000  unsigned int const kMaxChildren = numberOfForkedChildren_;
1001  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1002  std::vector<pid_t> childrenIds;
1003  childrenIds.reserve(kMaxChildren);
1004  std::vector<int> childrenSockets;
1005  childrenSockets.reserve(kMaxChildren);
1006  std::vector<int> childrenPipes;
1007  childrenPipes.reserve(kMaxChildren);
1008  std::vector<int> childrenSocketsCopy;
1009  childrenSocketsCopy.reserve(kMaxChildren);
1010  std::vector<int> childrenPipesCopy;
1011  childrenPipesCopy.reserve(kMaxChildren);
1012  int pipes[] {0, 0};
1013 
1014  {
1015  // make the services available
1017  Service<JobReport> jobReport;
1018  int sockets[2], fd_flags;
1019  for(; childIndex < kMaxChildren; ++childIndex) {
1020  // Create a UNIX_DGRAM socket pair
1021  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1022  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1023  exit(EXIT_FAILURE);
1024  }
1025  if (pipe(pipes)) {
1026  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1027  exit(EXIT_FAILURE);
1028  }
1029  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1030  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1031  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1032  exit(EXIT_FAILURE);
1033  }
1034  // Mark socket as non-block. Child must be careful to do select prior
1035  // to reading from socket.
1036  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1037  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1038  exit(EXIT_FAILURE);
1039  }
1040  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1041  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1042  exit(EXIT_FAILURE);
1043  }
1044  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1045  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1046  exit(EXIT_FAILURE);
1047  }
1048  // Linux man page notes there are some edge cases where reading from a
1049  // fd can block, even after a select.
1050  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1051  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1052  exit(EXIT_FAILURE);
1053  }
1054  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1055  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1056  exit(EXIT_FAILURE);
1057  }
1058 
1059  childrenPipesCopy = childrenPipes;
1060  childrenSocketsCopy = childrenSockets;
1061 
1062  pid_t value = fork();
1063  if(value == 0) {
1064  // Close the parent's side of the socket and pipe which will talk to us.
1065  close(pipes[0]);
1066  close(sockets[0]);
1067  // Close our copies of the parent's other communication pipes.
1068  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1069  close(*it);
1070  }
1071  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1072  close(*it);
1073  }
1074 
1075  // this is the child process, redirect stdout and stderr to a log file
1076  fflush(stdout);
1077  fflush(stderr);
1078  std::stringstream stout;
1079  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1080  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1081  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1082  }
1083  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1084  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1085  }
1086 
1087  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1088  if(setCpuAffinity_) {
1089  // CPU affinity is handled differently on macosx.
1090  // We disable it and print a message until someone reads:
1091  //
1092  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1093  //
1094  // and implements it.
1095 #ifdef __APPLE__
1096  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1097 #else
1098  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1099  cpu_set_t mask;
1100  CPU_ZERO(&mask);
1101  CPU_SET(childIndex, &mask);
1102  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1103  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1104  exit(-1);
1105  }
1106 #endif
1107  }
1108  break;
1109  } else {
1110  //this is the parent
1111  close(pipes[1]);
1112  close(sockets[1]);
1113  }
1114  if(value < 0) {
1115  LogError("ForkingChild") << "failed to create a child";
1116  exit(-1);
1117  }
1118  childrenIds.push_back(value);
1119  childrenSockets.push_back(sockets[0]);
1120  childrenPipes.push_back(pipes[0]);
1121  }
1122 
1123  if(childIndex < kMaxChildren) {
1124  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1125  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1126 
1127  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1128  input_->doPostForkReacquireResources(receiver);
1129  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1130  //NOTE: sources have to reset themselves by listening to the post fork message
1131  //rewindInput();
1132  return true;
1133  }
1134  jobReport->parentAfterFork(jobReportFile);
1135  }
1136 
1137  //this is the original, which is now the master for all the children
1138 
1139  //Need to wait for signals from the children or externally
1140  // To wait we must
1141  // 1) block the signals we want to wait on so we do not have a race condition
1142  // 2) check that we haven't already meet our ending criteria
1143  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1144  sigset_t blockingSigSet;
1145  sigset_t unblockingSigSet;
1146  sigset_t oldSigSet;
1147  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1148  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1149  sigaddset(&blockingSigSet, SIGCHLD);
1150  sigaddset(&blockingSigSet, SIGUSR2);
1151  sigaddset(&blockingSigSet, SIGINT);
1152  sigdelset(&unblockingSigSet, SIGCHLD);
1153  sigdelset(&unblockingSigSet, SIGUSR2);
1154  sigdelset(&unblockingSigSet, SIGINT);
1155  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1156 
1157  // If there are too many fd's (unlikely, but possible) for select, denote this
1158  // because the sender will fail.
1159  bool too_many_fds = false;
1160  if (pipes[1]+1 > FD_SETSIZE) {
1161  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1162  too_many_fds = true;
1163  }
1164 
1165  //create a thread that sends the units of work to workers
1166  // we create it after all signals were blocked so that this
1167  // thread is never interupted by a signal
1168  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1169  boost::thread senderThread(sender);
1170 
1171  if(not too_many_fds) {
1172  //NOTE: a child could have failed before we got here and even after this call
1173  // which is why the 'if' is conditional on continueAfterChildFailure_
1175  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1176  sigsuspend(&unblockingSigSet);
1178  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1179  }
1180  }
1181  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1182 
1183  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1184  if(child_failed) {
1185  LogError("ForkingStopping") << "child failed";
1186  }
1187  if(shutdown_flag) {
1188  LogSystem("ForkingStopping") << "asked to shutdown";
1189  }
1190 
1191  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1192  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1193  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1194  it != itEnd; ++it) {
1195  /* int result = */ kill(*it, SIGUSR2);
1196  }
1197  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1198  while(num_children_done != kMaxChildren) {
1199  sigsuspend(&unblockingSigSet);
1200  }
1201  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1202  }
1203  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1204  senderThread.join();
1205  if(child_failed && !continueAfterChildFailure_) {
1206  if (child_fail_signal) {
1207  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1208  } else if (child_fail_exit_status) {
1209  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1210  } else {
1211  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1212  }
1213  }
1214  if(too_many_fds) {
1215  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1216  }
1217  return false;
1218  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
edm::propagate_const< std::unique_ptr< InputSource > > input_
void possiblyContinueAfterForkChildFailure()
volatile std::atomic< bool > shutdown_flag
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: value.py:1
def pipe(cmdline, input=None)
Definition: pipe.py:5
virtual void readFile() override
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1221 of file EventProcessor.cc.

References schedule_.

1221  {
1222  return schedule_->getAllModuleDescriptions();
1223  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 693 of file EventProcessor.cc.

References AlCaHLTBitMon_ParallelJobs::p, and serviceToken_.

Referenced by ~EventProcessor().

693  {
694  return serviceToken_;
695  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1251 of file EventProcessor.cc.

References schedule_.

1251  {
1252  schedule_->getTriggerReport(rep);
1253  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1188
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTask iTask,
unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 2005 of file EventProcessor.cc.

References deferredExceptionPtr_, deferredExceptionPtrIsSet_, pyrootRender::destroy(), edm::WaitingTaskHolder::doneWaiting(), h, edm::make_waiting_task(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, and sourceResourcesAcquirer_.

Referenced by readAndProcessEvent().

2008  {
2009  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
2010  if(iPtr) {
2011  bool expected = false;
2012  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2013  deferredExceptionPtr_ = *iPtr;
2014  {
2015  WaitingTaskHolder h(iTask);
2016  h.doneWaiting(*iPtr);
2017  }
2018  }
2019  //the stream will stop now
2020  iTask->decrement_ref_count();
2021  return;
2022  }
2023 
2024  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
2025  });
2026 
2027  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
2029 
2030  try {
2031  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
2032  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
2033  } else {
2034  //the stream will stop now
2035  tbb::task::destroy(*recursionTask);
2036  iTask->decrement_ref_count();
2037  }
2038  } catch(...) {
2039  WaitingTaskHolder h(recursionTask);
2040  h.doneWaiting(std::current_exception());
2041  }
2042  });
2043  }
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def destroy(e)
Definition: pyrootRender.py:13
void push(const T &iAction)
asynchronously pushes functor iAction into queue
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 412 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, diffTreeTool::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), and thinnedAssociationsHelper_.

Referenced by EventProcessor().

414  {
415 
416  //std::cerr << processDesc->dump() << std::endl;
417 
418  // register the empty parentage vector , once and for all
420 
421  // register the empty parameter set, once and for all.
422  ParameterSet().registerIt();
423 
424  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
425 
426  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
427  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
428  bool const hasSubProcesses = !subProcessVParameterSet.empty();
429 
430  // Now set some parameters specific to the main process.
431  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
432  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
433  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
434  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
435  //threading
436  unsigned int nThreads=1;
437  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
438  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
439  if(nThreads == 0) {
440  nThreads = 1;
441  }
442  }
443  /* TODO: when we support having each stream run in a different thread use this default
444  unsigned int nStreams =nThreads;
445  */
446  unsigned int nStreams =1;
447  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
448  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
449  if(nStreams==0) {
450  nStreams = nThreads;
451  }
452  }
453  if(nThreads >1) {
454  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
455  }
456 
457  /*
458  bool nRunsSet = false;
459  */
460  unsigned int nConcurrentRuns =1;
461  /*
462  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
463  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
464  }
465  */
466  unsigned int nConcurrentLumis =1;
467  /*
468  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
469  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
470  } else {
471  nConcurrentLumis = nConcurrentRuns;
472  }
473  */
474  //Check that relationships between threading parameters makes sense
475  /*
476  if(nThreads<nStreams) {
477  //bad
478  }
479  if(nConcurrentRuns>nStreams) {
480  //bad
481  }
482  if(nConcurrentRuns>nConcurrentLumis) {
483  //bad
484  }
485  */
486  //forking
487  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
488  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
489  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
490  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
491  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
492  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
493  for(auto const& ps : excluded) {
494  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
495  ps.getUntrackedParameter<std::string>("label", ""));
496  }
497  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
498 
499  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
500 
501  // Now do general initialization
502  ScheduleItems items;
503 
504  //initialize the services
505  auto& serviceSets = processDesc->getServicesPSets();
506  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
507  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
508 
509  //make the services available
511 
512  if(nStreams>1) {
514  handler->willBeUsingThreads();
515  }
516 
517  // intialize miscellaneous items
518  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
519 
520  // intialize the event setup provider
521  esp_ = espController_->makeProvider(*parameterSet);
522 
523  // initialize the looper, if any
524  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
525  if(looper_) {
526  looper_->setActionTable(items.act_table_.get());
527  looper_->attachTo(*items.actReg_);
528 
529  //For now loopers make us run only 1 transition at a time
530  nStreams=1;
531  nConcurrentLumis=1;
532  nConcurrentRuns=1;
533  }
534 
535  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
536 
537  // initialize the input source
538  input_ = makeInput(*parameterSet,
539  *common,
540  items.preg(),
541  items.branchIDListHelper(),
542  items.thinnedAssociationsHelper(),
543  items.actReg_,
544  items.processConfiguration(),
546 
547  // intialize the Schedule
548  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
549 
550  // set the data members
551  act_table_ = std::move(items.act_table_);
552  actReg_ = items.actReg_;
553  preg_ = items.preg();
554  branchIDListHelper_ = items.branchIDListHelper();
555  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
556  processConfiguration_ = items.processConfiguration();
558  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
559 
560  FDEBUG(2) << parameterSet << std::endl;
561 
563  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
564  // Reusable event principal
565  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
568  }
569 
570  // fill the subprocesses, if there are any
571  subProcesses_.reserve(subProcessVParameterSet.size());
572  for(auto& subProcessPSet : subProcessVParameterSet) {
573  subProcesses_.emplace_back(subProcessPSet,
574  *parameterSet,
575  preg(),
578  SubProcessParentageHelper(),
580  *actReg_,
581  token,
584  &processContext_);
585  }
586  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:747
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 266 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

266 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 267 of file EventProcessor.h.

References edm::get_underlying_safe().

267 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::openOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1522 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1522  {
1523  if (fb_.get() != nullptr) {
1524  schedule_->openOutputFiles(*fb_);
1525  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1526  }
1527  FDEBUG(1) << "\topenOutputFiles\n";
1528  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 904 of file EventProcessor.cc.

References continueAfterChildFailure_.

Referenced by forkProcess().

904  {
905  if(child_failed && continueAfterChildFailure_) {
906  if (child_fail_signal) {
907  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
908  child_fail_signal=0;
909  } else if (child_fail_exit_status) {
910  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
911  child_fail_exit_status=0;
912  } else {
913  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
914  }
915  child_failed =false;
916  }
917  }
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 260 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), readLuminosityBlock(), readRun(), and runToCompletion().

260 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe().

261 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1584 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

1584  {
1585  looper_->prepareForNextLoop(esp_.get());
1586  FDEBUG(1) << "\tprepareForNextLoop\n";
1587  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2102 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by handleNextEventForStreamAsync(), and readAndProcessEvent().

2103  {
2104  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2105  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2107  if(rng.isAvailable()) {
2108  Event ev(*pep, ModuleDescription(), nullptr);
2109  rng->postEventRead(ev);
2110  }
2111  assert(pep->luminosityBlockPrincipalPtrValid());
2112  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2113  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2114 
2115  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2116  tbb::task::allocate_root(),
2117  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2118  {
2120 
2121  //NOTE: If we have a looper we only have one Stream
2122  if(looper_) {
2123  processEventWithLooper(*pep);
2124  }
2125 
2126  FDEBUG(1) << "\tprocessEvent\n";
2127  pep->clearEventPrincipal();
2128  if(iPtr) {
2129  iHolder.doneWaiting(*iPtr);
2130  } else {
2131  iHolder.doneWaiting(std::exception_ptr());
2132  }
2133  }
2134  )
2135  );
2136  WaitingTaskHolder afterProcessTask;
2137  if(subProcesses_.empty()) {
2138  afterProcessTask = std::move(finalizeEventTask);
2139  } else {
2140  //Need to run SubProcesses after schedule has finished
2141  // with the event
2142  afterProcessTask = WaitingTaskHolder(
2143  make_waiting_task(tbb::task::allocate_root(),
2144  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2145  {
2146  if(not iPtr) {
2148 
2149  //when run with 1 thread, we want to the order to be what
2150  // it was before. This requires reversing the order since
2151  // tasks are run last one in first one out
2152  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2153  subProcess.doEventAsync(finalizeEventTask,*pep);
2154  }
2155  } else {
2156  finalizeEventTask.doneWaiting(*iPtr);
2157  }
2158  })
2159  );
2160  }
2161 
2162  schedule_->processOneEventAsync(std::move(afterProcessTask),
2163  iStreamIndex,*pep, esp_->eventSetup());
2164 
2165  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 2167 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsync().

2167  {
2168  bool randomAccess = input_->randomAccess();
2169  ProcessingController::ForwardState forwardState = input_->forwardState();
2170  ProcessingController::ReverseState reverseState = input_->reverseState();
2171  ProcessingController pc(forwardState, reverseState, randomAccess);
2172 
2174  do {
2175 
2176  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2177  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2178 
2179  bool succeeded = true;
2180  if(randomAccess) {
2181  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2182  input_->skipEvents(-2);
2183  }
2184  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2185  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2186  }
2187  }
2188  pc.setLastOperationSucceeded(succeeded);
2189  } while(!pc.lastOperationSucceeded());
2190  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2191  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
int edm::EventProcessor::readAndMergeLumi ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1920 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::lumiPrincipalPtr(), edm::PrincipalCache::merge(), preg(), and principalCache_.

1920  {
1921  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1922  {
1923  SendSourceTerminationSignalIfException sentry(actReg_.get());
1924  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1925  sentry.completedSuccessfully();
1926  }
1927  return input_->luminosityBlock();
1928  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1883 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1883  {
1884  principalCache_.merge(input_->runAuxiliary(), preg());
1885  auto runPrincipal =principalCache_.runPrincipalPtr();
1886  {
1887  SendSourceTerminationSignalIfException sentry(actReg_.get());
1888  input_->readAndMergeRun(*runPrincipal);
1889  sentry.completedSuccessfully();
1890  }
1891  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1892  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1893  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2045 of file EventProcessor.cc.

References asyncStopRequestedWhileProcessingEvents_, deferredExceptionPtr_, deferredExceptionPtrIsSet_, firstEventInBlock_, handleNextEventForStreamAsync(), edm::InputSource::IsEvent, edm::make_empty_waiting_task(), edm::make_waiting_task(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processEventAsync(), and readEvent().

2045  {
2046  if(numberOfForkedChildren_>0) {
2047  //Have to do something special for forking since
2048  // after each event the system may have to skip
2049  // some transitions. This is handled in runToCompletion
2050  readEvent(0);
2051  auto eventLoopWaitTask = make_empty_waiting_task();
2052  eventLoopWaitTask->increment_ref_count();
2053  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
2054  eventLoopWaitTask->wait_for_all();
2055  return;
2056  }
2059 
2060  std::atomic<bool> finishedProcessingEvents{false};
2061  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2062 
2063  //The state machine already found the event so
2064  // we have to avoid looking again
2065  firstEventInBlock_ = true;
2066 
2067  //To wait, the ref count has to b 1+#streams
2068  auto eventLoopWaitTask = make_empty_waiting_task();
2069  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2070  eventLoopWaitTask->increment_ref_count();
2071 
2072  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2073  unsigned int iStreamIndex = 0;
2074  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2075  eventLoopWaitTask->increment_ref_count();
2076  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2077  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2078  }) );
2079  }
2080  eventLoopWaitTask->increment_ref_count();
2081  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2082  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2083  }));
2084 
2085  //One of the processing threads saw an exception
2087  std::rethrow_exception(deferredExceptionPtr_);
2088  }
2089  }
void readEvent(unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2090 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, and processContext_.

Referenced by readAndProcessEvent(), and readNextEventForStream().

2090  {
2091  //TODO this will have to become per stream
2092  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2093  StreamContext streamContext(event.streamID(), &processContext_);
2094 
2095  SendSourceTerminationSignalIfException sentry(actReg_.get());
2096  input_->readEvent(event, streamContext);
2097  sentry.completedSuccessfully();
2098 
2099  FDEBUG(1) << "\treadEvent\n";
2100  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1495 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), or, edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, and findQualityFiles::size.

Referenced by forkProcess(), Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1495  {
1496  FDEBUG(1) << " \treadFile\n";
1497  size_t size = preg_->size();
1498  SendSourceTerminationSignalIfException sentry(actReg_.get());
1499 
1500  fb_ = input_->readFile();
1501  if(size < preg_->size()) {
1503  }
1505  if((numberOfForkedChildren_ > 0) or
1508  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1509  }
1510  sentry.completedSuccessfully();
1511  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1895 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasLumiPrincipal(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, processConfiguration_, and edm::PrincipalCache::runPrincipalPtr().

1895  {
1898  << "EventProcessor::readRun\n"
1899  << "Illegal attempt to insert lumi into cache\n"
1900  << "Contact a Framework Developer\n";
1901  }
1904  << "EventProcessor::readRun\n"
1905  << "Illegal attempt to insert lumi into cache\n"
1906  << "Run is invalid\n"
1907  << "Contact a Framework Developer\n";
1908  }
1909  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1910  {
1911  SendSourceTerminationSignalIfException sentry(actReg_.get());
1912  input_->readLuminosityBlock(*lbp, *historyAppender_);
1913  sentry.completedSuccessfully();
1914  }
1915  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1916  principalCache_.insert(lbp);
1917  return input_->luminosityBlock();
1918  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1954 of file EventProcessor.cc.

References actReg_, asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, checkForAsyncStopRequest(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::ExternalSignal, firstEventInBlock_, input_, edm::InputSource::IsEvent, nextItemTypeFromProcessingEvents_, cmsPerfStripChart::operate(), readEvent(), serviceToken_, shouldWeStop(), and sourceMutex_.

Referenced by handleNextEventForStreamAsync().

1955  {
1956  if(shouldWeStop()) {
1957  return false;
1958  }
1959 
1960  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1961  return false;
1962  }
1963 
1964  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1965  return false;
1966  }
1967 
1969  try {
1970  //need to use lock in addition to the serial task queue because
1971  // of delayed provenance reading and reading data in response to
1972  // edm::Refs etc
1973  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1974  if(not firstEventInBlock_) {
1975  //The state machine already called input_->nextItemType
1976  // and found an event. We can't call input_->nextItemType
1977  // again since it would move to the next transition
1978  InputSource::ItemType itemType = input_->nextItemType();
1979  if (InputSource::IsEvent !=itemType) {
1981  finishedProcessingEvents->store(true,std::memory_order_release);
1982  //std::cerr<<"next item type "<<itemType<<"\n";
1983  return false;
1984  }
1986  //std::cerr<<"task told to async stop\n";
1987  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1988  return false;
1989  }
1990  } else {
1991  firstEventInBlock_ = false;
1992  }
1993  readEvent(iStreamIndex);
1994  } catch (...) {
1995  bool expected =false;
1996  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1997  deferredExceptionPtr_ = std::current_exception();
1998 
1999  }
2000  return false;
2001  }
2002  return true;
2003  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
virtual bool shouldWeStop() const override
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< std::recursive_mutex > sourceMutex_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
statemachine::Run edm::EventProcessor::readRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1865 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, and processConfiguration_.

1865  {
1868  << "EventProcessor::readRun\n"
1869  << "Illegal attempt to insert run into cache\n"
1870  << "Contact a Framework Developer\n";
1871  }
1872  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1873  {
1874  SendSourceTerminationSignalIfException sentry(actReg_.get());
1875  input_->readRun(*rp, *historyAppender_);
1876  sentry.completedSuccessfully();
1877  }
1878  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1879  principalCache_.insert(rp);
1880  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1881  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1547 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1547  {
1548  if (fb_.get() != nullptr) {
1549  schedule_->respondToCloseInputFile(*fb_);
1550  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1551  }
1552  FDEBUG(1) << "\trespondToCloseInputFile\n";
1553  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1538 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1538  {
1539  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1540  if (fb_.get() != nullptr) {
1541  schedule_->respondToOpenInputFile(*fb_);
1542  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1543  }
1544  FDEBUG(1) << "\trespondToOpenInputFile\n";
1545  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1578 of file EventProcessor.cc.

References FDEBUG, and input_.

1578  {
1579  input_->repeat();
1580  input_->rewind();
1581  FDEBUG(1) << "\trewind\n";
1582  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 337 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

337  {
338  return runToCompletion();
339  }
virtual StatusCode runToCompletion() override
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1307 of file EventProcessor.cc.

References actReg_, cms::Exception::addAdditionalInfo(), edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), alreadyHandlingException_, cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), checkForAsyncStopRequest(), createStateMachine(), MillePedeFileConverter_cfg::e, edm::IEventProcessor::epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, edm::ExternalSignal, FDEBUG, forceLooperToEnd_, input_, edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, edm::errors::LogicError, eostools::move(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, cmsPerfStripChart::operate(), preg(), preg_, principalCache_, runEdmFileComparison::returnCode, serviceToken_, findQualityFiles::size, stateMachineWasInErrorState_, terminateMachine(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

1307  {
1308 
1311  std::unique_ptr<statemachine::Machine> machine;
1312  {
1313  beginJob(); //make sure this was called
1314 
1315  //StatusCode returnCode = epSuccess;
1317 
1318  // make the services available
1320 
1321  machine = createStateMachine();
1324  try {
1325  convertException::wrap([&]() {
1326 
1327  InputSource::ItemType itemType;
1328 
1329  while(true) {
1330 
1331  bool more = true;
1332  if(numberOfForkedChildren_ > 0) {
1333  size_t size = preg_->size();
1334  {
1335  SendSourceTerminationSignalIfException sentry(actReg_.get());
1336  more = input_->skipForForking();
1337  sentry.completedSuccessfully();
1338  }
1339  if(more) {
1340  if(size < preg_->size()) {
1342  }
1344  }
1345  }
1346  {
1347  SendSourceTerminationSignalIfException sentry(actReg_.get());
1348  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1349  sentry.completedSuccessfully();
1350  }
1351 
1352  FDEBUG(1) << "itemType = " << itemType << "\n";
1353 
1354  if(checkForAsyncStopRequest(returnCode)) {
1355  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1356  forceLooperToEnd_ = true;
1357  machine->process_event(statemachine::Stop());
1358  forceLooperToEnd_ = false;
1359  break;
1360  }
1361 
1362  if(itemType == InputSource::IsEvent) {
1363  machine->process_event(statemachine::Event());
1365  forceLooperToEnd_ = true;
1366  machine->process_event(statemachine::Stop());
1367  forceLooperToEnd_ = false;
1369  break;
1370  }
1372  }
1373 
1374  if(itemType == InputSource::IsEvent) {
1375  }
1376  else if(itemType == InputSource::IsStop) {
1377  machine->process_event(statemachine::Stop());
1378  }
1379  else if(itemType == InputSource::IsFile) {
1380  machine->process_event(statemachine::File());
1381  }
1382  else if(itemType == InputSource::IsRun) {
1383  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1384  }
1385  else if(itemType == InputSource::IsLumi) {
1386  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1387  }
1388  else if(itemType == InputSource::IsSynchronize) {
1389  //For now, we don't have to do anything
1390  }
1391  // This should be impossible
1392  else {
1394  << "Unknown next item type passed to EventProcessor\n"
1395  << "Please report this error to the Framework group\n";
1396  }
1397  if(machine->terminated()) {
1398  break;
1399  }
1400  } // End of loop over state machine events
1401  }); // convertException::wrap
1402  } // Try block
1403  // Some comments on exception handling related to the boost state machine:
1404  //
1405  // Some states used in the machine are special because they
1406  // perform actions while the machine is being terminated, actions
1407  // such as close files, call endRun, call endLumi etc ... Each of these
1408  // states has two functions that perform these actions. The functions
1409  // are almost identical. The major difference is that one version
1410  // catches all exceptions and the other lets exceptions pass through.
1411  // The destructor catches them and the other function named "exit" lets
1412  // them pass through. On a normal termination, boost will always call
1413  // "exit" and then the state destructor. In our state classes, the
1414  // the destructors do nothing if the exit function already took
1415  // care of things. Here's the interesting part. When boost is
1416  // handling an exception the "exit" function is not called (a boost
1417  // feature).
1418  //
1419  // If an exception occurs while the boost machine is in control
1420  // (which usually means inside a process_event call), then
1421  // the boost state machine destroys its states and "terminates" itself.
1422  // This already done before we hit the catch blocks below. In this case
1423  // the call to terminateMachine below only destroys an already
1424  // terminated state machine. Because exit is not called, the state destructors
1425  // handle cleaning up lumis, runs, and files. The destructors swallow
1426  // all exceptions and only pass through the exceptions messages, which
1427  // are tacked onto the original exception below.
1428  //
1429  // If an exception occurs when the boost state machine is not
1430  // in control (outside the process_event functions), then boost
1431  // cannot destroy its own states. The terminateMachine function
1432  // below takes care of that. The flag "alreadyHandlingException"
1433  // is set true so that the state exit functions do nothing (and
1434  // cannot throw more exceptions while handling the first). Then the
1435  // state destructors take care of this because exit did nothing.
1436  //
1437  // In both cases above, the EventProcessor::endOfLoop function is
1438  // not called because it can throw exceptions.
1439  //
1440  // One tricky aspect of the state machine is that things that can
1441  // throw should not be invoked by the state machine while another
1442  // exception is being handled.
1443  // Another tricky aspect is that it appears to be important to
1444  // terminate the state machine before invoking its destructor.
1445  // We've seen crashes that are not understood when that is not
1446  // done. Maintainers of this code should be careful about this.
1447 
1448  catch (cms::Exception & e) {
1450  terminateMachine(std::move(machine));
1451  alreadyHandlingException_ = false;
1452  if (!exceptionMessageLumis_.empty()) {
1454  if (e.alreadyPrinted()) {
1455  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1456  }
1457  }
1458  if (!exceptionMessageRuns_.empty()) {
1460  if (e.alreadyPrinted()) {
1461  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1462  }
1463  }
1464  if (!exceptionMessageFiles_.empty()) {
1466  if (e.alreadyPrinted()) {
1467  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1468  }
1469  }
1470  throw;
1471  }
1472 
1473  if(machine->terminated()) {
1474  FDEBUG(1) << "The state machine reports it has been terminated\n";
1475  machine.reset();
1476  }
1477 
1479  throw cms::Exception("BadState")
1480  << "The boost state machine in the EventProcessor exited after\n"
1481  << "entering the Error state.\n";
1482  }
1483 
1484  }
1485  if(machine.get() != nullptr) {
1486  terminateMachine(std::move(machine));
1488  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1489  << "Please report this error to the Framework group\n";
1490  }
1491 
1492  return returnCode;
1493  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2207 of file EventProcessor.cc.

References exceptionMessageFiles_, and python.rootplot.argparse::message.

2207  {
2209  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2215 of file EventProcessor.cc.

References exceptionMessageLumis_, and python.rootplot.argparse::message.

2215  {
2217  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2211 of file EventProcessor.cc.

References exceptionMessageRuns_, and python.rootplot.argparse::message.

2211  {
2213  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1589 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1589  {
1590  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1591  if(!subProcesses_.empty()) {
1592  for(auto const& subProcess : subProcesses_) {
1593  if(subProcess.shouldWeCloseOutput()) {
1594  return true;
1595  }
1596  }
1597  return false;
1598  }
1599  return schedule_->shouldWeCloseOutput();
1600  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2193 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2193  {
2194  FDEBUG(1) << "\tshouldWeStop\n";
2195  if(shouldWeStop_) return true;
2196  if(!subProcesses_.empty()) {
2197  for(auto const& subProcess : subProcesses_) {
2198  if(subProcess.terminate()) {
2199  return true;
2200  }
2201  }
2202  return false;
2203  }
2204  return schedule_->terminate();
2205  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1555 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

1555  {
1556  shouldWeStop_ = false;
1557  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1558  // until after we've called beginOfJob
1559  if(looper_ && looperBeginJobRun_) {
1560  looper_->doStartingNewLoop();
1561  }
1562  FDEBUG(1) << "\tstartingNewLoop\n";
1563  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::terminateMachine ( std::unique_ptr< statemachine::Machine iMachine)
private

Definition at line 2223 of file EventProcessor.cc.

References FDEBUG, and forceLooperToEnd_.

Referenced by runToCompletion().

2223  {
2224  if(iMachine.get() != nullptr) {
2225  if(!iMachine->terminated()) {
2226  forceLooperToEnd_ = true;
2227  iMachine->process_event(statemachine::Stop());
2228  forceLooperToEnd_ = false;
2229  }
2230  else {
2231  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2232  }
2233  if(iMachine->terminated()) {
2234  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2235  }
2236  }
2237  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 264 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 265 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1226 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

1226  {
1227  return schedule_->totalEvents();
1228  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1236 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

1236  {
1237  return schedule_->totalEventsFailed();
1238  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1231 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

1231  {
1232  return schedule_->totalEventsPassed();
1233  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1942 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), edm::PrincipalCache::lumiPrincipal(), principalCache_, processContext_, schedule_, and subProcesses_.

1942  {
1944  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1945  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1946  }
ProcessContext processContext_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1930 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), principalCache_, processContext_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1930  {
1931  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1932  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1933  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1934  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 309 of file EventProcessor.h.

Referenced by alreadyHandlingException(), and runToCompletion().

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 321 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 323 of file EventProcessor.h.

Referenced by readNextEventForStream(), and runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 301 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 317 of file EventProcessor.h.

Referenced by forkProcess(), init(), and possiblyContinueAfterForkChildFailure().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 305 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 281 of file EventProcessor.h.

Referenced by beginLumi(), beginRun(), endLumi(), endRun(), forkProcess(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 328 of file EventProcessor.h.

Referenced by forkProcess(), and init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 306 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 308 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 307 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
std::string edm::EventProcessor::fileMode_
private

Definition at line 304 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 324 of file EventProcessor.h.

Referenced by readAndProcessEvent(), and readNextEventForStream().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 312 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 310 of file EventProcessor.h.

Referenced by endOfLoop(), runToCompletion(), and terminateMachine().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 322 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 314 of file EventProcessor.h.

Referenced by forkProcess(), init(), readAndProcessEvent(), readFile(), and runToCompletion().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 315 of file EventProcessor.h.

Referenced by forkProcess(), and init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 286 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 276 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), readFile(), and runToCompletion().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 284 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 316 of file EventProcessor.h.

Referenced by forkProcess(), and init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 302 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 299 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 298 of file EventProcessor.h.

Referenced by handleNextEventForStreamAsync().

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 303 of file EventProcessor.h.

Referenced by doErrorStuff(), and runToCompletion().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init().