test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop ()
 
ProcessConfiguration const & processConfiguration () const
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual int readLuminosityBlock ()
 
virtual statemachine::Run readRun ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToOpenInputFile ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
virtual StatusCode runToCompletion ()
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
virtual void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
bool checkForAsyncStopRequest (StatusCode &)
 
std::auto_ptr
< statemachine::Machine
createStateMachine ()
 
bool hasSubProcesses () const
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
std::shared_ptr< EDLooperBase
const > 
looper () const
 
std::shared_ptr< EDLooperBase > & looper ()
 
void possiblyContinueAfterForkChildFailure ()
 
std::shared_ptr
< ProductRegistry const > 
preg () const
 
std::shared_ptr
< ProductRegistry > & 
preg ()
 
void processEvent (unsigned int iStreamIndex)
 
void processEventsForStreamAsync (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void readEvent (unsigned int iStreamIndex)
 
void setupSignal ()
 
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
edm::propagate_const
< std::shared_ptr
< BranchIDListHelper > > 
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
edm::propagate_const
< std::shared_ptr
< eventsetup::EventSetupProvider > > 
esp_
 
edm::propagate_const
< std::unique_ptr
< eventsetup::EventSetupsController > > 
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
edm::propagate_const
< std::unique_ptr< FileBlock > > 
fb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
edm::propagate_const
< std::unique_ptr
< HistoryAppender > > 
historyAppender_
 
edm::propagate_const
< std::unique_ptr< InputSource > > 
input_
 
edm::propagate_const
< std::shared_ptr
< EDLooperBase > > 
looper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
std::mutex nextTransitionMutex_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
edm::propagate_const
< std::shared_ptr
< ProductRegistry > > 
preg_
 
PrincipalCache principalCache_
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
edm::propagate_const
< std::unique_ptr< Schedule > > 
schedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
bool stateMachineWasInErrorState_
 
edm::propagate_const
< std::unique_ptr< std::vector
< SubProcess > > > 
subProcesses_
 
edm::propagate_const
< std::shared_ptr
< ThinnedAssociationsHelper > > 
thinnedAssociationsHelper_
 

Friends

class StreamProcessingTask
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 61 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 318 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 319 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 218 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

222  :
223  actReg_(),
224  preg_(),
226  serviceToken_(),
227  input_(),
228  espController_(new eventsetup::EventSetupsController),
229  esp_(),
230  act_table_(),
232  schedule_(),
233  subProcesses_(),
234  historyAppender_(new HistoryAppender),
235  fb_(),
236  looper_(),
238  principalCache_(),
239  beginJobCalled_(false),
240  shouldWeStop_(false),
242  fileMode_(),
248  forceLooperToEnd_(false),
249  looperBeginJobRun_(false),
253  setCpuAffinity_(false),
255  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
256  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
257  processDesc->addServices(defaultServices, forcedServices);
258  init(processDesc, iToken, iLegacy);
259  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 261 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

263  :
264  actReg_(),
265  preg_(),
267  serviceToken_(),
268  input_(),
269  espController_(new eventsetup::EventSetupsController),
270  esp_(),
271  act_table_(),
273  schedule_(),
274  subProcesses_(),
275  historyAppender_(new HistoryAppender),
276  fb_(),
277  looper_(),
279  principalCache_(),
280  beginJobCalled_(false),
281  shouldWeStop_(false),
283  fileMode_(),
289  forceLooperToEnd_(false),
290  looperBeginJobRun_(false),
294  setCpuAffinity_(false),
298  {
299  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
300  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
301  processDesc->addServices(defaultServices, forcedServices);
303  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 305 of file EventProcessor.cc.

References init().

307  :
308  actReg_(),
309  preg_(),
311  serviceToken_(),
312  input_(),
313  espController_(new eventsetup::EventSetupsController),
314  esp_(),
315  act_table_(),
317  schedule_(),
318  subProcesses_(),
319  historyAppender_(new HistoryAppender),
320  fb_(),
321  looper_(),
323  principalCache_(),
324  beginJobCalled_(false),
325  shouldWeStop_(false),
327  fileMode_(),
333  forceLooperToEnd_(false),
334  looperBeginJobRun_(false),
338  setCpuAffinity_(false),
342  {
343  init(processDesc, token, legacy);
344  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 347 of file EventProcessor.cc.

References mps_alisetup::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

347  :
348  actReg_(),
349  preg_(),
351  serviceToken_(),
352  input_(),
353  espController_(new eventsetup::EventSetupsController),
354  esp_(),
355  act_table_(),
357  schedule_(),
358  subProcesses_(),
359  historyAppender_(new HistoryAppender),
360  fb_(),
361  looper_(),
363  principalCache_(),
364  beginJobCalled_(false),
365  shouldWeStop_(false),
367  fileMode_(),
373  forceLooperToEnd_(false),
374  looperBeginJobRun_(false),
378  setCpuAffinity_(false),
382 {
383  if(isPython) {
384  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
385  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
387  }
388  else {
389  auto processDesc = std::make_shared<ProcessDesc>(config);
391  }
392  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::string exceptionMessageRuns_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::string exceptionMessageFiles_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 573 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, subProcesses_, and unpackBuffers-CaloStage2::token.

573  {
574  // Make the services available while everything is being deleted.
576  ServiceRegistry::Operate op(token);
577 
578  // manually destroy all these thing that may need the services around
579  // propagate_const<T> has no reset() function
580  espController_ = nullptr;
581  subProcesses_ = nullptr;
582  esp_ = nullptr;
583  schedule_ = nullptr;
584  input_ = nullptr;
585  looper_ = nullptr;
586  actReg_ = nullptr;
587 
590  }
void clear()
Not thread safe.
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void clear()
Not thread safe.
Definition: Registry.cc:40
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2164 of file EventProcessor.cc.

2164  {
2166  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 593 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, hasSubProcesses(), i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg(), preg_, processContext_, schedule_, serviceToken_, subProcesses_, and edm::convertException::wrap().

593  {
594  if(beginJobCalled_) return;
595  beginJobCalled_=true;
596  bk::beginJob();
597 
598  // StateSentry toerror(this); // should we add this ?
599  //make the services available
601 
602  service::SystemBounds bounds(preallocations_.numberOfStreams(),
606  actReg_->preallocateSignal_(bounds);
608  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
609 
610  //NOTE: This implementation assumes 'Job' means one call
611  // the EventProcessor::run
612  // If it really means once per 'application' then this code will
613  // have to be changed.
614  // Also have to deal with case where have 'run' then new Module
615  // added and do 'run'
616  // again. In that case the newly added Module needs its 'beginJob'
617  // to be called.
618 
619  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
620  // For now we delay calling beginOfJob until first beginOfRun
621  //if(looper_) {
622  // looper_->beginOfJob(es);
623  //}
624  try {
625  convertException::wrap([&]() {
626  input_->doBeginJob();
627  });
628  }
629  catch(cms::Exception& ex) {
630  ex.addContext("Calling beginJob for the source");
631  throw;
632  }
633  schedule_->beginJob(*preg_);
634  // toerror.succeeded(); // should we add this?
635  if(hasSubProcesses()) {
636  for(auto& subProcess : *subProcesses_) {
637  subProcess.doBeginJob();
638  }
639  }
640  actReg_->postBeginJobSignal_();
641 
642  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
643  schedule_->beginStream(i);
644  if(hasSubProcesses()) {
645  for(auto& subProcess : *subProcesses_) {
646  subProcess.doBeginStream(i);
647  }
648  }
649  }
650  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
PreallocationConfiguration preallocations_
void beginJob()
Definition: Breakpoints.cc:15
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::shared_ptr< ProductRegistry const > preg() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1732 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, i, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

1732  {
1733  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1734  {
1735  SendSourceTerminationSignalIfException sentry(actReg_.get());
1736 
1737  input_->doBeginLumi(lumiPrincipal, &processContext_);
1738  sentry.completedSuccessfully();
1739  }
1740 
1742  if(rng.isAvailable()) {
1743  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1744  rng->preBeginLumi(lb);
1745  }
1746 
1747  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1748  // lumi blocks know their start and end times why not also start and end events?
1749  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1750  {
1751  SendSourceTerminationSignalIfException sentry(actReg_.get());
1752  espController_->eventSetupForInstance(ts);
1753  sentry.completedSuccessfully();
1754  }
1755  EventSetup const& es = esp_->eventSetup();
1756  {
1757  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1758  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1759  if(hasSubProcesses()) {
1760  for(auto& subProcess : *subProcesses_) {
1761  subProcess.doBeginLuminosityBlock(lumiPrincipal, ts);
1762  }
1763  }
1764  }
1765  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1766  if(looper_) {
1767  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1768  }
1769  {
1770  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1771  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1772  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1773  if(hasSubProcesses()) {
1774  for(auto& subProcess : *subProcesses_) {
1775  subProcess.doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1776  }
1777  }
1778  }
1779  }
1780  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1781  if(looper_) {
1782  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1783  }
1784  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1629 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1629  {
1630  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1631  {
1632  SendSourceTerminationSignalIfException sentry(actReg_.get());
1633 
1634  input_->doBeginRun(runPrincipal, &processContext_);
1635  sentry.completedSuccessfully();
1636  }
1637 
1638  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1639  runPrincipal.beginTime());
1641  espController_->forceCacheClear();
1642  }
1643  {
1644  SendSourceTerminationSignalIfException sentry(actReg_.get());
1645  espController_->eventSetupForInstance(ts);
1646  sentry.completedSuccessfully();
1647  }
1648  EventSetup const& es = esp_->eventSetup();
1649  if(looper_ && looperBeginJobRun_== false) {
1650  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1651  looper_->beginOfJob(es);
1652  looperBeginJobRun_ = true;
1653  looper_->doStartingNewLoop();
1654  }
1655  {
1656  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1657  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1658  if(hasSubProcesses()) {
1659  for(auto& subProcess : *subProcesses_) {
1660  subProcess.doBeginRun(runPrincipal, ts);
1661  }
1662  }
1663  }
1664  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1665  if(looper_) {
1666  looper_->doBeginRun(runPrincipal, es, &processContext_);
1667  }
1668  {
1669  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1670  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1671  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1672  if(hasSubProcesses()) {
1673  for(auto& subProcess : *subProcesses_) {
1674  subProcess.doStreamBeginRun(i, runPrincipal, ts);
1675  }
1676  }
1677  }
1678  }
1679  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1680  if(looper_) {
1681  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1682  }
1683  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr<BranchIDListHelper const> edm::EventProcessor::branchIDListHelper ( ) const
inlineprivate

Definition at line 256 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr<BranchIDListHelper>& edm::EventProcessor::branchIDListHelper ( )
inlineprivate

Definition at line 257 of file EventProcessor.h.

References branchIDListHelper_, and edm::get_underlying_safe().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1290 of file EventProcessor.cc.

References edm::shutdown_flag.

1290  {
1291  bool returnValue = false;
1292 
1293  // Look for a shutdown signal
1294  if(shutdown_flag.load(std::memory_order_acquire)) {
1295  returnValue = true;
1296  returnCode = epSignal;
1297  }
1298  return returnValue;
1299  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1253 of file EventProcessor.cc.

1253  {
1254  schedule_->clearCounters();
1255  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1509 of file EventProcessor.cc.

References FDEBUG.

1509  {
1510  if (fb_.get() != nullptr) {
1511  SendSourceTerminationSignalIfException sentry(actReg_.get());
1512  input_->closeFile(fb_.get(), cleaningUpAfterException);
1513  sentry.completedSuccessfully();
1514  }
1515  FDEBUG(1) << "\tcloseInputFile\n";
1516  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1530 of file EventProcessor.cc.

References FDEBUG.

1530  {
1531  if (fb_.get() != nullptr) {
1532  schedule_->closeOutputFiles();
1533  if(hasSubProcesses()) {
1534  for(auto& subProcess : *subProcesses_) {
1535  subProcess.closeOutputFiles();
1536  }
1537  }
1538  }
1539  FDEBUG(1) << "\tcloseOutputFiles\n";
1540  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1259 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1259  {
1261  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1262  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1263  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1264  else {
1265  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1266  << fileMode_ << ".\n"
1267  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1268  }
1269 
1270  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1271  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1272  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1273  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1274  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1275  else {
1276  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1277  << emptyRunLumiMode_ << ".\n"
1278  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1279  }
1280 
1281  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1282  fileMode,
1283  emptyRunLumiMode));
1284 
1285  machine->initiate();
1286  return machine;
1287  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1929 of file EventProcessor.cc.

References FDEBUG.

1929  {
1931  if(hasSubProcesses()) {
1932  for(auto& subProcess : *subProcesses_) {
1933  subProcess.deleteLumiFromCache(phid, run, lumi);
1934  }
1935  }
1936  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1937  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
bool hasSubProcesses() const
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1909 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1909  {
1910  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1911  if(hasSubProcesses()) {
1912  for(auto& subProcess : *subProcesses_) {
1913  subProcess.deleteRunFromCache(run.processHistoryID(), run.runNumber());
1914  }
1915  }
1916  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1917  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
bool hasSubProcesses() const
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1618 of file EventProcessor.cc.

References FDEBUG.

1618  {
1619  FDEBUG(1) << "\tdoErrorStuff\n";
1620  LogError("StateMachine")
1621  << "The EventProcessor state machine encountered an unexpected event\n"
1622  << "and went to the error state\n"
1623  << "Will attempt to terminate processing normally\n"
1624  << "(IF using the looper the next loop will be attempted)\n"
1625  << "This likely indicates a bug in an input module or corrupted input or both\n";
1627  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1238 of file EventProcessor.cc.

1238  {
1239  schedule_->enableEndPaths(active);
1240  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 653 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcesses(), edm::ExceptionCollector::hasThrown(), i, input_, looper(), looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcesses_.

653  {
654  // Collects exceptions, so we don't throw before all operations are performed.
655  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
656 
657  //make the services available
659 
660  //NOTE: this really should go elsewhere in the future
661  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
662  c.call([this,i](){this->schedule_->endStream(i);});
663  if(hasSubProcesses()) {
664  for(auto& subProcess : *subProcesses_) {
665  c.call([&subProcess,i](){ subProcess.doEndStream(i); } );
666  }
667  }
668  }
669  auto actReg = actReg_.get();
670  c.call([actReg](){actReg->preEndJobSignal_();});
671  schedule_->endJob(c);
672  if(hasSubProcesses()) {
673  for(auto& subProcess : *subProcesses_) {
674  c.call(std::bind(&SubProcess::doEndJob, &subProcess));
675  }
676  }
677  c.call(std::bind(&InputSource::doEndJob, input_.get()));
678  if(looper_) {
679  c.call(std::bind(&EDLooperBase::endOfJob, looper()));
680  }
681  c.call([actReg](){actReg->postEndJobSignal_();});
682  if(c.hasThrown()) {
683  c.rethrow();
684  }
685  }
int i
Definition: DBlmapReader.cc:9
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:244
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
std::shared_ptr< ActivityRegistry > actReg_
std::shared_ptr< EDLooperBase const > looper() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1786 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, i, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

1786  {
1787  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1788  {
1789  SendSourceTerminationSignalIfException sentry(actReg_.get());
1790 
1791  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1792  sentry.completedSuccessfully();
1793  }
1794  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1795  // lumi blocks know their start and end times why not also start and end events?
1796  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1797  lumiPrincipal.endTime());
1798  {
1799  SendSourceTerminationSignalIfException sentry(actReg_.get());
1800  espController_->eventSetupForInstance(ts);
1801  sentry.completedSuccessfully();
1802  }
1803  EventSetup const& es = esp_->eventSetup();
1804  {
1805  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1806  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1807  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1808  if(hasSubProcesses()) {
1809  for(auto& subProcess : *subProcesses_) {
1810  subProcess.doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1811  }
1812  }
1813  }
1814  }
1815  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1816  if(looper_) {
1817  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1818  }
1819  {
1820  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1821  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1822  if(hasSubProcesses()) {
1823  for(auto& subProcess : *subProcesses_) {
1824  subProcess.doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1825  }
1826  }
1827  }
1828  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1829  if(looper_) {
1830  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1831  }
1832  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1581 of file EventProcessor.cc.

References FDEBUG, and mps_update::status.

1581  {
1582  if(looper_) {
1583  ModuleChanger changer(schedule_.get(),preg_.get());
1584  looper_->setModuleChanger(&changer);
1585  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1586  looper_->setModuleChanger(nullptr);
1587  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1588  else return false;
1589  }
1590  FDEBUG(1) << "\tendOfLoop\n";
1591  return true;
1592  }
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
tuple status
Definition: mps_update.py:57
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1243 of file EventProcessor.cc.

1243  {
1244  return schedule_->endPathsEnabled();
1245  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1685 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1685  {
1686  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1687  {
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689 
1690  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1691  sentry.completedSuccessfully();
1692  }
1693 
1695  runPrincipal.endTime());
1696  {
1697  SendSourceTerminationSignalIfException sentry(actReg_.get());
1698  espController_->eventSetupForInstance(ts);
1699  sentry.completedSuccessfully();
1700  }
1701  EventSetup const& es = esp_->eventSetup();
1702  {
1703  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1704  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1705  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1706  if(hasSubProcesses()) {
1707  for(auto& subProcess : *subProcesses_) {
1708  subProcess.doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1709  }
1710  }
1711  }
1712  }
1713  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1714  if(looper_) {
1715  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1716  }
1717  {
1718  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1719  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1720  if(hasSubProcesses()) {
1721  for(auto& subProcess : *subProcesses_) {
1722  subProcess.doEndRun(runPrincipal, ts, cleaningUpAfterException);
1723  }
1724  }
1725  }
1726  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1727  if(looper_) {
1728  looper_->doEndRun(runPrincipal, es, &processContext_);
1729  }
1730  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 917 of file EventProcessor.cc.

References assert(), bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

917  {
918 
919  if(0 == numberOfForkedChildren_) {return true;}
921  //do what we want done in common
922  {
923  beginJob(); //make sure this was run
924  // make the services available
926 
927  InputSource::ItemType itemType;
928  itemType = input_->nextItemType();
929 
930  assert(itemType == InputSource::IsFile);
931  {
932  readFile();
933  }
934  itemType = input_->nextItemType();
935  assert(itemType == InputSource::IsRun);
936 
937  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
938  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
939  input_->runAuxiliary()->beginTime());
940  espController_->eventSetupForInstance(ts);
941  EventSetup const& es = esp_->eventSetup();
942 
943  //now get all the data available in the EventSetup
944  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
945  es.fillAvailableRecordKeys(recordKeys);
946  std::vector<eventsetup::DataKey> dataKeys;
947  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
948  itKey != itEnd;
949  ++itKey) {
950  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
951  //see if this is on our exclusion list
952  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
953  ExcludedData const* excludedData(nullptr);
954  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
955  excludedData = &(itExcludeRec->second);
956  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
957  //skip all items in this record
958  continue;
959  }
960  }
961  if(0 != recordPtr) {
962  dataKeys.clear();
963  recordPtr->fillRegisteredDataKeys(dataKeys);
964  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
965  itDataKey != itDataKeyEnd;
966  ++itDataKey) {
967  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
968  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
969  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
970  continue;
971  }
972  try {
973  recordPtr->doGet(*itDataKey);
974  } catch(cms::Exception& e) {
975  LogWarning("ForkingEventSetupPreFetching") << e.what();
976  }
977  }
978  }
979  }
980  }
981  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
982  {
983  // make the services available
985  Service<JobReport> jobReport;
986  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
987 
988  //Now actually do the forking
989  actReg_->preForkReleaseResourcesSignal_();
990  input_->doPreForkReleaseResources();
991  schedule_->preForkReleaseResources();
992  }
993  installCustomHandler(SIGCHLD, ep_sigchld);
994 
995 
996  unsigned int childIndex = 0;
997  unsigned int const kMaxChildren = numberOfForkedChildren_;
998  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
999  std::vector<pid_t> childrenIds;
1000  childrenIds.reserve(kMaxChildren);
1001  std::vector<int> childrenSockets;
1002  childrenSockets.reserve(kMaxChildren);
1003  std::vector<int> childrenPipes;
1004  childrenPipes.reserve(kMaxChildren);
1005  std::vector<int> childrenSocketsCopy;
1006  childrenSocketsCopy.reserve(kMaxChildren);
1007  std::vector<int> childrenPipesCopy;
1008  childrenPipesCopy.reserve(kMaxChildren);
1009  int pipes[] {0, 0};
1010 
1011  {
1012  // make the services available
1014  Service<JobReport> jobReport;
1015  int sockets[2], fd_flags;
1016  for(; childIndex < kMaxChildren; ++childIndex) {
1017  // Create a UNIX_DGRAM socket pair
1018  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1019  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022  if (pipe(pipes)) {
1023  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1027  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1028  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1029  exit(EXIT_FAILURE);
1030  }
1031  // Mark socket as non-block. Child must be careful to do select prior
1032  // to reading from socket.
1033  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1034  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1035  exit(EXIT_FAILURE);
1036  }
1037  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1038  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1039  exit(EXIT_FAILURE);
1040  }
1041  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1042  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1043  exit(EXIT_FAILURE);
1044  }
1045  // Linux man page notes there are some edge cases where reading from a
1046  // fd can block, even after a select.
1047  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1048  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1049  exit(EXIT_FAILURE);
1050  }
1051  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1052  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1053  exit(EXIT_FAILURE);
1054  }
1055 
1056  childrenPipesCopy = childrenPipes;
1057  childrenSocketsCopy = childrenSockets;
1058 
1059  pid_t value = fork();
1060  if(value == 0) {
1061  // Close the parent's side of the socket and pipe which will talk to us.
1062  close(pipes[0]);
1063  close(sockets[0]);
1064  // Close our copies of the parent's other communication pipes.
1065  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1066  close(*it);
1067  }
1068  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1069  close(*it);
1070  }
1071 
1072  // this is the child process, redirect stdout and stderr to a log file
1073  fflush(stdout);
1074  fflush(stderr);
1075  std::stringstream stout;
1076  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1077  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1078  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1079  }
1080  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1081  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1082  }
1083 
1084  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1085  if(setCpuAffinity_) {
1086  // CPU affinity is handled differently on macosx.
1087  // We disable it and print a message until someone reads:
1088  //
1089  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1090  //
1091  // and implements it.
1092 #ifdef __APPLE__
1093  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1094 #else
1095  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1096  cpu_set_t mask;
1097  CPU_ZERO(&mask);
1098  CPU_SET(childIndex, &mask);
1099  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1100  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1101  exit(-1);
1102  }
1103 #endif
1104  }
1105  break;
1106  } else {
1107  //this is the parent
1108  close(pipes[1]);
1109  close(sockets[1]);
1110  }
1111  if(value < 0) {
1112  LogError("ForkingChild") << "failed to create a child";
1113  exit(-1);
1114  }
1115  childrenIds.push_back(value);
1116  childrenSockets.push_back(sockets[0]);
1117  childrenPipes.push_back(pipes[0]);
1118  }
1119 
1120  if(childIndex < kMaxChildren) {
1121  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1122  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1123 
1124  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1125  input_->doPostForkReacquireResources(receiver);
1126  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1127  //NOTE: sources have to reset themselves by listening to the post fork message
1128  //rewindInput();
1129  return true;
1130  }
1131  jobReport->parentAfterFork(jobReportFile);
1132  }
1133 
1134  //this is the original, which is now the master for all the children
1135 
1136  //Need to wait for signals from the children or externally
1137  // To wait we must
1138  // 1) block the signals we want to wait on so we do not have a race condition
1139  // 2) check that we haven't already meet our ending criteria
1140  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1141  sigset_t blockingSigSet;
1142  sigset_t unblockingSigSet;
1143  sigset_t oldSigSet;
1144  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1145  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1146  sigaddset(&blockingSigSet, SIGCHLD);
1147  sigaddset(&blockingSigSet, SIGUSR2);
1148  sigaddset(&blockingSigSet, SIGINT);
1149  sigdelset(&unblockingSigSet, SIGCHLD);
1150  sigdelset(&unblockingSigSet, SIGUSR2);
1151  sigdelset(&unblockingSigSet, SIGINT);
1152  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1153 
1154  // If there are too many fd's (unlikely, but possible) for select, denote this
1155  // because the sender will fail.
1156  bool too_many_fds = false;
1157  if (pipes[1]+1 > FD_SETSIZE) {
1158  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1159  too_many_fds = true;
1160  }
1161 
1162  //create a thread that sends the units of work to workers
1163  // we create it after all signals were blocked so that this
1164  // thread is never interupted by a signal
1165  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1166  boost::thread senderThread(sender);
1167 
1168  if(not too_many_fds) {
1169  //NOTE: a child could have failed before we got here and even after this call
1170  // which is why the 'if' is conditional on continueAfterChildFailure_
1172  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1173  sigsuspend(&unblockingSigSet);
1175  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1176  }
1177  }
1178  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1179 
1180  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1181  if(child_failed) {
1182  LogError("ForkingStopping") << "child failed";
1183  }
1184  if(shutdown_flag) {
1185  LogSystem("ForkingStopping") << "asked to shutdown";
1186  }
1187 
1188  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1189  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1190  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1191  it != itEnd; ++it) {
1192  /* int result = */ kill(*it, SIGUSR2);
1193  }
1194  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1195  while(num_children_done != kMaxChildren) {
1196  sigsuspend(&unblockingSigSet);
1197  }
1198  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1199  }
1200  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1201  senderThread.join();
1202  if(child_failed && !continueAfterChildFailure_) {
1203  if (child_fail_signal) {
1204  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1205  } else if (child_fail_exit_status) {
1206  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1207  } else {
1208  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1209  }
1210  }
1211  if(too_many_fds) {
1212  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1213  }
1214  return false;
1215  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
void possiblyContinueAfterForkChildFailure()
assert(m_qm.get())
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1218 of file EventProcessor.cc.

1218  {
1219  return schedule_->getAllModuleDescriptions();
1220  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 688 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

688  {
689  return serviceToken_;
690  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1248 of file EventProcessor.cc.

1248  {
1249  schedule_->getTriggerReport(rep);
1250  }
string rep
Definition: cuy.py:1188
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool edm::EventProcessor::hasSubProcesses ( ) const
inlineprivate

Definition at line 234 of file EventProcessor.h.

References subProcesses_.

Referenced by beginJob(), endJob(), and init().

234  {
235  return subProcesses_.get() != nullptr && !subProcesses_->empty();
236  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 395 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper(), branchIDListHelper(), branchIDListHelper_, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), hasSubProcesses(), historyAppender_, cmsHarvester::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), mps_monitormerge::items, edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessVParameterSet(), preallocations_, edm::ScheduleItems::preg(), preg(), preg_, principalCache_, edm::ScheduleItems::processConfiguration(), processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcesses_, edm::ScheduleItems::thinnedAssociationsHelper(), thinnedAssociationsHelper(), thinnedAssociationsHelper_, and unpackBuffers-CaloStage2::token.

Referenced by EventProcessor().

397  {
398 
399  //std::cerr << processDesc->dump() << std::endl;
400 
401  // register the empty parentage vector , once and for all
403 
404  // register the empty parameter set, once and for all.
405  ParameterSet().registerIt();
406 
407  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
408 
409  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
410  std::unique_ptr<std::vector<ParameterSet> > subProcessVParameterSet(popSubProcessVParameterSet(*parameterSet));
411  bool hasSubProcesses = subProcessVParameterSet.get() != nullptr;
412 
413  // Now set some parameters specific to the main process.
414  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
415  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
416  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
417  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
418  //threading
419  unsigned int nThreads=1;
420  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
421  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
422  if(nThreads == 0) {
423  nThreads = 1;
424  }
425  }
426  /* TODO: when we support having each stream run in a different thread use this default
427  unsigned int nStreams =nThreads;
428  */
429  unsigned int nStreams =1;
430  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
431  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
432  if(nStreams==0) {
433  nStreams = nThreads;
434  }
435  }
436  /*
437  bool nRunsSet = false;
438  */
439  unsigned int nConcurrentRuns =1;
440  /*
441  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
442  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
443  }
444  */
445  unsigned int nConcurrentLumis =1;
446  /*
447  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
448  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
449  } else {
450  nConcurrentLumis = nConcurrentRuns;
451  }
452  */
453  //Check that relationships between threading parameters makes sense
454  /*
455  if(nThreads<nStreams) {
456  //bad
457  }
458  if(nConcurrentRuns>nStreams) {
459  //bad
460  }
461  if(nConcurrentRuns>nConcurrentLumis) {
462  //bad
463  }
464  */
465  //forking
466  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
467  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
468  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
469  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
470  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
471  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
472  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
473  itPS != itPSEnd;
474  ++itPS) {
475  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
476  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
477  itPS->getUntrackedParameter<std::string>("label", "")));
478  }
479  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
480 
481  // Now do general initialization
482  ScheduleItems items;
483 
484  //initialize the services
485  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
486  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
487  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
488 
489  //make the services available
491 
492  if(nStreams>1) {
494  handler->willBeUsingThreads();
495  }
496 
497  // intialize miscellaneous items
498  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
499 
500  // intialize the event setup provider
501  esp_ = espController_->makeProvider(*parameterSet);
502 
503  // initialize the looper, if any
504  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
505  if(looper_) {
506  looper_->setActionTable(items.act_table_.get());
507  looper_->attachTo(*items.actReg_);
508 
509  //For now loopers make us run only 1 transition at a time
510  nStreams=1;
511  nConcurrentLumis=1;
512  nConcurrentRuns=1;
513  }
514 
515  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
516 
517  // initialize the input source
518  input_ = makeInput(*parameterSet,
519  *common,
520  items.preg(),
521  items.branchIDListHelper(),
522  items.thinnedAssociationsHelper(),
523  items.actReg_,
524  items.processConfiguration(),
526 
527  // intialize the Schedule
528  schedule_ = items.initSchedule(*parameterSet,hasSubProcesses,preallocations_,&processContext_);
529 
530  // set the data members
531  act_table_ = std::move(items.act_table_);
532  actReg_ = items.actReg_;
533  preg_ = items.preg();
534  branchIDListHelper_ = items.branchIDListHelper();
535  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
536  processConfiguration_ = items.processConfiguration();
538  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
539 
540  FDEBUG(2) << parameterSet << std::endl;
541 
543  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
544  // Reusable event principal
545  auto ep = std::make_shared<EventPrincipal>(preg(), branchIDListHelper(),
547  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
548  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
550  }
551  // initialize the subprocess, if there is one
552  if(hasSubProcesses) {
553  if(subProcesses_ == nullptr) {
554  subProcesses_ = std::make_unique<std::vector<SubProcess> >();
555  }
556  subProcesses_->reserve(subProcessVParameterSet->size());
557  for(auto& subProcessPSet : *subProcessVParameterSet) {
558  subProcesses_->emplace_back(subProcessPSet,
559  *parameterSet,
560  preg(),
564  *actReg_,
565  token,
568  &processContext_);
569  }
570  }
571  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::unique_ptr< std::vector< ParameterSet > > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:640
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, std::shared_ptr< ProductRegistry > preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
ServiceToken serviceToken_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:510
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasSubProcesses() const
std::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< ActivityRegistry > actReg_
static ParentageRegistry * instance()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
std::shared_ptr<EDLooperBase const> edm::EventProcessor::looper ( ) const
inlineprivate

Definition at line 260 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

Referenced by endJob().

260 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<EDLooperBase>& edm::EventProcessor::looper ( )
inlineprivate

Definition at line 261 of file EventProcessor.h.

References edm::get_underlying_safe(), and looper_.

261 {return get_underlying_safe(looper_);}
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1518 of file EventProcessor.cc.

References FDEBUG.

1518  {
1519  if (fb_.get() != nullptr) {
1520  schedule_->openOutputFiles(*fb_);
1521  if(hasSubProcesses()) {
1522  for(auto& subProcess : *subProcesses_) {
1523  subProcess.openOutputFiles(*fb_);
1524  }
1525  }
1526  }
1527  FDEBUG(1) << "\topenOutputFiles\n";
1528  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 901 of file EventProcessor.cc.

901  {
902  if(child_failed && continueAfterChildFailure_) {
903  if (child_fail_signal) {
904  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
905  child_fail_signal=0;
906  } else if (child_fail_exit_status) {
907  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
908  child_fail_exit_status=0;
909  } else {
910  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
911  }
912  child_failed =false;
913  }
914  }
std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg ( ) const
inlineprivate

Definition at line 254 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

Referenced by beginJob(), and init().

254 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr<ProductRegistry>& edm::EventProcessor::preg ( )
inlineprivate

Definition at line 255 of file EventProcessor.h.

References edm::get_underlying_safe(), and preg_.

255 {return get_underlying_safe(preg_);}
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1600 of file EventProcessor.cc.

References FDEBUG.

1600  {
1601  looper_->prepareForNextLoop(esp_.get());
1602  FDEBUG(1) << "\tprepareForNextLoop\n";
1603  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
element_type const * get() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
ProcessConfiguration const& edm::EventProcessor::processConfiguration ( ) const
inline

Definition at line 120 of file EventProcessor.h.

References processConfiguration_.

120 { return *processConfiguration_; }
std::shared_ptr< ProcessConfiguration const > processConfiguration_
void edm::EventProcessor::processEvent ( unsigned int  iStreamIndex)
private

Definition at line 2080 of file EventProcessor.cc.

References assert(), ev, FDEBUG, edm::Service< T >::isAvailable(), edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), mps_update::status, and summarizeEdmComparisonLogfiles::succeeded.

2080  {
2081  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
2082  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2084  if(rng.isAvailable()) {
2085  Event ev(*pep, ModuleDescription(), nullptr);
2086  rng->postEventRead(ev);
2087  }
2088  assert(pep->luminosityBlockPrincipalPtrValid());
2089  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2090  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2091 
2092  //We can only update IOVs on Lumi boundaries
2093  //IOVSyncValue ts(pep->id(), pep->time());
2094  //espController_->eventSetupForInstance(ts);
2095  EventSetup const& es = esp_->eventSetup();
2096  {
2097  typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Traits;
2098  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2099  if(hasSubProcesses()) {
2100  for(auto& subProcess : *subProcesses_) {
2101  subProcess.doEvent(*pep);
2102  }
2103  }
2104  }
2105 
2106  //NOTE: If we have a looper we only have one Stream
2107  if(looper_) {
2108  bool randomAccess = input_->randomAccess();
2109  ProcessingController::ForwardState forwardState = input_->forwardState();
2110  ProcessingController::ReverseState reverseState = input_->reverseState();
2111  ProcessingController pc(forwardState, reverseState, randomAccess);
2112 
2114  do {
2115 
2116  StreamContext streamContext(pep->streamID(), &processContext_);
2117  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2118 
2119  bool succeeded = true;
2120  if(randomAccess) {
2121  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2122  input_->skipEvents(-2);
2123  }
2124  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2125  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2126  }
2127  }
2128  pc.setLastOperationSucceeded(succeeded);
2129  } while(!pc.lastOperationSucceeded());
2130  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2131 
2132  }
2133 
2134  FDEBUG(1) << "\tprocessEvent\n";
2135  pep->clearEventPrincipal();
2136  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(m_qm.get())
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
Definition: Event.h:16
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
PrincipalCache principalCache_
tuple status
Definition: mps_update.py:57
void edm::EventProcessor::processEventsForStreamAsync ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1962 of file EventProcessor.cc.

References cmsPerfStripChart::operate(), and processEvent().

1963  {
1964  try {
1965  // make the services available
1969  handler->initializeThisThreadForUse();
1970  }
1971 
1972  if(iStreamIndex==0) {
1973  processEvent(0);
1974  }
1975  do {
1976  if(shouldWeStop()) {
1977  break;
1978  }
1979  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1980  //another thread hit an exception
1981  //std::cerr<<"another thread saw an exception\n";
1982  break;
1983  }
1984  {
1985 
1986 
1987  {
1988  //nextItemType and readEvent need to be in same critical section
1989  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1990 
1991  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1992  //std::cerr<<"finishedProcessingEvents\n";
1993  break;
1994  }
1995 
1996  //If source and DelayedReader share a resource we must serialize them
1997  auto sr = input_->resourceSharedWithDelayedReader();
1998  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1999  if(sr) {
2000  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
2001  }
2002  InputSource::ItemType itemType = input_->nextItemType();
2003  if (InputSource::IsEvent !=itemType) {
2005  finishedProcessingEvents->store(true,std::memory_order_release);
2006  //std::cerr<<"next item type "<<itemType<<"\n";
2007  break;
2008  }
2010  //std::cerr<<"task told to async stop\n";
2011  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
2012  break;
2013  }
2014  readEvent(iStreamIndex);
2015  }
2016  }
2017  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
2018  //another thread hit an exception
2019  //std::cerr<<"another thread saw an exception\n";
2020  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
2021 
2022  break;
2023  }
2024  processEvent(iStreamIndex);
2025  }while(true);
2026  } catch (...) {
2027  bool expected =false;
2028  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
2029  deferredExceptionPtr_ = std::current_exception();
2030  }
2031  //std::cerr<<"task caught exception\n";
2032  }
2033  }
void readEvent(unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::mutex nextTransitionMutex_
virtual bool shouldWeStop() const
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1889 of file EventProcessor.cc.

References edm::preg.

1889  {
1890  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg());
1891  {
1892  SendSourceTerminationSignalIfException sentry(actReg_.get());
1893  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1894  sentry.completedSuccessfully();
1895  }
1896  return input_->luminosityBlock();
1897  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1852 of file EventProcessor.cc.

References assert(), edm::preg, and PDRates::Run.

1852  {
1853  principalCache_.merge(input_->runAuxiliary(), preg());
1854  auto runPrincipal =principalCache_.runPrincipalPtr();
1855  {
1856  SendSourceTerminationSignalIfException sentry(actReg_.get());
1857  input_->readAndMergeRun(*runPrincipal);
1858  sentry.completedSuccessfully();
1859  }
1860  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1861  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1862  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
assert(m_qm.get())
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2035 of file EventProcessor.cc.

References pyrootRender::destroy(), and processEvent().

2035  {
2036  if(numberOfForkedChildren_>0) {
2037  readEvent(0);
2038  processEvent(0);
2039  return;
2040  }
2043 
2044  std::atomic<bool> finishedProcessingEvents{false};
2045 
2046  //Task assumes Stream 0 has already read the event that caused us to go here
2047  readEvent(0);
2048 
2049  //To wait, the ref count has to b 1+#streams
2050  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
2051  eventLoopWaitTask->increment_ref_count();
2052 
2053  const unsigned int kNumStreams = preallocations_.numberOfStreams();
2054  unsigned int iStreamIndex = 0;
2055  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
2056  eventLoopWaitTask->increment_ref_count();
2057  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
2058 
2059  }
2060  eventLoopWaitTask->increment_ref_count();
2061  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
2062  tbb::task::destroy(*eventLoopWaitTask);
2063 
2064  //One of the processing threads saw an exception
2066  std::rethrow_exception(deferredExceptionPtr_);
2067  }
2068  }
void readEvent(unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
friend class StreamProcessingTask
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 2069 of file EventProcessor.cc.

References event(), and FDEBUG.

2069  {
2070  //TODO this will have to become per stream
2071  auto& event = principalCache_.eventPrincipal(iStreamIndex);
2072  StreamContext streamContext(event.streamID(), &processContext_);
2073 
2074  SendSourceTerminationSignalIfException sentry(actReg_.get());
2075  input_->readEvent(event, streamContext);
2076  sentry.completedSuccessfully();
2077 
2078  FDEBUG(1) << "\treadEvent\n";
2079  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1491 of file EventProcessor.cc.

References FDEBUG, or, edm::preg, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1491  {
1492  FDEBUG(1) << " \treadFile\n";
1493  size_t size = preg_->size();
1494  SendSourceTerminationSignalIfException sentry(actReg_.get());
1495 
1496  fb_ = input_->readFile();
1497  if(size < preg_->size()) {
1499  }
1501  if((numberOfForkedChildren_ > 0) or
1504  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1505  }
1506  sentry.completedSuccessfully();
1507  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::shared_ptr< ProductRegistry const > preg() const
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1864 of file EventProcessor.cc.

References Exception, edm::errors::LogicError, and edm::preg.

1864  {
1867  << "EventProcessor::readRun\n"
1868  << "Illegal attempt to insert lumi into cache\n"
1869  << "Contact a Framework Developer\n";
1870  }
1873  << "EventProcessor::readRun\n"
1874  << "Illegal attempt to insert lumi into cache\n"
1875  << "Run is invalid\n"
1876  << "Contact a Framework Developer\n";
1877  }
1878  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1879  {
1880  SendSourceTerminationSignalIfException sentry(actReg_.get());
1881  input_->readLuminosityBlock(*lbp, *historyAppender_);
1882  sentry.completedSuccessfully();
1883  }
1884  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1885  principalCache_.insert(lbp);
1886  return input_->luminosityBlock();
1887  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasLumiPrincipal() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1834 of file EventProcessor.cc.

References assert(), Exception, edm::errors::LogicError, edm::preg, and PDRates::Run.

1834  {
1837  << "EventProcessor::readRun\n"
1838  << "Illegal attempt to insert run into cache\n"
1839  << "Contact a Framework Developer\n";
1840  }
1841  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg(), *processConfiguration_, historyAppender_.get(), 0);
1842  {
1843  SendSourceTerminationSignalIfException sentry(actReg_.get());
1844  input_->readRun(*rp, *historyAppender_);
1845  sentry.completedSuccessfully();
1846  }
1847  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1848  principalCache_.insert(rp);
1849  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1850  }
void insert(std::shared_ptr< RunPrincipal > rp)
edm::propagate_const< std::unique_ptr< InputSource > > input_
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
assert(m_qm.get())
bool hasRunPrincipal() const
std::shared_ptr< ProductRegistry const > preg() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1559 of file EventProcessor.cc.

References FDEBUG.

1559  {
1560  if (fb_.get() != nullptr) {
1561  schedule_->respondToCloseInputFile(*fb_);
1562  if(hasSubProcesses()) {
1563  for(auto& subProcess : *subProcesses_) {
1564  subProcess.respondToCloseInputFile(*fb_);
1565  }
1566  }
1567  }
1568  FDEBUG(1) << "\trespondToCloseInputFile\n";
1569  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1542 of file EventProcessor.cc.

References FDEBUG.

1542  {
1543  if(hasSubProcesses()) {
1544  for(auto& subProcess : *subProcesses_) {
1545  subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1546  }
1547  }
1548  if (fb_.get() != nullptr) {
1549  schedule_->respondToOpenInputFile(*fb_);
1550  if(hasSubProcesses()) {
1551  for(auto& subProcess : *subProcesses_) {
1552  subProcess.respondToOpenInputFile(*fb_);
1553  }
1554  }
1555  }
1556  FDEBUG(1) << "\trespondToOpenInputFile\n";
1557  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1594 of file EventProcessor.cc.

References FDEBUG.

1594  {
1595  input_->repeat();
1596  input_->rewind();
1597  FDEBUG(1) << "\trewind\n";
1598  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 327 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

327  {
328  return runToCompletion();
329  }
virtual StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1303 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), bk::beginJob(), alignCSCRings::e, Exception, FDEBUG, edm::errors::LogicError, cmsPerfStripChart::operate(), edm::preg, runEdmFileComparison::returnCode, findQualityFiles::size, and edm::convertException::wrap().

Referenced by run().

1303  {
1304 
1307  std::auto_ptr<statemachine::Machine> machine;
1308  {
1309  beginJob(); //make sure this was called
1310 
1311  //StatusCode returnCode = epSuccess;
1313 
1314  // make the services available
1316 
1317  machine = createStateMachine();
1320  try {
1321  convertException::wrap([&]() {
1322 
1323  InputSource::ItemType itemType;
1324 
1325  while(true) {
1326 
1327  bool more = true;
1328  if(numberOfForkedChildren_ > 0) {
1329  size_t size = preg_->size();
1330  {
1331  SendSourceTerminationSignalIfException sentry(actReg_.get());
1332  more = input_->skipForForking();
1333  sentry.completedSuccessfully();
1334  }
1335  if(more) {
1336  if(size < preg_->size()) {
1338  }
1340  }
1341  }
1342  {
1343  SendSourceTerminationSignalIfException sentry(actReg_.get());
1344  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1345  sentry.completedSuccessfully();
1346  }
1347 
1348  FDEBUG(1) << "itemType = " << itemType << "\n";
1349 
1350  if(checkForAsyncStopRequest(returnCode)) {
1351  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1352  forceLooperToEnd_ = true;
1353  machine->process_event(statemachine::Stop());
1354  forceLooperToEnd_ = false;
1355  break;
1356  }
1357 
1358  if(itemType == InputSource::IsEvent) {
1359  machine->process_event(statemachine::Event());
1361  forceLooperToEnd_ = true;
1362  machine->process_event(statemachine::Stop());
1363  forceLooperToEnd_ = false;
1365  break;
1366  }
1368  }
1369 
1370  if(itemType == InputSource::IsEvent) {
1371  }
1372  else if(itemType == InputSource::IsStop) {
1373  machine->process_event(statemachine::Stop());
1374  }
1375  else if(itemType == InputSource::IsFile) {
1376  machine->process_event(statemachine::File());
1377  }
1378  else if(itemType == InputSource::IsRun) {
1379  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1380  }
1381  else if(itemType == InputSource::IsLumi) {
1382  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1383  }
1384  else if(itemType == InputSource::IsSynchronize) {
1385  //For now, we don't have to do anything
1386  }
1387  // This should be impossible
1388  else {
1390  << "Unknown next item type passed to EventProcessor\n"
1391  << "Please report this error to the Framework group\n";
1392  }
1393  if(machine->terminated()) {
1394  break;
1395  }
1396  } // End of loop over state machine events
1397  }); // convertException::wrap
1398  } // Try block
1399  // Some comments on exception handling related to the boost state machine:
1400  //
1401  // Some states used in the machine are special because they
1402  // perform actions while the machine is being terminated, actions
1403  // such as close files, call endRun, call endLumi etc ... Each of these
1404  // states has two functions that perform these actions. The functions
1405  // are almost identical. The major difference is that one version
1406  // catches all exceptions and the other lets exceptions pass through.
1407  // The destructor catches them and the other function named "exit" lets
1408  // them pass through. On a normal termination, boost will always call
1409  // "exit" and then the state destructor. In our state classes, the
1410  // the destructors do nothing if the exit function already took
1411  // care of things. Here's the interesting part. When boost is
1412  // handling an exception the "exit" function is not called (a boost
1413  // feature).
1414  //
1415  // If an exception occurs while the boost machine is in control
1416  // (which usually means inside a process_event call), then
1417  // the boost state machine destroys its states and "terminates" itself.
1418  // This already done before we hit the catch blocks below. In this case
1419  // the call to terminateMachine below only destroys an already
1420  // terminated state machine. Because exit is not called, the state destructors
1421  // handle cleaning up lumis, runs, and files. The destructors swallow
1422  // all exceptions and only pass through the exceptions messages, which
1423  // are tacked onto the original exception below.
1424  //
1425  // If an exception occurs when the boost state machine is not
1426  // in control (outside the process_event functions), then boost
1427  // cannot destroy its own states. The terminateMachine function
1428  // below takes care of that. The flag "alreadyHandlingException"
1429  // is set true so that the state exit functions do nothing (and
1430  // cannot throw more exceptions while handling the first). Then the
1431  // state destructors take care of this because exit did nothing.
1432  //
1433  // In both cases above, the EventProcessor::endOfLoop function is
1434  // not called because it can throw exceptions.
1435  //
1436  // One tricky aspect of the state machine is that things that can
1437  // throw should not be invoked by the state machine while another
1438  // exception is being handled.
1439  // Another tricky aspect is that it appears to be important to
1440  // terminate the state machine before invoking its destructor.
1441  // We've seen crashes that are not understood when that is not
1442  // done. Maintainers of this code should be careful about this.
1443 
1444  catch (cms::Exception & e) {
1446  terminateMachine(machine);
1447  alreadyHandlingException_ = false;
1448  if (!exceptionMessageLumis_.empty()) {
1450  if (e.alreadyPrinted()) {
1451  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1452  }
1453  }
1454  if (!exceptionMessageRuns_.empty()) {
1456  if (e.alreadyPrinted()) {
1457  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1458  }
1459  }
1460  if (!exceptionMessageFiles_.empty()) {
1462  if (e.alreadyPrinted()) {
1463  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1464  }
1465  }
1466  throw;
1467  }
1468 
1469  if(machine->terminated()) {
1470  FDEBUG(1) << "The state machine reports it has been terminated\n";
1471  machine.reset();
1472  }
1473 
1475  throw cms::Exception("BadState")
1476  << "The boost state machine in the EventProcessor exited after\n"
1477  << "entering the Error state.\n";
1478  }
1479 
1480  }
1481  if(machine.get() != 0) {
1482  terminateMachine(machine);
1484  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1485  << "Please report this error to the Framework group\n";
1486  }
1487 
1488  return returnCode;
1489  }
edm::propagate_const< std::unique_ptr< InputSource > > input_
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::auto_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
ServiceToken serviceToken_
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
StatusCode asyncStopStatusCodeFromProcessingEvents_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2152 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2152  {
2154  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2160 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2160  {
2162  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2156 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2156  {
2158  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1605 of file EventProcessor.cc.

References FDEBUG.

1605  {
1606  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1607  if(hasSubProcesses()) {
1608  for(auto const& subProcess : *subProcesses_) {
1609  if(subProcess.shouldWeCloseOutput()) {
1610  return true;
1611  }
1612  }
1613  return false;
1614  }
1615  return schedule_->shouldWeCloseOutput();
1616  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2138 of file EventProcessor.cc.

References FDEBUG.

2138  {
2139  FDEBUG(1) << "\tshouldWeStop\n";
2140  if(shouldWeStop_) return true;
2141  if(hasSubProcesses()) {
2142  for(auto const& subProcess : *subProcesses_) {
2143  if(subProcess.terminate()) {
2144  return true;
2145  }
2146  }
2147  return false;
2148  }
2149  return schedule_->terminate();
2150  }
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1571 of file EventProcessor.cc.

References FDEBUG.

1571  {
1572  shouldWeStop_ = false;
1573  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1574  // until after we've called beginOfJob
1575  if(looper_ && looperBeginJobRun_) {
1576  looper_->doStartingNewLoop();
1577  }
1578  FDEBUG(1) << "\tstartingNewLoop\n";
1579  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine)
private

Definition at line 2168 of file EventProcessor.cc.

References FDEBUG.

2168  {
2169  if(iMachine.get() != 0) {
2170  if(!iMachine->terminated()) {
2171  forceLooperToEnd_ = true;
2172  iMachine->process_event(statemachine::Stop());
2173  forceLooperToEnd_ = false;
2174  }
2175  else {
2176  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2177  }
2178  if(iMachine->terminated()) {
2179  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2180  }
2181  iMachine.reset();
2182  }
2183  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr<ThinnedAssociationsHelper const> edm::EventProcessor::thinnedAssociationsHelper ( ) const
inlineprivate

Definition at line 258 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

Referenced by init().

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr<ThinnedAssociationsHelper>& edm::EventProcessor::thinnedAssociationsHelper ( )
inlineprivate

Definition at line 259 of file EventProcessor.h.

References edm::get_underlying_safe(), and thinnedAssociationsHelper_.

std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1223 of file EventProcessor.cc.

1223  {
1224  return schedule_->totalEvents();
1225  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1233 of file EventProcessor.cc.

1233  {
1234  return schedule_->totalEventsFailed();
1235  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1228 of file EventProcessor.cc.

1228  {
1229  return schedule_->totalEventsPassed();
1230  }
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1919 of file EventProcessor.cc.

References FDEBUG.

1919  {
1921  if(hasSubProcesses()) {
1922  for(auto& subProcess : *subProcesses_) {
1923  subProcess.writeLumi(phid, run, lumi);
1924  }
1925  }
1926  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1927  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1899 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1899  {
1900  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1901  if(hasSubProcesses()) {
1902  for(auto& subProcess : *subProcesses_) {
1903  subProcess.writeRun(run.processHistoryID(), run.runNumber());
1904  }
1905  }
1906  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1907  }
ProcessContext processContext_
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
bool hasSubProcesses() const
PrincipalCache principalCache_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class StreamProcessingTask
friend

Definition at line 240 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 269 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 302 of file EventProcessor.h.

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 314 of file EventProcessor.h.

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 316 of file EventProcessor.h.

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 294 of file EventProcessor.h.

Referenced by beginJob().

edm::propagate_const<std::shared_ptr<BranchIDListHelper> > edm::EventProcessor::branchIDListHelper_
private

Definition at line 271 of file EventProcessor.h.

Referenced by branchIDListHelper(), and init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 310 of file EventProcessor.h.

Referenced by init().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 290 of file EventProcessor.h.

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 289 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 298 of file EventProcessor.h.

Referenced by init().

edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider> > edm::EventProcessor::esp_
private

Definition at line 276 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController> > edm::EventProcessor::espController_
private

Definition at line 275 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 320 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 299 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 301 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 300 of file EventProcessor.h.

edm::propagate_const<std::unique_ptr<FileBlock> > edm::EventProcessor::fb_
private

Definition at line 285 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 297 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 305 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 303 of file EventProcessor.h.

edm::propagate_const<std::unique_ptr<HistoryAppender> > edm::EventProcessor::historyAppender_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

edm::propagate_const<std::unique_ptr<InputSource> > edm::EventProcessor::input_
private

Definition at line 274 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

edm::propagate_const<std::shared_ptr<EDLooperBase> > edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 304 of file EventProcessor.h.

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 315 of file EventProcessor.h.

std::mutex edm::EventProcessor::nextTransitionMutex_
private

Definition at line 292 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 307 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 308 of file EventProcessor.h.

Referenced by init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 280 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private

Definition at line 312 of file EventProcessor.h.

Referenced by beginJob(), endJob(), and init().

edm::propagate_const<std::shared_ptr<ProductRegistry> > edm::EventProcessor::preg_
private

Definition at line 270 of file EventProcessor.h.

Referenced by beginJob(), init(), and preg().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 293 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 278 of file EventProcessor.h.

Referenced by init(), and processConfiguration().

ProcessContext edm::EventProcessor::processContext_
private

Definition at line 279 of file EventProcessor.h.

Referenced by beginJob(), and init().

edm::propagate_const<std::unique_ptr<Schedule> > edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 273 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 309 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 295 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 296 of file EventProcessor.h.

edm::propagate_const<std::unique_ptr<std::vector<SubProcess> > > edm::EventProcessor::subProcesses_
private
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper> > edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 272 of file EventProcessor.h.

Referenced by init(), and thinnedAssociationsHelper().