CMS 3D CMS Logo

List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const override
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void beginRun (statemachine::Run const &run) override
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException) override
 
virtual void closeOutputFiles () override
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void deleteRunFromCache (statemachine::Run const &run) override
 
virtual void doErrorStuff () override
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override
 
virtual bool endOfLoop () override
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException) override
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles () override
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop () override
 
ProcessConfiguration const & processConfiguration () const
 
virtual int readAndMergeLumi () override
 
virtual statemachine::Run readAndMergeRun () override
 
virtual void readAndProcessEvent () override
 
virtual void readFile () override
 
virtual int readLuminosityBlock () override
 
virtual statemachine::Run readRun () override
 
virtual void respondToCloseInputFile () override
 
virtual void respondToOpenInputFile () override
 
virtual void rewindInput () override
 
StatusCode run ()
 
virtual StatusCode runToCompletion () override
 
virtual void setExceptionMessageFiles (std::string &message) override
 
virtual void setExceptionMessageLumis (std::string &message) override
 
virtual void setExceptionMessageRuns (std::string &message) override
 
virtual bool shouldWeCloseOutput () const override
 
virtual bool shouldWeStop () const override
 
virtual void startingNewLoop () override
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override
 
virtual void writeRun (statemachine::Run const &run) override
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair< std::string, std::string > > ExcludedData
 
typedef std::map< std::string, ExcludedDataExcludedDataMap
 

Private Member Functions

std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
std::unique_ptr< statemachine::MachinecreateStateMachine ()
 
void handleNextEventForStreamAsync (WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase const > looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
void possiblyContinueAfterForkChildFailure ()
 
std::shared_ptr< ProductRegistry const > preg () const
 
std::shared_ptr< ProductRegistry > & preg ()
 
void processEventAsync (WaitingTaskHolder iHolder, unsigned int iStreamIndex)
 
void processEventWithLooper (EventPrincipal &)
 
void readEvent (unsigned int iStreamIndex)
 
bool readNextEventForStream (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void setupSignal ()
 
void terminateMachine (std::unique_ptr< statemachine::Machine >)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr< ExceptionToActionTable const > act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
 
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
 
std::string fileMode_
 
bool firstEventInBlock_ =true
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
 
edm::propagate_const< std::unique_ptr< InputSource > > input_
 
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr< ProcessConfiguration const > processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
std::shared_ptr< std::recursive_mutex > sourceMutex_
 
SharedResourcesAcquirer sourceResourcesAcquirer_
 
bool stateMachineWasInErrorState_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 65 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 326 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 327 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 227 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

231  :
232  actReg_(),
233  preg_(),
235  serviceToken_(),
236  input_(),
237  espController_(new eventsetup::EventSetupsController),
238  esp_(),
239  act_table_(),
241  schedule_(),
242  subProcesses_(),
243  historyAppender_(new HistoryAppender),
244  fb_(),
245  looper_(),
247  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
248  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
249  principalCache_(),
250  beginJobCalled_(false),
251  shouldWeStop_(false),
253  fileMode_(),
259  forceLooperToEnd_(false),
260  looperBeginJobRun_(false),
264  setCpuAffinity_(false),
266  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
267  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
268  processDesc->addServices(defaultServices, forcedServices);
269  init(processDesc, iToken, iLegacy);
270  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 272 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

274  :
275  actReg_(),
276  preg_(),
278  serviceToken_(),
279  input_(),
280  espController_(new eventsetup::EventSetupsController),
281  esp_(),
282  act_table_(),
284  schedule_(),
285  subProcesses_(),
286  historyAppender_(new HistoryAppender),
287  fb_(),
288  looper_(),
290  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
291  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
292  principalCache_(),
293  beginJobCalled_(false),
294  shouldWeStop_(false),
296  fileMode_(),
302  forceLooperToEnd_(false),
303  looperBeginJobRun_(false),
307  setCpuAffinity_(false),
311  {
312  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
313  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
314  processDesc->addServices(defaultServices, forcedServices);
316  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 318 of file EventProcessor.cc.

References init().

320  :
321  actReg_(),
322  preg_(),
324  serviceToken_(),
325  input_(),
326  espController_(new eventsetup::EventSetupsController),
327  esp_(),
328  act_table_(),
330  schedule_(),
331  subProcesses_(),
332  historyAppender_(new HistoryAppender),
333  fb_(),
334  looper_(),
336  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
337  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
338  principalCache_(),
339  beginJobCalled_(false),
340  shouldWeStop_(false),
342  fileMode_(),
348  forceLooperToEnd_(false),
349  looperBeginJobRun_(false),
353  setCpuAffinity_(false),
357  {
358  init(processDesc, token, legacy);
359  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 362 of file EventProcessor.cc.

References mps_alisetup::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

362  :
363  actReg_(),
364  preg_(),
366  serviceToken_(),
367  input_(),
368  espController_(new eventsetup::EventSetupsController),
369  esp_(),
370  act_table_(),
372  schedule_(),
373  subProcesses_(),
374  historyAppender_(new HistoryAppender),
375  fb_(),
376  looper_(),
378  sourceResourcesAcquirer_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().first),
379  sourceMutex_(SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader().second),
380  principalCache_(),
381  beginJobCalled_(false),
382  shouldWeStop_(false),
384  fileMode_(),
390  forceLooperToEnd_(false),
391  looperBeginJobRun_(false),
395  setCpuAffinity_(false),
399  {
400  if(isPython) {
401  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
402  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
404  }
405  else {
406  auto processDesc = std::make_shared<ProcessDesc>(config);
408  }
409  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
U second(std::pair< T, U > const &p)
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static SharedResourcesRegistry * instance()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::shared_ptr< edm::ParameterSet > parameterSet() const
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
T first(std::pair< T, U > const &p)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 588 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, and schedule_.

588  {
589  // Make the services available while everything is being deleted.
590  ServiceToken token = getToken();
591  ServiceRegistry::Operate op(token);
592 
593  // manually destroy all these thing that may need the services around
594  // propagate_const<T> has no reset() function
595  espController_ = nullptr;
596  esp_ = nullptr;
597  schedule_ = nullptr;
598  input_ = nullptr;
599  looper_ = nullptr;
600  actReg_ = nullptr;
601 
604  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:44
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2220 of file EventProcessor.cc.

References alreadyHandlingException_.

2220  {
2222  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 607 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), mps_fire::i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processConfiguration_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

Referenced by forkProcess(), and runToCompletion().

607  {
608  if(beginJobCalled_) return;
609  beginJobCalled_=true;
610  bk::beginJob();
611 
612  // StateSentry toerror(this); // should we add this ?
613  //make the services available
615 
616  service::SystemBounds bounds(preallocations_.numberOfStreams(),
620  actReg_->preallocateSignal_(bounds);
621  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
623 
624  //NOTE: this may throw
626  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
627 
628  //NOTE: This implementation assumes 'Job' means one call
629  // the EventProcessor::run
630  // If it really means once per 'application' then this code will
631  // have to be changed.
632  // Also have to deal with case where have 'run' then new Module
633  // added and do 'run'
634  // again. In that case the newly added Module needs its 'beginJob'
635  // to be called.
636 
637  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
638  // For now we delay calling beginOfJob until first beginOfRun
639  //if(looper_) {
640  // looper_->beginOfJob(es);
641  //}
642  try {
643  convertException::wrap([&]() {
644  input_->doBeginJob();
645  });
646  }
647  catch(cms::Exception& ex) {
648  ex.addContext("Calling beginJob for the source");
649  throw;
650  }
651  schedule_->beginJob(*preg_);
652  // toerror.succeeded(); // should we add this?
653  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
654  actReg_->postBeginJobSignal_();
655 
656  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
657  schedule_->beginStream(i);
658  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
659  }
660  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1741 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, espController_, FDEBUG, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, and subProcesses_.

1741  {
1742  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1743  {
1744  SendSourceTerminationSignalIfException sentry(actReg_.get());
1745 
1746  input_->doBeginLumi(lumiPrincipal, &processContext_);
1747  sentry.completedSuccessfully();
1748  }
1749 
1751  if(rng.isAvailable()) {
1752  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1753  rng->preBeginLumi(lb);
1754  }
1755 
1756  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1757  // lumi blocks know their start and end times why not also start and end events?
1758  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1759  {
1760  SendSourceTerminationSignalIfException sentry(actReg_.get());
1761  espController_->eventSetupForInstance(ts);
1762  sentry.completedSuccessfully();
1763  }
1764  EventSetup const& es = esp_->eventSetup();
1765  {
1766  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1767  auto globalWaitTask = make_empty_waiting_task();
1768  globalWaitTask->increment_ref_count();
1769  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1770  *schedule_,
1771  lumiPrincipal,
1772  ts,
1773  es,
1774  subProcesses_);
1775  globalWaitTask->wait_for_all();
1776  if(globalWaitTask->exceptionPtr() != nullptr) {
1777  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1778  }
1779  }
1780  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1781  if(looper_) {
1782  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1783  }
1784  {
1785  //To wait, the ref count has to b 1+#streams
1786  auto streamLoopWaitTask = make_empty_waiting_task();
1787  streamLoopWaitTask->increment_ref_count();
1788 
1789  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1790 
1791  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1792  *schedule_,
1794  lumiPrincipal,
1795  ts,
1796  es,
1797  subProcesses_);
1798  streamLoopWaitTask->wait_for_all();
1799  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1800  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1801  }
1802  }
1803 
1804  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1805  if(looper_) {
1806  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1807  }
1808  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1614 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, espController_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, edm::make_empty_waiting_task(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1614  {
1615  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1616  {
1617  SendSourceTerminationSignalIfException sentry(actReg_.get());
1618 
1619  input_->doBeginRun(runPrincipal, &processContext_);
1620  sentry.completedSuccessfully();
1621  }
1622 
1623  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1624  runPrincipal.beginTime());
1626  espController_->forceCacheClear();
1627  }
1628  {
1629  SendSourceTerminationSignalIfException sentry(actReg_.get());
1630  espController_->eventSetupForInstance(ts);
1631  sentry.completedSuccessfully();
1632  }
1633  EventSetup const& es = esp_->eventSetup();
1634  if(looper_ && looperBeginJobRun_== false) {
1635  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1636  looper_->beginOfJob(es);
1637  looperBeginJobRun_ = true;
1638  looper_->doStartingNewLoop();
1639  }
1640  {
1641  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1642  auto globalWaitTask = make_empty_waiting_task();
1643  globalWaitTask->increment_ref_count();
1644  beginGlobalTransitionAsync<Traits>(WaitingTaskHolder(globalWaitTask.get()),
1645  *schedule_,
1646  runPrincipal,
1647  ts,
1648  es,
1649  subProcesses_);
1650  globalWaitTask->wait_for_all();
1651  if(globalWaitTask->exceptionPtr() != nullptr) {
1652  std::rethrow_exception(* (globalWaitTask->exceptionPtr()) );
1653  }
1654  }
1655  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1656  if(looper_) {
1657  looper_->doBeginRun(runPrincipal, es, &processContext_);
1658  }
1659  {
1660  //To wait, the ref count has to be 1+#streams
1661  auto streamLoopWaitTask = make_empty_waiting_task();
1662  streamLoopWaitTask->increment_ref_count();
1663 
1664  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1665 
1666  beginStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1667  *schedule_,
1669  runPrincipal,
1670  ts,
1671  es,
1672  subProcesses_);
1673 
1674  streamLoopWaitTask->wait_for_all();
1675  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1676  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1677  }
1678  }
1679  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1680  if(looper_) {
1681  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1682  }
1683  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 262 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 263 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1295 of file EventProcessor.cc.

References edm::IEventProcessor::epSignal, and edm::shutdown_flag.

Referenced by readNextEventForStream(), and runToCompletion().

1295  {
1296  bool returnValue = false;
1297 
1298  // Look for a shutdown signal
1299  if(shutdown_flag.load(std::memory_order_acquire)) {
1300  returnValue = true;
1301  returnCode = epSignal;
1302  }
1303  return returnValue;
1304  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1257 of file EventProcessor.cc.

References schedule_.

1257  {
1258  schedule_->clearCounters();
1259  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1514 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, and input_.

1514  {
1515  if (fb_.get() != nullptr) {
1516  SendSourceTerminationSignalIfException sentry(actReg_.get());
1517  input_->closeFile(fb_.get(), cleaningUpAfterException);
1518  sentry.completedSuccessfully();
1519  }
1520  FDEBUG(1) << "\tcloseInputFile\n";
1521  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1531 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1531  {
1532  if (fb_.get() != nullptr) {
1533  schedule_->closeOutputFiles();
1534  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1535  }
1536  FDEBUG(1) << "\tcloseOutputFiles\n";
1537  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::unique_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1263 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, emptyRunLumiMode_, Exception, fileMode_, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by runToCompletion().

1263  {
1264  statemachine::FileMode fileMode;
1265  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1266  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1267  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1268  else {
1269  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1270  << fileMode_ << ".\n"
1271  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1272  }
1273 
1274  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1275  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1276  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1277  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1278  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1279  else {
1280  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1281  << emptyRunLumiMode_ << ".\n"
1282  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1283  }
1284 
1285  auto machine = std::make_unique<statemachine::Machine>(
1286  this,
1287  fileMode,
1288  emptyRunLumiMode);
1289 
1290  machine->initiate();
1291  return machine;
1292  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1949 of file EventProcessor.cc.

References edm::PrincipalCache::deleteLumi(), FDEBUG, edm::for_all(), principalCache_, and subProcesses_.

1949  {
1951  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1952  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1953  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1937 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, edm::for_all(), principalCache_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), and subProcesses_.

1937  {
1938  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1939  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1940  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1941  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1603 of file EventProcessor.cc.

References FDEBUG, and stateMachineWasInErrorState_.

1603  {
1604  FDEBUG(1) << "\tdoErrorStuff\n";
1605  LogError("StateMachine")
1606  << "The EventProcessor state machine encountered an unexpected event\n"
1607  << "and went to the error state\n"
1608  << "Will attempt to terminate processing normally\n"
1609  << "(IF using the looper the next loop will be attempted)\n"
1610  << "This likely indicates a bug in an input module or corrupted input or both\n";
1612  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1242 of file EventProcessor.cc.

References schedule_.

1242  {
1243  schedule_->enableEndPaths(active);
1244  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 663 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), mps_fire::i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

Referenced by PythonEventProcessor::~PythonEventProcessor().

663  {
664  // Collects exceptions, so we don't throw before all operations are performed.
665  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
666 
667  //make the services available
669 
670  //NOTE: this really should go elsewhere in the future
671  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
672  c.call([this,i](){this->schedule_->endStream(i);});
673  for(auto& subProcess : subProcesses_) {
674  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
675  }
676  }
677  auto actReg = actReg_.get();
678  c.call([actReg](){actReg->preEndJobSignal_();});
679  schedule_->endJob(c);
680  for(auto& subProcess : subProcesses_) {
681  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
682  }
683  c.call(std::bind(&InputSource::doEndJob, input_.get()));
684  if(looper_) {
685  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
686  }
687  c.call([actReg](){actReg->postEndJobSignal_();});
688  if(c.hasThrown()) {
689  c.rethrow();
690  }
691  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1810 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, edm::LuminosityBlockPrincipal::run(), schedule_, edm::Principal::setAtEndTransition(), edm::LuminosityBlockPrincipal::setComplete(), edm::LuminosityBlockPrincipal::setEndTime(), and subProcesses_.

Referenced by Types.EventRange::cppID().

1810  {
1811  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1812  {
1813  SendSourceTerminationSignalIfException sentry(actReg_.get());
1814 
1815  lumiPrincipal.setEndTime(input_->timestamp());
1816  lumiPrincipal.setComplete();
1817  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1818  sentry.completedSuccessfully();
1819  }
1820  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1821  // lumi blocks know their start and end times why not also start and end events?
1822  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1823  lumiPrincipal.endTime());
1824  {
1825  SendSourceTerminationSignalIfException sentry(actReg_.get());
1826  espController_->eventSetupForInstance(ts);
1827  sentry.completedSuccessfully();
1828  }
1829  EventSetup const& es = esp_->eventSetup();
1830  {
1831  //To wait, the ref count has to b 1+#streams
1832  auto streamLoopWaitTask = make_empty_waiting_task();
1833  streamLoopWaitTask->increment_ref_count();
1834 
1835  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1836 
1837  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1838  *schedule_,
1840  lumiPrincipal,
1841  ts,
1842  es,
1843  subProcesses_,
1844  cleaningUpAfterException);
1845  streamLoopWaitTask->wait_for_all();
1846  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1847  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1848  }
1849  }
1850  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1851  if(looper_) {
1852  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1853  }
1854  {
1855  lumiPrincipal.setAtEndTransition(true);
1856  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1857  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1858  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1859  }
1860  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1861  if(looper_) {
1862  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1863  }
1864  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1566 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, preg_, schedule_, and mps_update::status.

1566  {
1567  if(looper_) {
1568  ModuleChanger changer(schedule_.get(),preg_.get());
1569  looper_->setModuleChanger(&changer);
1570  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1571  looper_->setModuleChanger(nullptr);
1572  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1573  else return false;
1574  }
1575  FDEBUG(1) << "\tendOfLoop\n";
1576  return true;
1577  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1247 of file EventProcessor.cc.

References schedule_.

1247  {
1248  return schedule_->endPathsEnabled();
1249  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1685 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, espController_, FDEBUG, edm::for_all(), input_, looper_, edm::make_empty_waiting_task(), edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), edm::PreallocationConfiguration::numberOfStreams(), preallocations_, principalCache_, processContext_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, edm::Principal::setAtEndTransition(), edm::RunPrincipal::setComplete(), edm::RunPrincipal::setEndTime(), and subProcesses_.

1685  {
1686  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1687  {
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689 
1690  runPrincipal.setEndTime(input_->timestamp());
1691  runPrincipal.setComplete();
1692  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1693  sentry.completedSuccessfully();
1694  }
1695 
1697  runPrincipal.endTime());
1698  {
1699  SendSourceTerminationSignalIfException sentry(actReg_.get());
1700  espController_->eventSetupForInstance(ts);
1701  sentry.completedSuccessfully();
1702  }
1703  EventSetup const& es = esp_->eventSetup();
1704  {
1705  //To wait, the ref count has to be 1+#streams
1706  auto streamLoopWaitTask = make_empty_waiting_task();
1707  streamLoopWaitTask->increment_ref_count();
1708 
1709  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1710 
1711  endStreamsTransitionAsync<Traits>(streamLoopWaitTask.get(),
1712  *schedule_,
1714  runPrincipal,
1715  ts,
1716  es,
1717  subProcesses_,
1718  cleaningUpAfterException);
1719 
1720  streamLoopWaitTask->wait_for_all();
1721  if(streamLoopWaitTask->exceptionPtr() != nullptr) {
1722  std::rethrow_exception(* (streamLoopWaitTask->exceptionPtr()) );
1723  }
1724  }
1725  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1726  if(looper_) {
1727  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1728  }
1729  {
1730  runPrincipal.setAtEndTransition(true);
1731  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1732  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1733  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1734  }
1735  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1736  if(looper_) {
1737  looper_->doEndRun(runPrincipal, es, &processContext_);
1738  }
1739  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 921 of file EventProcessor.cc.

References actReg_, beginJob(), continueAfterChildFailure_, edm::eventsetup::EventSetupRecord::doGet(), MillePedeFileConverter_cfg::e, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), input_, edm::installCustomHandler(), edm::InputSource::IsFile, edm::InputSource::IsRun, RecoTauDiscriminantConfiguration::mask, NULL, numberOfForkedChildren_, numberOfSequentialEventsPerChild_, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), possiblyContinueAfterForkChildFailure(), readFile(), schedule_, serviceToken_, setCpuAffinity_, edm::shutdown_flag, and cms::Exception::what().

921  {
922 
923  if(0 == numberOfForkedChildren_) {return true;}
924  assert(0<numberOfForkedChildren_);
925  //do what we want done in common
926  {
927  beginJob(); //make sure this was run
928  // make the services available
930 
931  InputSource::ItemType itemType;
932  itemType = input_->nextItemType();
933 
934  assert(itemType == InputSource::IsFile);
935  {
936  readFile();
937  }
938  itemType = input_->nextItemType();
939  assert(itemType == InputSource::IsRun);
940 
941  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
942  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
943  input_->runAuxiliary()->beginTime());
944  espController_->eventSetupForInstance(ts);
945  EventSetup const& es = esp_->eventSetup();
946 
947  //now get all the data available in the EventSetup
948  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
949  es.fillAvailableRecordKeys(recordKeys);
950  std::vector<eventsetup::DataKey> dataKeys;
951  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
952  itKey != itEnd;
953  ++itKey) {
954  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
955  //see if this is on our exclusion list
956  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
957  ExcludedData const* excludedData(nullptr);
958  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
959  excludedData = &(itExcludeRec->second);
960  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
961  //skip all items in this record
962  continue;
963  }
964  }
965  if(0 != recordPtr) {
966  dataKeys.clear();
967  recordPtr->fillRegisteredDataKeys(dataKeys);
968  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
969  itDataKey != itDataKeyEnd;
970  ++itDataKey) {
971  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
972  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
973  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
974  continue;
975  }
976  try {
977  recordPtr->doGet(*itDataKey);
978  } catch(cms::Exception& e) {
979  LogWarning("ForkingEventSetupPreFetching") << e.what();
980  }
981  }
982  }
983  }
984  }
985  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
986  {
987  // make the services available
989  Service<JobReport> jobReport;
990  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
991 
992  //Now actually do the forking
993  actReg_->preForkReleaseResourcesSignal_();
994  input_->doPreForkReleaseResources();
995  schedule_->preForkReleaseResources();
996  }
997  installCustomHandler(SIGCHLD, ep_sigchld);
998 
999 
1000  unsigned int childIndex = 0;
1001  unsigned int const kMaxChildren = numberOfForkedChildren_;
1002  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1003  std::vector<pid_t> childrenIds;
1004  childrenIds.reserve(kMaxChildren);
1005  std::vector<int> childrenSockets;
1006  childrenSockets.reserve(kMaxChildren);
1007  std::vector<int> childrenPipes;
1008  childrenPipes.reserve(kMaxChildren);
1009  std::vector<int> childrenSocketsCopy;
1010  childrenSocketsCopy.reserve(kMaxChildren);
1011  std::vector<int> childrenPipesCopy;
1012  childrenPipesCopy.reserve(kMaxChildren);
1013  int pipes[] {0, 0};
1014 
1015  {
1016  // make the services available
1018  Service<JobReport> jobReport;
1019  int sockets[2], fd_flags;
1020  for(; childIndex < kMaxChildren; ++childIndex) {
1021  // Create a UNIX_DGRAM socket pair
1022  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1023  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026  if (pipe(pipes)) {
1027  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1028  exit(EXIT_FAILURE);
1029  }
1030  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1031  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1032  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1033  exit(EXIT_FAILURE);
1034  }
1035  // Mark socket as non-block. Child must be careful to do select prior
1036  // to reading from socket.
1037  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1038  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1039  exit(EXIT_FAILURE);
1040  }
1041  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1042  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1043  exit(EXIT_FAILURE);
1044  }
1045  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1046  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1047  exit(EXIT_FAILURE);
1048  }
1049  // Linux man page notes there are some edge cases where reading from a
1050  // fd can block, even after a select.
1051  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1052  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1053  exit(EXIT_FAILURE);
1054  }
1055  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1056  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1057  exit(EXIT_FAILURE);
1058  }
1059 
1060  childrenPipesCopy = childrenPipes;
1061  childrenSocketsCopy = childrenSockets;
1062 
1063  pid_t value = fork();
1064  if(value == 0) {
1065  // Close the parent's side of the socket and pipe which will talk to us.
1066  close(pipes[0]);
1067  close(sockets[0]);
1068  // Close our copies of the parent's other communication pipes.
1069  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1070  close(*it);
1071  }
1072  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1073  close(*it);
1074  }
1075 
1076  // this is the child process, redirect stdout and stderr to a log file
1077  fflush(stdout);
1078  fflush(stderr);
1079  std::stringstream stout;
1080  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1081  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1082  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1083  }
1084  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1085  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1086  }
1087 
1088  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1089  if(setCpuAffinity_) {
1090  // CPU affinity is handled differently on macosx.
1091  // We disable it and print a message until someone reads:
1092  //
1093  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1094  //
1095  // and implements it.
1096 #ifdef __APPLE__
1097  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1098 #else
1099  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1100  cpu_set_t mask;
1101  CPU_ZERO(&mask);
1102  CPU_SET(childIndex, &mask);
1103  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1104  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1105  exit(-1);
1106  }
1107 #endif
1108  }
1109  break;
1110  } else {
1111  //this is the parent
1112  close(pipes[1]);
1113  close(sockets[1]);
1114  }
1115  if(value < 0) {
1116  LogError("ForkingChild") << "failed to create a child";
1117  exit(-1);
1118  }
1119  childrenIds.push_back(value);
1120  childrenSockets.push_back(sockets[0]);
1121  childrenPipes.push_back(pipes[0]);
1122  }
1123 
1124  if(childIndex < kMaxChildren) {
1125  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1126  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1127 
1128  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1129  input_->doPostForkReacquireResources(receiver);
1130  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1131  //NOTE: sources have to reset themselves by listening to the post fork message
1132  //rewindInput();
1133  return true;
1134  }
1135  jobReport->parentAfterFork(jobReportFile);
1136  }
1137 
1138  //this is the original, which is now the master for all the children
1139 
1140  //Need to wait for signals from the children or externally
1141  // To wait we must
1142  // 1) block the signals we want to wait on so we do not have a race condition
1143  // 2) check that we haven't already meet our ending criteria
1144  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1145  sigset_t blockingSigSet;
1146  sigset_t unblockingSigSet;
1147  sigset_t oldSigSet;
1148  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1149  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1150  sigaddset(&blockingSigSet, SIGCHLD);
1151  sigaddset(&blockingSigSet, SIGUSR2);
1152  sigaddset(&blockingSigSet, SIGINT);
1153  sigdelset(&unblockingSigSet, SIGCHLD);
1154  sigdelset(&unblockingSigSet, SIGUSR2);
1155  sigdelset(&unblockingSigSet, SIGINT);
1156  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1157 
1158  // If there are too many fd's (unlikely, but possible) for select, denote this
1159  // because the sender will fail.
1160  bool too_many_fds = false;
1161  if (pipes[1]+1 > FD_SETSIZE) {
1162  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1163  too_many_fds = true;
1164  }
1165 
1166  //create a thread that sends the units of work to workers
1167  // we create it after all signals were blocked so that this
1168  // thread is never interupted by a signal
1169  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1170  boost::thread senderThread(sender);
1171 
1172  if(not too_many_fds) {
1173  //NOTE: a child could have failed before we got here and even after this call
1174  // which is why the 'if' is conditional on continueAfterChildFailure_
1176  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1177  sigsuspend(&unblockingSigSet);
1179  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1180  }
1181  }
1182  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1183 
1184  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1185  if(child_failed) {
1186  LogError("ForkingStopping") << "child failed";
1187  }
1188  if(shutdown_flag) {
1189  LogSystem("ForkingStopping") << "asked to shutdown";
1190  }
1191 
1192  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1193  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1194  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1195  it != itEnd; ++it) {
1196  /* int result = */ kill(*it, SIGUSR2);
1197  }
1198  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1199  while(num_children_done != kMaxChildren) {
1200  sigsuspend(&unblockingSigSet);
1201  }
1202  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1203  }
1204  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1205  senderThread.join();
1206  if(child_failed && !continueAfterChildFailure_) {
1207  if (child_fail_signal) {
1208  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1209  } else if (child_fail_exit_status) {
1210  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1211  } else {
1212  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1213  }
1214  }
1215  if(too_many_fds) {
1216  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1217  }
1218  return false;
1219  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
edm::propagate_const< std::unique_ptr< InputSource > > input_
void possiblyContinueAfterForkChildFailure()
volatile std::atomic< bool > shutdown_flag
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: value.py:1
def pipe(cmdline, input=None)
Definition: pipe.py:5
virtual void readFile() override
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1222 of file EventProcessor.cc.

References schedule_.

1222  {
1223  return schedule_->getAllModuleDescriptions();
1224  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 694 of file EventProcessor.cc.

References AlCaHLTBitMon_ParallelJobs::p, and serviceToken_.

Referenced by ~EventProcessor().

694  {
695  return serviceToken_;
696  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1252 of file EventProcessor.cc.

References schedule_.

1252  {
1253  schedule_->getTriggerReport(rep);
1254  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
rep
Definition: cuy.py:1188
void edm::EventProcessor::handleNextEventForStreamAsync ( WaitingTask iTask,
unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 2006 of file EventProcessor.cc.

References deferredExceptionPtr_, deferredExceptionPtrIsSet_, pyrootRender::destroy(), edm::WaitingTaskHolder::doneWaiting(), h, edm::make_waiting_task(), cmsPerfStripChart::operate(), processEventAsync(), edm::SerialTaskQueueChain::push(), readNextEventForStream(), edm::SharedResourcesAcquirer::serialQueueChain(), serviceToken_, and sourceResourcesAcquirer_.

Referenced by readAndProcessEvent().

2009  {
2010  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
2011  if(iPtr) {
2012  bool expected = false;
2013  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2014  deferredExceptionPtr_ = *iPtr;
2015  {
2016  WaitingTaskHolder h(iTask);
2017  h.doneWaiting(*iPtr);
2018  }
2019  }
2020  //the stream will stop now
2021  iTask->decrement_ref_count();
2022  return;
2023  }
2024 
2025  handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
2026  });
2027 
2028  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
2030 
2031  try {
2032  if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
2033  processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
2034  } else {
2035  //the stream will stop now
2036  tbb::task::destroy(*recursionTask);
2037  iTask->decrement_ref_count();
2038  }
2039  } catch(...) {
2040  WaitingTaskHolder h(recursionTask);
2041  h.doneWaiting(std::current_exception());
2042  }
2043  });
2044  }
SharedResourcesAcquirer sourceResourcesAcquirer_
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
bool readNextEventForStream(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def destroy(e)
Definition: pyrootRender.py:13
void push(const T &iAction)
asynchronously pushes functor iAction into queue
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
SerialTaskQueueChain & serialQueueChain() const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 412 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, trackingPlots::common, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, diffTreeTool::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), and thinnedAssociationsHelper_.

Referenced by EventProcessor().

414  {
415 
416  //std::cerr << processDesc->dump() << std::endl;
417 
418  // register the empty parentage vector , once and for all
420 
421  // register the empty parameter set, once and for all.
422  ParameterSet().registerIt();
423 
424  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
425 
426  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
427  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
428  bool const hasSubProcesses = !subProcessVParameterSet.empty();
429 
430  // Now set some parameters specific to the main process.
431  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
432  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
433  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
434  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
435  //threading
436  unsigned int nThreads=1;
437  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
438  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
439  if(nThreads == 0) {
440  nThreads = 1;
441  }
442  }
443  /* TODO: when we support having each stream run in a different thread use this default
444  unsigned int nStreams =nThreads;
445  */
446  unsigned int nStreams =1;
447  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
448  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
449  if(nStreams==0) {
450  nStreams = nThreads;
451  }
452  }
453  if(nThreads >1) {
454  edm::LogInfo("ThreadStreamSetup") <<"setting # threads "<<nThreads<<"\nsetting # streams "<<nStreams;
455  }
456 
457  /*
458  bool nRunsSet = false;
459  */
460  unsigned int nConcurrentRuns =1;
461  /*
462  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
463  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
464  }
465  */
466  unsigned int nConcurrentLumis =1;
467  /*
468  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
469  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
470  } else {
471  nConcurrentLumis = nConcurrentRuns;
472  }
473  */
474  //Check that relationships between threading parameters makes sense
475  /*
476  if(nThreads<nStreams) {
477  //bad
478  }
479  if(nConcurrentRuns>nStreams) {
480  //bad
481  }
482  if(nConcurrentRuns>nConcurrentLumis) {
483  //bad
484  }
485  */
486  //forking
487  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
488  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
489  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
490  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
491  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
492  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
493  for(auto const& ps : excluded) {
494  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
495  ps.getUntrackedParameter<std::string>("label", ""));
496  }
497  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
498 
499  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
500 
501  // Now do general initialization
502  ScheduleItems items;
503 
504  //initialize the services
505  auto& serviceSets = processDesc->getServicesPSets();
506  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
507  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
508 
509  //make the services available
511 
512  if(nStreams>1) {
514  handler->willBeUsingThreads();
515  }
516 
517  // intialize miscellaneous items
518  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
519 
520  // intialize the event setup provider
521  esp_ = espController_->makeProvider(*parameterSet);
522 
523  // initialize the looper, if any
524  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
525  if(looper_) {
526  looper_->setActionTable(items.act_table_.get());
527  looper_->attachTo(*items.actReg_);
528 
529  //For now loopers make us run only 1 transition at a time
530  nStreams=1;
531  nConcurrentLumis=1;
532  nConcurrentRuns=1;
533  }
534 
535  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
536 
537  // initialize the input source
538  input_ = makeInput(*parameterSet,
539  *common,
540  items.preg(),
541  items.branchIDListHelper(),
542  items.thinnedAssociationsHelper(),
543  items.actReg_,
544  items.processConfiguration(),
546 
547  // intialize the Schedule
548  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
549 
550  // set the data members
551  act_table_ = std::move(items.act_table_);
552  actReg_ = items.actReg_;
553  preg_ = items.preg();
554  branchIDListHelper_ = items.branchIDListHelper();
555  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
556  processConfiguration_ = items.processConfiguration();
558  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
559 
560  FDEBUG(2) << parameterSet << std::endl;
561 
563  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
564  // Reusable event principal
565  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
568  }
569 
570  // fill the subprocesses, if there are any
571  subProcesses_.reserve(subProcessVParameterSet.size());
572  for(auto& subProcessPSet : subProcessVParameterSet) {
573  subProcesses_.emplace_back(subProcessPSet,
574  *parameterSet,
575  preg(),
578  SubProcessParentageHelper(),
580  *actReg_,
581  token,
584  &processContext_);
585  }
586  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
Definition: common.py:1
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:748
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 266 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by endJob().

266 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 267 of file EventProcessor.h.

References edm::get_underlying_safe().

267 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::openOutputFiles ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1523 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1523  {
1524  if (fb_.get() != nullptr) {
1525  schedule_->openOutputFiles(*fb_);
1526  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1527  }
1528  FDEBUG(1) << "\topenOutputFiles\n";
1529  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 905 of file EventProcessor.cc.

References continueAfterChildFailure_.

Referenced by forkProcess().

905  {
906  if(child_failed && continueAfterChildFailure_) {
907  if (child_fail_signal) {
908  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
909  child_fail_signal=0;
910  } else if (child_fail_exit_status) {
911  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
912  child_fail_exit_status=0;
913  } else {
914  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
915  }
916  child_failed =false;
917  }
918  }
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 260 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by beginJob(), init(), readAndMergeLumi(), readAndMergeRun(), readFile(), readLuminosityBlock(), readRun(), and runToCompletion().

260 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe().

261 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1585 of file EventProcessor.cc.

References esp_, FDEBUG, edm::propagate_const< T >::get(), and looper_.

1585  {
1586  looper_->prepareForNextLoop(esp_.get());
1587  FDEBUG(1) << "\tprepareForNextLoop\n";
1588  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline
void edm::EventProcessor::processEventAsync ( WaitingTaskHolder  iHolder,
unsigned int  iStreamIndex 
)
private

Definition at line 2103 of file EventProcessor.cc.

References edm::WaitingTaskHolder::doneWaiting(), esp_, ev, edm::PrincipalCache::eventPrincipal(), FDEBUG, edm::Service< T >::isAvailable(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), principalCache_, processEventWithLooper(), groupFilesInBlocks::reverse, schedule_, serviceToken_, and subProcesses_.

Referenced by handleNextEventForStreamAsync(), and readAndProcessEvent().

2104  {
2105  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2106  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2108  if(rng.isAvailable()) {
2109  Event ev(*pep, ModuleDescription(), nullptr);
2110  rng->postEventRead(ev);
2111  }
2112  assert(pep->luminosityBlockPrincipalPtrValid());
2113  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2114  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2115 
2116  WaitingTaskHolder finalizeEventTask( make_waiting_task(
2117  tbb::task::allocate_root(),
2118  [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
2119  {
2121 
2122  //NOTE: If we have a looper we only have one Stream
2123  if(looper_) {
2124  processEventWithLooper(*pep);
2125  }
2126 
2127  FDEBUG(1) << "\tprocessEvent\n";
2128  pep->clearEventPrincipal();
2129  if(iPtr) {
2130  iHolder.doneWaiting(*iPtr);
2131  } else {
2132  iHolder.doneWaiting(std::exception_ptr());
2133  }
2134  }
2135  )
2136  );
2137  WaitingTaskHolder afterProcessTask;
2138  if(subProcesses_.empty()) {
2139  afterProcessTask = std::move(finalizeEventTask);
2140  } else {
2141  //Need to run SubProcesses after schedule has finished
2142  // with the event
2143  afterProcessTask = WaitingTaskHolder(
2144  make_waiting_task(tbb::task::allocate_root(),
2145  [this,pep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable
2146  {
2147  if(not iPtr) {
2149 
2150  //when run with 1 thread, we want to the order to be what
2151  // it was before. This requires reversing the order since
2152  // tasks are run last one in first one out
2153  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
2154  subProcess.doEventAsync(finalizeEventTask,*pep);
2155  }
2156  } else {
2157  finalizeEventTask.doneWaiting(*iPtr);
2158  }
2159  })
2160  );
2161  }
2162 
2163  schedule_->processOneEventAsync(std::move(afterProcessTask),
2164  iStreamIndex,*pep, esp_->eventSetup());
2165 
2166  }
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
void processEventWithLooper(EventPrincipal &)
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::processEventWithLooper ( EventPrincipal iPrincipal)
private

Definition at line 2168 of file EventProcessor.cc.

References esp_, input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, processContext_, edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), mps_update::status, edm::EventPrincipal::streamID(), and summarizeEdmComparisonLogfiles::succeeded.

Referenced by processEventAsync().

2168  {
2169  bool randomAccess = input_->randomAccess();
2170  ProcessingController::ForwardState forwardState = input_->forwardState();
2171  ProcessingController::ReverseState reverseState = input_->reverseState();
2172  ProcessingController pc(forwardState, reverseState, randomAccess);
2173 
2175  do {
2176 
2177  StreamContext streamContext(iPrincipal.streamID(), &processContext_);
2178  status = looper_->doDuringLoop(iPrincipal, esp_->eventSetup(), pc, &streamContext);
2179 
2180  bool succeeded = true;
2181  if(randomAccess) {
2182  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2183  input_->skipEvents(-2);
2184  }
2185  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2186  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2187  }
2188  }
2189  pc.setLastOperationSucceeded(succeeded);
2190  } while(!pc.lastOperationSucceeded());
2191  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2192  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
int edm::EventProcessor::readAndMergeLumi ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1921 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::lumiPrincipalPtr(), edm::PrincipalCache::merge(), preg(), and principalCache_.

1921  {
1922  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1923  {
1924  SendSourceTerminationSignalIfException sentry(actReg_.get());
1925  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1926  sentry.completedSuccessfully();
1927  }
1928  return input_->luminosityBlock();
1929  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1884 of file EventProcessor.cc.

References actReg_, input_, edm::PrincipalCache::merge(), preg(), principalCache_, and edm::PrincipalCache::runPrincipalPtr().

1884  {
1885  principalCache_.merge(input_->runAuxiliary(), preg());
1886  auto runPrincipal =principalCache_.runPrincipalPtr();
1887  {
1888  SendSourceTerminationSignalIfException sentry(actReg_.get());
1889  input_->readAndMergeRun(*runPrincipal);
1890  sentry.completedSuccessfully();
1891  }
1892  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1893  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1894  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2046 of file EventProcessor.cc.

References asyncStopRequestedWhileProcessingEvents_, deferredExceptionPtr_, deferredExceptionPtrIsSet_, firstEventInBlock_, handleNextEventForStreamAsync(), edm::InputSource::IsEvent, edm::make_empty_waiting_task(), edm::make_waiting_task(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), preallocations_, processEventAsync(), and readEvent().

2046  {
2047  if(numberOfForkedChildren_>0) {
2048  //Have to do something special for forking since
2049  // after each event the system may have to skip
2050  // some transitions. This is handled in runToCompletion
2051  readEvent(0);
2052  auto eventLoopWaitTask = make_empty_waiting_task();
2053  eventLoopWaitTask->increment_ref_count();
2054  processEventAsync(WaitingTaskHolder(eventLoopWaitTask.get()),0);
2055  eventLoopWaitTask->wait_for_all();
2056  return;
2057  }
2060 
2061  std::atomic<bool> finishedProcessingEvents{false};
2062  auto finishedProcessingEventsPtr = &finishedProcessingEvents;
2063 
2064  //The state machine already found the event so
2065  // we have to avoid looking again
2066  firstEventInBlock_ = true;
2067 
2068  //To wait, the ref count has to b 1+#streams
2069  auto eventLoopWaitTask = make_empty_waiting_task();
2070  auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
2071  eventLoopWaitTask->increment_ref_count();
2072 
2073  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2074  unsigned int iStreamIndex = 0;
2075  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2076  eventLoopWaitTask->increment_ref_count();
2077  tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2078  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2079  }) );
2080  }
2081  eventLoopWaitTask->increment_ref_count();
2082  eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
2083  handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
2084  }));
2085 
2086  //One of the processing threads saw an exception
2088  std::rethrow_exception(deferredExceptionPtr_);
2089  }
2090  }
void readEvent(unsigned int iStreamIndex)
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
InputSource::ItemType nextItemTypeFromProcessingEvents_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
void handleNextEventForStreamAsync(WaitingTask *iTask, unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2091 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::eventPrincipal(), FDEBUG, input_, principalCache_, and processContext_.

Referenced by readAndProcessEvent(), and readNextEventForStream().

2091  {
2092  //TODO this will have to become per stream
2093  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2094  StreamContext streamContext(event.streamID(), &processContext_);
2095 
2096  SendSourceTerminationSignalIfException sentry(actReg_.get());
2097  input_->readEvent(event, streamContext);
2098  sentry.completedSuccessfully();
2099 
2100  FDEBUG(1) << "\treadEvent\n";
2101  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
Definition: event.py:1
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1496 of file EventProcessor.cc.

References actReg_, edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), fb_, FDEBUG, input_, numberOfForkedChildren_, edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), or, edm::FileBlock::ParallelProcesses, preallocations_, preg(), preg_, principalCache_, and findQualityFiles::size.

Referenced by forkProcess(), Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1496  {
1497  FDEBUG(1) << " \treadFile\n";
1498  size_t size = preg_->size();
1499  SendSourceTerminationSignalIfException sentry(actReg_.get());
1500 
1501  fb_ = input_->readFile();
1502  if(size < preg_->size()) {
1504  }
1506  if((numberOfForkedChildren_ > 0) or
1509  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1510  }
1511  sentry.completedSuccessfully();
1512  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1896 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasLumiPrincipal(), edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, processConfiguration_, and edm::PrincipalCache::runPrincipalPtr().

1896  {
1899  << "EventProcessor::readRun\n"
1900  << "Illegal attempt to insert lumi into cache\n"
1901  << "Contact a Framework Developer\n";
1902  }
1905  << "EventProcessor::readRun\n"
1906  << "Illegal attempt to insert lumi into cache\n"
1907  << "Run is invalid\n"
1908  << "Contact a Framework Developer\n";
1909  }
1910  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1911  {
1912  SendSourceTerminationSignalIfException sentry(actReg_.get());
1913  input_->readLuminosityBlock(*lbp, *historyAppender_);
1914  sentry.completedSuccessfully();
1915  }
1916  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1917  principalCache_.insert(lbp);
1918  return input_->luminosityBlock();
1919  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::readNextEventForStream ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1955 of file EventProcessor.cc.

References actReg_, asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, checkForAsyncStopRequest(), deferredExceptionPtr_, deferredExceptionPtrIsSet_, edm::ExternalSignal, firstEventInBlock_, input_, edm::InputSource::IsEvent, nextItemTypeFromProcessingEvents_, cmsPerfStripChart::operate(), readEvent(), serviceToken_, shouldWeStop(), and sourceMutex_.

Referenced by handleNextEventForStreamAsync().

1956  {
1957  if(shouldWeStop()) {
1958  return false;
1959  }
1960 
1961  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1962  return false;
1963  }
1964 
1965  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1966  return false;
1967  }
1968 
1970  try {
1971  //need to use lock in addition to the serial task queue because
1972  // of delayed provenance reading and reading data in response to
1973  // edm::Refs etc
1974  std::lock_guard<std::recursive_mutex> guard(*(sourceMutex_.get()));
1975  if(not firstEventInBlock_) {
1976  //The state machine already called input_->nextItemType
1977  // and found an event. We can't call input_->nextItemType
1978  // again since it would move to the next transition
1979  InputSource::ItemType itemType = input_->nextItemType();
1980  if (InputSource::IsEvent !=itemType) {
1982  finishedProcessingEvents->store(true,std::memory_order_release);
1983  //std::cerr<<"next item type "<<itemType<<"\n";
1984  return false;
1985  }
1987  //std::cerr<<"task told to async stop\n";
1988  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1989  return false;
1990  }
1991  } else {
1992  firstEventInBlock_ = false;
1993  }
1994  readEvent(iStreamIndex);
1995  } catch (...) {
1996  bool expected =false;
1997  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1998  deferredExceptionPtr_ = std::current_exception();
1999 
2000  }
2001  return false;
2002  }
2003  return true;
2004  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
virtual bool shouldWeStop() const override
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< std::recursive_mutex > sourceMutex_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def operate(timelog, memlog, json_f, num)
statemachine::Run edm::EventProcessor::readRun ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1866 of file EventProcessor.cc.

References actReg_, Exception, edm::PrincipalCache::hasRunPrincipal(), historyAppender_, input_, edm::PrincipalCache::insert(), edm::errors::LogicError, preg(), principalCache_, and processConfiguration_.

1866  {
1869  << "EventProcessor::readRun\n"
1870  << "Illegal attempt to insert run into cache\n"
1871  << "Contact a Framework Developer\n";
1872  }
1873  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1874  {
1875  SendSourceTerminationSignalIfException sentry(actReg_.get());
1876  input_->readRun(*rp, *historyAppender_);
1877  sentry.completedSuccessfully();
1878  }
1879  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1880  principalCache_.insert(rp);
1881  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1882  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1548 of file EventProcessor.cc.

References fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1548  {
1549  if (fb_.get() != nullptr) {
1550  schedule_->respondToCloseInputFile(*fb_);
1551  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1552  }
1553  FDEBUG(1) << "\trespondToCloseInputFile\n";
1554  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1539 of file EventProcessor.cc.

References branchIDListHelper_, fb_, FDEBUG, edm::for_all(), schedule_, and subProcesses_.

1539  {
1540  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1541  if (fb_.get() != nullptr) {
1542  schedule_->respondToOpenInputFile(*fb_);
1543  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1544  }
1545  FDEBUG(1) << "\trespondToOpenInputFile\n";
1546  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1579 of file EventProcessor.cc.

References FDEBUG, and input_.

1579  {
1580  input_->repeat();
1581  input_->rewind();
1582  FDEBUG(1) << "\trewind\n";
1583  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 337 of file EventProcessor.h.

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

337  {
338  return runToCompletion();
339  }
virtual StatusCode runToCompletion() override
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1308 of file EventProcessor.cc.

References actReg_, cms::Exception::addAdditionalInfo(), edm::PrincipalCache::adjustEventsToNewProductRegistry(), edm::PrincipalCache::adjustIndexesAfterProductRegistryAddition(), alreadyHandlingException_, cms::Exception::alreadyPrinted(), asyncStopRequestedWhileProcessingEvents_, asyncStopStatusCodeFromProcessingEvents_, beginJob(), checkForAsyncStopRequest(), createStateMachine(), MillePedeFileConverter_cfg::e, edm::IEventProcessor::epSuccess, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, edm::ExternalSignal, FDEBUG, forceLooperToEnd_, input_, edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::InputSource::IsSynchronize, edm::errors::LogicError, eostools::move(), nextItemTypeFromProcessingEvents_, numberOfForkedChildren_, cmsPerfStripChart::operate(), preg(), preg_, principalCache_, runEdmFileComparison::returnCode, serviceToken_, findQualityFiles::size, stateMachineWasInErrorState_, terminateMachine(), and edm::convertException::wrap().

Referenced by PythonEventProcessor::run().

1308  {
1309 
1312  std::unique_ptr<statemachine::Machine> machine;
1313  {
1314  beginJob(); //make sure this was called
1315 
1316  //StatusCode returnCode = epSuccess;
1318 
1319  // make the services available
1321 
1322  machine = createStateMachine();
1325  try {
1326  convertException::wrap([&]() {
1327 
1328  InputSource::ItemType itemType;
1329 
1330  while(true) {
1331 
1332  bool more = true;
1333  if(numberOfForkedChildren_ > 0) {
1334  size_t size = preg_->size();
1335  {
1336  SendSourceTerminationSignalIfException sentry(actReg_.get());
1337  more = input_->skipForForking();
1338  sentry.completedSuccessfully();
1339  }
1340  if(more) {
1341  if(size < preg_->size()) {
1343  }
1345  }
1346  }
1347  {
1348  SendSourceTerminationSignalIfException sentry(actReg_.get());
1349  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1350  sentry.completedSuccessfully();
1351  }
1352 
1353  FDEBUG(1) << "itemType = " << itemType << "\n";
1354 
1355  if(checkForAsyncStopRequest(returnCode)) {
1356  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1357  forceLooperToEnd_ = true;
1358  machine->process_event(statemachine::Stop());
1359  forceLooperToEnd_ = false;
1360  break;
1361  }
1362 
1363  if(itemType == InputSource::IsEvent) {
1364  machine->process_event(statemachine::Event());
1366  forceLooperToEnd_ = true;
1367  machine->process_event(statemachine::Stop());
1368  forceLooperToEnd_ = false;
1370  break;
1371  }
1373  }
1374 
1375  if(itemType == InputSource::IsEvent) {
1376  }
1377  else if(itemType == InputSource::IsStop) {
1378  machine->process_event(statemachine::Stop());
1379  }
1380  else if(itemType == InputSource::IsFile) {
1381  machine->process_event(statemachine::File());
1382  }
1383  else if(itemType == InputSource::IsRun) {
1384  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1385  }
1386  else if(itemType == InputSource::IsLumi) {
1387  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1388  }
1389  else if(itemType == InputSource::IsSynchronize) {
1390  //For now, we don't have to do anything
1391  }
1392  // This should be impossible
1393  else {
1395  << "Unknown next item type passed to EventProcessor\n"
1396  << "Please report this error to the Framework group\n";
1397  }
1398  if(machine->terminated()) {
1399  break;
1400  }
1401  } // End of loop over state machine events
1402  }); // convertException::wrap
1403  } // Try block
1404  // Some comments on exception handling related to the boost state machine:
1405  //
1406  // Some states used in the machine are special because they
1407  // perform actions while the machine is being terminated, actions
1408  // such as close files, call endRun, call endLumi etc ... Each of these
1409  // states has two functions that perform these actions. The functions
1410  // are almost identical. The major difference is that one version
1411  // catches all exceptions and the other lets exceptions pass through.
1412  // The destructor catches them and the other function named "exit" lets
1413  // them pass through. On a normal termination, boost will always call
1414  // "exit" and then the state destructor. In our state classes, the
1415  // the destructors do nothing if the exit function already took
1416  // care of things. Here's the interesting part. When boost is
1417  // handling an exception the "exit" function is not called (a boost
1418  // feature).
1419  //
1420  // If an exception occurs while the boost machine is in control
1421  // (which usually means inside a process_event call), then
1422  // the boost state machine destroys its states and "terminates" itself.
1423  // This already done before we hit the catch blocks below. In this case
1424  // the call to terminateMachine below only destroys an already
1425  // terminated state machine. Because exit is not called, the state destructors
1426  // handle cleaning up lumis, runs, and files. The destructors swallow
1427  // all exceptions and only pass through the exceptions messages, which
1428  // are tacked onto the original exception below.
1429  //
1430  // If an exception occurs when the boost state machine is not
1431  // in control (outside the process_event functions), then boost
1432  // cannot destroy its own states. The terminateMachine function
1433  // below takes care of that. The flag "alreadyHandlingException"
1434  // is set true so that the state exit functions do nothing (and
1435  // cannot throw more exceptions while handling the first). Then the
1436  // state destructors take care of this because exit did nothing.
1437  //
1438  // In both cases above, the EventProcessor::endOfLoop function is
1439  // not called because it can throw exceptions.
1440  //
1441  // One tricky aspect of the state machine is that things that can
1442  // throw should not be invoked by the state machine while another
1443  // exception is being handled.
1444  // Another tricky aspect is that it appears to be important to
1445  // terminate the state machine before invoking its destructor.
1446  // We've seen crashes that are not understood when that is not
1447  // done. Maintainers of this code should be careful about this.
1448 
1449  catch (cms::Exception & e) {
1451  terminateMachine(std::move(machine));
1452  alreadyHandlingException_ = false;
1453  if (!exceptionMessageLumis_.empty()) {
1455  if (e.alreadyPrinted()) {
1456  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1457  }
1458  }
1459  if (!exceptionMessageRuns_.empty()) {
1461  if (e.alreadyPrinted()) {
1462  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1463  }
1464  }
1465  if (!exceptionMessageFiles_.empty()) {
1467  if (e.alreadyPrinted()) {
1468  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1469  }
1470  }
1471  throw;
1472  }
1473 
1474  if(machine->terminated()) {
1475  FDEBUG(1) << "The state machine reports it has been terminated\n";
1476  machine.reset();
1477  }
1478 
1480  throw cms::Exception("BadState")
1481  << "The boost state machine in the EventProcessor exited after\n"
1482  << "entering the Error state.\n";
1483  }
1484 
1485  }
1486  if(machine.get() != nullptr) {
1487  terminateMachine(std::move(machine));
1489  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1490  << "Please report this error to the Framework group\n";
1491  }
1492 
1493  return returnCode;
1494  }
size
Write out results.
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
def move(src, dest)
Definition: eostools.py:510
PrincipalCache principalCache_
def operate(timelog, memlog, json_f, num)
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2208 of file EventProcessor.cc.

References exceptionMessageFiles_, and python.rootplot.argparse::message.

2208  {
2210  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2216 of file EventProcessor.cc.

References exceptionMessageLumis_, and python.rootplot.argparse::message.

2216  {
2218  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2212 of file EventProcessor.cc.

References exceptionMessageRuns_, and python.rootplot.argparse::message.

2212  {
2214  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1590 of file EventProcessor.cc.

References FDEBUG, schedule_, and subProcesses_.

1590  {
1591  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1592  if(!subProcesses_.empty()) {
1593  for(auto const& subProcess : subProcesses_) {
1594  if(subProcess.shouldWeCloseOutput()) {
1595  return true;
1596  }
1597  }
1598  return false;
1599  }
1600  return schedule_->shouldWeCloseOutput();
1601  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const
overridevirtual

Implements edm::IEventProcessor.

Definition at line 2194 of file EventProcessor.cc.

References FDEBUG, schedule_, shouldWeStop_, and subProcesses_.

Referenced by readNextEventForStream().

2194  {
2195  FDEBUG(1) << "\tshouldWeStop\n";
2196  if(shouldWeStop_) return true;
2197  if(!subProcesses_.empty()) {
2198  for(auto const& subProcess : subProcesses_) {
2199  if(subProcess.terminate()) {
2200  return true;
2201  }
2202  }
2203  return false;
2204  }
2205  return schedule_->terminate();
2206  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1556 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

1556  {
1557  shouldWeStop_ = false;
1558  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1559  // until after we've called beginOfJob
1560  if(looper_ && looperBeginJobRun_) {
1561  looper_->doStartingNewLoop();
1562  }
1563  FDEBUG(1) << "\tstartingNewLoop\n";
1564  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::terminateMachine ( std::unique_ptr< statemachine::Machine iMachine)
private

Definition at line 2224 of file EventProcessor.cc.

References FDEBUG, and forceLooperToEnd_.

Referenced by runToCompletion().

2224  {
2225  if(iMachine.get() != nullptr) {
2226  if(!iMachine->terminated()) {
2227  forceLooperToEnd_ = true;
2228  iMachine->process_event(statemachine::Stop());
2229  forceLooperToEnd_ = false;
2230  }
2231  else {
2232  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2233  }
2234  if(iMachine->terminated()) {
2235  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2236  }
2237  }
2238  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 264 of file EventProcessor.h.

References edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 265 of file EventProcessor.h.

References edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1227 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEvents().

1227  {
1228  return schedule_->totalEvents();
1229  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1237 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsFailed().

1237  {
1238  return schedule_->totalEventsFailed();
1239  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1232 of file EventProcessor.cc.

References schedule_.

Referenced by PythonEventProcessor::totalEventsPassed().

1232  {
1233  return schedule_->totalEventsPassed();
1234  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1943 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), edm::PrincipalCache::lumiPrincipal(), principalCache_, processContext_, schedule_, and subProcesses_.

1943  {
1945  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1946  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1947  }
ProcessContext processContext_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
overridevirtual

Implements edm::IEventProcessor.

Definition at line 1931 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), principalCache_, processContext_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), schedule_, and subProcesses_.

1931  {
1932  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1933  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1934  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1935  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 309 of file EventProcessor.h.

Referenced by alreadyHandlingException(), and runToCompletion().

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 321 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 323 of file EventProcessor.h.

Referenced by readNextEventForStream(), and runToCompletion().

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 301 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init(), and respondToOpenInputFile().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 317 of file EventProcessor.h.

Referenced by forkProcess(), init(), and possiblyContinueAfterForkChildFailure().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private
std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private
std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 305 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 281 of file EventProcessor.h.

Referenced by beginLumi(), beginRun(), endLumi(), endRun(), forkProcess(), init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 328 of file EventProcessor.h.

Referenced by forkProcess(), and init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 306 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 308 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 307 of file EventProcessor.h.

Referenced by runToCompletion(), and setExceptionMessageRuns().

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private
std::string edm::EventProcessor::fileMode_
private

Definition at line 304 of file EventProcessor.h.

Referenced by createStateMachine(), and init().

bool edm::EventProcessor::firstEventInBlock_ =true
private

Definition at line 324 of file EventProcessor.h.

Referenced by readAndProcessEvent(), and readNextEventForStream().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 312 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 310 of file EventProcessor.h.

Referenced by endOfLoop(), runToCompletion(), and terminateMachine().

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init(), readLuminosityBlock(), and readRun().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private
edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 311 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 322 of file EventProcessor.h.

Referenced by readAndProcessEvent(), readNextEventForStream(), and runToCompletion().

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 314 of file EventProcessor.h.

Referenced by forkProcess(), init(), readAndProcessEvent(), readFile(), and runToCompletion().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 315 of file EventProcessor.h.

Referenced by forkProcess(), and init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 286 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private
edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 276 of file EventProcessor.h.

Referenced by beginJob(), endOfLoop(), init(), readFile(), and runToCompletion().

PrincipalCache edm::EventProcessor::principalCache_
private
bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 284 of file EventProcessor.h.

Referenced by beginJob(), init(), readLuminosityBlock(), and readRun().

ProcessContext edm::EventProcessor::processContext_
private
edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private
bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 316 of file EventProcessor.h.

Referenced by forkProcess(), and init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 302 of file EventProcessor.h.

Referenced by processEventWithLooper(), shouldWeStop(), and startingNewLoop().

std::shared_ptr<std::recursive_mutex> edm::EventProcessor::sourceMutex_
private

Definition at line 299 of file EventProcessor.h.

Referenced by readNextEventForStream().

SharedResourcesAcquirer edm::EventProcessor::sourceResourcesAcquirer_
private

Definition at line 298 of file EventProcessor.h.

Referenced by handleNextEventForStreamAsync().

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 303 of file EventProcessor.h.

Referenced by doErrorStuff(), and runToCompletion().

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init().