test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual int readLuminosityBlock ()
 
virtual statemachine::Run readRun ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToOpenInputFile ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
virtual StatusCode runToCompletion ()
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
virtual void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
std::unique_ptr
< statemachine::Machine
createStateMachine ()
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase
const > 
looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
void possiblyContinueAfterForkChildFailure ()
 
std::shared_ptr
< ProductRegistry const > 
preg () const
 
std::shared_ptr
< ProductRegistry > & 
preg ()
 
void processEvent (unsigned int iStreamIndex)
 
void processEventsForStreamAsync (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void readEvent (unsigned int iStreamIndex)
 
void setupSignal ()
 
void terminateMachine (std::unique_ptr< statemachine::Machine >)
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const
< std::shared_ptr
< BranchIDListHelper > > 
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
edm::propagate_const
< std::shared_ptr
< eventsetup::EventSetupProvider > > 
esp_
 
edm::propagate_const
< std::unique_ptr
< eventsetup::EventSetupsController > > 
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const
< std::unique_ptr< FileBlock > > 
fb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const
< std::unique_ptr
< HistoryAppender > > 
historyAppender_
 
edm::propagate_const
< std::unique_ptr< InputSource > > 
input_
 
edm::propagate_const
< std::shared_ptr
< EDLooperBase > > 
looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
std::mutex nextTransitionMutex_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const
< std::shared_ptr
< ProductRegistry > > 
preg_
 
PrincipalCache principalCache_
 
bool printDependencies_ = false
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const
< std::unique_ptr< Schedule > > 
schedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
bool stateMachineWasInErrorState_
 
std::vector< SubProcesssubProcesses_
 
edm::propagate_const
< std::shared_ptr
< ThinnedAssociationsHelper > > 
thinnedAssociationsHelper_
 

Friends

class StreamProcessingTask
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 61 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 314 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 315 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 219 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

223  :
224  actReg_(),
225  preg_(),
227  serviceToken_(),
228  input_(),
229  espController_(new eventsetup::EventSetupsController),
230  esp_(),
231  act_table_(),
233  schedule_(),
234  subProcesses_(),
235  historyAppender_(new HistoryAppender),
236  fb_(),
237  looper_(),
239  principalCache_(),
240  beginJobCalled_(false),
241  shouldWeStop_(false),
243  fileMode_(),
249  forceLooperToEnd_(false),
250  looperBeginJobRun_(false),
254  setCpuAffinity_(false),
256  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
257  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
258  processDesc->addServices(defaultServices, forcedServices);
259  init(processDesc, iToken, iLegacy);
260  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 262 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

264  :
265  actReg_(),
266  preg_(),
268  serviceToken_(),
269  input_(),
270  espController_(new eventsetup::EventSetupsController),
271  esp_(),
272  act_table_(),
274  schedule_(),
275  subProcesses_(),
276  historyAppender_(new HistoryAppender),
277  fb_(),
278  looper_(),
280  principalCache_(),
281  beginJobCalled_(false),
282  shouldWeStop_(false),
284  fileMode_(),
290  forceLooperToEnd_(false),
291  looperBeginJobRun_(false),
295  setCpuAffinity_(false),
299  {
300  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
301  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
302  processDesc->addServices(defaultServices, forcedServices);
304  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 306 of file EventProcessor.cc.

References init().

308  :
309  actReg_(),
310  preg_(),
312  serviceToken_(),
313  input_(),
314  espController_(new eventsetup::EventSetupsController),
315  esp_(),
316  act_table_(),
318  schedule_(),
319  subProcesses_(),
320  historyAppender_(new HistoryAppender),
321  fb_(),
322  looper_(),
324  principalCache_(),
325  beginJobCalled_(false),
326  shouldWeStop_(false),
328  fileMode_(),
334  forceLooperToEnd_(false),
335  looperBeginJobRun_(false),
339  setCpuAffinity_(false),
343  {
344  init(processDesc, token, legacy);
345  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 348 of file EventProcessor.cc.

References mps_alisetup::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

348  :
349  actReg_(),
350  preg_(),
352  serviceToken_(),
353  input_(),
354  espController_(new eventsetup::EventSetupsController),
355  esp_(),
356  act_table_(),
358  schedule_(),
359  subProcesses_(),
360  historyAppender_(new HistoryAppender),
361  fb_(),
362  looper_(),
364  principalCache_(),
365  beginJobCalled_(false),
366  shouldWeStop_(false),
368  fileMode_(),
374  forceLooperToEnd_(false),
375  looperBeginJobRun_(false),
379  setCpuAffinity_(false),
383  {
384  if(isPython) {
385  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
386  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
388  }
389  else {
390  auto processDesc = std::make_shared<ProcessDesc>(config);
392  }
393  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 569 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and unpackBuffers-CaloStage2::token.

569  {
570  // Make the services available while everything is being deleted.
572  ServiceRegistry::Operate op(token);
573 
574  // manually destroy all these thing that may need the services around
575  // propagate_const<T> has no reset() function
576  espController_ = nullptr;
577  esp_ = nullptr;
578  schedule_ = nullptr;
579  input_ = nullptr;
580  looper_ = nullptr;
581  actReg_ = nullptr;
582 
585  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2081 of file EventProcessor.cc.

2081  {
2083  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 588 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, edm::checkForModuleDependencyCorrectness(), edm::for_all(), i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, printDependencies_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

588  {
589  if(beginJobCalled_) return;
590  beginJobCalled_=true;
591  bk::beginJob();
592 
593  // StateSentry toerror(this); // should we add this ?
594  //make the services available
596 
597  service::SystemBounds bounds(preallocations_.numberOfStreams(),
601  actReg_->preallocateSignal_(bounds);
603 
604  //NOTE: this may throw
606  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
607 
608  //NOTE: This implementation assumes 'Job' means one call
609  // the EventProcessor::run
610  // If it really means once per 'application' then this code will
611  // have to be changed.
612  // Also have to deal with case where have 'run' then new Module
613  // added and do 'run'
614  // again. In that case the newly added Module needs its 'beginJob'
615  // to be called.
616 
617  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
618  // For now we delay calling beginOfJob until first beginOfRun
619  //if(looper_) {
620  // looper_->beginOfJob(es);
621  //}
622  try {
623  convertException::wrap([&]() {
624  input_->doBeginJob();
625  });
626  }
627  catch(cms::Exception& ex) {
628  ex.addContext("Calling beginJob for the source");
629  throw;
630  }
631  schedule_->beginJob(*preg_);
632  // toerror.succeeded(); // should we add this?
633  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
634  actReg_->postBeginJobSignal_();
635 
636  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
637  schedule_->beginStream(i);
638  for_all(subProcesses_, [i](auto& subProcess){ subProcess.doBeginStream(i); });
639  }
640  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1684 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, edm::for_all(), i, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), rng, and edm::LuminosityBlockPrincipal::run().

1684  {
1685  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1686  {
1687  SendSourceTerminationSignalIfException sentry(actReg_.get());
1688 
1689  input_->doBeginLumi(lumiPrincipal, &processContext_);
1690  sentry.completedSuccessfully();
1691  }
1692 
1694  if(rng.isAvailable()) {
1695  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1696  rng->preBeginLumi(lb);
1697  }
1698 
1699  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1700  // lumi blocks know their start and end times why not also start and end events?
1701  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1702  {
1703  SendSourceTerminationSignalIfException sentry(actReg_.get());
1704  espController_->eventSetupForInstance(ts);
1705  sentry.completedSuccessfully();
1706  }
1707  EventSetup const& es = esp_->eventSetup();
1708  {
1709  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1710  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1711  for_all(subProcesses_, [&lumiPrincipal, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lumiPrincipal, ts); });
1712  }
1713  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1714  if(looper_) {
1715  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1716  }
1717  {
1718  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1719  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1720  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1721  for_all(subProcesses_, [i, &lumiPrincipal, &ts](auto& subProcess){ subProcess.doStreamBeginLuminosityBlock(i,lumiPrincipal, ts); });
1722  }
1723  }
1724  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1725  if(looper_) {
1726  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1727  }
1728  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::Service< edm::RandomNumberGenerator > rng
edm::propagate_const< std::unique_ptr< InputSource > > input_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1594 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, edm::for_all(), i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1594  {
1595  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1596  {
1597  SendSourceTerminationSignalIfException sentry(actReg_.get());
1598 
1599  input_->doBeginRun(runPrincipal, &processContext_);
1600  sentry.completedSuccessfully();
1601  }
1602 
1603  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1604  runPrincipal.beginTime());
1606  espController_->forceCacheClear();
1607  }
1608  {
1609  SendSourceTerminationSignalIfException sentry(actReg_.get());
1610  espController_->eventSetupForInstance(ts);
1611  sentry.completedSuccessfully();
1612  }
1613  EventSetup const& es = esp_->eventSetup();
1614  if(looper_ && looperBeginJobRun_== false) {
1615  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1616  looper_->beginOfJob(es);
1617  looperBeginJobRun_ = true;
1618  looper_->doStartingNewLoop();
1619  }
1620  {
1621  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1622  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1623  for_all(subProcesses_, [&runPrincipal, &ts](auto& subProcess){ subProcess.doBeginRun(runPrincipal, ts); });
1624  }
1625  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1626  if(looper_) {
1627  looper_->doBeginRun(runPrincipal, es, &processContext_);
1628  }
1629  {
1630  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1631  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1632  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1633  for_all(subProcesses_, [i, &runPrincipal, &ts](auto& subProcess){ subProcess.doStreamBeginRun(i, runPrincipal, ts); });
1634  }
1635  }
1636  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1637  if(looper_) {
1638  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1639  }
1640  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 252 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 253 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1275 of file EventProcessor.cc.

References edm::shutdown_flag.

1275  {
1276  bool returnValue = false;
1277 
1278  // Look for a shutdown signal
1279  if(shutdown_flag.load(std::memory_order_acquire)) {
1280  returnValue = true;
1281  returnCode = epSignal;
1282  }
1283  return returnValue;
1284  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1237 of file EventProcessor.cc.

1237  {
1238  schedule_->clearCounters();
1239  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1494 of file EventProcessor.cc.

References FDEBUG.

1494  {
1495  if (fb_.get() != nullptr) {
1496  SendSourceTerminationSignalIfException sentry(actReg_.get());
1497  input_->closeFile(fb_.get(), cleaningUpAfterException);
1498  sentry.completedSuccessfully();
1499  }
1500  FDEBUG(1) << "\tcloseInputFile\n";
1501  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1511 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1511  {
1512  if (fb_.get() != nullptr) {
1513  schedule_->closeOutputFiles();
1514  for_all(subProcesses_, [](auto& subProcess){ subProcess.closeOutputFiles(); });
1515  }
1516  FDEBUG(1) << "\tcloseOutputFiles\n";
1517  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::unique_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1243 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1243  {
1245  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1246  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1247  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1248  else {
1249  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1250  << fileMode_ << ".\n"
1251  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1252  }
1253 
1254  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1255  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1256  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1257  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1258  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1259  else {
1260  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1261  << emptyRunLumiMode_ << ".\n"
1262  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1263  }
1264 
1265  auto machine = std::make_unique<statemachine::Machine>(
1266  this,
1267  fileMode,
1268  emptyRunLumiMode);
1269 
1270  machine->initiate();
1271  return machine;
1272  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1855 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1855  {
1857  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.deleteLumiFromCache(phid, run, lumi); });
1858  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1859  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1843 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1843  {
1844  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1845  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber()); });
1846  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1847  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1583 of file EventProcessor.cc.

References FDEBUG.

1583  {
1584  FDEBUG(1) << "\tdoErrorStuff\n";
1585  LogError("StateMachine")
1586  << "The EventProcessor state machine encountered an unexpected event\n"
1587  << "and went to the error state\n"
1588  << "Will attempt to terminate processing normally\n"
1589  << "(IF using the looper the next loop will be attempted)\n"
1590  << "This likely indicates a bug in an input module or corrupted input or both\n";
1592  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1222 of file EventProcessor.cc.

1222  {
1223  schedule_->enableEndPaths(active);
1224  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 643 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

643  {
644  // Collects exceptions, so we don't throw before all operations are performed.
645  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
646 
647  //make the services available
649 
650  //NOTE: this really should go elsewhere in the future
651  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
652  c.call([this,i](){this->schedule_->endStream(i);});
653  for(auto& subProcess : subProcesses_) {
654  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
655  }
656  }
657  auto actReg = actReg_.get();
658  c.call([actReg](){actReg->preEndJobSignal_();});
659  schedule_->endJob(c);
660  for(auto& subProcess : subProcesses_) {
661  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
662  }
663  c.call(std::bind(&InputSource::doEndJob, input_.get()));
664  if(looper_) {
665  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
666  }
667  c.call([actReg](){actReg->postEndJobSignal_();});
668  if(c.hasThrown()) {
669  c.rethrow();
670  }
671  }
int i
Definition: DBlmapReader.cc:9
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1730 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, edm::for_all(), i, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::LuminosityBlockPrincipal::run(), edm::LuminosityBlockPrincipal::setComplete(), and edm::LuminosityBlockPrincipal::setEndTime().

Referenced by Types.EventRange::cppID().

1730  {
1731  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1732  {
1733  SendSourceTerminationSignalIfException sentry(actReg_.get());
1734 
1735  lumiPrincipal.setEndTime(input_->timestamp());
1736  lumiPrincipal.setComplete();
1737  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1738  sentry.completedSuccessfully();
1739  }
1740  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1741  // lumi blocks know their start and end times why not also start and end events?
1742  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1743  lumiPrincipal.endTime());
1744  {
1745  SendSourceTerminationSignalIfException sentry(actReg_.get());
1746  espController_->eventSetupForInstance(ts);
1747  sentry.completedSuccessfully();
1748  }
1749  EventSetup const& es = esp_->eventSetup();
1750  {
1751  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1752  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1753  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1754  for_all(subProcesses_, [i, &lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException); });
1755  }
1756  }
1757  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1758  if(looper_) {
1759  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1760  }
1761  {
1762  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1763  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1764  for_all(subProcesses_, [&lumiPrincipal, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException); });
1765  }
1766  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1767  if(looper_) {
1768  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1769  }
1770  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1546 of file EventProcessor.cc.

References FDEBUG, and mps_update::status.

1546  {
1547  if(looper_) {
1548  ModuleChanger changer(schedule_.get(),preg_.get());
1549  looper_->setModuleChanger(&changer);
1550  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1551  looper_->setModuleChanger(nullptr);
1552  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1553  else return false;
1554  }
1555  FDEBUG(1) << "\tendOfLoop\n";
1556  return true;
1557  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
tuple status
Definition: mps_update.py:57
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1227 of file EventProcessor.cc.

1227  {
1228  return schedule_->endPathsEnabled();
1229  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1642 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, edm::for_all(), i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::RunPrincipal::setComplete(), and edm::RunPrincipal::setEndTime().

1642  {
1643  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1644  {
1645  SendSourceTerminationSignalIfException sentry(actReg_.get());
1646 
1647  runPrincipal.setEndTime(input_->timestamp());
1648  runPrincipal.setComplete();
1649  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1650  sentry.completedSuccessfully();
1651  }
1652 
1654  runPrincipal.endTime());
1655  {
1656  SendSourceTerminationSignalIfException sentry(actReg_.get());
1657  espController_->eventSetupForInstance(ts);
1658  sentry.completedSuccessfully();
1659  }
1660  EventSetup const& es = esp_->eventSetup();
1661  {
1662  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1663  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1664  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1665  for_all(subProcesses_, [i, &runPrincipal, &ts, cleaningUpAfterException](auto& subProcess) { subProcess.doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1666  });
1667  }
1668  }
1669  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1670  if(looper_) {
1671  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1672  }
1673  {
1674  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1675  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1676  for_all(subProcesses_, [&runPrincipal, &ts, cleaningUpAfterException](auto& subProcess){subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException); });
1677  }
1678  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1679  if(looper_) {
1680  looper_->doEndRun(runPrincipal, es, &processContext_);
1681  }
1682  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setEndTime(Timestamp const &time)
Definition: RunPrincipal.h:81
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 901 of file EventProcessor.cc.

References assert(), bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

901  {
902 
903  if(0 == numberOfForkedChildren_) {return true;}
905  //do what we want done in common
906  {
907  beginJob(); //make sure this was run
908  // make the services available
910 
911  InputSource::ItemType itemType;
912  itemType = input_->nextItemType();
913 
914  assert(itemType == InputSource::IsFile);
915  {
916  readFile();
917  }
918  itemType = input_->nextItemType();
919  assert(itemType == InputSource::IsRun);
920 
921  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
922  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
923  input_->runAuxiliary()->beginTime());
924  espController_->eventSetupForInstance(ts);
925  EventSetup const& es = esp_->eventSetup();
926 
927  //now get all the data available in the EventSetup
928  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
929  es.fillAvailableRecordKeys(recordKeys);
930  std::vector<eventsetup::DataKey> dataKeys;
931  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
932  itKey != itEnd;
933  ++itKey) {
934  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
935  //see if this is on our exclusion list
936  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
937  ExcludedData const* excludedData(nullptr);
938  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
939  excludedData = &(itExcludeRec->second);
940  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
941  //skip all items in this record
942  continue;
943  }
944  }
945  if(0 != recordPtr) {
946  dataKeys.clear();
947  recordPtr->fillRegisteredDataKeys(dataKeys);
948  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
949  itDataKey != itDataKeyEnd;
950  ++itDataKey) {
951  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
952  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
953  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
954  continue;
955  }
956  try {
957  recordPtr->doGet(*itDataKey);
958  } catch(cms::Exception& e) {
959  LogWarning("ForkingEventSetupPreFetching") << e.what();
960  }
961  }
962  }
963  }
964  }
965  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
966  {
967  // make the services available
969  Service<JobReport> jobReport;
970  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
971 
972  //Now actually do the forking
973  actReg_->preForkReleaseResourcesSignal_();
974  input_->doPreForkReleaseResources();
975  schedule_->preForkReleaseResources();
976  }
977  installCustomHandler(SIGCHLD, ep_sigchld);
978 
979 
980  unsigned int childIndex = 0;
981  unsigned int const kMaxChildren = numberOfForkedChildren_;
982  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
983  std::vector<pid_t> childrenIds;
984  childrenIds.reserve(kMaxChildren);
985  std::vector<int> childrenSockets;
986  childrenSockets.reserve(kMaxChildren);
987  std::vector<int> childrenPipes;
988  childrenPipes.reserve(kMaxChildren);
989  std::vector<int> childrenSocketsCopy;
990  childrenSocketsCopy.reserve(kMaxChildren);
991  std::vector<int> childrenPipesCopy;
992  childrenPipesCopy.reserve(kMaxChildren);
993  int pipes[] {0, 0};
994 
995  {
996  // make the services available
998  Service<JobReport> jobReport;
999  int sockets[2], fd_flags;
1000  for(; childIndex < kMaxChildren; ++childIndex) {
1001  // Create a UNIX_DGRAM socket pair
1002  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1003  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1004  exit(EXIT_FAILURE);
1005  }
1006  if (pipe(pipes)) {
1007  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1008  exit(EXIT_FAILURE);
1009  }
1010  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1011  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1012  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1013  exit(EXIT_FAILURE);
1014  }
1015  // Mark socket as non-block. Child must be careful to do select prior
1016  // to reading from socket.
1017  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1018  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1019  exit(EXIT_FAILURE);
1020  }
1021  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1022  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1023  exit(EXIT_FAILURE);
1024  }
1025  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1026  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1027  exit(EXIT_FAILURE);
1028  }
1029  // Linux man page notes there are some edge cases where reading from a
1030  // fd can block, even after a select.
1031  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1032  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1033  exit(EXIT_FAILURE);
1034  }
1035  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1036  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1037  exit(EXIT_FAILURE);
1038  }
1039 
1040  childrenPipesCopy = childrenPipes;
1041  childrenSocketsCopy = childrenSockets;
1042 
1043  pid_t value = fork();
1044  if(value == 0) {
1045  // Close the parent's side of the socket and pipe which will talk to us.
1046  close(pipes[0]);
1047  close(sockets[0]);
1048  // Close our copies of the parent's other communication pipes.
1049  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1050  close(*it);
1051  }
1052  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1053  close(*it);
1054  }
1055 
1056  // this is the child process, redirect stdout and stderr to a log file
1057  fflush(stdout);
1058  fflush(stderr);
1059  std::stringstream stout;
1060  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1061  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1062  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1063  }
1064  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1065  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1066  }
1067 
1068  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1069  if(setCpuAffinity_) {
1070  // CPU affinity is handled differently on macosx.
1071  // We disable it and print a message until someone reads:
1072  //
1073  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1074  //
1075  // and implements it.
1076 #ifdef __APPLE__
1077  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1078 #else
1079  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1080  cpu_set_t mask;
1081  CPU_ZERO(&mask);
1082  CPU_SET(childIndex, &mask);
1083  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1084  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1085  exit(-1);
1086  }
1087 #endif
1088  }
1089  break;
1090  } else {
1091  //this is the parent
1092  close(pipes[1]);
1093  close(sockets[1]);
1094  }
1095  if(value < 0) {
1096  LogError("ForkingChild") << "failed to create a child";
1097  exit(-1);
1098  }
1099  childrenIds.push_back(value);
1100  childrenSockets.push_back(sockets[0]);
1101  childrenPipes.push_back(pipes[0]);
1102  }
1103 
1104  if(childIndex < kMaxChildren) {
1105  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1106  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1107 
1108  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1109  input_->doPostForkReacquireResources(receiver);
1110  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1111  //NOTE: sources have to reset themselves by listening to the post fork message
1112  //rewindInput();
1113  return true;
1114  }
1115  jobReport->parentAfterFork(jobReportFile);
1116  }
1117 
1118  //this is the original, which is now the master for all the children
1119 
1120  //Need to wait for signals from the children or externally
1121  // To wait we must
1122  // 1) block the signals we want to wait on so we do not have a race condition
1123  // 2) check that we haven't already meet our ending criteria
1124  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1125  sigset_t blockingSigSet;
1126  sigset_t unblockingSigSet;
1127  sigset_t oldSigSet;
1128  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1129  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1130  sigaddset(&blockingSigSet, SIGCHLD);
1131  sigaddset(&blockingSigSet, SIGUSR2);
1132  sigaddset(&blockingSigSet, SIGINT);
1133  sigdelset(&unblockingSigSet, SIGCHLD);
1134  sigdelset(&unblockingSigSet, SIGUSR2);
1135  sigdelset(&unblockingSigSet, SIGINT);
1136  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1137 
1138  // If there are too many fd's (unlikely, but possible) for select, denote this
1139  // because the sender will fail.
1140  bool too_many_fds = false;
1141  if (pipes[1]+1 > FD_SETSIZE) {
1142  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1143  too_many_fds = true;
1144  }
1145 
1146  //create a thread that sends the units of work to workers
1147  // we create it after all signals were blocked so that this
1148  // thread is never interupted by a signal
1149  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1150  boost::thread senderThread(sender);
1151 
1152  if(not too_many_fds) {
1153  //NOTE: a child could have failed before we got here and even after this call
1154  // which is why the 'if' is conditional on continueAfterChildFailure_
1156  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1157  sigsuspend(&unblockingSigSet);
1159  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1160  }
1161  }
1162  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1163 
1164  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1165  if(child_failed) {
1166  LogError("ForkingStopping") << "child failed";
1167  }
1168  if(shutdown_flag) {
1169  LogSystem("ForkingStopping") << "asked to shutdown";
1170  }
1171 
1172  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1173  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1174  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1175  it != itEnd; ++it) {
1176  /* int result = */ kill(*it, SIGUSR2);
1177  }
1178  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1179  while(num_children_done != kMaxChildren) {
1180  sigsuspend(&unblockingSigSet);
1181  }
1182  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1183  }
1184  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1185  senderThread.join();
1186  if(child_failed && !continueAfterChildFailure_) {
1187  if (child_fail_signal) {
1188  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1189  } else if (child_fail_exit_status) {
1190  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1191  } else {
1192  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1193  }
1194  }
1195  if(too_many_fds) {
1196  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1197  }
1198  return false;
1199  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
void possiblyContinueAfterForkChildFailure()
assert(m_qm.get())
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1202 of file EventProcessor.cc.

1202  {
1203  return schedule_->getAllModuleDescriptions();
1204  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 674 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

674  {
675  return serviceToken_;
676  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1232 of file EventProcessor.cc.

1232  {
1233  schedule_->getTriggerReport(rep);
1234  }
string rep
Definition: cuy.py:1188
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 396 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, cmsHarvester::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, printDependencies_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), thinnedAssociationsHelper_, and unpackBuffers-CaloStage2::token.

Referenced by EventProcessor().

398  {
399 
400  //std::cerr << processDesc->dump() << std::endl;
401 
402  // register the empty parentage vector , once and for all
404 
405  // register the empty parameter set, once and for all.
406  ParameterSet().registerIt();
407 
408  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
409 
410  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
411  auto subProcessVParameterSet = popSubProcessVParameterSet(*parameterSet);
412  bool const hasSubProcesses = !subProcessVParameterSet.empty();
413 
414  // Now set some parameters specific to the main process.
415  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
416  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
417  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
418  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
419  //threading
420  unsigned int nThreads=1;
421  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
422  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
423  if(nThreads == 0) {
424  nThreads = 1;
425  }
426  }
427  /* TODO: when we support having each stream run in a different thread use this default
428  unsigned int nStreams =nThreads;
429  */
430  unsigned int nStreams =1;
431  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
432  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
433  if(nStreams==0) {
434  nStreams = nThreads;
435  }
436  // PG: Log the number of streams
437  edm::LogInfo("StreamSetup") <<"setting # streams "<<nStreams;
438  }
439  /*
440  bool nRunsSet = false;
441  */
442  unsigned int nConcurrentRuns =1;
443  /*
444  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
445  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
446  }
447  */
448  unsigned int nConcurrentLumis =1;
449  /*
450  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
451  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
452  } else {
453  nConcurrentLumis = nConcurrentRuns;
454  }
455  */
456  //Check that relationships between threading parameters makes sense
457  /*
458  if(nThreads<nStreams) {
459  //bad
460  }
461  if(nConcurrentRuns>nStreams) {
462  //bad
463  }
464  if(nConcurrentRuns>nConcurrentLumis) {
465  //bad
466  }
467  */
468  //forking
469  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
470  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
471  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
472  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
473  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
474  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
475  for(auto const& ps : excluded) {
476  eventSetupDataToExcludeFromPrefetching_[ps.getUntrackedParameter<std::string>("record")].emplace(ps.getUntrackedParameter<std::string>("type", "*"),
477  ps.getUntrackedParameter<std::string>("label", ""));
478  }
479  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
480 
481  printDependencies_ = optionsPset.getUntrackedParameter("printDependencies", false);
482 
483  // Now do general initialization
484  ScheduleItems items;
485 
486  //initialize the services
487  auto& serviceSets = processDesc->getServicesPSets();
488  ServiceToken token = items.initServices(serviceSets, *parameterSet, iToken, iLegacy, true);
489  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
490 
491  //make the services available
493 
494  if(nStreams>1) {
496  handler->willBeUsingThreads();
497  }
498 
499  // intialize miscellaneous items
500  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
501 
502  // intialize the event setup provider
503  esp_ = espController_->makeProvider(*parameterSet);
504 
505  // initialize the looper, if any
506  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
507  if(looper_) {
508  looper_->setActionTable(items.act_table_.get());
509  looper_->attachTo(*items.actReg_);
510 
511  //For now loopers make us run only 1 transition at a time
512  nStreams=1;
513  nConcurrentLumis=1;
514  nConcurrentRuns=1;
515  }
516 
517  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
518 
519  // initialize the input source
520  input_ = makeInput(*parameterSet,
521  *common,
522  items.preg(),
523  items.branchIDListHelper(),
524  items.thinnedAssociationsHelper(),
525  items.actReg_,
526  items.processConfiguration(),
528 
529  // intialize the Schedule
530  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
531 
532  // set the data members
533  act_table_ = std::move(items.act_table_);
534  actReg_ = items.actReg_;
535  preg_ = items.preg();
536  branchIDListHelper_ = items.branchIDListHelper();
537  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
538  processConfiguration_ = items.processConfiguration();
540  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
541 
542  FDEBUG(2) << parameterSet << std::endl;
543 
545  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
546  // Reusable event principal
547  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
550  }
551 
552  // fill the subprocesses, if there are any
553  subProcesses_.reserve(subProcessVParameterSet.size());
554  for(auto& subProcessPSet : subProcessVParameterSet) {
555  subProcesses_.emplace_back(subProcessPSet,
556  *parameterSet,
557  preg(),
561  *actReg_,
562  token,
565  &processContext_);
566  }
567  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::vector< SubProcess > subProcesses_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:510
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:574
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 256 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

256 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 257 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

257 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1503 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1503  {
1504  if (fb_.get() != nullptr) {
1505  schedule_->openOutputFiles(*fb_);
1506  for_all(subProcesses_, [this](auto& subProcess){ subProcess.openOutputFiles(*fb_); });
1507  }
1508  FDEBUG(1) << "\topenOutputFiles\n";
1509  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 885 of file EventProcessor.cc.

885  {
886  if(child_failed && continueAfterChildFailure_) {
887  if (child_fail_signal) {
888  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
889  child_fail_signal=0;
890  } else if (child_fail_exit_status) {
891  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
892  child_fail_exit_status=0;
893  } else {
894  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
895  }
896  child_failed =false;
897  }
898  }
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 250 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), and init().

250 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 251 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

251 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1565 of file EventProcessor.cc.

References FDEBUG.

1565  {
1566  looper_->prepareForNextLoop(esp_.get());
1567  FDEBUG(1) << "\tprepareForNextLoop\n";
1568  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 120 of file EventProcessor.h.

References processConfiguration_.

120 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEvent ( unsigned int  iStreamIndex)
private

Definition at line 2002 of file EventProcessor.cc.

References assert(), ev, FDEBUG, edm::for_all(), edm::Service< T >::isAvailable(), edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), rng, edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), mps_update::status, and summarizeEdmComparisonLogfiles::succeeded.

2002  {
2003  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2004  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2006  if(rng.isAvailable()) {
2007  Event ev(*pep, ModuleDescription(), nullptr);
2008  rng->postEventRead(ev);
2009  }
2010  assert(pep->luminosityBlockPrincipalPtrValid());
2011  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2012  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2013 
2014  //We can only update IOVs on Lumi boundaries
2015  //IOVSyncValue ts(pep->id(), pep->time());
2016  //espController_->eventSetupForInstance(ts);
2017  EventSetup const& es = esp_->eventSetup();
2018  {
2019  schedule_->processOneEvent(iStreamIndex,*pep, es);
2020  for_all(subProcesses_, [pep](auto& subProcess) { subProcess.doEvent(*pep); });
2021  }
2022 
2023  //NOTE: If we have a looper we only have one Stream
2024  if(looper_) {
2025  bool randomAccess = input_->randomAccess();
2026  ProcessingController::ForwardState forwardState = input_->forwardState();
2027  ProcessingController::ReverseState reverseState = input_->reverseState();
2028  ProcessingController pc(forwardState, reverseState, randomAccess);
2029 
2031  do {
2032 
2033  StreamContext streamContext(pep->streamID(), &processContext_);
2034  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2035 
2036  bool succeeded = true;
2037  if(randomAccess) {
2038  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2039  input_->skipEvents(-2);
2040  }
2041  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2042  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2043  }
2044  }
2045  pc.setLastOperationSucceeded(succeeded);
2046  } while(!pc.lastOperationSucceeded());
2047  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2048 
2049  }
2050 
2051  FDEBUG(1) << "\tprocessEvent\n";
2052  pep->clearEventPrincipal();
2053  }
ProcessContext processContext_
edm::Service< edm::RandomNumberGenerator > rng
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(m_qm.get())
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Definition: Event.h:16
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
PrincipalCache principalCache_
tuple status
Definition: mps_update.py:57
void edm::EventProcessor::processEventsForStreamAsync ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1884 of file EventProcessor.cc.

References cmsPerfStripChart::operate(), and processEvent().

1885  {
1886  try {
1887  // make the services available
1891  handler->initializeThisThreadForUse();
1892  }
1893 
1894  if(iStreamIndex==0) {
1895  processEvent(0);
1896  }
1897  do {
1898  if(shouldWeStop()) {
1899  break;
1900  }
1901  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1902  //another thread hit an exception
1903  //std::cerr<<"another thread saw an exception\n";
1904  break;
1905  }
1906  {
1907 
1908 
1909  {
1910  //nextItemType and readEvent need to be in same critical section
1911  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1912 
1913  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1914  //std::cerr<<"finishedProcessingEvents\n";
1915  break;
1916  }
1917 
1918  //If source and DelayedReader share a resource we must serialize them
1919  auto sr = input_->resourceSharedWithDelayedReader().second;
1920  std::unique_lock<std::recursive_mutex> delayedReaderGuard;
1921  if(sr) {
1922  delayedReaderGuard = std::unique_lock<std::recursive_mutex>(*sr);
1923  }
1924  InputSource::ItemType itemType = input_->nextItemType();
1925  if (InputSource::IsEvent !=itemType) {
1927  finishedProcessingEvents->store(true,std::memory_order_release);
1928  //std::cerr<<"next item type "<<itemType<<"\n";
1929  break;
1930  }
1932  //std::cerr<<"task told to async stop\n";
1933  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1934  break;
1935  }
1936  readEvent(iStreamIndex);
1937  }
1938  }
1939  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1940  //another thread hit an exception
1941  //std::cerr<<"another thread saw an exception\n";
1942  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1943 
1944  break;
1945  }
1946  processEvent(iStreamIndex);
1947  }while(true);
1948  } catch (...) {
1949  bool expected =false;
1950  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1951  deferredExceptionPtr_ = std::current_exception();
1952  }
1953  //std::cerr<<"task caught exception\n";
1954  }
1955  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::mutex nextTransitionMutex_
virtual bool shouldWeStop() const
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1827 of file EventProcessor.cc.

References edm::preg.

1827  {
1828  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1829  {
1830  SendSourceTerminationSignalIfException sentry(actReg_.get());
1831  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1832  sentry.completedSuccessfully();
1833  }
1834  return input_->luminosityBlock();
1835  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1790 of file EventProcessor.cc.

References assert(), edm::preg, and PDRates::Run.

1790  {
1791  principalCache_.merge(input_->runAuxiliary(), preg());
1792  auto runPrincipal =principalCache_.runPrincipalPtr();
1793  {
1794  SendSourceTerminationSignalIfException sentry(actReg_.get());
1795  input_->readAndMergeRun(*runPrincipal);
1796  sentry.completedSuccessfully();
1797  }
1798  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1799  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1800  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(m_qm.get())
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1957 of file EventProcessor.cc.

References pyrootRender::destroy(), and processEvent().

1957  {
1958  if(numberOfForkedChildren_>0) {
1959  readEvent(0);
1960  processEvent(0);
1961  return;
1962  }
1965 
1966  std::atomic<bool> finishedProcessingEvents{false};
1967 
1968  //Task assumes Stream 0 has already read the event that caused us to go here
1969  readEvent(0);
1970 
1971  //To wait, the ref count has to b 1+#streams
1972  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1973  eventLoopWaitTask->increment_ref_count();
1974 
1975  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1976  unsigned int iStreamIndex = 0;
1977  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1978  eventLoopWaitTask->increment_ref_count();
1979  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1980 
1981  }
1982  eventLoopWaitTask->increment_ref_count();
1983  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1984  tbb::task::destroy(*eventLoopWaitTask);
1985 
1986  //One of the processing threads saw an exception
1988  std::rethrow_exception(deferredExceptionPtr_);
1989  }
1990  }
void readEvent(unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
friend class StreamProcessingTask
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1991 of file EventProcessor.cc.

References event(), and FDEBUG.

1991  {
1992  //TODO this will have to become per stream
1993  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1994  StreamContext streamContext(event.streamID(), &processContext_);
1995 
1996  SendSourceTerminationSignalIfException sentry(actReg_.get());
1997  input_->readEvent(event, streamContext);
1998  sentry.completedSuccessfully();
1999 
2000  FDEBUG(1) << "\treadEvent\n";
2001  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1476 of file EventProcessor.cc.

References FDEBUG, or, edm::preg, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1476  {
1477  FDEBUG(1) << " \treadFile\n";
1478  size_t size = preg_->size();
1479  SendSourceTerminationSignalIfException sentry(actReg_.get());
1480 
1481  fb_ = input_->readFile();
1482  if(size < preg_->size()) {
1484  }
1486  if((numberOfForkedChildren_ > 0) or
1489  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1490  }
1491  sentry.completedSuccessfully();
1492  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1802 of file EventProcessor.cc.

References Exception, edm::errors::LogicError, and edm::preg.

1802  {
1805  << "EventProcessor::readRun\n"
1806  << "Illegal attempt to insert lumi into cache\n"
1807  << "Contact a Framework Developer\n";
1808  }
1811  << "EventProcessor::readRun\n"
1812  << "Illegal attempt to insert lumi into cache\n"
1813  << "Run is invalid\n"
1814  << "Contact a Framework Developer\n";
1815  }
1816  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1817  {
1818  SendSourceTerminationSignalIfException sentry(actReg_.get());
1819  input_->readLuminosityBlock(*lbp, *historyAppender_);
1820  sentry.completedSuccessfully();
1821  }
1822  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1823  principalCache_.insert(lbp);
1824  return input_->luminosityBlock();
1825  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1772 of file EventProcessor.cc.

References assert(), Exception, edm::errors::LogicError, edm::preg, and PDRates::Run.

1772  {
1775  << "EventProcessor::readRun\n"
1776  << "Illegal attempt to insert run into cache\n"
1777  << "Contact a Framework Developer\n";
1778  }
1779  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1780  {
1781  SendSourceTerminationSignalIfException sentry(actReg_.get());
1782  input_->readRun(*rp, *historyAppender_);
1783  sentry.completedSuccessfully();
1784  }
1785  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1786  principalCache_.insert(rp);
1787  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1788  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(m_qm.get())
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1528 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1528  {
1529  if (fb_.get() != nullptr) {
1530  schedule_->respondToCloseInputFile(*fb_);
1531  for_all(subProcesses_, [this](auto& subProcess){ subProcess.respondToCloseInputFile(*fb_); });
1532  }
1533  FDEBUG(1) << "\trespondToCloseInputFile\n";
1534  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1519 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1519  {
1520  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); } );
1521  if (fb_.get() != nullptr) {
1522  schedule_->respondToOpenInputFile(*fb_);
1523  for_all(subProcesses_, [this](auto& subProcess) { subProcess.respondToOpenInputFile(*fb_); });
1524  }
1525  FDEBUG(1) << "\trespondToOpenInputFile\n";
1526  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1559 of file EventProcessor.cc.

References FDEBUG.

1559  {
1560  input_->repeat();
1561  input_->rewind();
1562  FDEBUG(1) << "\trewind\n";
1563  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 325 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

325  {
326  return runToCompletion();
327  }
virtual StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1288 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), bk::beginJob(), alignCSCRings::e, Exception, FDEBUG, edm::errors::LogicError, eostools::move(), cmsPerfStripChart::operate(), edm::preg, runEdmFileComparison::returnCode, findQualityFiles::size, and edm::convertException::wrap().

Referenced by run().

1288  {
1289 
1292  std::unique_ptr<statemachine::Machine> machine;
1293  {
1294  beginJob(); //make sure this was called
1295 
1296  //StatusCode returnCode = epSuccess;
1298 
1299  // make the services available
1301 
1302  machine = createStateMachine();
1305  try {
1306  convertException::wrap([&]() {
1307 
1308  InputSource::ItemType itemType;
1309 
1310  while(true) {
1311 
1312  bool more = true;
1313  if(numberOfForkedChildren_ > 0) {
1314  size_t size = preg_->size();
1315  {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317  more = input_->skipForForking();
1318  sentry.completedSuccessfully();
1319  }
1320  if(more) {
1321  if(size < preg_->size()) {
1323  }
1325  }
1326  }
1327  {
1328  SendSourceTerminationSignalIfException sentry(actReg_.get());
1329  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1330  sentry.completedSuccessfully();
1331  }
1332 
1333  FDEBUG(1) << "itemType = " << itemType << "\n";
1334 
1335  if(checkForAsyncStopRequest(returnCode)) {
1336  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1337  forceLooperToEnd_ = true;
1338  machine->process_event(statemachine::Stop());
1339  forceLooperToEnd_ = false;
1340  break;
1341  }
1342 
1343  if(itemType == InputSource::IsEvent) {
1344  machine->process_event(statemachine::Event());
1346  forceLooperToEnd_ = true;
1347  machine->process_event(statemachine::Stop());
1348  forceLooperToEnd_ = false;
1350  break;
1351  }
1353  }
1354 
1355  if(itemType == InputSource::IsEvent) {
1356  }
1357  else if(itemType == InputSource::IsStop) {
1358  machine->process_event(statemachine::Stop());
1359  }
1360  else if(itemType == InputSource::IsFile) {
1361  machine->process_event(statemachine::File());
1362  }
1363  else if(itemType == InputSource::IsRun) {
1364  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1365  }
1366  else if(itemType == InputSource::IsLumi) {
1367  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1368  }
1369  else if(itemType == InputSource::IsSynchronize) {
1370  //For now, we don't have to do anything
1371  }
1372  // This should be impossible
1373  else {
1375  << "Unknown next item type passed to EventProcessor\n"
1376  << "Please report this error to the Framework group\n";
1377  }
1378  if(machine->terminated()) {
1379  break;
1380  }
1381  } // End of loop over state machine events
1382  }); // convertException::wrap
1383  } // Try block
1384  // Some comments on exception handling related to the boost state machine:
1385  //
1386  // Some states used in the machine are special because they
1387  // perform actions while the machine is being terminated, actions
1388  // such as close files, call endRun, call endLumi etc ... Each of these
1389  // states has two functions that perform these actions. The functions
1390  // are almost identical. The major difference is that one version
1391  // catches all exceptions and the other lets exceptions pass through.
1392  // The destructor catches them and the other function named "exit" lets
1393  // them pass through. On a normal termination, boost will always call
1394  // "exit" and then the state destructor. In our state classes, the
1395  // the destructors do nothing if the exit function already took
1396  // care of things. Here's the interesting part. When boost is
1397  // handling an exception the "exit" function is not called (a boost
1398  // feature).
1399  //
1400  // If an exception occurs while the boost machine is in control
1401  // (which usually means inside a process_event call), then
1402  // the boost state machine destroys its states and "terminates" itself.
1403  // This already done before we hit the catch blocks below. In this case
1404  // the call to terminateMachine below only destroys an already
1405  // terminated state machine. Because exit is not called, the state destructors
1406  // handle cleaning up lumis, runs, and files. The destructors swallow
1407  // all exceptions and only pass through the exceptions messages, which
1408  // are tacked onto the original exception below.
1409  //
1410  // If an exception occurs when the boost state machine is not
1411  // in control (outside the process_event functions), then boost
1412  // cannot destroy its own states. The terminateMachine function
1413  // below takes care of that. The flag "alreadyHandlingException"
1414  // is set true so that the state exit functions do nothing (and
1415  // cannot throw more exceptions while handling the first). Then the
1416  // state destructors take care of this because exit did nothing.
1417  //
1418  // In both cases above, the EventProcessor::endOfLoop function is
1419  // not called because it can throw exceptions.
1420  //
1421  // One tricky aspect of the state machine is that things that can
1422  // throw should not be invoked by the state machine while another
1423  // exception is being handled.
1424  // Another tricky aspect is that it appears to be important to
1425  // terminate the state machine before invoking its destructor.
1426  // We've seen crashes that are not understood when that is not
1427  // done. Maintainers of this code should be careful about this.
1428 
1429  catch (cms::Exception & e) {
1431  terminateMachine(std::move(machine));
1432  alreadyHandlingException_ = false;
1433  if (!exceptionMessageLumis_.empty()) {
1435  if (e.alreadyPrinted()) {
1436  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1437  }
1438  }
1439  if (!exceptionMessageRuns_.empty()) {
1441  if (e.alreadyPrinted()) {
1442  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1443  }
1444  }
1445  if (!exceptionMessageFiles_.empty()) {
1447  if (e.alreadyPrinted()) {
1448  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1449  }
1450  }
1451  throw;
1452  }
1453 
1454  if(machine->terminated()) {
1455  FDEBUG(1) << "The state machine reports it has been terminated\n";
1456  machine.reset();
1457  }
1458 
1460  throw cms::Exception("BadState")
1461  << "The boost state machine in the EventProcessor exited after\n"
1462  << "entering the Error state.\n";
1463  }
1464 
1465  }
1466  if(machine.get() != nullptr) {
1467  terminateMachine(std::move(machine));
1469  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1470  << "Please report this error to the Framework group\n";
1471  }
1472 
1473  return returnCode;
1474  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
void terminateMachine(std::unique_ptr< statemachine::Machine >)
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
InputSource::ItemType nextItemTypeFromProcessingEvents_
def move
Definition: eostools.py:510
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2069 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2069  {
2071  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2077 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2077  {
2079  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2073 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2073  {
2075  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1570 of file EventProcessor.cc.

References FDEBUG.

1570  {
1571  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1572  if(!subProcesses_.empty()) {
1573  for(auto const& subProcess : subProcesses_) {
1574  if(subProcess.shouldWeCloseOutput()) {
1575  return true;
1576  }
1577  }
1578  return false;
1579  }
1580  return schedule_->shouldWeCloseOutput();
1581  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2055 of file EventProcessor.cc.

References FDEBUG.

2055  {
2056  FDEBUG(1) << "\tshouldWeStop\n";
2057  if(shouldWeStop_) return true;
2058  if(!subProcesses_.empty()) {
2059  for(auto const& subProcess : subProcesses_) {
2060  if(subProcess.terminate()) {
2061  return true;
2062  }
2063  }
2064  return false;
2065  }
2066  return schedule_->terminate();
2067  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1536 of file EventProcessor.cc.

References FDEBUG.

1536  {
1537  shouldWeStop_ = false;
1538  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1539  // until after we've called beginOfJob
1540  if(looper_ && looperBeginJobRun_) {
1541  looper_->doStartingNewLoop();
1542  }
1543  FDEBUG(1) << "\tstartingNewLoop\n";
1544  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::terminateMachine ( std::unique_ptr< statemachine::Machine iMachine)
private

Definition at line 2085 of file EventProcessor.cc.

References FDEBUG.

2085  {
2086  if(iMachine.get() != nullptr) {
2087  if(!iMachine->terminated()) {
2088  forceLooperToEnd_ = true;
2089  iMachine->process_event(statemachine::Stop());
2090  forceLooperToEnd_ = false;
2091  }
2092  else {
2093  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2094  }
2095  if(iMachine->terminated()) {
2096  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2097  }
2098  }
2099  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 254 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 255 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1207 of file EventProcessor.cc.

1207  {
1208  return schedule_->totalEvents();
1209  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1217 of file EventProcessor.cc.

1217  {
1218  return schedule_->totalEventsFailed();
1219  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1212 of file EventProcessor.cc.

1212  {
1213  return schedule_->totalEventsPassed();
1214  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1849 of file EventProcessor.cc.

References FDEBUG, and edm::for_all().

1849  {
1851  for_all(subProcesses_, [&phid, run, lumi](auto& subProcess){ subProcess.writeLumi(phid, run, lumi); });
1852  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1853  }
ProcessContext processContext_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1837 of file EventProcessor.cc.

References FDEBUG, edm::for_all(), statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1837  {
1838  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1839  for_all(subProcesses_, [&run](auto& subProcess){ subProcess.writeRun(run.processHistoryID(), run.runNumber()); });
1840  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1841  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< SubProcess > subProcesses_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class StreamProcessingTask
friend

Definition at line 236 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 273 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 265 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 298 of file EventProcessor.h.

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 310 of file EventProcessor.h.

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 312 of file EventProcessor.h.

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 290 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 267 of file EventProcessor.h.

Referenced by branchIDListHelper(), and init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 306 of file EventProcessor.h.

Referenced by init().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 286 of file EventProcessor.h.

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 285 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 294 of file EventProcessor.h.

Referenced by init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

Definition at line 272 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 271 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 316 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 295 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 297 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 296 of file EventProcessor.h.

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private

Definition at line 281 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 293 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 301 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 299 of file EventProcessor.h.

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 279 of file EventProcessor.h.

Referenced by init().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

Definition at line 270 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 300 of file EventProcessor.h.

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 311 of file EventProcessor.h.

std::mutex edm::EventProcessor::nextTransitionMutex_
private

Definition at line 288 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 303 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 304 of file EventProcessor.h.

Referenced by init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 276 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private

Definition at line 308 of file EventProcessor.h.

Referenced by beginJob(), endJob(), and init().

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 266 of file EventProcessor.h.

Referenced by beginJob(), init(), and preg().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::printDependencies_ = false
private

Definition at line 318 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 274 of file EventProcessor.h.

Referenced by init(), and processConfiguration().

ProcessContext edm::EventProcessor::processContext_
private

Definition at line 275 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 269 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 305 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 291 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 292 of file EventProcessor.h.

std::vector<SubProcess> edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 268 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().