CMS 3D CMS Logo

List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const override
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void beginRun (statemachine::Run const &run) override
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException) override
 
virtual void closeOutputFiles () override
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void deleteRunFromCache (statemachine::Run const &run) override
 
virtual void doErrorStuff () override
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
 
virtual bool endOfLoop () override
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException) override
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles () override
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop () override
 
ProcessConfiguration const & processConfiguration () const
 
virtual int readAndMergeLumi () override
 
virtual statemachine::Run readAndMergeRun () override
 
virtual void readAndProcessEvent () override
 
virtual void readFile () override
 
virtual int readLuminosityBlock () override
 
virtual statemachine::Run readRun () override
 
virtual void respondToCloseInputFile () override
 
virtual void respondToOpenInputFile () override
 
virtual void rewindInput () override
 
StatusCode run ()
 
virtual StatusCode runToCompletion () override
 
virtual void setExceptionMessageFiles (std::string &message) override
 
virtual void setExceptionMessageLumis (std::string &message) override
 
virtual void setExceptionMessageRuns (std::string &message) override
 
virtual bool shouldWeCloseOutput () const override
 
virtual bool shouldWeStop () const override
 
virtual void startingNewLoop () override
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void writeRun (statemachine::Run const &run) override
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
std::unique_ptr< statemachine::MachinecreateStateMachine ()
 
void handleNextEventForStreamAsync (WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
void possiblyContinueAfterForkChildFailure ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void setupSignal ()
 
void terminateMachine (std::unique_ptr< statemachine::Machine >)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
std::string fileMode_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
bool stateMachineWasInErrorState_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 65 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 326 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 327 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 226 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

230  :
231  actReg_(),
232  preg_(),
234  serviceToken_(),
235  input_(),
236  espController_(new eventsetup::EventSetupsController),
237  esp_(),
238  act_table_(),
240  schedule_(),
241  subProcesses_(),
242  historyAppender_(new HistoryAppender),
243  fb_(),
244  looper_(),
246  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
247  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
248  principalCache_(),
249  beginJobCalled_(false),
250  shouldWeStop_(false),
252  fileMode_(),
258  forceLooperToEnd_(false),
259  looperBeginJobRun_(false),
263  setCpuAffinity_(false),
265  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
266  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
267  processDesc->addServices(defaultServices, forcedServices);
268  init(processDesc, iToken, iLegacy);
269  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 271 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

273  :
274  actReg_(),
275  preg_(),
277  serviceToken_(),
278  input_(),
279  espController_(new eventsetup::EventSetupsController),
280  esp_(),
281  act_table_(),
283  schedule_(),
284  subProcesses_(),
285  historyAppender_(new HistoryAppender),
286  fb_(),
287  looper_(),
289  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
290  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
291  principalCache_(),
292  beginJobCalled_(false),
293  shouldWeStop_(false),
295  fileMode_(),
301  forceLooperToEnd_(false),
302  looperBeginJobRun_(false),
306  setCpuAffinity_(false),
310  {
311  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
312  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
313  processDesc->addServices(defaultServices, forcedServices);
315  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 317 of file EventProcessor.cc.

References init().

319  :
320  actReg_(),
321  preg_(),
323  serviceToken_(),
324  input_(),
325  espController_(new eventsetup::EventSetupsController),
326  esp_(),
327  act_table_(),
329  schedule_(),
330  subProcesses_(),
331  historyAppender_(new HistoryAppender),
332  fb_(),
333  looper_(),
335  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
336  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
337  principalCache_(),
338  beginJobCalled_(false),
339  shouldWeStop_(false),
341  fileMode_(),
347  forceLooperToEnd_(false),
348  looperBeginJobRun_(false),
352  setCpuAffinity_(false),
356  {
357  init(processDesc, token, legacy);
358  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 361 of file EventProcessor.cc.

References mps_alisetup::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

361  :
362  actReg_(),
363  preg_(),
365  serviceToken_(),
366  input_(),
367  espController_(new eventsetup::EventSetupsController),
368  esp_(),
369  act_table_(),
371  schedule_(),
372  subProcesses_(),
373  historyAppender_(new HistoryAppender),
374  fb_(),
375  looper_(),
377  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
378  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
379  principalCache_(),
380  beginJobCalled_(false),
381  shouldWeStop_(false),
383  fileMode_(),
389  forceLooperToEnd_(false),
390  looperBeginJobRun_(false),
394  setCpuAffinity_(false),
398  {
399  if(isPython) {
400  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
401  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
403  }
404  else {
405  auto processDesc = std::make_shared<ProcessDesc>(config);
407  }
408  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 585 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

585  {
586  // Make the services available while everything is being deleted.
587  ServiceToken token = getToken();
588  ServiceRegistry::Operate op(token);
589 
590  // manually destroy all these thing that may need the services around
591  // propagate_const<T> has no reset() function
592  espController_ = nullptr;
593  esp_ = nullptr;
594  schedule_ = nullptr;
595  input_ = nullptr;
596  looper_ = nullptr;
597  actReg_ = nullptr;
598 
601  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:44
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2199 of file EventProcessor.cc.

References alreadyHandlingException_.

2199  {
2201  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 604 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by forkProcess(), and runToCompletion().

604  {
605  if(beginJobCalled_) return;
606  beginJobCalled_=true;
607  bk::beginJob();
608 
609  // StateSentry toerror(this); // should we add this ?
610  //make the services available
612 
613  service::SystemBounds bounds(preallocations_.numberOfStreams(),
617  actReg_->preallocateSignal_(bounds);
619 
620  //NOTE: this may throw
622  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
623 
624  //NOTE: This implementation assumes 'Job' means one call
625  // the EventProcessor::run
626  // If it really means once per 'application' then this code will
627  // have to be changed.
628  // Also have to deal with case where have 'run' then new Module
629  // added and do 'run'
630  // again. In that case the newly added Module needs its 'beginJob'
631  // to be called.
632 
633  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
634  // For now we delay calling beginOfJob until first beginOfRun
635  //if(looper_) {
636  // looper_->beginOfJob(es);
637  //}
638  try {
639  convertException::wrap([&]() {
640  input_->doBeginJob();
641  });
642  }
643  catch(cms::Exception& ex) {
644  ex.addContext("Calling beginJob for the source");
645  throw;
646  }
647  schedule_->beginJob(*preg_);
648  // toerror.succeeded(); // should we add this?
649  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
650  actReg_->postBeginJobSignal_();
651 
652  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
653  schedule_->beginStream(i);
654  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
655  }
656  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1726 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, and subProcesses_.

1726  {
1727  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1728  {
1729  SendSourceTerminationSignalIfException sentry(actReg_.get());
1730 
1731  input_->doBeginLumi(lumiPrincipal, &processContext_);
1732  sentry.completedSuccessfully();
1733  }
1734 
1736  if(rng.isAvailable()) {
1737  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1738  rng->preBeginLumi(lb);
1739  }
1740 
1741  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1742  // lumi blocks know their start and end times why not also start and end events?
1743  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1744  {
1745  SendSourceTerminationSignalIfException sentry(actReg_.get());
1746  espController_->eventSetupForInstance(ts);
1747  sentry.completedSuccessfully();
1748  }
1749  EventSetup const& es = esp_->eventSetup();
1750  {
1751  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1752  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1753  for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
1754  }
1755  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1756  if(looper_) {
1757  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1758  }
1759  {
1760  //To wait, the ref count has to b 1+#streams
1761  auto streamLoopWaitTask = make_empty_waiting_task();
1762  streamLoopWaitTask->increment_ref_count();
1763 
1764  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1765 
1766  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1767  *schedule_,
1769  lumiPrincipal,
1770  ts,
1771  es,
1772  subProcesses_);
1773  streamLoopWaitTask->wait_for_all();
1774  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1775  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1776  }
1777  }
1778 
1779  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1780  if(looper_) {
1781  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1782  }
1783  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1610 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, edm::for_all(), forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1610  {
1611  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1612  {
1613  SendSourceTerminationSignalIfException sentry(actReg_.get());
1614 
1615  input_->doBeginRun(runPrincipal, &processContext_);
1616  sentry.completedSuccessfully();
1617  }
1618 
1619  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1620  runPrincipal.beginTime());
1622  espController_->forceCacheClear();
1623  }
1624  {
1625  SendSourceTerminationSignalIfException sentry(actReg_.get());
1626  espController_->eventSetupForInstance(ts);
1627  sentry.completedSuccessfully();
1628  }
1629  EventSetup const& es = esp_->eventSetup();
1630  if(looper_ && looperBeginJobRun_== false) {
1631  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1632  looper_->beginOfJob(es);
1633  looperBeginJobRun_ = true;
1634  looper_->doStartingNewLoop();
1635  }
1636  {
1637  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1638  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1639  for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
1640  }
1641  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1642  if(looper_) {
1643  looper_->doBeginRun(runPrincipal, es, &processContext_);
1644  }
1645  {
1646  //To wait, the ref count has to be 1+#streams
1647  auto streamLoopWaitTask = make_empty_waiting_task();
1648  streamLoopWaitTask->increment_ref_count();
1649 
1650  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1651 
1652  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1653  *schedule_,
1655  runPrincipal,
1656  ts,
1657  es,
1658  subProcesses_);
1659 
1660  streamLoopWaitTask->wait_for_all();
1661  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1662  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1663  }
1664  }
1665  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1666  if(looper_) {
1667  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1668  }
1669  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 262 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 263 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1291 of file EventProcessor.cc.

References edm::IEventProcessor::epSignal, and edm::shutdown_flag.

Referenced by readNextEventForStream(), and runToCompletion().

1291  {
1292  bool returnValue = false;
1293 
1294  // Look for a shutdown signal
1295  if(shutdown_flag.load(std::memory_order_acquire)) {
1296  returnValue = true;
1297  returnCode = epSignal;
1298  }
1299  return returnValue;
1300  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1253 of file EventProcessor.cc.

References schedule_.

1253  {
1254  schedule_->clearCounters();
1255  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1510 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

1510  {
1511  if (fb_.get() != nullptr) {
1512  SendSourceTerminationSignalIfException sentry(actReg_.get());
1513  input_->closeFile(fb_.get(), cleaningUpAfterException);
1514  sentry.completedSuccessfully();
1515  }
1516  FDEBUG(1) << "\tcloseInputFile\n";
1517  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1527 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1527  {
1528  if (fb_.get() != nullptr) {
1529  schedule_->closeOutputFiles();
1530  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1531  }
1532  FDEBUG(1) << "\tcloseOutputFiles\n";
1533  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::unique_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1259 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, emptyRunLumiMode_, Exception, fileMode_, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by runToCompletion().

1259  {
1260  statemachine::FileMode fileMode;
1261  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1262  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1263  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1264  else {
1265  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1266  << fileMode_ << ".\n"
1267  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1268  }
1269 
1270  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1271  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1272  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1273  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1274  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1275  else {
1276  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1277  << emptyRunLumiMode_ << ".\n"
1278  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1279  }
1280 
1281  auto machine = std::make_unique<statemachine::Machine>(
1282  this,
1283  fileMode,
1284  emptyRunLumiMode);
1285 
1286  machine->initiate();
1287  return machine;
1288  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1923 of file EventProcessor.cc.

References edm::PrincipalCache::deleteLumi(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1923  {
1925  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1926  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1927  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1911 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), and subProcesses_.

1911  {
1912  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1913  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1914  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1915  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1599 of file EventProcessor.cc.

References FDEBUG, and stateMachineWasInErrorState_.

1599  {
1600  FDEBUG(1) << "\tdoErrorStuff\n";
1601  LogError("StateMachine")
1602  << "The EventProcessor state machine encountered an unexpected event\n"
1603  << "and went to the error state\n"
1604  << "Will attempt to terminate processing normally\n"
1605  << "(IF using the looper the next loop will be attempted)\n"
1606  << "This likely indicates a bug in an input module or corrupted input or both\n";
1608  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1238 of file EventProcessor.cc.

References schedule_.

1238  {
1239  schedule_->enableEndPaths(active);
1240  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 659 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

659  {
660  // Collects exceptions, so we don't throw before all operations are performed.
661  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
662 
663  //make the services available
665 
666  //NOTE: this really should go elsewhere in the future
667  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
668  c.call([this,i](){this->schedule_->endStream(i);});
669  for(auto& subProcess : subProcesses_) {
670  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
671  }
672  }
673  auto actReg = actReg_.get();
674  c.call([actReg](){actReg->preEndJobSignal_();});
675  schedule_->endJob(c);
676  for(auto& subProcess : subProcesses_) {
677  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
678  }
679  c.call(std::bind(&InputSource::doEndJob, input_.get()));
680  if(looper_) {
681  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
682  }
683  c.call([actReg](){actReg->postEndJobSignal_();});
684  if(c.hasThrown()) {
685  c.rethrow();
686  }
687  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1785 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, edm::LuminosityBlockPrincipal::setComplete(), edm::LuminosityBlockPrincipal::setEndTime(), and subProcesses_.

Referenced by Types.EventRange::cppID().

1785  {
1786  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1787  {
1788  SendSourceTerminationSignalIfException sentry(actReg_.get());
1789 
1790  lumiPrincipal.setEndTime(input_->timestamp());
1791  lumiPrincipal.setComplete();
1792  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1793  sentry.completedSuccessfully();
1794  }
1795  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1796  // lumi blocks know their start and end times why not also start and end events?
1797  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1798  lumiPrincipal.endTime());
1799  {
1800  SendSourceTerminationSignalIfException sentry(actReg_.get());
1801  espController_->eventSetupForInstance(ts);
1802  sentry.completedSuccessfully();
1803  }
1804  EventSetup const& es = esp_->eventSetup();
1805  {
1806  //To wait, the ref count has to b 1+#streams
1807  auto streamLoopWaitTask = make_empty_waiting_task();
1808  streamLoopWaitTask->increment_ref_count();
1809 
1810  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1811 
1812  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1813  *schedule_,
1815  lumiPrincipal,
1816  ts,
1817  es,
1818  subProcesses_,
1819  cleaningUpAfterException);
1820  streamLoopWaitTask->wait_for_all();
1821  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1822  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1823  }
1824  }
1825  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1826  if(looper_) {
1827  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1828  }
1829  {
1830  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1831  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1832  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1833  }
1834  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1835  if(looper_) {
1836  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1837  }
1838  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1562 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

1562  {
1563  if(looper_) {
1564  ModuleChanger changer(schedule_.get(),preg_.get());
1565  looper_->setModuleChanger(&changer);
1566  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1567  looper_->setModuleChanger(nullptr);
1568  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1569  else return false;
1570  }
1571  FDEBUG(1) << "\tendOfLoop\n";
1572  return true;
1573  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1243 of file EventProcessor.cc.

References schedule_.

1243  {
1244  return schedule_->endPathsEnabled();
1245  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1671 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, edm::RunPrincipal::setComplete(), edm::RunPrincipal::setEndTime(), and subProcesses_.

1671  {
1672  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1673  {
1674  SendSourceTerminationSignalIfException sentry(actReg_.get());
1675 
1676  runPrincipal.setEndTime(input_->timestamp());
1677  runPrincipal.setComplete();
1678  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1679  sentry.completedSuccessfully();
1680  }
1681 
1683  runPrincipal.endTime());
1684  {
1685  SendSourceTerminationSignalIfException sentry(actReg_.get());
1686  espController_->eventSetupForInstance(ts);
1687  sentry.completedSuccessfully();
1688  }
1689  EventSetup const& es = esp_->eventSetup();
1690  {
1691  //To wait, the ref count has to be 1+#streams
1692  auto streamLoopWaitTask = make_empty_waiting_task();
1693  streamLoopWaitTask->increment_ref_count();
1694 
1695  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1696 
1697  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1698  *schedule_,
1700  runPrincipal,
1701  ts,
1702  es,
1703  subProcesses_,
1704  cleaningUpAfterException);
1705 
1706  streamLoopWaitTask->wait_for_all();
1707  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1708  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1709  }
1710  }
1711  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1712  if(looper_) {
1713  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1714  }
1715  {
1716  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1717  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1718  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1719  }
1720  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1721  if(looper_) {
1722  looper_->doEndRun(runPrincipal, es, &processContext_);
1723  }
1724  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 917 of file EventProcessor.cc.

References actReg_, beginJob(), continueAfterChildFailure_, edm::eventsetup::EventSetupRecord::doGet(), MillePedeFileConverter_cfg::e, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), input_, edm::installCustomHandler(), edm::InputSource::IsFile, edm::InputSource::IsRun, RecoTauDiscriminantConfiguration::mask, NULL, numberOfForkedChildren_, numberOfSequentialEventsPerChild_, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), possiblyContinueAfterForkChildFailure(), readFile(), schedule_, serviceToken_, setCpuAffinity_, edm::shutdown_flag, and cms::Exception::what().

917  {
918 
919  if(0 == numberOfForkedChildren_) {return true;}
920  assert(0<numberOfForkedChildren_);
921  //do what we want done in common
922  {
923  beginJob(); //make sure this was run
924  // make the services available
926 
927  InputSource::ItemType itemType;
928  itemType = input_->nextItemType();
929 
930  assert(itemType == InputSource::IsFile);
931  {
932  readFile();
933  }
934  itemType = input_->nextItemType();
935  assert(itemType == InputSource::IsRun);
936 
937  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
938  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
939  input_->runAuxiliary()->beginTime());
940  espController_->eventSetupForInstance(ts);
941  EventSetup const& es = esp_->eventSetup();
942 
943  //now get all the data available in the EventSetup
944  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
945  es.fillAvailableRecordKeys(recordKeys);
946  std::vector<eventsetup::DataKey> dataKeys;
947  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
948  itKey != itEnd;
949  ++itKey) {
950  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
951  //see if this is on our exclusion list
952  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
953  ExcludedData const* excludedData(nullptr);
954  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
955  excludedData = &(itExcludeRec->second);
956  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
957  //skip all items in this record
958  continue;
959  }
960  }
961  if(0 != recordPtr) {
962  dataKeys.clear();
963  recordPtr->fillRegisteredDataKeys(dataKeys);
964  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
965  itDataKey != itDataKeyEnd;
966  ++itDataKey) {
967  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
968  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
969  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
970  continue;
971  }
972  try {
973  recordPtr->doGet(*itDataKey);
974  } catch(cms::Exception& e) {
975  LogWarning("ForkingEventSetupPreFetching") << e.what();
976  }
977  }
978  }
979  }
980  }
981  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
982  {
983  // make the services available
985  Service<JobReport> jobReport;
986  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
987 
988  //Now actually do the forking
989  actReg_->preForkReleaseResourcesSignal_();
990  input_->doPreForkReleaseResources();
991  schedule_->preForkReleaseResources();
992  }
993  installCustomHandler(SIGCHLD, ep_sigchld);
994 
995 
996  unsigned int childIndex = 0;
997  unsigned int const kMaxChildren = numberOfForkedChildren_;
998  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
999  std::vector<pid_t> childrenIds;
1000  childrenIds.reserve(kMaxChildren);
1001  std::vector<int> childrenSockets;
1002  childrenSockets.reserve(kMaxChildren);
1003  std::vector<int> childrenPipes;
1004  childrenPipes.reserve(kMaxChildren);
1005  std::vector<int> childrenSocketsCopy;
1006  childrenSocketsCopy.reserve(kMaxChildren);
1007  std::vector<int> childrenPipesCopy;
1008  childrenPipesCopy.reserve(kMaxChildren);
1009  int pipes[] {0, 0};
1010 
1011  {
1012  // make the services available
1014  Service<JobReport> jobReport;
1015  int sockets[2], fd_flags;
1016  for(; childIndex < kMaxChildren; ++childIndex) {
1017  // Create a UNIX_DGRAM socket pair
1018  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1019  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022  if (pipe(pipes)) {
1023  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1027  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1028  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1029  exit(EXIT_FAILURE);
1030  }
1031  // Mark socket as non-block. Child must be careful to do select prior
1032  // to reading from socket.
1033  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1034  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1035  exit(EXIT_FAILURE);
1036  }
1037  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1038  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1039  exit(EXIT_FAILURE);
1040  }
1041  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1042  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1043  exit(EXIT_FAILURE);
1044  }
1045  // Linux man page notes there are some edge cases where reading from a
1046  // fd can block, even after a select.
1047  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1048  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1049  exit(EXIT_FAILURE);
1050  }
1051  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1052  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1053  exit(EXIT_FAILURE);
1054  }
1055 
1056  childrenPipesCopy = childrenPipes;
1057  childrenSocketsCopy = childrenSockets;
1058 
1059  pid_t value = fork();
1060  if(value == 0) {
1061  // Close the parent's side of the socket and pipe which will talk to us.
1062  close(pipes[0]);
1063  close(sockets[0]);
1064  // Close our copies of the parent's other communication pipes.
1065  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1066  close(*it);
1067  }
1068  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1069  close(*it);
1070  }
1071 
1072  // this is the child process, redirect stdout and stderr to a log file
1073  fflush(stdout);
1074  fflush(stderr);
1075  std::stringstream stout;
1076  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1077  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1078  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1079  }
1080  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1081  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1082  }
1083 
1084  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1085  if(setCpuAffinity_) {
1086  // CPU affinity is handled differently on macosx.
1087  // We disable it and print a message until someone reads:
1088  //
1089  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1090  //
1091  // and implements it.
1092 #ifdef __APPLE__
1093  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1094 #else
1095  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1096  cpu_set_t mask;
1097  CPU_ZERO(&mask);
1098  CPU_SET(childIndex, &mask);
1099  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1100  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1101  exit(-1);
1102  }
1103 #endif
1104  }
1105  break;
1106  } else {
1107  //this is the parent
1108  close(pipes[1]);
1109  close(sockets[1]);
1110  }
1111  if(value < 0) {
1112  LogError("ForkingChild") << "failed to create a child";
1113  exit(-1);
1114  }
1115  childrenIds.push_back(value);
1116  childrenSockets.push_back(sockets[0]);
1117  childrenPipes.push_back(pipes[0]);
1118  }
1119 
1120  if(childIndex < kMaxChildren) {
1121  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1122  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1123 
1124  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1125  input_->doPostForkReacquireResources(receiver);
1126  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1127  //NOTE: sources have to reset themselves by listening to the post fork message
1128  //rewindInput();
1129  return true;
1130  }
1131  jobReport->parentAfterFork(jobReportFile);
1132  }
1133 
1134  //this is the original, which is now the master for all the children
1135 
1136  //Need to wait for signals from the children or externally
1137  // To wait we must
1138  // 1) block the signals we want to wait on so we do not have a race condition
1139  // 2) check that we haven't already meet our ending criteria
1140  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1141  sigset_t blockingSigSet;
1142  sigset_t unblockingSigSet;
1143  sigset_t oldSigSet;
1144  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1145  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1146  sigaddset(&blockingSigSet, SIGCHLD);
1147  sigaddset(&blockingSigSet, SIGUSR2);
1148  sigaddset(&blockingSigSet, SIGINT);
1149  sigdelset(&unblockingSigSet, SIGCHLD);
1150  sigdelset(&unblockingSigSet, SIGUSR2);
1151  sigdelset(&unblockingSigSet, SIGINT);
1152  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1153 
1154  // If there are too many fd's (unlikely, but possible) for select, denote this
1155  // because the sender will fail.
1156  bool too_many_fds = false;
1157  if (pipes[1]+1 > FD_SETSIZE) {
1158  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1159  too_many_fds = true;
1160  }
1161 
1162  //create a thread that sends the units of work to workers
1163  // we create it after all signals were blocked so that this
1164  // thread is never interupted by a signal
1165  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1166  boost::thread senderThread(sender);
1167 
1168  if(not too_many_fds) {
1169  //NOTE: a child could have failed before we got here and even after this call
1170  // which is why the 'if' is conditional on continueAfterChildFailure_
1172  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1173  sigsuspend(&unblockingSigSet);
1175  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1176  }
1177  }
1178  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1179 
1180  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1181  if(child_failed) {
1182  LogError("ForkingStopping") << "child failed";
1183  }
1184  if(shutdown_flag) {
1185  LogSystem("ForkingStopping") << "asked to shutdown";
1186  }
1187 
1188  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1189  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1190  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1191  it != itEnd; ++it) {
1192  /* int result = */ kill(*it, SIGUSR2);
1193  }
1194  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1195  while(num_children_done != kMaxChildren) {
1196  sigsuspend(&unblockingSigSet);
1197  }
1198  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1199  }
1200  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1201  senderThread.join();
1202  if(child_failed && !continueAfterChildFailure_) {
1203  if (child_fail_signal) {
1204  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1205  } else if (child_fail_exit_status) {
1206  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1207  } else {
1208  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1209  }
1210  }
1211  if(too_many_fds) {
1212  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1213  }
1214  return false;
1215  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
edm::propagate_const< std::unique_ptr< InputSource > > input_
void possiblyContinueAfterForkChildFailure()
volatile std::atomic< bool > shutdown_flag
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: value.py:1
def pipe(cmdline, input=None)
Definition: pipe.py:5
virtual void readFile() override
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1218 of file EventProcessor.cc.

References schedule_.

1218  {
1219  return schedule_->getAllModuleDescriptions();
1220  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 690 of file EventProcessor.cc.

References AlCaHLTBitMon_ParallelJobs::p, and serviceToken_.

Referenced by ~EventProcessor().

690  {
691  return serviceToken_;
692  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1248 of file EventProcessor.cc.

References schedule_.

1248  {
1249  schedule_->getTriggerReport(rep);
1250  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1188
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTask iTask,
unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1985 of file EventProcessor.cc.

References deferredExceptionPtr_, deferredExceptionPtrIsSet_, pyrootRender::destroy(), edm::WaitingTaskHolder::doneWaiting(), h, edm::make_waiting_task(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, and sourceResourcesAcquirer_.

Referenced by readAndProcessEvent().

1988  {
1989  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
1990  if(iPtr) {
1991  bool expected = false;
1992  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1993  deferredExceptionPtr_ = *iPtr;
1994  {
1995  WaitingTaskHolder h(iTask);
1996  h.doneWaiting(*iPtr);
1997  }
1998  }
1999  //the stream will stop now
2000  iTask->decrement_ref_count();
2001  return;
2002  }
2003 
2004  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
2005  });
2006 
2007  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
2009 
2010  try {
2011  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
2012  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
2013  } else {
2014  //the stream will stop now
2015  tbb::task::destroy(*recursionTask);
2016  iTask->decrement_ref_count();
2017  }
2018  } catch(...) {
2019  WaitingTaskHolder h(recursionTask);
2020  h.doneWaiting(std::current_exception());
2021  }
2022  });
2023  }
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def destroy(e)
Definition: pyrootRender.py:13
void push(const T &iAction)
asynchronously pushes functor iAction into queue
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 411 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, diffTreeTool::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), and thinnedAssociationsHelper_.

Referenced by EventProcessor().

413  {
414 
415  //std::cerr << processDesc->dump() << std::endl;
416 
417  // register the empty parentage vector , once and for all
419 
420  // register the empty parameter set, once and for all.
421  ParameterSet().registerIt();
422 
423  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
424 
425  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
426  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
427  bool const hasSubProcesses = !subProcessVParameterSet.empty();
428 
429  // Now set some parameters specific to the main process.
430  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
431  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
432  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
433  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
434  //threading
435  unsigned int nThreads=1;
436  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
437  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
438  if(nThreads == 0) {
439  nThreads = 1;
440  }
441  }
442  /* TODO: when we support having each stream run in a different thread use this default
443  unsigned int nStreams =nThreads;
444  */
445  unsigned int nStreams =1;
446  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
447  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
448  if(nStreams==0) {
449  nStreams = nThreads;
450  }
451  // PG: Log the number of streams
452  edm::LogInfo("StreamSetup") <<"setting # streams "<<nStreams;
453  }
454  /*
455  bool nRunsSet = false;
456  */
457  unsigned int nConcurrentRuns =1;
458  /*
459  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
460  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
461  }
462  */
463  unsigned int nConcurrentLumis =1;
464  /*
465  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
466  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
467  } else {
468  nConcurrentLumis = nConcurrentRuns;
469  }
470  */
471  //Check that relationships between threading parameters makes sense
472  /*
473  if(nThreads<nStreams) {
474  //bad
475  }
476  if(nConcurrentRuns>nStreams) {
477  //bad
478  }
479  if(nConcurrentRuns>nConcurrentLumis) {
480  //bad
481  }
482  */
483  //forking
484  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
485  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
486  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
487  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
488  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
489  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
490  for(auto const& ps : excluded) {
491  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
492  ps.getUntrackedParameter<std::string>("label", ""));
493  }
494  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
495 
496  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
497 
498  // Now do general initialization
499  ScheduleItems items;
500 
501  //initialize the services
502  auto& serviceSets = processDesc->getServicesPSets();
503  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
504  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
505 
506  //make the services available
508 
509  if(nStreams>1) {
511  handler->willBeUsingThreads();
512  }
513 
514  // intialize miscellaneous items
515  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
516 
517  // intialize the event setup provider
518  esp_ = espController_->makeProvider(*parameterSet);
519 
520  // initialize the looper, if any
521  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
522  if(looper_) {
523  looper_->setActionTable(items.act_table_.get());
524  looper_->attachTo(*items.actReg_);
525 
526  //For now loopers make us run only 1 transition at a time
527  nStreams=1;
528  nConcurrentLumis=1;
529  nConcurrentRuns=1;
530  }
531 
532  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
533 
534  // initialize the input source
535  input_ = makeInput(*parameterSet,
536  *common,
537  items.preg(),
538  items.branchIDListHelper(),
539  items.thinnedAssociationsHelper(),
540  items.actReg_,
541  items.processConfiguration(),
543 
544  // intialize the Schedule
545  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
546 
547  // set the data members
548  act_table_ = std::move(items.act_table_);
549  actReg_ = items.actReg_;
550  preg_ = items.preg();
551  branchIDListHelper_ = items.branchIDListHelper();
552  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
553  processConfiguration_ = items.processConfiguration();
555  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
556 
557  FDEBUG(2) << parameterSet << std::endl;
558 
560  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
561  // Reusable event principal
562  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
565  }
566 
567  // fill the subprocesses, if there are any
568  subProcesses_.reserve(subProcessVParameterSet.size());
569  for(auto& subProcessPSet : subProcessVParameterSet) {
570  subProcesses_.emplace_back(subProcessPSet,
571  *parameterSet,
572  preg(),
575  SubProcessParentageHelper(),
577  *actReg_,
578  token,
581  &processContext_);
582  }
583  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:692
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 266 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

266 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 267 of file EventProcessor.h.

References edm::get_underlying_safe().

267 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::openOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1519 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1519  {
1520  if (fb_.get() != nullptr) {
1521  schedule_->openOutputFiles(*fb_);
1522  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1523  }
1524  FDEBUG(1) << "\topenOutputFiles\n";
1525  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 901 of file EventProcessor.cc.

References continueAfterChildFailure_.

Referenced by forkProcess().

901  {
902  if(child_failed && continueAfterChildFailure_) {
903  if (child_fail_signal) {
904  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
905  child_fail_signal=0;
906  } else if (child_fail_exit_status) {
907  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
908  child_fail_exit_status=0;
909  } else {
910  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
911  }
912  child_failed =false;
913  }
914  }
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 260 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), readLuminosityBlock(), readRun(), and runToCompletion().

260 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe().

261 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1581 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

1581  {
1582  looper_->prepareForNextLoop(esp_.get());
1583  FDEBUG(1) << "\tprepareForNextLoop\n";
1584  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2082 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by handleNextEventForStreamAsync(), and readAndProcessEvent().

2083  {
2084  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2085  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2087  if(rng.isAvailable()) {
2088  Event ev(*pep, ModuleDescription(), nullptr);
2089  rng->postEventRead(ev);
2090  }
2091  assert(pep->luminosityBlockPrincipalPtrValid());
2092  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2093  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2094 
2095  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2096  tbb::task::allocate_root(),
2097  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2098  {
2100 
2101  //NOTE: If we have a looper we only have one Stream
2102  if(looper_) {
2103  processEventWithLooper(*pep);
2104  }
2105 
2106  FDEBUG(1) << "\tprocessEvent\n";
2107  pep->clearEventPrincipal();
2108  if(iPtr) {
2109  iHolder.doneWaiting(*iPtr);
2110  } else {
2111  iHolder.doneWaiting(std::exception_ptr());
2112  }
2113  }
2114  )
2115  );
2116  WaitingTaskHolder afterProcessTask;
2117  if(subProcesses_.empty()) {
2118  afterProcessTask = std::move(finalizeEventTask);
2119  } else {
2120  //Need to run SubProcesses after schedule has finished
2121  // with the event
2122  afterProcessTask = WaitingTaskHolder(
2123  make_waiting_task(tbb::task::allocate_root(),
2124  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2125  {
2126  if(not iPtr) {
2128 
2129  //when run with 1 thread, we want to the order to be what
2130  // it was before. This requires reversing the order since
2131  // tasks are run last one in first one out
2132  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2133  subProcess.doEventAsync(finalizeEventTask,*pep);
2134  }
2135  } else {
2136  finalizeEventTask.doneWaiting(*iPtr);
2137  }
2138  })
2139  );
2140  }
2141 
2142  schedule_->processOneEventAsync(std::move(afterProcessTask),
2143  iStreamIndex,*pep, esp_->eventSetup());
2144 
2145  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 2147 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsync().

2147  {
2148  bool randomAccess = input_->randomAccess();
2149  ProcessingController::ForwardState forwardState = input_->forwardState();
2150  ProcessingController::ReverseState reverseState = input_->reverseState();
2151  ProcessingController pc(forwardState, reverseState, randomAccess);
2152 
2154  do {
2155 
2156  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2157  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2158 
2159  bool succeeded = true;
2160  if(randomAccess) {
2161  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2162  input_->skipEvents(-2);
2163  }
2164  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2165  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2166  }
2167  }
2168  pc.setLastOperationSucceeded(succeeded);
2169  } while(!pc.lastOperationSucceeded());
2170  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2171  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
int edm::EventProcessor::readAndMergeLumi ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1895 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::lumiPrincipalPtr(), edm::PrincipalCache::merge(), preg(), and principalCache_.

1895  {
1896  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1897  {
1898  SendSourceTerminationSignalIfException sentry(actReg_.get());
1899  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1900  sentry.completedSuccessfully();
1901  }
1902  return input_->luminosityBlock();
1903  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1858 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1858  {
1859  principalCache_.merge(input_->runAuxiliary(), preg());
1860  auto runPrincipal =principalCache_.runPrincipalPtr();
1861  {
1862  SendSourceTerminationSignalIfException sentry(actReg_.get());
1863  input_->readAndMergeRun(*runPrincipal);
1864  sentry.completedSuccessfully();
1865  }
1866  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1867  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1868  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2025 of file EventProcessor.cc.

References asyncStopRequestedWhileProcessingEvents_, deferredExceptionPtr_, deferredExceptionPtrIsSet_, firstEventInBlock_, handleNextEventForStreamAsync(), edm::InputSource::IsEvent, edm::make_empty_waiting_task(), edm::make_waiting_task(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processEventAsync(), and readEvent().

2025  {
2026  if(numberOfForkedChildren_>0) {
2027  //Have to do something special for forking since
2028  // after each event the system may have to skip
2029  // some transitions. This is handled in runToCompletion
2030  readEvent(0);
2031  auto eventLoopWaitTask = make_empty_waiting_task();
2032  eventLoopWaitTask->increment_ref_count();
2033  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
2034  eventLoopWaitTask->wait_for_all();
2035  return;
2036  }
2039 
2040  std::atomic<bool> finishedProcessingEvents{false};
2041  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2042 
2043  //The state machine already found the event so
2044  // we have to avoid looking again
2045  firstEventInBlock_ = true;
2046 
2047  //To wait, the ref count has to b 1+#streams
2048  auto eventLoopWaitTask = make_empty_waiting_task();
2049  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2050  eventLoopWaitTask->increment_ref_count();
2051 
2052  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2053  unsigned int iStreamIndex = 0;
2054  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2055  eventLoopWaitTask->increment_ref_count();
2056  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2057  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2058  }) );
2059  }
2060  eventLoopWaitTask->increment_ref_count();
2061  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2062  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2063  }));
2064 
2065  //One of the processing threads saw an exception
2067  std::rethrow_exception(deferredExceptionPtr_);
2068  }
2069  }
void readEvent(unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2070 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, and processContext_.

Referenced by readAndProcessEvent(), and readNextEventForStream().

2070  {
2071  //TODO this will have to become per stream
2072  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2073  StreamContext streamContext(event.streamID(), &processContext_);
2074 
2075  SendSourceTerminationSignalIfException sentry(actReg_.get());
2076  input_->readEvent(event, streamContext);
2077  sentry.completedSuccessfully();
2078 
2079  FDEBUG(1) << "\treadEvent\n";
2080  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1492 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), or, edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, and findQualityFiles::size.

Referenced by forkProcess(), Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1492  {
1493  FDEBUG(1) << " \treadFile\n";
1494  size_t size = preg_->size();
1495  SendSourceTerminationSignalIfException sentry(actReg_.get());
1496 
1497  fb_ = input_->readFile();
1498  if(size < preg_->size()) {
1500  }
1502  if((numberOfForkedChildren_ > 0) or
1505  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1506  }
1507  sentry.completedSuccessfully();
1508  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1870 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasLumiPrincipal(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, processConfiguration_, and edm::PrincipalCache::runPrincipalPtr().

1870  {
1873  << "EventProcessor::readRun\n"
1874  << "Illegal attempt to insert lumi into cache\n"
1875  << "Contact a Framework Developer\n";
1876  }
1879  << "EventProcessor::readRun\n"
1880  << "Illegal attempt to insert lumi into cache\n"
1881  << "Run is invalid\n"
1882  << "Contact a Framework Developer\n";
1883  }
1884  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1885  {
1886  SendSourceTerminationSignalIfException sentry(actReg_.get());
1887  input_->readLuminosityBlock(*lbp, *historyAppender_);
1888  sentry.completedSuccessfully();
1889  }
1890  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1891  principalCache_.insert(lbp);
1892  return input_->luminosityBlock();
1893  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1929 of file EventProcessor.cc.

References actReg_, asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, checkForAsyncStopRequest(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::ExternalSignal, firstEventInBlock_, input_, edm::InputSource::IsEvent, nextItemTypeFromProcessingEvents_, edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), preallocations_, readEvent(), serviceToken_, shouldWeStop(), and sourceMutex_.

Referenced by handleNextEventForStreamAsync().

1930  {
1931  if(shouldWeStop()) {
1932  return false;
1933  }
1934 
1935  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1936  return false;
1937  }
1938 
1939  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1940  return false;
1941  }
1942 
1946  handler->initializeThisThreadForUse();
1947  }
1948 
1949  try {
1950  //need to use lock in addition to the serial task queue because
1951  // of delayed provenance reading and reading data in response to
1952  // edm::Refs etc
1953  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1954  if(not firstEventInBlock_) {
1955  //The state machine already called input_->nextItemType
1956  // and found an event. We can't call input_->nextItemType
1957  // again since it would move to the next transition
1958  InputSource::ItemType itemType = input_->nextItemType();
1959  if (InputSource::IsEvent !=itemType) {
1961  finishedProcessingEvents->store(true,std::memory_order_release);
1962  //std::cerr<<"next item type "<<itemType<<"\n";
1963  return false;
1964  }
1966  //std::cerr<<"task told to async stop\n";
1967  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1968  return false;
1969  }
1970  } else {
1971  firstEventInBlock_ = false;
1972  }
1973  readEvent(iStreamIndex);
1974  } catch (...) {
1975  bool expected =false;
1976  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1977  deferredExceptionPtr_ = std::current_exception();
1978 
1979  }
1980  return false;
1981  }
1982  return true;
1983  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
virtual bool shouldWeStop() const override
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< std::recursive_mutex > sourceMutex_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
statemachine::Run edm::EventProcessor::readRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1840 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, and processConfiguration_.

1840  {
1843  << "EventProcessor::readRun\n"
1844  << "Illegal attempt to insert run into cache\n"
1845  << "Contact a Framework Developer\n";
1846  }
1847  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1848  {
1849  SendSourceTerminationSignalIfException sentry(actReg_.get());
1850  input_->readRun(*rp, *historyAppender_);
1851  sentry.completedSuccessfully();
1852  }
1853  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1854  principalCache_.insert(rp);
1855  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1856  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1544 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1544  {
1545  if (fb_.get() != nullptr) {
1546  schedule_->respondToCloseInputFile(*fb_);
1547  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1548  }
1549  FDEBUG(1) << "\trespondToCloseInputFile\n";
1550  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1535 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1535  {
1536  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1537  if (fb_.get() != nullptr) {
1538  schedule_->respondToOpenInputFile(*fb_);
1539  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1540  }
1541  FDEBUG(1) << "\trespondToOpenInputFile\n";
1542  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1575 of file EventProcessor.cc.

References FDEBUG, and input_.

1575  {
1576  input_->repeat();
1577  input_->rewind();
1578  FDEBUG(1) << "\trewind\n";
1579  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 337 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

337  {
338  return runToCompletion();
339  }
virtual StatusCode runToCompletion() override
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1304 of file EventProcessor.cc.

References actReg_, cms::Exception::addAdditionalInfo(), edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), alreadyHandlingException_, cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), checkForAsyncStopRequest(), createStateMachine(), MillePedeFileConverter_cfg::e, edm::IEventProcessor::epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, edm::ExternalSignal, FDEBUG, forceLooperToEnd_, input_, edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, edm::errors::LogicError, eostools::move(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, cmsPerfStripChart::operate(), preg(), preg_, principalCache_, runEdmFileComparison::returnCode, serviceToken_, findQualityFiles::size, stateMachineWasInErrorState_, terminateMachine(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

1304  {
1305 
1308  std::unique_ptr<statemachine::Machine> machine;
1309  {
1310  beginJob(); //make sure this was called
1311 
1312  //StatusCode returnCode = epSuccess;
1314 
1315  // make the services available
1317 
1318  machine = createStateMachine();
1321  try {
1322  convertException::wrap([&]() {
1323 
1324  InputSource::ItemType itemType;
1325 
1326  while(true) {
1327 
1328  bool more = true;
1329  if(numberOfForkedChildren_ > 0) {
1330  size_t size = preg_->size();
1331  {
1332  SendSourceTerminationSignalIfException sentry(actReg_.get());
1333  more = input_->skipForForking();
1334  sentry.completedSuccessfully();
1335  }
1336  if(more) {
1337  if(size < preg_->size()) {
1339  }
1341  }
1342  }
1343  {
1344  SendSourceTerminationSignalIfException sentry(actReg_.get());
1345  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1346  sentry.completedSuccessfully();
1347  }
1348 
1349  FDEBUG(1) << "itemType = " << itemType << "\n";
1350 
1351  if(checkForAsyncStopRequest(returnCode)) {
1352  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1353  forceLooperToEnd_ = true;
1354  machine->process_event(statemachine::Stop());
1355  forceLooperToEnd_ = false;
1356  break;
1357  }
1358 
1359  if(itemType == InputSource::IsEvent) {
1360  machine->process_event(statemachine::Event());
1362  forceLooperToEnd_ = true;
1363  machine->process_event(statemachine::Stop());
1364  forceLooperToEnd_ = false;
1366  break;
1367  }
1369  }
1370 
1371  if(itemType == InputSource::IsEvent) {
1372  }
1373  else if(itemType == InputSource::IsStop) {
1374  machine->process_event(statemachine::Stop());
1375  }
1376  else if(itemType == InputSource::IsFile) {
1377  machine->process_event(statemachine::File());
1378  }
1379  else if(itemType == InputSource::IsRun) {
1380  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1381  }
1382  else if(itemType == InputSource::IsLumi) {
1383  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1384  }
1385  else if(itemType == InputSource::IsSynchronize) {
1386  //For now, we don't have to do anything
1387  }
1388  // This should be impossible
1389  else {
1391  << "Unknown next item type passed to EventProcessor\n"
1392  << "Please report this error to the Framework group\n";
1393  }
1394  if(machine->terminated()) {
1395  break;
1396  }
1397  } // End of loop over state machine events
1398  }); // convertException::wrap
1399  } // Try block
1400  // Some comments on exception handling related to the boost state machine:
1401  //
1402  // Some states used in the machine are special because they
1403  // perform actions while the machine is being terminated, actions
1404  // such as close files, call endRun, call endLumi etc ... Each of these
1405  // states has two functions that perform these actions. The functions
1406  // are almost identical. The major difference is that one version
1407  // catches all exceptions and the other lets exceptions pass through.
1408  // The destructor catches them and the other function named "exit" lets
1409  // them pass through. On a normal termination, boost will always call
1410  // "exit" and then the state destructor. In our state classes, the
1411  // the destructors do nothing if the exit function already took
1412  // care of things. Here's the interesting part. When boost is
1413  // handling an exception the "exit" function is not called (a boost
1414  // feature).
1415  //
1416  // If an exception occurs while the boost machine is in control
1417  // (which usually means inside a process_event call), then
1418  // the boost state machine destroys its states and "terminates" itself.
1419  // This already done before we hit the catch blocks below. In this case
1420  // the call to terminateMachine below only destroys an already
1421  // terminated state machine. Because exit is not called, the state destructors
1422  // handle cleaning up lumis, runs, and files. The destructors swallow
1423  // all exceptions and only pass through the exceptions messages, which
1424  // are tacked onto the original exception below.
1425  //
1426  // If an exception occurs when the boost state machine is not
1427  // in control (outside the process_event functions), then boost
1428  // cannot destroy its own states. The terminateMachine function
1429  // below takes care of that. The flag "alreadyHandlingException"
1430  // is set true so that the state exit functions do nothing (and
1431  // cannot throw more exceptions while handling the first). Then the
1432  // state destructors take care of this because exit did nothing.
1433  //
1434  // In both cases above, the EventProcessor::endOfLoop function is
1435  // not called because it can throw exceptions.
1436  //
1437  // One tricky aspect of the state machine is that things that can
1438  // throw should not be invoked by the state machine while another
1439  // exception is being handled.
1440  // Another tricky aspect is that it appears to be important to
1441  // terminate the state machine before invoking its destructor.
1442  // We've seen crashes that are not understood when that is not
1443  // done. Maintainers of this code should be careful about this.
1444 
1445  catch (cms::Exception & e) {
1447  terminateMachine(std::move(machine));
1448  alreadyHandlingException_ = false;
1449  if (!exceptionMessageLumis_.empty()) {
1451  if (e.alreadyPrinted()) {
1452  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1453  }
1454  }
1455  if (!exceptionMessageRuns_.empty()) {
1457  if (e.alreadyPrinted()) {
1458  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1459  }
1460  }
1461  if (!exceptionMessageFiles_.empty()) {
1463  if (e.alreadyPrinted()) {
1464  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1465  }
1466  }
1467  throw;
1468  }
1469 
1470  if(machine->terminated()) {
1471  FDEBUG(1) << "The state machine reports it has been terminated\n";
1472  machine.reset();
1473  }
1474 
1476  throw cms::Exception("BadState")
1477  << "The boost state machine in the EventProcessor exited after\n"
1478  << "entering the Error state.\n";
1479  }
1480 
1481  }
1482  if(machine.get() != nullptr) {
1483  terminateMachine(std::move(machine));
1485  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1486  << "Please report this error to the Framework group\n";
1487  }
1488 
1489  return returnCode;
1490  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2187 of file EventProcessor.cc.

References exceptionMessageFiles_, and python.rootplot.argparse::message.

2187  {
2189  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2195 of file EventProcessor.cc.

References exceptionMessageLumis_, and python.rootplot.argparse::message.

2195  {
2197  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2191 of file EventProcessor.cc.

References exceptionMessageRuns_, and python.rootplot.argparse::message.

2191  {
2193  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1586 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1586  {
1587  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1588  if(!subProcesses_.empty()) {
1589  for(auto const& subProcess : subProcesses_) {
1590  if(subProcess.shouldWeCloseOutput()) {
1591  return true;
1592  }
1593  }
1594  return false;
1595  }
1596  return schedule_->shouldWeCloseOutput();
1597  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2173 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2173  {
2174  FDEBUG(1) << "\tshouldWeStop\n";
2175  if(shouldWeStop_) return true;
2176  if(!subProcesses_.empty()) {
2177  for(auto const& subProcess : subProcesses_) {
2178  if(subProcess.terminate()) {
2179  return true;
2180  }
2181  }
2182  return false;
2183  }
2184  return schedule_->terminate();
2185  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1552 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

1552  {
1553  shouldWeStop_ = false;
1554  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1555  // until after we've called beginOfJob
1556  if(looper_ && looperBeginJobRun_) {
1557  looper_->doStartingNewLoop();
1558  }
1559  FDEBUG(1) << "\tstartingNewLoop\n";
1560  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::terminateMachine ( std::unique_ptr< statemachine::Machine iMachine)
private

Definition at line 2203 of file EventProcessor.cc.

References FDEBUG, and forceLooperToEnd_.

Referenced by runToCompletion().

2203  {
2204  if(iMachine.get() != nullptr) {
2205  if(!iMachine->terminated()) {
2206  forceLooperToEnd_ = true;
2207  iMachine->process_event(statemachine::Stop());
2208  forceLooperToEnd_ = false;
2209  }
2210  else {
2211  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2212  }
2213  if(iMachine->terminated()) {
2214  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2215  }
2216  }
2217  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 264 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 265 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1223 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

1223  {
1224  return schedule_->totalEvents();
1225  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1233 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

1233  {
1234  return schedule_->totalEventsFailed();
1235  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1228 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

1228  {
1229  return schedule_->totalEventsPassed();
1230  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1917 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), edm::PrincipalCache::lumiPrincipal(), principalCache_, processContext_, schedule_, and subProcesses_.

1917  {
1919  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1920  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1921  }
ProcessContext processContext_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1905 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), principalCache_, processContext_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1905  {
1906  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1907  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1908  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1909  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 309 of file EventProcessor.h.

Referenced by alreadyHandlingException(), and runToCompletion().

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 321 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 323 of file EventProcessor.h.

Referenced by readNextEventForStream(), and runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 301 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 317 of file EventProcessor.h.

Referenced by forkProcess(), init(), and possiblyContinueAfterForkChildFailure().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 305 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 281 of file EventProcessor.h.

Referenced by beginLumi(), beginRun(), endLumi(), endRun(), forkProcess(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 328 of file EventProcessor.h.

Referenced by forkProcess(), and init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 306 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 308 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 307 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
std::string edm::EventProcessor::fileMode_
private

Definition at line 304 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 324 of file EventProcessor.h.

Referenced by readAndProcessEvent(), and readNextEventForStream().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 312 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 310 of file EventProcessor.h.

Referenced by endOfLoop(), runToCompletion(), and terminateMachine().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 322 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 314 of file EventProcessor.h.

Referenced by forkProcess(), init(), readAndProcessEvent(), readFile(), and runToCompletion().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 315 of file EventProcessor.h.

Referenced by forkProcess(), and init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 286 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 276 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), readFile(), and runToCompletion().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 284 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 316 of file EventProcessor.h.

Referenced by forkProcess(), and init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 302 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 299 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 298 of file EventProcessor.h.

Referenced by handleNextEventForStreamAsync().

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 303 of file EventProcessor.h.

Referenced by doErrorStuff(), and runToCompletion().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init().