CMS 3D CMS Logo

List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const override
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void beginRun (statemachine::Run const &run) override
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException) override
 
virtual void closeOutputFiles () override
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void deleteRunFromCache (statemachine::Run const &run) override
 
virtual void doErrorStuff () override
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
 
virtual bool endOfLoop () override
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException) override
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles () override
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop () override
 
ProcessConfiguration const & processConfiguration () const
 
virtual int readAndMergeLumi () override
 
virtual statemachine::Run readAndMergeRun () override
 
virtual void readAndProcessEvent () override
 
virtual void readFile () override
 
virtual int readLuminosityBlock () override
 
virtual statemachine::Run readRun () override
 
virtual void respondToCloseInputFile () override
 
virtual void respondToOpenInputFile () override
 
virtual void rewindInput () override
 
StatusCode run ()
 
virtual StatusCode runToCompletion () override
 
virtual void setExceptionMessageFiles (std::string &message) override
 
virtual void setExceptionMessageLumis (std::string &message) override
 
virtual void setExceptionMessageRuns (std::string &message) override
 
virtual bool shouldWeCloseOutput () const override
 
virtual bool shouldWeStop () const override
 
virtual void startingNewLoop () override
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void writeRun (statemachine::Run const &run) override
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
std::unique_ptr< statemachine::MachinecreateStateMachine ()
 
void handleNextEventForStreamAsync (WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
void possiblyContinueAfterForkChildFailure ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void setupSignal ()
 
void terminateMachine (std::unique_ptr< statemachine::Machine >)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
std::string fileMode_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
bool stateMachineWasInErrorState_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 65 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 326 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 327 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 227 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

231  :
232  actReg_(),
233  preg_(),
235  serviceToken_(),
236  input_(),
237  espController_(new eventsetup::EventSetupsController),
238  esp_(),
239  act_table_(),
241  schedule_(),
242  subProcesses_(),
243  historyAppender_(new HistoryAppender),
244  fb_(),
245  looper_(),
247  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
248  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
249  principalCache_(),
250  beginJobCalled_(false),
251  shouldWeStop_(false),
253  fileMode_(),
259  forceLooperToEnd_(false),
260  looperBeginJobRun_(false),
264  setCpuAffinity_(false),
266  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
267  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
268  processDesc->addServices(defaultServices, forcedServices);
269  init(processDesc, iToken, iLegacy);
270  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 272 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

274  :
275  actReg_(),
276  preg_(),
278  serviceToken_(),
279  input_(),
280  espController_(new eventsetup::EventSetupsController),
281  esp_(),
282  act_table_(),
284  schedule_(),
285  subProcesses_(),
286  historyAppender_(new HistoryAppender),
287  fb_(),
288  looper_(),
290  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
291  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
292  principalCache_(),
293  beginJobCalled_(false),
294  shouldWeStop_(false),
296  fileMode_(),
302  forceLooperToEnd_(false),
303  looperBeginJobRun_(false),
307  setCpuAffinity_(false),
311  {
312  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
313  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
314  processDesc->addServices(defaultServices, forcedServices);
316  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 318 of file EventProcessor.cc.

References init().

320  :
321  actReg_(),
322  preg_(),
324  serviceToken_(),
325  input_(),
326  espController_(new eventsetup::EventSetupsController),
327  esp_(),
328  act_table_(),
330  schedule_(),
331  subProcesses_(),
332  historyAppender_(new HistoryAppender),
333  fb_(),
334  looper_(),
336  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
337  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
338  principalCache_(),
339  beginJobCalled_(false),
340  shouldWeStop_(false),
342  fileMode_(),
348  forceLooperToEnd_(false),
349  looperBeginJobRun_(false),
353  setCpuAffinity_(false),
357  {
358  init(processDesc, token, legacy);
359  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 362 of file EventProcessor.cc.

References mps_alisetup::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

362  :
363  actReg_(),
364  preg_(),
366  serviceToken_(),
367  input_(),
368  espController_(new eventsetup::EventSetupsController),
369  esp_(),
370  act_table_(),
372  schedule_(),
373  subProcesses_(),
374  historyAppender_(new HistoryAppender),
375  fb_(),
376  looper_(),
378  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
379  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
380  principalCache_(),
381  beginJobCalled_(false),
382  shouldWeStop_(false),
384  fileMode_(),
390  forceLooperToEnd_(false),
391  looperBeginJobRun_(false),
395  setCpuAffinity_(false),
399  {
400  if(isPython) {
401  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
402  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
404  }
405  else {
406  auto processDesc = std::make_shared<ProcessDesc>(config);
408  }
409  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 586 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

586  {
587  // Make the services available while everything is being deleted.
588  ServiceToken token = getToken();
589  ServiceRegistry::Operate op(token);
590 
591  // manually destroy all these thing that may need the services around
592  // propagate_const<T> has no reset() function
593  espController_ = nullptr;
594  esp_ = nullptr;
595  schedule_ = nullptr;
596  input_ = nullptr;
597  looper_ = nullptr;
598  actReg_ = nullptr;
599 
602  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:44
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2222 of file EventProcessor.cc.

References alreadyHandlingException_.

2222  {
2224  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 605 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by forkProcess(), and runToCompletion().

605  {
606  if(beginJobCalled_) return;
607  beginJobCalled_=true;
608  bk::beginJob();
609 
610  // StateSentry toerror(this); // should we add this ?
611  //make the services available
613 
614  service::SystemBounds bounds(preallocations_.numberOfStreams(),
618  actReg_->preallocateSignal_(bounds);
620 
621  //NOTE: this may throw
623  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
624 
625  //NOTE: This implementation assumes 'Job' means one call
626  // the EventProcessor::run
627  // If it really means once per 'application' then this code will
628  // have to be changed.
629  // Also have to deal with case where have 'run' then new Module
630  // added and do 'run'
631  // again. In that case the newly added Module needs its 'beginJob'
632  // to be called.
633 
634  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
635  // For now we delay calling beginOfJob until first beginOfRun
636  //if(looper_) {
637  // looper_->beginOfJob(es);
638  //}
639  try {
640  convertException::wrap([&]() {
641  input_->doBeginJob();
642  });
643  }
644  catch(cms::Exception& ex) {
645  ex.addContext("Calling beginJob for the source");
646  throw;
647  }
648  schedule_->beginJob(*preg_);
649  // toerror.succeeded(); // should we add this?
650  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
651  actReg_->postBeginJobSignal_();
652 
653  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
654  schedule_->beginStream(i);
655  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
656  }
657  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1738 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, espController_, FDEBUG, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, and subProcesses_.

1738  {
1739  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1740  {
1741  SendSourceTerminationSignalIfException sentry(actReg_.get());
1742 
1743  input_->doBeginLumi(lumiPrincipal, &processContext_);
1744  sentry.completedSuccessfully();
1745  }
1746 
1748  if(rng.isAvailable()) {
1749  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1750  rng->preBeginLumi(lb);
1751  }
1752 
1753  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1754  // lumi blocks know their start and end times why not also start and end events?
1755  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1756  {
1757  SendSourceTerminationSignalIfException sentry(actReg_.get());
1758  espController_->eventSetupForInstance(ts);
1759  sentry.completedSuccessfully();
1760  }
1761  EventSetup const& es = esp_->eventSetup();
1762  {
1763  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1764  auto globalWaitTask = make_empty_waiting_task();
1765  globalWaitTask->increment_ref_count();
1766  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1767  *schedule_,
1768  lumiPrincipal,
1769  ts,
1770  es,
1771  subProcesses_);
1772  globalWaitTask->wait_for_all();
1773  if(globalWaitTask->exceptionPtr() != nullptr) {
1774  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1775  }
1776  }
1777  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1778  if(looper_) {
1779  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1780  }
1781  {
1782  //To wait, the ref count has to b 1+#streams
1783  auto streamLoopWaitTask = make_empty_waiting_task();
1784  streamLoopWaitTask->increment_ref_count();
1785 
1786  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1787 
1788  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1789  *schedule_,
1791  lumiPrincipal,
1792  ts,
1793  es,
1794  subProcesses_);
1795  streamLoopWaitTask->wait_for_all();
1796  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1797  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1798  }
1799  }
1800 
1801  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1802  if(looper_) {
1803  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1804  }
1805  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1611 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1611  {
1612  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1613  {
1614  SendSourceTerminationSignalIfException sentry(actReg_.get());
1615 
1616  input_->doBeginRun(runPrincipal, &processContext_);
1617  sentry.completedSuccessfully();
1618  }
1619 
1620  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1621  runPrincipal.beginTime());
1623  espController_->forceCacheClear();
1624  }
1625  {
1626  SendSourceTerminationSignalIfException sentry(actReg_.get());
1627  espController_->eventSetupForInstance(ts);
1628  sentry.completedSuccessfully();
1629  }
1630  EventSetup const& es = esp_->eventSetup();
1631  if(looper_ && looperBeginJobRun_== false) {
1632  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1633  looper_->beginOfJob(es);
1634  looperBeginJobRun_ = true;
1635  looper_->doStartingNewLoop();
1636  }
1637  {
1638  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1639  auto globalWaitTask = make_empty_waiting_task();
1640  globalWaitTask->increment_ref_count();
1641  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1642  *schedule_,
1643  runPrincipal,
1644  ts,
1645  es,
1646  subProcesses_);
1647  globalWaitTask->wait_for_all();
1648  if(globalWaitTask->exceptionPtr() != nullptr) {
1649  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1650  }
1651  }
1652  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1653  if(looper_) {
1654  looper_->doBeginRun(runPrincipal, es, &processContext_);
1655  }
1656  {
1657  //To wait, the ref count has to be 1+#streams
1658  auto streamLoopWaitTask = make_empty_waiting_task();
1659  streamLoopWaitTask->increment_ref_count();
1660 
1661  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1662 
1663  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1664  *schedule_,
1666  runPrincipal,
1667  ts,
1668  es,
1669  subProcesses_);
1670 
1671  streamLoopWaitTask->wait_for_all();
1672  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1673  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1674  }
1675  }
1676  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1677  if(looper_) {
1678  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1679  }
1680  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 262 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 263 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1292 of file EventProcessor.cc.

References edm::IEventProcessor::epSignal, and edm::shutdown_flag.

Referenced by readNextEventForStream(), and runToCompletion().

1292  {
1293  bool returnValue = false;
1294 
1295  // Look for a shutdown signal
1296  if(shutdown_flag.load(std::memory_order_acquire)) {
1297  returnValue = true;
1298  returnCode = epSignal;
1299  }
1300  return returnValue;
1301  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1254 of file EventProcessor.cc.

References schedule_.

1254  {
1255  schedule_->clearCounters();
1256  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1511 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

1511  {
1512  if (fb_.get() != nullptr) {
1513  SendSourceTerminationSignalIfException sentry(actReg_.get());
1514  input_->closeFile(fb_.get(), cleaningUpAfterException);
1515  sentry.completedSuccessfully();
1516  }
1517  FDEBUG(1) << "\tcloseInputFile\n";
1518  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1528 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1528  {
1529  if (fb_.get() != nullptr) {
1530  schedule_->closeOutputFiles();
1531  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1532  }
1533  FDEBUG(1) << "\tcloseOutputFiles\n";
1534  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::unique_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1260 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, emptyRunLumiMode_, Exception, fileMode_, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by runToCompletion().

1260  {
1261  statemachine::FileMode fileMode;
1262  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1263  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1264  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1265  else {
1266  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1267  << fileMode_ << ".\n"
1268  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1269  }
1270 
1271  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1272  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1273  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1274  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1275  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1276  else {
1277  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1278  << emptyRunLumiMode_ << ".\n"
1279  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1280  }
1281 
1282  auto machine = std::make_unique<statemachine::Machine>(
1283  this,
1284  fileMode,
1285  emptyRunLumiMode);
1286 
1287  machine->initiate();
1288  return machine;
1289  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1946 of file EventProcessor.cc.

References edm::PrincipalCache::deleteLumi(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1946  {
1948  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1949  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1950  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1934 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), and subProcesses_.

1934  {
1935  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1936  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1937  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1938  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1600 of file EventProcessor.cc.

References FDEBUG, and stateMachineWasInErrorState_.

1600  {
1601  FDEBUG(1) << "\tdoErrorStuff\n";
1602  LogError("StateMachine")
1603  << "The EventProcessor state machine encountered an unexpected event\n"
1604  << "and went to the error state\n"
1605  << "Will attempt to terminate processing normally\n"
1606  << "(IF using the looper the next loop will be attempted)\n"
1607  << "This likely indicates a bug in an input module or corrupted input or both\n";
1609  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1239 of file EventProcessor.cc.

References schedule_.

1239  {
1240  schedule_->enableEndPaths(active);
1241  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 660 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

660  {
661  // Collects exceptions, so we don't throw before all operations are performed.
662  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
663 
664  //make the services available
666 
667  //NOTE: this really should go elsewhere in the future
668  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
669  c.call([this,i](){this->schedule_->endStream(i);});
670  for(auto& subProcess : subProcesses_) {
671  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
672  }
673  }
674  auto actReg = actReg_.get();
675  c.call([actReg](){actReg->preEndJobSignal_();});
676  schedule_->endJob(c);
677  for(auto& subProcess : subProcesses_) {
678  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
679  }
680  c.call(std::bind(&InputSource::doEndJob, input_.get()));
681  if(looper_) {
682  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
683  }
684  c.call([actReg](){actReg->postEndJobSignal_();});
685  if(c.hasThrown()) {
686  c.rethrow();
687  }
688  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1807 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, edm::Principal::setAtEndTransition(), edm::LuminosityBlockPrincipal::setComplete(), edm::LuminosityBlockPrincipal::setEndTime(), and subProcesses_.

Referenced by Types.EventRange::cppID().

1807  {
1808  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1809  {
1810  SendSourceTerminationSignalIfException sentry(actReg_.get());
1811 
1812  lumiPrincipal.setEndTime(input_->timestamp());
1813  lumiPrincipal.setComplete();
1814  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1815  sentry.completedSuccessfully();
1816  }
1817  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1818  // lumi blocks know their start and end times why not also start and end events?
1819  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1820  lumiPrincipal.endTime());
1821  {
1822  SendSourceTerminationSignalIfException sentry(actReg_.get());
1823  espController_->eventSetupForInstance(ts);
1824  sentry.completedSuccessfully();
1825  }
1826  EventSetup const& es = esp_->eventSetup();
1827  {
1828  //To wait, the ref count has to b 1+#streams
1829  auto streamLoopWaitTask = make_empty_waiting_task();
1830  streamLoopWaitTask->increment_ref_count();
1831 
1832  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1833 
1834  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1835  *schedule_,
1837  lumiPrincipal,
1838  ts,
1839  es,
1840  subProcesses_,
1841  cleaningUpAfterException);
1842  streamLoopWaitTask->wait_for_all();
1843  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1844  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1845  }
1846  }
1847  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1848  if(looper_) {
1849  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1850  }
1851  {
1852  lumiPrincipal.setAtEndTransition(true);
1853  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1854  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1855  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1856  }
1857  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1858  if(looper_) {
1859  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1860  }
1861  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1563 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

1563  {
1564  if(looper_) {
1565  ModuleChanger changer(schedule_.get(),preg_.get());
1566  looper_->setModuleChanger(&changer);
1567  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1568  looper_->setModuleChanger(nullptr);
1569  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1570  else return false;
1571  }
1572  FDEBUG(1) << "\tendOfLoop\n";
1573  return true;
1574  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1244 of file EventProcessor.cc.

References schedule_.

1244  {
1245  return schedule_->endPathsEnabled();
1246  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1682 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, edm::Principal::setAtEndTransition(), edm::RunPrincipal::setComplete(), edm::RunPrincipal::setEndTime(), and subProcesses_.

1682  {
1683  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1684  {
1685  SendSourceTerminationSignalIfException sentry(actReg_.get());
1686 
1687  runPrincipal.setEndTime(input_->timestamp());
1688  runPrincipal.setComplete();
1689  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1690  sentry.completedSuccessfully();
1691  }
1692 
1694  runPrincipal.endTime());
1695  {
1696  SendSourceTerminationSignalIfException sentry(actReg_.get());
1697  espController_->eventSetupForInstance(ts);
1698  sentry.completedSuccessfully();
1699  }
1700  EventSetup const& es = esp_->eventSetup();
1701  {
1702  //To wait, the ref count has to be 1+#streams
1703  auto streamLoopWaitTask = make_empty_waiting_task();
1704  streamLoopWaitTask->increment_ref_count();
1705 
1706  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1707 
1708  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1709  *schedule_,
1711  runPrincipal,
1712  ts,
1713  es,
1714  subProcesses_,
1715  cleaningUpAfterException);
1716 
1717  streamLoopWaitTask->wait_for_all();
1718  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1719  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1720  }
1721  }
1722  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1723  if(looper_) {
1724  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1725  }
1726  {
1727  runPrincipal.setAtEndTransition(true);
1728  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1729  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1730  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1731  }
1732  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1733  if(looper_) {
1734  looper_->doEndRun(runPrincipal, es, &processContext_);
1735  }
1736  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 918 of file EventProcessor.cc.

References actReg_, beginJob(), continueAfterChildFailure_, edm::eventsetup::EventSetupRecord::doGet(), MillePedeFileConverter_cfg::e, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), input_, edm::installCustomHandler(), edm::InputSource::IsFile, edm::InputSource::IsRun, RecoTauDiscriminantConfiguration::mask, NULL, numberOfForkedChildren_, numberOfSequentialEventsPerChild_, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), possiblyContinueAfterForkChildFailure(), readFile(), schedule_, serviceToken_, setCpuAffinity_, edm::shutdown_flag, and cms::Exception::what().

918  {
919 
920  if(0 == numberOfForkedChildren_) {return true;}
921  assert(0<numberOfForkedChildren_);
922  //do what we want done in common
923  {
924  beginJob(); //make sure this was run
925  // make the services available
927 
928  InputSource::ItemType itemType;
929  itemType = input_->nextItemType();
930 
931  assert(itemType == InputSource::IsFile);
932  {
933  readFile();
934  }
935  itemType = input_->nextItemType();
936  assert(itemType == InputSource::IsRun);
937 
938  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
939  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
940  input_->runAuxiliary()->beginTime());
941  espController_->eventSetupForInstance(ts);
942  EventSetup const& es = esp_->eventSetup();
943 
944  //now get all the data available in the EventSetup
945  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
946  es.fillAvailableRecordKeys(recordKeys);
947  std::vector<eventsetup::DataKey> dataKeys;
948  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
949  itKey != itEnd;
950  ++itKey) {
951  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
952  //see if this is on our exclusion list
953  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
954  ExcludedData const* excludedData(nullptr);
955  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
956  excludedData = &(itExcludeRec->second);
957  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
958  //skip all items in this record
959  continue;
960  }
961  }
962  if(0 != recordPtr) {
963  dataKeys.clear();
964  recordPtr->fillRegisteredDataKeys(dataKeys);
965  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
966  itDataKey != itDataKeyEnd;
967  ++itDataKey) {
968  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
969  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
970  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
971  continue;
972  }
973  try {
974  recordPtr->doGet(*itDataKey);
975  } catch(cms::Exception& e) {
976  LogWarning("ForkingEventSetupPreFetching") << e.what();
977  }
978  }
979  }
980  }
981  }
982  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
983  {
984  // make the services available
986  Service<JobReport> jobReport;
987  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
988 
989  //Now actually do the forking
990  actReg_->preForkReleaseResourcesSignal_();
991  input_->doPreForkReleaseResources();
992  schedule_->preForkReleaseResources();
993  }
994  installCustomHandler(SIGCHLD, ep_sigchld);
995 
996 
997  unsigned int childIndex = 0;
998  unsigned int const kMaxChildren = numberOfForkedChildren_;
999  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1000  std::vector<pid_t> childrenIds;
1001  childrenIds.reserve(kMaxChildren);
1002  std::vector<int> childrenSockets;
1003  childrenSockets.reserve(kMaxChildren);
1004  std::vector<int> childrenPipes;
1005  childrenPipes.reserve(kMaxChildren);
1006  std::vector<int> childrenSocketsCopy;
1007  childrenSocketsCopy.reserve(kMaxChildren);
1008  std::vector<int> childrenPipesCopy;
1009  childrenPipesCopy.reserve(kMaxChildren);
1010  int pipes[] {0, 0};
1011 
1012  {
1013  // make the services available
1015  Service<JobReport> jobReport;
1016  int sockets[2], fd_flags;
1017  for(; childIndex < kMaxChildren; ++childIndex) {
1018  // Create a UNIX_DGRAM socket pair
1019  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1020  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1021  exit(EXIT_FAILURE);
1022  }
1023  if (pipe(pipes)) {
1024  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1025  exit(EXIT_FAILURE);
1026  }
1027  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1028  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1029  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1030  exit(EXIT_FAILURE);
1031  }
1032  // Mark socket as non-block. Child must be careful to do select prior
1033  // to reading from socket.
1034  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1035  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1036  exit(EXIT_FAILURE);
1037  }
1038  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1039  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1040  exit(EXIT_FAILURE);
1041  }
1042  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1043  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1044  exit(EXIT_FAILURE);
1045  }
1046  // Linux man page notes there are some edge cases where reading from a
1047  // fd can block, even after a select.
1048  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1049  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1050  exit(EXIT_FAILURE);
1051  }
1052  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1053  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1054  exit(EXIT_FAILURE);
1055  }
1056 
1057  childrenPipesCopy = childrenPipes;
1058  childrenSocketsCopy = childrenSockets;
1059 
1060  pid_t value = fork();
1061  if(value == 0) {
1062  // Close the parent's side of the socket and pipe which will talk to us.
1063  close(pipes[0]);
1064  close(sockets[0]);
1065  // Close our copies of the parent's other communication pipes.
1066  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1067  close(*it);
1068  }
1069  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1070  close(*it);
1071  }
1072 
1073  // this is the child process, redirect stdout and stderr to a log file
1074  fflush(stdout);
1075  fflush(stderr);
1076  std::stringstream stout;
1077  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1078  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1079  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1080  }
1081  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1082  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1083  }
1084 
1085  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1086  if(setCpuAffinity_) {
1087  // CPU affinity is handled differently on macosx.
1088  // We disable it and print a message until someone reads:
1089  //
1090  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1091  //
1092  // and implements it.
1093 #ifdef __APPLE__
1094  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1095 #else
1096  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1097  cpu_set_t mask;
1098  CPU_ZERO(&mask);
1099  CPU_SET(childIndex, &mask);
1100  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1101  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1102  exit(-1);
1103  }
1104 #endif
1105  }
1106  break;
1107  } else {
1108  //this is the parent
1109  close(pipes[1]);
1110  close(sockets[1]);
1111  }
1112  if(value < 0) {
1113  LogError("ForkingChild") << "failed to create a child";
1114  exit(-1);
1115  }
1116  childrenIds.push_back(value);
1117  childrenSockets.push_back(sockets[0]);
1118  childrenPipes.push_back(pipes[0]);
1119  }
1120 
1121  if(childIndex < kMaxChildren) {
1122  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1123  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1124 
1125  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1126  input_->doPostForkReacquireResources(receiver);
1127  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1128  //NOTE: sources have to reset themselves by listening to the post fork message
1129  //rewindInput();
1130  return true;
1131  }
1132  jobReport->parentAfterFork(jobReportFile);
1133  }
1134 
1135  //this is the original, which is now the master for all the children
1136 
1137  //Need to wait for signals from the children or externally
1138  // To wait we must
1139  // 1) block the signals we want to wait on so we do not have a race condition
1140  // 2) check that we haven't already meet our ending criteria
1141  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1142  sigset_t blockingSigSet;
1143  sigset_t unblockingSigSet;
1144  sigset_t oldSigSet;
1145  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1146  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1147  sigaddset(&blockingSigSet, SIGCHLD);
1148  sigaddset(&blockingSigSet, SIGUSR2);
1149  sigaddset(&blockingSigSet, SIGINT);
1150  sigdelset(&unblockingSigSet, SIGCHLD);
1151  sigdelset(&unblockingSigSet, SIGUSR2);
1152  sigdelset(&unblockingSigSet, SIGINT);
1153  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1154 
1155  // If there are too many fd's (unlikely, but possible) for select, denote this
1156  // because the sender will fail.
1157  bool too_many_fds = false;
1158  if (pipes[1]+1 > FD_SETSIZE) {
1159  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1160  too_many_fds = true;
1161  }
1162 
1163  //create a thread that sends the units of work to workers
1164  // we create it after all signals were blocked so that this
1165  // thread is never interupted by a signal
1166  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1167  boost::thread senderThread(sender);
1168 
1169  if(not too_many_fds) {
1170  //NOTE: a child could have failed before we got here and even after this call
1171  // which is why the 'if' is conditional on continueAfterChildFailure_
1173  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1174  sigsuspend(&unblockingSigSet);
1176  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1177  }
1178  }
1179  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1180 
1181  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1182  if(child_failed) {
1183  LogError("ForkingStopping") << "child failed";
1184  }
1185  if(shutdown_flag) {
1186  LogSystem("ForkingStopping") << "asked to shutdown";
1187  }
1188 
1189  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1190  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1191  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1192  it != itEnd; ++it) {
1193  /* int result = */ kill(*it, SIGUSR2);
1194  }
1195  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1196  while(num_children_done != kMaxChildren) {
1197  sigsuspend(&unblockingSigSet);
1198  }
1199  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1200  }
1201  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1202  senderThread.join();
1203  if(child_failed && !continueAfterChildFailure_) {
1204  if (child_fail_signal) {
1205  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1206  } else if (child_fail_exit_status) {
1207  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1208  } else {
1209  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1210  }
1211  }
1212  if(too_many_fds) {
1213  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1214  }
1215  return false;
1216  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
edm::propagate_const< std::unique_ptr< InputSource > > input_
void possiblyContinueAfterForkChildFailure()
volatile std::atomic< bool > shutdown_flag
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: value.py:1
def pipe(cmdline, input=None)
Definition: pipe.py:5
virtual void readFile() override
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1219 of file EventProcessor.cc.

References schedule_.

1219  {
1220  return schedule_->getAllModuleDescriptions();
1221  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 691 of file EventProcessor.cc.

References AlCaHLTBitMon_ParallelJobs::p, and serviceToken_.

Referenced by ~EventProcessor().

691  {
692  return serviceToken_;
693  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1249 of file EventProcessor.cc.

References schedule_.

1249  {
1250  schedule_->getTriggerReport(rep);
1251  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1188
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTask iTask,
unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 2008 of file EventProcessor.cc.

References deferredExceptionPtr_, deferredExceptionPtrIsSet_, pyrootRender::destroy(), edm::WaitingTaskHolder::doneWaiting(), h, edm::make_waiting_task(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, and sourceResourcesAcquirer_.

Referenced by readAndProcessEvent().

2011  {
2012  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
2013  if(iPtr) {
2014  bool expected = false;
2015  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2016  deferredExceptionPtr_ = *iPtr;
2017  {
2018  WaitingTaskHolder h(iTask);
2019  h.doneWaiting(*iPtr);
2020  }
2021  }
2022  //the stream will stop now
2023  iTask->decrement_ref_count();
2024  return;
2025  }
2026 
2027  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
2028  });
2029 
2030  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
2032 
2033  try {
2034  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
2035  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
2036  } else {
2037  //the stream will stop now
2038  tbb::task::destroy(*recursionTask);
2039  iTask->decrement_ref_count();
2040  }
2041  } catch(...) {
2042  WaitingTaskHolder h(recursionTask);
2043  h.doneWaiting(std::current_exception());
2044  }
2045  });
2046  }
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def destroy(e)
Definition: pyrootRender.py:13
void push(const T &iAction)
asynchronously pushes functor iAction into queue
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 412 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, diffTreeTool::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), and thinnedAssociationsHelper_.

Referenced by EventProcessor().

414  {
415 
416  //std::cerr << processDesc->dump() << std::endl;
417 
418  // register the empty parentage vector , once and for all
420 
421  // register the empty parameter set, once and for all.
422  ParameterSet().registerIt();
423 
424  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
425 
426  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
427  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
428  bool const hasSubProcesses = !subProcessVParameterSet.empty();
429 
430  // Now set some parameters specific to the main process.
431  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
432  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
433  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
434  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
435  //threading
436  unsigned int nThreads=1;
437  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
438  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
439  if(nThreads == 0) {
440  nThreads = 1;
441  }
442  }
443  /* TODO: when we support having each stream run in a different thread use this default
444  unsigned int nStreams =nThreads;
445  */
446  unsigned int nStreams =1;
447  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
448  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
449  if(nStreams==0) {
450  nStreams = nThreads;
451  }
452  // PG: Log the number of streams
453  edm::LogInfo("StreamSetup") <<"setting # streams "<<nStreams;
454  }
455  /*
456  bool nRunsSet = false;
457  */
458  unsigned int nConcurrentRuns =1;
459  /*
460  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
461  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
462  }
463  */
464  unsigned int nConcurrentLumis =1;
465  /*
466  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
467  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
468  } else {
469  nConcurrentLumis = nConcurrentRuns;
470  }
471  */
472  //Check that relationships between threading parameters makes sense
473  /*
474  if(nThreads<nStreams) {
475  //bad
476  }
477  if(nConcurrentRuns>nStreams) {
478  //bad
479  }
480  if(nConcurrentRuns>nConcurrentLumis) {
481  //bad
482  }
483  */
484  //forking
485  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
486  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
487  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
488  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
489  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
490  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
491  for(auto const& ps : excluded) {
492  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
493  ps.getUntrackedParameter<std::string>("label", ""));
494  }
495  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
496 
497  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
498 
499  // Now do general initialization
500  ScheduleItems items;
501 
502  //initialize the services
503  auto& serviceSets = processDesc->getServicesPSets();
504  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
505  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
506 
507  //make the services available
509 
510  if(nStreams>1) {
512  handler->willBeUsingThreads();
513  }
514 
515  // intialize miscellaneous items
516  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
517 
518  // intialize the event setup provider
519  esp_ = espController_->makeProvider(*parameterSet);
520 
521  // initialize the looper, if any
522  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
523  if(looper_) {
524  looper_->setActionTable(items.act_table_.get());
525  looper_->attachTo(*items.actReg_);
526 
527  //For now loopers make us run only 1 transition at a time
528  nStreams=1;
529  nConcurrentLumis=1;
530  nConcurrentRuns=1;
531  }
532 
533  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
534 
535  // initialize the input source
536  input_ = makeInput(*parameterSet,
537  *common,
538  items.preg(),
539  items.branchIDListHelper(),
540  items.thinnedAssociationsHelper(),
541  items.actReg_,
542  items.processConfiguration(),
544 
545  // intialize the Schedule
546  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
547 
548  // set the data members
549  act_table_ = std::move(items.act_table_);
550  actReg_ = items.actReg_;
551  preg_ = items.preg();
552  branchIDListHelper_ = items.branchIDListHelper();
553  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
554  processConfiguration_ = items.processConfiguration();
556  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
557 
558  FDEBUG(2) << parameterSet << std::endl;
559 
561  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
562  // Reusable event principal
563  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
566  }
567 
568  // fill the subprocesses, if there are any
569  subProcesses_.reserve(subProcessVParameterSet.size());
570  for(auto& subProcessPSet : subProcessVParameterSet) {
571  subProcesses_.emplace_back(subProcessPSet,
572  *parameterSet,
573  preg(),
576  SubProcessParentageHelper(),
578  *actReg_,
579  token,
582  &processContext_);
583  }
584  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:747
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 266 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

266 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 267 of file EventProcessor.h.

References edm::get_underlying_safe().

267 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::openOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1520 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1520  {
1521  if (fb_.get() != nullptr) {
1522  schedule_->openOutputFiles(*fb_);
1523  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1524  }
1525  FDEBUG(1) << "\topenOutputFiles\n";
1526  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 902 of file EventProcessor.cc.

References continueAfterChildFailure_.

Referenced by forkProcess().

902  {
903  if(child_failed && continueAfterChildFailure_) {
904  if (child_fail_signal) {
905  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
906  child_fail_signal=0;
907  } else if (child_fail_exit_status) {
908  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
909  child_fail_exit_status=0;
910  } else {
911  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
912  }
913  child_failed =false;
914  }
915  }
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 260 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), readLuminosityBlock(), readRun(), and runToCompletion().

260 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe().

261 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1582 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

1582  {
1583  looper_->prepareForNextLoop(esp_.get());
1584  FDEBUG(1) << "\tprepareForNextLoop\n";
1585  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2105 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by handleNextEventForStreamAsync(), and readAndProcessEvent().

2106  {
2107  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2108  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2110  if(rng.isAvailable()) {
2111  Event ev(*pep, ModuleDescription(), nullptr);
2112  rng->postEventRead(ev);
2113  }
2114  assert(pep->luminosityBlockPrincipalPtrValid());
2115  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2116  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2117 
2118  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2119  tbb::task::allocate_root(),
2120  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2121  {
2123 
2124  //NOTE: If we have a looper we only have one Stream
2125  if(looper_) {
2126  processEventWithLooper(*pep);
2127  }
2128 
2129  FDEBUG(1) << "\tprocessEvent\n";
2130  pep->clearEventPrincipal();
2131  if(iPtr) {
2132  iHolder.doneWaiting(*iPtr);
2133  } else {
2134  iHolder.doneWaiting(std::exception_ptr());
2135  }
2136  }
2137  )
2138  );
2139  WaitingTaskHolder afterProcessTask;
2140  if(subProcesses_.empty()) {
2141  afterProcessTask = std::move(finalizeEventTask);
2142  } else {
2143  //Need to run SubProcesses after schedule has finished
2144  // with the event
2145  afterProcessTask = WaitingTaskHolder(
2146  make_waiting_task(tbb::task::allocate_root(),
2147  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2148  {
2149  if(not iPtr) {
2151 
2152  //when run with 1 thread, we want to the order to be what
2153  // it was before. This requires reversing the order since
2154  // tasks are run last one in first one out
2155  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2156  subProcess.doEventAsync(finalizeEventTask,*pep);
2157  }
2158  } else {
2159  finalizeEventTask.doneWaiting(*iPtr);
2160  }
2161  })
2162  );
2163  }
2164 
2165  schedule_->processOneEventAsync(std::move(afterProcessTask),
2166  iStreamIndex,*pep, esp_->eventSetup());
2167 
2168  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 2170 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsync().

2170  {
2171  bool randomAccess = input_->randomAccess();
2172  ProcessingController::ForwardState forwardState = input_->forwardState();
2173  ProcessingController::ReverseState reverseState = input_->reverseState();
2174  ProcessingController pc(forwardState, reverseState, randomAccess);
2175 
2177  do {
2178 
2179  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2180  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2181 
2182  bool succeeded = true;
2183  if(randomAccess) {
2184  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2185  input_->skipEvents(-2);
2186  }
2187  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2188  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2189  }
2190  }
2191  pc.setLastOperationSucceeded(succeeded);
2192  } while(!pc.lastOperationSucceeded());
2193  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2194  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
int edm::EventProcessor::readAndMergeLumi ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1918 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::lumiPrincipalPtr(), edm::PrincipalCache::merge(), preg(), and principalCache_.

1918  {
1919  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1920  {
1921  SendSourceTerminationSignalIfException sentry(actReg_.get());
1922  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1923  sentry.completedSuccessfully();
1924  }
1925  return input_->luminosityBlock();
1926  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1881 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1881  {
1882  principalCache_.merge(input_->runAuxiliary(), preg());
1883  auto runPrincipal =principalCache_.runPrincipalPtr();
1884  {
1885  SendSourceTerminationSignalIfException sentry(actReg_.get());
1886  input_->readAndMergeRun(*runPrincipal);
1887  sentry.completedSuccessfully();
1888  }
1889  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1890  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1891  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2048 of file EventProcessor.cc.

References asyncStopRequestedWhileProcessingEvents_, deferredExceptionPtr_, deferredExceptionPtrIsSet_, firstEventInBlock_, handleNextEventForStreamAsync(), edm::InputSource::IsEvent, edm::make_empty_waiting_task(), edm::make_waiting_task(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processEventAsync(), and readEvent().

2048  {
2049  if(numberOfForkedChildren_>0) {
2050  //Have to do something special for forking since
2051  // after each event the system may have to skip
2052  // some transitions. This is handled in runToCompletion
2053  readEvent(0);
2054  auto eventLoopWaitTask = make_empty_waiting_task();
2055  eventLoopWaitTask->increment_ref_count();
2056  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
2057  eventLoopWaitTask->wait_for_all();
2058  return;
2059  }
2062 
2063  std::atomic<bool> finishedProcessingEvents{false};
2064  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2065 
2066  //The state machine already found the event so
2067  // we have to avoid looking again
2068  firstEventInBlock_ = true;
2069 
2070  //To wait, the ref count has to b 1+#streams
2071  auto eventLoopWaitTask = make_empty_waiting_task();
2072  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2073  eventLoopWaitTask->increment_ref_count();
2074 
2075  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2076  unsigned int iStreamIndex = 0;
2077  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2078  eventLoopWaitTask->increment_ref_count();
2079  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2080  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2081  }) );
2082  }
2083  eventLoopWaitTask->increment_ref_count();
2084  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2085  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2086  }));
2087 
2088  //One of the processing threads saw an exception
2090  std::rethrow_exception(deferredExceptionPtr_);
2091  }
2092  }
void readEvent(unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2093 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, and processContext_.

Referenced by readAndProcessEvent(), and readNextEventForStream().

2093  {
2094  //TODO this will have to become per stream
2095  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2096  StreamContext streamContext(event.streamID(), &processContext_);
2097 
2098  SendSourceTerminationSignalIfException sentry(actReg_.get());
2099  input_->readEvent(event, streamContext);
2100  sentry.completedSuccessfully();
2101 
2102  FDEBUG(1) << "\treadEvent\n";
2103  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1493 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), or, edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, and findQualityFiles::size.

Referenced by forkProcess(), Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1493  {
1494  FDEBUG(1) << " \treadFile\n";
1495  size_t size = preg_->size();
1496  SendSourceTerminationSignalIfException sentry(actReg_.get());
1497 
1498  fb_ = input_->readFile();
1499  if(size < preg_->size()) {
1501  }
1503  if((numberOfForkedChildren_ > 0) or
1506  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1507  }
1508  sentry.completedSuccessfully();
1509  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1893 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasLumiPrincipal(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, processConfiguration_, and edm::PrincipalCache::runPrincipalPtr().

1893  {
1896  << "EventProcessor::readRun\n"
1897  << "Illegal attempt to insert lumi into cache\n"
1898  << "Contact a Framework Developer\n";
1899  }
1902  << "EventProcessor::readRun\n"
1903  << "Illegal attempt to insert lumi into cache\n"
1904  << "Run is invalid\n"
1905  << "Contact a Framework Developer\n";
1906  }
1907  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1908  {
1909  SendSourceTerminationSignalIfException sentry(actReg_.get());
1910  input_->readLuminosityBlock(*lbp, *historyAppender_);
1911  sentry.completedSuccessfully();
1912  }
1913  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1914  principalCache_.insert(lbp);
1915  return input_->luminosityBlock();
1916  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1952 of file EventProcessor.cc.

References actReg_, asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, checkForAsyncStopRequest(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::ExternalSignal, firstEventInBlock_, input_, edm::InputSource::IsEvent, nextItemTypeFromProcessingEvents_, edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), preallocations_, readEvent(), serviceToken_, shouldWeStop(), and sourceMutex_.

Referenced by handleNextEventForStreamAsync().

1953  {
1954  if(shouldWeStop()) {
1955  return false;
1956  }
1957 
1958  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1959  return false;
1960  }
1961 
1962  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1963  return false;
1964  }
1965 
1969  handler->initializeThisThreadForUse();
1970  }
1971 
1972  try {
1973  //need to use lock in addition to the serial task queue because
1974  // of delayed provenance reading and reading data in response to
1975  // edm::Refs etc
1976  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1977  if(not firstEventInBlock_) {
1978  //The state machine already called input_->nextItemType
1979  // and found an event. We can't call input_->nextItemType
1980  // again since it would move to the next transition
1981  InputSource::ItemType itemType = input_->nextItemType();
1982  if (InputSource::IsEvent !=itemType) {
1984  finishedProcessingEvents->store(true,std::memory_order_release);
1985  //std::cerr<<"next item type "<<itemType<<"\n";
1986  return false;
1987  }
1989  //std::cerr<<"task told to async stop\n";
1990  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1991  return false;
1992  }
1993  } else {
1994  firstEventInBlock_ = false;
1995  }
1996  readEvent(iStreamIndex);
1997  } catch (...) {
1998  bool expected =false;
1999  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2000  deferredExceptionPtr_ = std::current_exception();
2001 
2002  }
2003  return false;
2004  }
2005  return true;
2006  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
virtual bool shouldWeStop() const override
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< std::recursive_mutex > sourceMutex_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
statemachine::Run edm::EventProcessor::readRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1863 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, and processConfiguration_.

1863  {
1866  << "EventProcessor::readRun\n"
1867  << "Illegal attempt to insert run into cache\n"
1868  << "Contact a Framework Developer\n";
1869  }
1870  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1871  {
1872  SendSourceTerminationSignalIfException sentry(actReg_.get());
1873  input_->readRun(*rp, *historyAppender_);
1874  sentry.completedSuccessfully();
1875  }
1876  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1877  principalCache_.insert(rp);
1878  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1879  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1545 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1545  {
1546  if (fb_.get() != nullptr) {
1547  schedule_->respondToCloseInputFile(*fb_);
1548  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1549  }
1550  FDEBUG(1) << "\trespondToCloseInputFile\n";
1551  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1536 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1536  {
1537  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1538  if (fb_.get() != nullptr) {
1539  schedule_->respondToOpenInputFile(*fb_);
1540  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1541  }
1542  FDEBUG(1) << "\trespondToOpenInputFile\n";
1543  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1576 of file EventProcessor.cc.

References FDEBUG, and input_.

1576  {
1577  input_->repeat();
1578  input_->rewind();
1579  FDEBUG(1) << "\trewind\n";
1580  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 337 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

337  {
338  return runToCompletion();
339  }
virtual StatusCode runToCompletion() override
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1305 of file EventProcessor.cc.

References actReg_, cms::Exception::addAdditionalInfo(), edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), alreadyHandlingException_, cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), checkForAsyncStopRequest(), createStateMachine(), MillePedeFileConverter_cfg::e, edm::IEventProcessor::epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, edm::ExternalSignal, FDEBUG, forceLooperToEnd_, input_, edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, edm::errors::LogicError, eostools::move(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, cmsPerfStripChart::operate(), preg(), preg_, principalCache_, runEdmFileComparison::returnCode, serviceToken_, findQualityFiles::size, stateMachineWasInErrorState_, terminateMachine(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

1305  {
1306 
1309  std::unique_ptr<statemachine::Machine> machine;
1310  {
1311  beginJob(); //make sure this was called
1312 
1313  //StatusCode returnCode = epSuccess;
1315 
1316  // make the services available
1318 
1319  machine = createStateMachine();
1322  try {
1323  convertException::wrap([&]() {
1324 
1325  InputSource::ItemType itemType;
1326 
1327  while(true) {
1328 
1329  bool more = true;
1330  if(numberOfForkedChildren_ > 0) {
1331  size_t size = preg_->size();
1332  {
1333  SendSourceTerminationSignalIfException sentry(actReg_.get());
1334  more = input_->skipForForking();
1335  sentry.completedSuccessfully();
1336  }
1337  if(more) {
1338  if(size < preg_->size()) {
1340  }
1342  }
1343  }
1344  {
1345  SendSourceTerminationSignalIfException sentry(actReg_.get());
1346  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1347  sentry.completedSuccessfully();
1348  }
1349 
1350  FDEBUG(1) << "itemType = " << itemType << "\n";
1351 
1352  if(checkForAsyncStopRequest(returnCode)) {
1353  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1354  forceLooperToEnd_ = true;
1355  machine->process_event(statemachine::Stop());
1356  forceLooperToEnd_ = false;
1357  break;
1358  }
1359 
1360  if(itemType == InputSource::IsEvent) {
1361  machine->process_event(statemachine::Event());
1363  forceLooperToEnd_ = true;
1364  machine->process_event(statemachine::Stop());
1365  forceLooperToEnd_ = false;
1367  break;
1368  }
1370  }
1371 
1372  if(itemType == InputSource::IsEvent) {
1373  }
1374  else if(itemType == InputSource::IsStop) {
1375  machine->process_event(statemachine::Stop());
1376  }
1377  else if(itemType == InputSource::IsFile) {
1378  machine->process_event(statemachine::File());
1379  }
1380  else if(itemType == InputSource::IsRun) {
1381  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1382  }
1383  else if(itemType == InputSource::IsLumi) {
1384  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1385  }
1386  else if(itemType == InputSource::IsSynchronize) {
1387  //For now, we don't have to do anything
1388  }
1389  // This should be impossible
1390  else {
1392  << "Unknown next item type passed to EventProcessor\n"
1393  << "Please report this error to the Framework group\n";
1394  }
1395  if(machine->terminated()) {
1396  break;
1397  }
1398  } // End of loop over state machine events
1399  }); // convertException::wrap
1400  } // Try block
1401  // Some comments on exception handling related to the boost state machine:
1402  //
1403  // Some states used in the machine are special because they
1404  // perform actions while the machine is being terminated, actions
1405  // such as close files, call endRun, call endLumi etc ... Each of these
1406  // states has two functions that perform these actions. The functions
1407  // are almost identical. The major difference is that one version
1408  // catches all exceptions and the other lets exceptions pass through.
1409  // The destructor catches them and the other function named "exit" lets
1410  // them pass through. On a normal termination, boost will always call
1411  // "exit" and then the state destructor. In our state classes, the
1412  // the destructors do nothing if the exit function already took
1413  // care of things. Here's the interesting part. When boost is
1414  // handling an exception the "exit" function is not called (a boost
1415  // feature).
1416  //
1417  // If an exception occurs while the boost machine is in control
1418  // (which usually means inside a process_event call), then
1419  // the boost state machine destroys its states and "terminates" itself.
1420  // This already done before we hit the catch blocks below. In this case
1421  // the call to terminateMachine below only destroys an already
1422  // terminated state machine. Because exit is not called, the state destructors
1423  // handle cleaning up lumis, runs, and files. The destructors swallow
1424  // all exceptions and only pass through the exceptions messages, which
1425  // are tacked onto the original exception below.
1426  //
1427  // If an exception occurs when the boost state machine is not
1428  // in control (outside the process_event functions), then boost
1429  // cannot destroy its own states. The terminateMachine function
1430  // below takes care of that. The flag "alreadyHandlingException"
1431  // is set true so that the state exit functions do nothing (and
1432  // cannot throw more exceptions while handling the first). Then the
1433  // state destructors take care of this because exit did nothing.
1434  //
1435  // In both cases above, the EventProcessor::endOfLoop function is
1436  // not called because it can throw exceptions.
1437  //
1438  // One tricky aspect of the state machine is that things that can
1439  // throw should not be invoked by the state machine while another
1440  // exception is being handled.
1441  // Another tricky aspect is that it appears to be important to
1442  // terminate the state machine before invoking its destructor.
1443  // We've seen crashes that are not understood when that is not
1444  // done. Maintainers of this code should be careful about this.
1445 
1446  catch (cms::Exception & e) {
1448  terminateMachine(std::move(machine));
1449  alreadyHandlingException_ = false;
1450  if (!exceptionMessageLumis_.empty()) {
1452  if (e.alreadyPrinted()) {
1453  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1454  }
1455  }
1456  if (!exceptionMessageRuns_.empty()) {
1458  if (e.alreadyPrinted()) {
1459  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1460  }
1461  }
1462  if (!exceptionMessageFiles_.empty()) {
1464  if (e.alreadyPrinted()) {
1465  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1466  }
1467  }
1468  throw;
1469  }
1470 
1471  if(machine->terminated()) {
1472  FDEBUG(1) << "The state machine reports it has been terminated\n";
1473  machine.reset();
1474  }
1475 
1477  throw cms::Exception("BadState")
1478  << "The boost state machine in the EventProcessor exited after\n"
1479  << "entering the Error state.\n";
1480  }
1481 
1482  }
1483  if(machine.get() != nullptr) {
1484  terminateMachine(std::move(machine));
1486  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1487  << "Please report this error to the Framework group\n";
1488  }
1489 
1490  return returnCode;
1491  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2210 of file EventProcessor.cc.

References exceptionMessageFiles_, and python.rootplot.argparse::message.

2210  {
2212  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2218 of file EventProcessor.cc.

References exceptionMessageLumis_, and python.rootplot.argparse::message.

2218  {
2220  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2214 of file EventProcessor.cc.

References exceptionMessageRuns_, and python.rootplot.argparse::message.

2214  {
2216  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1587 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1587  {
1588  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1589  if(!subProcesses_.empty()) {
1590  for(auto const& subProcess : subProcesses_) {
1591  if(subProcess.shouldWeCloseOutput()) {
1592  return true;
1593  }
1594  }
1595  return false;
1596  }
1597  return schedule_->shouldWeCloseOutput();
1598  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2196 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2196  {
2197  FDEBUG(1) << "\tshouldWeStop\n";
2198  if(shouldWeStop_) return true;
2199  if(!subProcesses_.empty()) {
2200  for(auto const& subProcess : subProcesses_) {
2201  if(subProcess.terminate()) {
2202  return true;
2203  }
2204  }
2205  return false;
2206  }
2207  return schedule_->terminate();
2208  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1553 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

1553  {
1554  shouldWeStop_ = false;
1555  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1556  // until after we've called beginOfJob
1557  if(looper_ && looperBeginJobRun_) {
1558  looper_->doStartingNewLoop();
1559  }
1560  FDEBUG(1) << "\tstartingNewLoop\n";
1561  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::terminateMachine ( std::unique_ptr< statemachine::Machine iMachine)
private

Definition at line 2226 of file EventProcessor.cc.

References FDEBUG, and forceLooperToEnd_.

Referenced by runToCompletion().

2226  {
2227  if(iMachine.get() != nullptr) {
2228  if(!iMachine->terminated()) {
2229  forceLooperToEnd_ = true;
2230  iMachine->process_event(statemachine::Stop());
2231  forceLooperToEnd_ = false;
2232  }
2233  else {
2234  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2235  }
2236  if(iMachine->terminated()) {
2237  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2238  }
2239  }
2240  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 264 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 265 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1224 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

1224  {
1225  return schedule_->totalEvents();
1226  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1234 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

1234  {
1235  return schedule_->totalEventsFailed();
1236  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1229 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

1229  {
1230  return schedule_->totalEventsPassed();
1231  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1940 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), edm::PrincipalCache::lumiPrincipal(), principalCache_, processContext_, schedule_, and subProcesses_.

1940  {
1942  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1943  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1944  }
ProcessContext processContext_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1928 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), principalCache_, processContext_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1928  {
1929  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1930  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1931  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1932  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 309 of file EventProcessor.h.

Referenced by alreadyHandlingException(), and runToCompletion().

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 321 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 323 of file EventProcessor.h.

Referenced by readNextEventForStream(), and runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 301 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 317 of file EventProcessor.h.

Referenced by forkProcess(), init(), and possiblyContinueAfterForkChildFailure().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 305 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 281 of file EventProcessor.h.

Referenced by beginLumi(), beginRun(), endLumi(), endRun(), forkProcess(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 328 of file EventProcessor.h.

Referenced by forkProcess(), and init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 306 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 308 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 307 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
std::string edm::EventProcessor::fileMode_
private

Definition at line 304 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 324 of file EventProcessor.h.

Referenced by readAndProcessEvent(), and readNextEventForStream().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 312 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 310 of file EventProcessor.h.

Referenced by endOfLoop(), runToCompletion(), and terminateMachine().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 322 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 314 of file EventProcessor.h.

Referenced by forkProcess(), init(), readAndProcessEvent(), readFile(), and runToCompletion().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 315 of file EventProcessor.h.

Referenced by forkProcess(), and init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 286 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 276 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), readFile(), and runToCompletion().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 284 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 316 of file EventProcessor.h.

Referenced by forkProcess(), and init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 302 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 299 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 298 of file EventProcessor.h.

Referenced by handleNextEventForStreamAsync().

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 303 of file EventProcessor.h.

Referenced by doErrorStuff(), and runToCompletion().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init().