CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop ()
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual int readLuminosityBlock ()
 
virtual statemachine::Run readRun ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToOpenInputFile ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
virtual StatusCode runToCompletion ()
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
virtual void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

bool checkForAsyncStopRequest (StatusCode &)
 
std::auto_ptr
< statemachine::Machine
createStateMachine ()
 
bool hasSubProcess () const
 
void init (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
void possiblyContinueAfterForkChildFailure ()
 
void processEvent (unsigned int iStreamIndex)
 
void processEventsForStreamAsync (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void readEvent (unsigned int iStreamIndex)
 
void setupSignal ()
 
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
boost::shared_ptr
< ActivityRegistry
actReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
boost::shared_ptr
< BranchIDListHelper
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
std::unique_ptr
< eventsetup::EventSetupsController
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
std::unique_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
std::unique_ptr< HistoryAppenderhistoryAppender_
 
std::unique_ptr< InputSourceinput_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
std::mutex nextTransitionMutex_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PreallocationConfiguration preallocations_
 
boost::shared_ptr
< ProductRegistry const > 
preg_
 
PrincipalCache principalCache_
 
boost::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
bool stateMachineWasInErrorState_
 
std::auto_ptr< SubProcesssubProcess_
 

Friends

class StreamProcessingTask
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 58 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 302 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 303 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 193 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

197  :
198  actReg_(),
199  preg_(),
201  serviceToken_(),
202  input_(),
203  espController_(new eventsetup::EventSetupsController),
204  esp_(),
205  act_table_(),
207  schedule_(),
208  subProcess_(),
209  historyAppender_(new HistoryAppender),
210  fb_(),
211  looper_(),
213  principalCache_(),
214  beginJobCalled_(false),
215  shouldWeStop_(false),
217  fileMode_(),
223  forceLooperToEnd_(false),
224  looperBeginJobRun_(false),
228  setCpuAffinity_(false),
230  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
231  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
232  processDesc->addServices(defaultServices, forcedServices);
233  init(processDesc, iToken, iLegacy);
234  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::shared_ptr< edm::ParameterSet > parameterSet()
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 236 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

238  :
239  actReg_(),
240  preg_(),
242  serviceToken_(),
243  input_(),
244  espController_(new eventsetup::EventSetupsController),
245  esp_(),
246  act_table_(),
248  schedule_(),
249  subProcess_(),
250  historyAppender_(new HistoryAppender),
251  fb_(),
252  looper_(),
254  principalCache_(),
255  beginJobCalled_(false),
256  shouldWeStop_(false),
258  fileMode_(),
264  forceLooperToEnd_(false),
265  looperBeginJobRun_(false),
269  setCpuAffinity_(false),
273  {
274  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
275  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
276  processDesc->addServices(defaultServices, forcedServices);
278  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::shared_ptr< edm::ParameterSet > parameterSet()
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 280 of file EventProcessor.cc.

References init().

282  :
283  actReg_(),
284  preg_(),
286  serviceToken_(),
287  input_(),
288  espController_(new eventsetup::EventSetupsController),
289  esp_(),
290  act_table_(),
292  schedule_(),
293  subProcess_(),
294  historyAppender_(new HistoryAppender),
295  fb_(),
296  looper_(),
298  principalCache_(),
299  beginJobCalled_(false),
300  shouldWeStop_(false),
302  fileMode_(),
308  forceLooperToEnd_(false),
309  looperBeginJobRun_(false),
313  setCpuAffinity_(false),
317  {
318  init(processDesc, token, legacy);
319  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 322 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

322  :
323  actReg_(),
324  preg_(),
326  serviceToken_(),
327  input_(),
328  espController_(new eventsetup::EventSetupsController),
329  esp_(),
330  act_table_(),
332  schedule_(),
333  subProcess_(),
334  historyAppender_(new HistoryAppender),
335  fb_(),
336  looper_(),
338  principalCache_(),
339  beginJobCalled_(false),
340  shouldWeStop_(false),
342  fileMode_(),
348  forceLooperToEnd_(false),
349  looperBeginJobRun_(false),
353  setCpuAffinity_(false),
357 {
358  if(isPython) {
359  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
360  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
362  }
363  else {
364  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
366  }
367  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::shared_ptr< edm::ParameterSet > parameterSet()
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 538 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and subProcess_.

538  {
539  // Make the services available while everything is being deleted.
540  ServiceToken token = getToken();
541  ServiceRegistry::Operate op(token);
542 
543  // manually destroy all these thing that may need the services around
544  espController_.reset();
545  subProcess_.reset();
546  esp_.reset();
547  schedule_.reset();
548  input_.reset();
549  looper_.reset();
550  actReg_.reset();
551 
554  }
void clear()
Not thread safe.
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void clear()
Not thread safe.
Definition: Registry.cc:42
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
ServiceToken getToken()
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:14
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1978 of file EventProcessor.cc.

1978  {
1980  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 557 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, hasSubProcess(), i, input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), preallocations_, preg_, schedule_, serviceToken_, subProcess_, and edm::convertException::wrap().

557  {
558  if(beginJobCalled_) return;
559  beginJobCalled_=true;
560  bk::beginJob();
561 
562  // StateSentry toerror(this); // should we add this ?
563  //make the services available
565 
566  service::SystemBounds bounds(preallocations_.numberOfStreams(),
570  actReg_->preallocateSignal_(bounds);
571  //NOTE: This implementation assumes 'Job' means one call
572  // the EventProcessor::run
573  // If it really means once per 'application' then this code will
574  // have to be changed.
575  // Also have to deal with case where have 'run' then new Module
576  // added and do 'run'
577  // again. In that case the newly added Module needs its 'beginJob'
578  // to be called.
579 
580  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
581  // For now we delay calling beginOfJob until first beginOfRun
582  //if(looper_) {
583  // looper_->beginOfJob(es);
584  //}
585  try {
586  convertException::wrap([&]() {
587  input_->doBeginJob();
588  });
589  }
590  catch(cms::Exception& ex) {
591  ex.addContext("Calling beginJob for the source");
592  throw;
593  }
594  schedule_->beginJob(*preg_);
595  // toerror.succeeded(); // should we add this?
596  if(hasSubProcess()) subProcess_->doBeginJob();
597  actReg_->postBeginJobSignal_();
598 
599  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
600  schedule_->beginStream(i);
601  if(hasSubProcess()) subProcess_->doBeginStream(i);
602  }
603  }
int i
Definition: DBlmapReader.cc:9
boost::shared_ptr< ActivityRegistry > actReg_
PreallocationConfiguration preallocations_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void addContext(std::string const &context)
Definition: Exception.cc:227
std::auto_ptr< SubProcess > subProcess_
auto wrap(F iFunc) -> decltype(iFunc())
bool hasSubProcess() const
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1612 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, i, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

1612  {
1613  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1614  input_->doBeginLumi(lumiPrincipal, &processContext_);
1615 
1617  if(rng.isAvailable()) {
1618  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1619  rng->preBeginLumi(lb);
1620  }
1621 
1622  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1623  // lumi blocks know their start and end times why not also start and end events?
1624  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1625  espController_->eventSetupForInstance(ts);
1626  EventSetup const& es = esp_->eventSetup();
1627  {
1628  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1629  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1630  if(hasSubProcess()) {
1631  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1632  }
1633  }
1634  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1635  if(looper_) {
1636  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1637  }
1638  {
1639  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1640  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1641  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1642  if(hasSubProcess()) {
1643  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1644  }
1645  }
1646  }
1647  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1648  if(looper_) {
1649  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1650  }
1651  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1537 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1537  {
1538  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1539  input_->doBeginRun(runPrincipal, &processContext_);
1540  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1541  runPrincipal.beginTime());
1543  espController_->forceCacheClear();
1544  }
1545  espController_->eventSetupForInstance(ts);
1546  EventSetup const& es = esp_->eventSetup();
1547  if(looper_ && looperBeginJobRun_== false) {
1548  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1549  looper_->beginOfJob(es);
1550  looperBeginJobRun_ = true;
1551  looper_->doStartingNewLoop();
1552  }
1553  {
1554  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1555  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1556  if(hasSubProcess()) {
1557  subProcess_->doBeginRun(runPrincipal, ts);
1558  }
1559  }
1560  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1561  if(looper_) {
1562  looper_->doBeginRun(runPrincipal, es, &processContext_);
1563  }
1564  {
1565  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1566  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1567  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1568  if(hasSubProcess()) {
1569  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1570  }
1571  }
1572  }
1573  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1574  if(looper_) {
1575  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1576  }
1577  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1238 of file EventProcessor.cc.

References edm::shutdown_flag.

1238  {
1239  bool returnValue = false;
1240 
1241  // Look for a shutdown signal
1242  if(shutdown_flag.load(std::memory_order_acquire)) {
1243  returnValue = true;
1244  returnCode = epSignal;
1245  }
1246  return returnValue;
1247  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1201 of file EventProcessor.cc.

1201  {
1202  schedule_->clearCounters();
1203  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1445 of file EventProcessor.cc.

References FDEBUG.

1445  {
1446  if (fb_.get() != nullptr) {
1447  input_->closeFile(fb_.get(), cleaningUpAfterException);
1448  }
1449  FDEBUG(1) << "\tcloseInputFile\n";
1450  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::unique_ptr< InputSource > input_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1460 of file EventProcessor.cc.

References FDEBUG.

1460  {
1461  if (fb_.get() != nullptr) {
1462  schedule_->closeOutputFiles();
1463  if(hasSubProcess()) subProcess_->closeOutputFiles();
1464  }
1465  FDEBUG(1) << "\tcloseOutputFiles\n";
1466  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1207 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, edm::hlt::Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1207  {
1209  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1210  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1211  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1212  else {
1213  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1214  << fileMode_ << ".\n"
1215  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1216  }
1217 
1218  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1219  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1220  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1221  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1222  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1223  else {
1224  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1225  << emptyRunLumiMode_ << ".\n"
1226  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1227  }
1228 
1229  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1230  fileMode,
1231  emptyRunLumiMode));
1232 
1233  machine->initiate();
1234  return machine;
1235  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1763 of file EventProcessor.cc.

References FDEBUG.

1763  {
1765  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1766  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1767  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1751 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1751  {
1752  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1753  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1754  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1755  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1526 of file EventProcessor.cc.

References FDEBUG.

1526  {
1527  FDEBUG(1) << "\tdoErrorStuff\n";
1528  LogError("StateMachine")
1529  << "The EventProcessor state machine encountered an unexpected event\n"
1530  << "and went to the error state\n"
1531  << "Will attempt to terminate processing normally\n"
1532  << "(IF using the looper the next loop will be attempted)\n"
1533  << "This likely indicates a bug in an input module or corrupted input or both\n";
1535  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1186 of file EventProcessor.cc.

1186  {
1187  schedule_->enableEndPaths(active);
1188  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 606 of file EventProcessor.cc.

References actReg_, trackerHits::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcess(), edm::ExceptionCollector::hasThrown(), i, input_, looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcess_.

606  {
607  // Collects exceptions, so we don't throw before all operations are performed.
608  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
609 
610  //make the services available
612 
613  //NOTE: this really should go elsewhere in the future
614  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
615  c.call([this,i](){this->schedule_->endStream(i);});
616  if(hasSubProcess()) {
617  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
618  }
619  }
620  schedule_->endJob(c);
621  if(hasSubProcess()) {
622  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
623  }
624  c.call(boost::bind(&InputSource::doEndJob, input_.get()));
625  if(looper_) {
626  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
627  }
628  auto actReg = actReg_.get();
629  c.call([actReg](){actReg->postEndJobSignal_();});
630  if(c.hasThrown()) {
631  c.rethrow();
632  }
633  }
int i
Definition: DBlmapReader.cc:9
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:249
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:93
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1653 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, i, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

1653  {
1654  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1655  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1656  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1657  // lumi blocks know their start and end times why not also start and end events?
1658  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1659  lumiPrincipal.endTime());
1660  espController_->eventSetupForInstance(ts);
1661  EventSetup const& es = esp_->eventSetup();
1662  {
1663  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1664  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1665  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1666  if(hasSubProcess()) {
1667  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1668  }
1669  }
1670  }
1671  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1672  if(looper_) {
1673  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1674  }
1675  {
1676  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1677  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1678  if(hasSubProcess()) {
1679  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1680  }
1681  }
1682  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1683  if(looper_) {
1684  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1685  }
1686  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1497 of file EventProcessor.cc.

References FDEBUG, and ntuplemaker::status.

1497  {
1498  if(looper_) {
1499  ModuleChanger changer(schedule_.get(),preg_.get());
1500  looper_->setModuleChanger(&changer);
1501  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1502  looper_->setModuleChanger(nullptr);
1503  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1504  else return false;
1505  }
1506  FDEBUG(1) << "\tendOfLoop\n";
1507  return true;
1508  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1191 of file EventProcessor.cc.

1191  {
1192  return schedule_->endPathsEnabled();
1193  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1579 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1579  {
1580  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1581  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1583  runPrincipal.endTime());
1584  espController_->eventSetupForInstance(ts);
1585  EventSetup const& es = esp_->eventSetup();
1586  {
1587  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1588  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1589  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1590  if(hasSubProcess()) {
1591  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1592  }
1593  }
1594  }
1595  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1596  if(looper_) {
1597  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1598  }
1599  {
1600  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1601  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1602  if(hasSubProcess()) {
1603  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1604  }
1605  }
1606  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1607  if(looper_) {
1608  looper_->doEndRun(runPrincipal, es, &processContext_);
1609  }
1610  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 865 of file EventProcessor.cc.

References bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, edm::hlt::Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

865  {
866 
867  if(0 == numberOfForkedChildren_) {return true;}
868  assert(0<numberOfForkedChildren_);
869  //do what we want done in common
870  {
871  beginJob(); //make sure this was run
872  // make the services available
874 
875  InputSource::ItemType itemType;
876  itemType = input_->nextItemType();
877 
878  assert(itemType == InputSource::IsFile);
879  {
880  readFile();
881  }
882  itemType = input_->nextItemType();
883  assert(itemType == InputSource::IsRun);
884 
885  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
886  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
887  input_->runAuxiliary()->beginTime());
888  espController_->eventSetupForInstance(ts);
889  EventSetup const& es = esp_->eventSetup();
890 
891  //now get all the data available in the EventSetup
892  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
893  es.fillAvailableRecordKeys(recordKeys);
894  std::vector<eventsetup::DataKey> dataKeys;
895  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
896  itKey != itEnd;
897  ++itKey) {
898  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
899  //see if this is on our exclusion list
900  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
901  ExcludedData const* excludedData(nullptr);
902  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
903  excludedData = &(itExcludeRec->second);
904  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
905  //skip all items in this record
906  continue;
907  }
908  }
909  if(0 != recordPtr) {
910  dataKeys.clear();
911  recordPtr->fillRegisteredDataKeys(dataKeys);
912  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
913  itDataKey != itDataKeyEnd;
914  ++itDataKey) {
915  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
916  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
917  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
918  continue;
919  }
920  try {
921  recordPtr->doGet(*itDataKey);
922  } catch(cms::Exception& e) {
923  LogWarning("ForkingEventSetupPreFetching") << e.what();
924  }
925  }
926  }
927  }
928  }
929  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
930  {
931  // make the services available
933  Service<JobReport> jobReport;
934  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
935 
936  //Now actually do the forking
937  actReg_->preForkReleaseResourcesSignal_();
938  input_->doPreForkReleaseResources();
939  schedule_->preForkReleaseResources();
940  }
941  installCustomHandler(SIGCHLD, ep_sigchld);
942 
943 
944  unsigned int childIndex = 0;
945  unsigned int const kMaxChildren = numberOfForkedChildren_;
946  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
947  std::vector<pid_t> childrenIds;
948  childrenIds.reserve(kMaxChildren);
949  std::vector<int> childrenSockets;
950  childrenSockets.reserve(kMaxChildren);
951  std::vector<int> childrenPipes;
952  childrenPipes.reserve(kMaxChildren);
953  std::vector<int> childrenSocketsCopy;
954  childrenSocketsCopy.reserve(kMaxChildren);
955  std::vector<int> childrenPipesCopy;
956  childrenPipesCopy.reserve(kMaxChildren);
957  int pipes[] {0, 0};
958 
959  {
960  // make the services available
962  Service<JobReport> jobReport;
963  int sockets[2], fd_flags;
964  for(; childIndex < kMaxChildren; ++childIndex) {
965  // Create a UNIX_DGRAM socket pair
966  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
967  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
968  exit(EXIT_FAILURE);
969  }
970  if (pipe(pipes)) {
971  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
972  exit(EXIT_FAILURE);
973  }
974  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
975  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
976  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
977  exit(EXIT_FAILURE);
978  }
979  // Mark socket as non-block. Child must be careful to do select prior
980  // to reading from socket.
981  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
982  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
983  exit(EXIT_FAILURE);
984  }
985  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
986  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
987  exit(EXIT_FAILURE);
988  }
989  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
990  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
991  exit(EXIT_FAILURE);
992  }
993  // Linux man page notes there are some edge cases where reading from a
994  // fd can block, even after a select.
995  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
996  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
997  exit(EXIT_FAILURE);
998  }
999  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1000  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1001  exit(EXIT_FAILURE);
1002  }
1003 
1004  childrenPipesCopy = childrenPipes;
1005  childrenSocketsCopy = childrenSockets;
1006 
1007  pid_t value = fork();
1008  if(value == 0) {
1009  // Close the parent's side of the socket and pipe which will talk to us.
1010  close(pipes[0]);
1011  close(sockets[0]);
1012  // Close our copies of the parent's other communication pipes.
1013  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1014  close(*it);
1015  }
1016  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1017  close(*it);
1018  }
1019 
1020  // this is the child process, redirect stdout and stderr to a log file
1021  fflush(stdout);
1022  fflush(stderr);
1023  std::stringstream stout;
1024  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1025  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1026  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1027  }
1028  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1029  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1030  }
1031 
1032  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1033  if(setCpuAffinity_) {
1034  // CPU affinity is handled differently on macosx.
1035  // We disable it and print a message until someone reads:
1036  //
1037  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1038  //
1039  // and implements it.
1040 #ifdef __APPLE__
1041  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1042 #else
1043  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1044  cpu_set_t mask;
1045  CPU_ZERO(&mask);
1046  CPU_SET(childIndex, &mask);
1047  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1048  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1049  exit(-1);
1050  }
1051 #endif
1052  }
1053  break;
1054  } else {
1055  //this is the parent
1056  close(pipes[1]);
1057  close(sockets[1]);
1058  }
1059  if(value < 0) {
1060  LogError("ForkingChild") << "failed to create a child";
1061  exit(-1);
1062  }
1063  childrenIds.push_back(value);
1064  childrenSockets.push_back(sockets[0]);
1065  childrenPipes.push_back(pipes[0]);
1066  }
1067 
1068  if(childIndex < kMaxChildren) {
1069  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1070  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1071 
1072  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1073  input_->doPostForkReacquireResources(receiver);
1074  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1075  //NOTE: sources have to reset themselves by listening to the post fork message
1076  //rewindInput();
1077  return true;
1078  }
1079  jobReport->parentAfterFork(jobReportFile);
1080  }
1081 
1082  //this is the original, which is now the master for all the children
1083 
1084  //Need to wait for signals from the children or externally
1085  // To wait we must
1086  // 1) block the signals we want to wait on so we do not have a race condition
1087  // 2) check that we haven't already meet our ending criteria
1088  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1089  sigset_t blockingSigSet;
1090  sigset_t unblockingSigSet;
1091  sigset_t oldSigSet;
1092  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1093  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1094  sigaddset(&blockingSigSet, SIGCHLD);
1095  sigaddset(&blockingSigSet, SIGUSR2);
1096  sigaddset(&blockingSigSet, SIGINT);
1097  sigdelset(&unblockingSigSet, SIGCHLD);
1098  sigdelset(&unblockingSigSet, SIGUSR2);
1099  sigdelset(&unblockingSigSet, SIGINT);
1100  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1101 
1102  // If there are too many fd's (unlikely, but possible) for select, denote this
1103  // because the sender will fail.
1104  bool too_many_fds = false;
1105  if (pipes[1]+1 > FD_SETSIZE) {
1106  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1107  too_many_fds = true;
1108  }
1109 
1110  //create a thread that sends the units of work to workers
1111  // we create it after all signals were blocked so that this
1112  // thread is never interupted by a signal
1113  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1114  boost::thread senderThread(sender);
1115 
1116  if(not too_many_fds) {
1117  //NOTE: a child could have failed before we got here and even after this call
1118  // which is why the 'if' is conditional on continueAfterChildFailure_
1120  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1121  sigsuspend(&unblockingSigSet);
1123  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1124  }
1125  }
1126  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1127 
1128  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1129  if(child_failed) {
1130  LogError("ForkingStopping") << "child failed";
1131  }
1132  if(shutdown_flag) {
1133  LogSystem("ForkingStopping") << "asked to shutdown";
1134  }
1135 
1136  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1137  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1138  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1139  it != itEnd; ++it) {
1140  /* int result = */ kill(*it, SIGUSR2);
1141  }
1142  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1143  while(num_children_done != kMaxChildren) {
1144  sigsuspend(&unblockingSigSet);
1145  }
1146  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1147  }
1148  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1149  senderThread.join();
1150  if(child_failed && !continueAfterChildFailure_) {
1151  if (child_fail_signal) {
1152  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1153  } else if (child_fail_exit_status) {
1154  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1155  } else {
1156  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1157  }
1158  }
1159  if(too_many_fds) {
1160  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1161  }
1162  return false;
1163  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void possiblyContinueAfterForkChildFailure()
def pipe
Definition: pipe.py:5
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
#define O_NONBLOCK
Definition: SysFile.h:21
std::unique_ptr< eventsetup::EventSetupsController > espController_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1166 of file EventProcessor.cc.

1166  {
1167  return schedule_->getAllModuleDescriptions();
1168  }
std::auto_ptr< Schedule > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 636 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

636  {
637  return serviceToken_;
638  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1196 of file EventProcessor.cc.

1196  {
1197  schedule_->getTriggerReport(rep);
1198  }
string rep
Definition: cuy.py:1188
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::hasSubProcess ( ) const
inlineprivate

Definition at line 229 of file EventProcessor.h.

References subProcess_.

Referenced by beginJob(), and endJob().

229  {
230  return subProcess_.get() != 0;
231  }
std::auto_ptr< SubProcess > subProcess_
void edm::EventProcessor::init ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 370 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper_, branchIDListHelper_, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, cmsHarvester::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessParameterSet(), preallocations_, edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, and subProcess_.

Referenced by EventProcessor().

372  {
373 
374  //std::cerr << processDesc->dump() << std::endl;
375 
376  ROOT::Cintex::Cintex::Enable();
377 
378  // register the empty parentage vector , once and for all
380 
381  // register the empty parameter set, once and for all.
382  ParameterSet().registerIt();
383 
384  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
385  //std::cerr << parameterSet->dump() << std::endl;
386 
387  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
388  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
389 
390  // Now set some parameters specific to the main process.
391  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
392  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
393  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
394  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
395  //threading
396  unsigned int nThreads=1;
397  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
398  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
399  if(nThreads == 0) {
400  nThreads = 1;
401  }
402  }
403  /* TODO: when we support having each stream run in a different thread use this default
404  unsigned int nStreams =nThreads;
405  */
406  unsigned int nStreams =1;
407  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
408  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
409  if(nStreams==0) {
410  nStreams = nThreads;
411  }
412  }
413  /*
414  bool nRunsSet = false;
415  */
416  unsigned int nConcurrentRuns =1;
417  /*
418  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
419  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
420  }
421  */
422  unsigned int nConcurrentLumis =1;
423  /*
424  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
425  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
426  } else {
427  nConcurrentLumis = nConcurrentRuns;
428  }
429  */
430  //Check that relationships between threading parameters makes sense
431  /*
432  if(nThreads<nStreams) {
433  //bad
434  }
435  if(nConcurrentRuns>nStreams) {
436  //bad
437  }
438  if(nConcurrentRuns>nConcurrentLumis) {
439  //bad
440  }
441  */
442  //forking
443  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
444  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
445  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
446  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
447  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
448  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
449  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
450  itPS != itPSEnd;
451  ++itPS) {
452  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
453  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
454  itPS->getUntrackedParameter<std::string>("label", "")));
455  }
456  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
457 
458  // Now do general initialization
459  ScheduleItems items;
460 
461  //initialize the services
462  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
463  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
464  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
465 
466  //make the services available
468 
469  if(nStreams>1) {
471  handler->willBeUsingThreads();
472  }
473 
474  // intialize miscellaneous items
475  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
476 
477  // intialize the event setup provider
478  esp_ = espController_->makeProvider(*parameterSet);
479 
480  // initialize the looper, if any
481  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
482  if(looper_) {
483  looper_->setActionTable(items.act_table_.get());
484  looper_->attachTo(*items.actReg_);
485 
486  //For now loopers make us run only 1 transition at a time
487  nStreams=1;
488  nConcurrentLumis=1;
489  nConcurrentRuns=1;
490  }
491 
492  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
493 
494  // initialize the input source
495  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_, preallocations_);
496 
497  // intialize the Schedule
498  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
499 
500  // set the data members
501  act_table_ = std::move(items.act_table_);
502  actReg_ = items.actReg_;
503  preg_.reset(items.preg_.release());
504  branchIDListHelper_ = items.branchIDListHelper_;
505  processConfiguration_ = items.processConfiguration_;
507  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
508 
509  FDEBUG(2) << parameterSet << std::endl;
510 
512  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
513  // Reusable event principal
514  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_,
517  historyAppender_.get(),
518  index));
519  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
520  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
522  }
523  // initialize the subprocess, if there is one
524  if(subProcessParameterSet) {
525  subProcess_.reset(new SubProcess(*subProcessParameterSet,
526  *parameterSet,
527  preg_,
530  *actReg_,
531  token,
534  &processContext_));
535  }
536  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:550
boost::shared_ptr< EDLooperBase > looper_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< ActivityRegistry > actReg_
void insert(boost::shared_ptr< RunPrincipal > rp)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static void setThrowAnException(bool v)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::auto_ptr< Schedule > schedule_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
boost::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1452 of file EventProcessor.cc.

References FDEBUG.

1452  {
1453  if (fb_.get() != nullptr) {
1454  schedule_->openOutputFiles(*fb_);
1455  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1456  }
1457  FDEBUG(1) << "\topenOutputFiles\n";
1458  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 849 of file EventProcessor.cc.

849  {
850  if(child_failed && continueAfterChildFailure_) {
851  if (child_fail_signal) {
852  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
853  child_fail_signal=0;
854  } else if (child_fail_exit_status) {
855  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
856  child_fail_exit_status=0;
857  } else {
858  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
859  }
860  child_failed =false;
861  }
862  }
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1516 of file EventProcessor.cc.

References FDEBUG.

1516  {
1517  looper_->prepareForNextLoop(esp_.get());
1518  FDEBUG(1) << "\tprepareForNextLoop\n";
1519  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void edm::EventProcessor::processEvent ( unsigned int  iStreamIndex)
private

Definition at line 1904 of file EventProcessor.cc.

References ev, FDEBUG, edm::Service< T >::isAvailable(), edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, and summarizeEdmComparisonLogfiles::succeeded.

1904  {
1905  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1906  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1908  if(rng.isAvailable()) {
1909  Event ev(*pep, ModuleDescription(), nullptr);
1910  rng->postEventRead(ev);
1911  }
1912  assert(pep->luminosityBlockPrincipalPtrValid());
1913  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1914  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
1915 
1916  //We can only update IOVs on Lumi boundaries
1917  //IOVSyncValue ts(pep->id(), pep->time());
1918  //espController_->eventSetupForInstance(ts);
1919  EventSetup const& es = esp_->eventSetup();
1920  {
1921  typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Traits;
1922  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
1923  if(hasSubProcess()) {
1924  subProcess_->doEvent(*pep);
1925  }
1926  }
1927 
1928  //NOTE: If we have a looper we only have one Stream
1929  if(looper_) {
1930  bool randomAccess = input_->randomAccess();
1931  ProcessingController::ForwardState forwardState = input_->forwardState();
1932  ProcessingController::ReverseState reverseState = input_->reverseState();
1933  ProcessingController pc(forwardState, reverseState, randomAccess);
1934 
1936  do {
1937 
1938  StreamContext streamContext(pep->streamID(), &processContext_);
1939  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
1940 
1941  bool succeeded = true;
1942  if(randomAccess) {
1943  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1944  input_->skipEvents(-2);
1945  }
1946  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1947  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1948  }
1949  }
1950  pc.setLastOperationSucceeded(succeeded);
1951  } while(!pc.lastOperationSucceeded());
1952  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1953 
1954  }
1955 
1956  FDEBUG(1) << "\tprocessEvent\n";
1957  pep->clearEventPrincipal();
1958  }
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::auto_ptr< Schedule > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
tuple status
Definition: ntuplemaker.py:245
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::processEventsForStreamAsync ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1792 of file EventProcessor.cc.

References cmsPerfStripChart::operate(), and processEvent().

1793  {
1794  try {
1795  // make the services available
1799  handler->initializeThisThreadForUse();
1800  }
1801 
1802  if(iStreamIndex==0) {
1803  processEvent(0);
1804  }
1805  do {
1806  if(shouldWeStop()) {
1807  break;
1808  }
1809  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1810  //another thread hit an exception
1811  //std::cerr<<"another thread saw an exception\n";
1812  break;
1813  }
1814  {
1815 
1816 
1817  {
1818  //nextItemType and readEvent need to be in same critical section
1819  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1820 
1821  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1822  //std::cerr<<"finishedProcessingEvents\n";
1823  break;
1824  }
1825 
1826  //If source and DelayedReader share a resource we must serialize them
1827  auto sr = input_->resourceSharedWithDelayedReader();
1828  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1829  if(sr) {
1830  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1831  }
1832 
1833  InputSource::ItemType itemType = input_->nextItemType();
1834  if (InputSource::IsEvent !=itemType) {
1836  finishedProcessingEvents->store(true,std::memory_order_release);
1837  //std::cerr<<"next item type "<<itemType<<"\n";
1838  break;
1839  }
1841  //std::cerr<<"task told to async stop\n";
1842  break;
1843  }
1844  readEvent(iStreamIndex);
1845  }
1846  }
1847  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1848  //another thread hit an exception
1849  //std::cerr<<"another thread saw an exception\n";
1850  break;
1851  }
1852  processEvent(iStreamIndex);
1853  }while(true);
1854  } catch (...) {
1855  bool expected =false;
1856  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1857  deferredExceptionPtr_ = std::current_exception();
1858  }
1859  //std::cerr<<"task caught exception\n";
1860  }
1861  }
void readEvent(unsigned int iStreamIndex)
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::mutex nextTransitionMutex_
virtual bool shouldWeStop() const
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1739 of file EventProcessor.cc.

1739  {
1740  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1741  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1742  return input_->luminosityBlock();
1743  }
void merge(boost::shared_ptr< RunAuxiliary > aux, boost::shared_ptr< ProductRegistry const > reg)
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1706 of file EventProcessor.cc.

References PDRates::Run.

1706  {
1707  principalCache_.merge(input_->runAuxiliary(), preg_);
1708  auto runPrincipal =principalCache_.runPrincipalPtr();
1709  input_->readAndMergeRun(*runPrincipal);
1710  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1711  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1712  }
void merge(boost::shared_ptr< RunAuxiliary > aux, boost::shared_ptr< ProductRegistry const > reg)
boost::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1863 of file EventProcessor.cc.

References pyrootRender::destroy(), and processEvent().

1863  {
1864  if(numberOfForkedChildren_>0) {
1865  readEvent(0);
1866  processEvent(0);
1867  return;
1868  }
1871 
1872  std::atomic<bool> finishedProcessingEvents{false};
1873 
1874  //Task assumes Stream 0 has already read the event that caused us to go here
1875  readEvent(0);
1876 
1877  //To wait, the ref count has to b 1+#streams
1878  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1879  eventLoopWaitTask->increment_ref_count();
1880 
1881  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1882  unsigned int iStreamIndex = 0;
1883  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1884  eventLoopWaitTask->increment_ref_count();
1885  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1886 
1887  }
1888  eventLoopWaitTask->increment_ref_count();
1889  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1890  tbb::task::destroy(*eventLoopWaitTask);
1891 
1892  //One of the processing threads saw an exception
1894  std::rethrow_exception(deferredExceptionPtr_);
1895  }
1896  }
void readEvent(unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
friend class StreamProcessingTask
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1897 of file EventProcessor.cc.

References event(), and FDEBUG.

1897  {
1898  //TODO this will have to become per stream
1899  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1900  StreamContext streamContext(event.streamID(), &processContext_);
1901  input_->readEvent(event, streamContext);
1902  FDEBUG(1) << "\treadEvent\n";
1903  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1430 of file EventProcessor.cc.

References FDEBUG, or, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1430  {
1431  FDEBUG(1) << " \treadFile\n";
1432  size_t size = preg_->size();
1433  fb_ = input_->readFile();
1434  if(size < preg_->size()) {
1436  }
1438  if((numberOfForkedChildren_ > 0) or
1441  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1442  }
1443  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
void adjustEventsToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
std::unique_ptr< FileBlock > fb_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
tuple size
Write out results.
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1714 of file EventProcessor.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

1714  {
1717  << "EventProcessor::readRun\n"
1718  << "Illegal attempt to insert lumi into cache\n"
1719  << "Contact a Framework Developer\n";
1720  }
1723  << "EventProcessor::readRun\n"
1724  << "Illegal attempt to insert lumi into cache\n"
1725  << "Run is invalid\n"
1726  << "Contact a Framework Developer\n";
1727  }
1728  boost::shared_ptr<LuminosityBlockPrincipal> lbp(new LuminosityBlockPrincipal(input_->luminosityBlockAuxiliary(),
1729  preg_,
1731  historyAppender_.get(),
1732  0));
1733  input_->readLuminosityBlock(*lbp, *historyAppender_);
1734  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1735  principalCache_.insert(lbp);
1736  return input_->luminosityBlock();
1737  }
bool hasRunPrincipal() const
void insert(boost::shared_ptr< RunPrincipal > rp)
boost::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
boost::shared_ptr< ProductRegistry const > preg_
bool hasLumiPrincipal() const
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1688 of file EventProcessor.cc.

References edm::hlt::Exception, edm::errors::LogicError, and PDRates::Run.

1688  {
1691  << "EventProcessor::readRun\n"
1692  << "Illegal attempt to insert run into cache\n"
1693  << "Contact a Framework Developer\n";
1694  }
1695  boost::shared_ptr<RunPrincipal> rp(new RunPrincipal(input_->runAuxiliary(),
1696  preg_,
1698  historyAppender_.get(),
1699  0));
1700  input_->readRun(*rp, *historyAppender_);
1701  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1702  principalCache_.insert(rp);
1703  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1704  }
bool hasRunPrincipal() const
void insert(boost::shared_ptr< RunPrincipal > rp)
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1479 of file EventProcessor.cc.

References FDEBUG.

1479  {
1480  if (fb_.get() != nullptr) {
1481  schedule_->respondToCloseInputFile(*fb_);
1482  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1483  }
1484  FDEBUG(1) << "\trespondToCloseInputFile\n";
1485  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1468 of file EventProcessor.cc.

References FDEBUG.

1468  {
1469  if(hasSubProcess()) {
1470  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1471  }
1472  if (fb_.get() != nullptr) {
1473  schedule_->respondToOpenInputFile(*fb_);
1474  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1475  }
1476  FDEBUG(1) << "\trespondToOpenInputFile\n";
1477  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
bool hasSubProcess() const
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1510 of file EventProcessor.cc.

References FDEBUG.

1510  {
1511  input_->repeat();
1512  input_->rewind();
1513  FDEBUG(1) << "\trewind\n";
1514  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< InputSource > input_
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 311 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

311  {
312  return runToCompletion();
313  }
virtual StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1251 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), bk::beginJob(), alignCSCRings::e, edm::hlt::Exception, FDEBUG, edm::errors::LogicError, cmsPerfStripChart::operate(), runEdmFileComparison::returnCode, findQualityFiles::size, and edm::convertException::wrap().

Referenced by run().

1251  {
1252 
1255  std::auto_ptr<statemachine::Machine> machine;
1256  {
1257  beginJob(); //make sure this was called
1258 
1259  //StatusCode returnCode = epSuccess;
1261 
1262  // make the services available
1264 
1265  machine = createStateMachine();
1268  try {
1269  convertException::wrap([&]() {
1270 
1271  InputSource::ItemType itemType;
1272 
1273  while(true) {
1274 
1275  bool more = true;
1276  if(numberOfForkedChildren_ > 0) {
1277  size_t size = preg_->size();
1278  more = input_->skipForForking();
1279  if(more) {
1280  if(size < preg_->size()) {
1282  }
1284  }
1285  }
1286  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1287 
1288  FDEBUG(1) << "itemType = " << itemType << "\n";
1289 
1290  if(checkForAsyncStopRequest(returnCode)) {
1291  forceLooperToEnd_ = true;
1292  machine->process_event(statemachine::Stop());
1293  forceLooperToEnd_ = false;
1294  break;
1295  }
1296 
1297  if(itemType == InputSource::IsEvent) {
1298  machine->process_event(statemachine::Event());
1300  forceLooperToEnd_ = true;
1301  machine->process_event(statemachine::Stop());
1302  forceLooperToEnd_ = false;
1304  break;
1305  }
1307  }
1308 
1309  if(itemType == InputSource::IsEvent) {
1310  }
1311  else if(itemType == InputSource::IsStop) {
1312  machine->process_event(statemachine::Stop());
1313  }
1314  else if(itemType == InputSource::IsFile) {
1315  machine->process_event(statemachine::File());
1316  }
1317  else if(itemType == InputSource::IsRun) {
1318  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1319  }
1320  else if(itemType == InputSource::IsLumi) {
1321  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1322  }
1323  else if(itemType == InputSource::IsSynchronize) {
1324  //For now, we don't have to do anything
1325  }
1326  // This should be impossible
1327  else {
1329  << "Unknown next item type passed to EventProcessor\n"
1330  << "Please report this error to the Framework group\n";
1331  }
1332  if(machine->terminated()) {
1333  break;
1334  }
1335  } // End of loop over state machine events
1336  }); // convertException::wrap
1337  } // Try block
1338  // Some comments on exception handling related to the boost state machine:
1339  //
1340  // Some states used in the machine are special because they
1341  // perform actions while the machine is being terminated, actions
1342  // such as close files, call endRun, call endLumi etc ... Each of these
1343  // states has two functions that perform these actions. The functions
1344  // are almost identical. The major difference is that one version
1345  // catches all exceptions and the other lets exceptions pass through.
1346  // The destructor catches them and the other function named "exit" lets
1347  // them pass through. On a normal termination, boost will always call
1348  // "exit" and then the state destructor. In our state classes, the
1349  // the destructors do nothing if the exit function already took
1350  // care of things. Here's the interesting part. When boost is
1351  // handling an exception the "exit" function is not called (a boost
1352  // feature).
1353  //
1354  // If an exception occurs while the boost machine is in control
1355  // (which usually means inside a process_event call), then
1356  // the boost state machine destroys its states and "terminates" itself.
1357  // This already done before we hit the catch blocks below. In this case
1358  // the call to terminateMachine below only destroys an already
1359  // terminated state machine. Because exit is not called, the state destructors
1360  // handle cleaning up lumis, runs, and files. The destructors swallow
1361  // all exceptions and only pass through the exceptions messages, which
1362  // are tacked onto the original exception below.
1363  //
1364  // If an exception occurs when the boost state machine is not
1365  // in control (outside the process_event functions), then boost
1366  // cannot destroy its own states. The terminateMachine function
1367  // below takes care of that. The flag "alreadyHandlingException"
1368  // is set true so that the state exit functions do nothing (and
1369  // cannot throw more exceptions while handling the first). Then the
1370  // state destructors take care of this because exit did nothing.
1371  //
1372  // In both cases above, the EventProcessor::endOfLoop function is
1373  // not called because it can throw exceptions.
1374  //
1375  // One tricky aspect of the state machine is that things that can
1376  // throw should not be invoked by the state machine while another
1377  // exception is being handled.
1378  // Another tricky aspect is that it appears to be important to
1379  // terminate the state machine before invoking its destructor.
1380  // We've seen crashes that are not understood when that is not
1381  // done. Maintainers of this code should be careful about this.
1382 
1383  catch (cms::Exception & e) {
1385  terminateMachine(machine);
1386  alreadyHandlingException_ = false;
1387  if (!exceptionMessageLumis_.empty()) {
1389  if (e.alreadyPrinted()) {
1390  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1391  }
1392  }
1393  if (!exceptionMessageRuns_.empty()) {
1395  if (e.alreadyPrinted()) {
1396  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1397  }
1398  }
1399  if (!exceptionMessageFiles_.empty()) {
1401  if (e.alreadyPrinted()) {
1402  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1403  }
1404  }
1405  throw;
1406  }
1407 
1408  if(machine->terminated()) {
1409  FDEBUG(1) << "The state machine reports it has been terminated\n";
1410  machine.reset();
1411  }
1412 
1414  throw cms::Exception("BadState")
1415  << "The boost state machine in the EventProcessor exited after\n"
1416  << "entering the Error state.\n";
1417  }
1418 
1419  }
1420  if(machine.get() != 0) {
1421  terminateMachine(machine);
1423  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1424  << "Please report this error to the Framework group\n";
1425  }
1426 
1427  return returnCode;
1428  }
bool checkForAsyncStopRequest(StatusCode &)
std::auto_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
void adjustEventsToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
ServiceToken serviceToken_
std::string exceptionMessageLumis_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 1966 of file EventProcessor.cc.

References python.rootplot.argparse::message.

1966  {
1968  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 1974 of file EventProcessor.cc.

References python.rootplot.argparse::message.

1974  {
1976  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 1970 of file EventProcessor.cc.

References python.rootplot.argparse::message.

1970  {
1972  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1521 of file EventProcessor.cc.

References FDEBUG.

1521  {
1522  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1523  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1524  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1960 of file EventProcessor.cc.

References FDEBUG.

1960  {
1961  FDEBUG(1) << "\tshouldWeStop\n";
1962  if(shouldWeStop_) return true;
1963  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
1964  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1487 of file EventProcessor.cc.

References FDEBUG.

1487  {
1488  shouldWeStop_ = false;
1489  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1490  // until after we've called beginOfJob
1491  if(looper_ && looperBeginJobRun_) {
1492  looper_->doStartingNewLoop();
1493  }
1494  FDEBUG(1) << "\tstartingNewLoop\n";
1495  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine)
private

Definition at line 1982 of file EventProcessor.cc.

References FDEBUG.

1982  {
1983  if(iMachine.get() != 0) {
1984  if(!iMachine->terminated()) {
1985  forceLooperToEnd_ = true;
1986  iMachine->process_event(statemachine::Stop());
1987  forceLooperToEnd_ = false;
1988  }
1989  else {
1990  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
1991  }
1992  if(iMachine->terminated()) {
1993  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
1994  }
1995  iMachine.reset();
1996  }
1997  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1171 of file EventProcessor.cc.

1171  {
1172  return schedule_->totalEvents();
1173  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1181 of file EventProcessor.cc.

1181  {
1182  return schedule_->totalEventsFailed();
1183  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1176 of file EventProcessor.cc.

1176  {
1177  return schedule_->totalEventsPassed();
1178  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1757 of file EventProcessor.cc.

References FDEBUG.

1757  {
1759  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1760  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1761  }
ProcessContext processContext_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1745 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1745  {
1746  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1747  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1748  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1749  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class StreamProcessingTask
friend

Definition at line 235 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 262 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 255 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 286 of file EventProcessor.h.

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 298 of file EventProcessor.h.

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 300 of file EventProcessor.h.

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 278 of file EventProcessor.h.

Referenced by beginJob().

boost::shared_ptr<BranchIDListHelper> edm::EventProcessor::branchIDListHelper_
private

Definition at line 257 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 294 of file EventProcessor.h.

Referenced by init().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 274 of file EventProcessor.h.

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 273 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 282 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private

Definition at line 261 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

std::unique_ptr<eventsetup::EventSetupsController> edm::EventProcessor::espController_
private

Definition at line 260 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 304 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 283 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 285 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 284 of file EventProcessor.h.

std::unique_ptr<FileBlock> edm::EventProcessor::fb_
private

Definition at line 269 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 281 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 287 of file EventProcessor.h.

std::unique_ptr<HistoryAppender> edm::EventProcessor::historyAppender_
private

Definition at line 267 of file EventProcessor.h.

Referenced by init().

std::unique_ptr<InputSource> edm::EventProcessor::input_
private

Definition at line 259 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 288 of file EventProcessor.h.

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 299 of file EventProcessor.h.

std::mutex edm::EventProcessor::nextTransitionMutex_
private

Definition at line 276 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 291 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 292 of file EventProcessor.h.

Referenced by init().

PreallocationConfiguration edm::EventProcessor::preallocations_
private

Definition at line 296 of file EventProcessor.h.

Referenced by beginJob(), endJob(), and init().

boost::shared_ptr<ProductRegistry const> edm::EventProcessor::preg_
private

Definition at line 256 of file EventProcessor.h.

Referenced by beginJob(), and init().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 263 of file EventProcessor.h.

Referenced by init().

ProcessContext edm::EventProcessor::processContext_
private

Definition at line 264 of file EventProcessor.h.

Referenced by init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 258 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 293 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 279 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 280 of file EventProcessor.h.

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_
private