CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop ()
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual int readLuminosityBlock ()
 
virtual statemachine::Run readRun ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToOpenInputFile ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
virtual StatusCode runToCompletion ()
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
virtual void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

bool checkForAsyncStopRequest (StatusCode &)
 
std::auto_ptr
< statemachine::Machine
createStateMachine ()
 
bool hasSubProcess () const
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
void possiblyContinueAfterForkChildFailure ()
 
void processEvent (unsigned int iStreamIndex)
 
void processEventsForStreamAsync (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void readEvent (unsigned int iStreamIndex)
 
void setupSignal ()
 
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
std::unique_ptr
< eventsetup::EventSetupsController
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
std::unique_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
std::unique_ptr< HistoryAppenderhistoryAppender_
 
std::unique_ptr< InputSourceinput_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
std::mutex nextTransitionMutex_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PreallocationConfiguration preallocations_
 
std::shared_ptr
< ProductRegistry const > 
preg_
 
PrincipalCache principalCache_
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
bool stateMachineWasInErrorState_
 
std::auto_ptr< SubProcesssubProcess_
 

Friends

class StreamProcessingTask
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 58 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 302 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 303 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 216 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

220  :
221  actReg_(),
222  preg_(),
224  serviceToken_(),
225  input_(),
226  espController_(new eventsetup::EventSetupsController),
227  esp_(),
228  act_table_(),
230  schedule_(),
231  subProcess_(),
232  historyAppender_(new HistoryAppender),
233  fb_(),
234  looper_(),
236  principalCache_(),
237  beginJobCalled_(false),
238  shouldWeStop_(false),
240  fileMode_(),
246  forceLooperToEnd_(false),
247  looperBeginJobRun_(false),
251  setCpuAffinity_(false),
253  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
254  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
255  processDesc->addServices(defaultServices, forcedServices);
256  init(processDesc, iToken, iLegacy);
257  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 259 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

261  :
262  actReg_(),
263  preg_(),
265  serviceToken_(),
266  input_(),
267  espController_(new eventsetup::EventSetupsController),
268  esp_(),
269  act_table_(),
271  schedule_(),
272  subProcess_(),
273  historyAppender_(new HistoryAppender),
274  fb_(),
275  looper_(),
277  principalCache_(),
278  beginJobCalled_(false),
279  shouldWeStop_(false),
281  fileMode_(),
287  forceLooperToEnd_(false),
288  looperBeginJobRun_(false),
292  setCpuAffinity_(false),
296  {
297  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
298  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
299  processDesc->addServices(defaultServices, forcedServices);
301  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 303 of file EventProcessor.cc.

References init().

305  :
306  actReg_(),
307  preg_(),
309  serviceToken_(),
310  input_(),
311  espController_(new eventsetup::EventSetupsController),
312  esp_(),
313  act_table_(),
315  schedule_(),
316  subProcess_(),
317  historyAppender_(new HistoryAppender),
318  fb_(),
319  looper_(),
321  principalCache_(),
322  beginJobCalled_(false),
323  shouldWeStop_(false),
325  fileMode_(),
331  forceLooperToEnd_(false),
332  looperBeginJobRun_(false),
336  setCpuAffinity_(false),
340  {
341  init(processDesc, token, legacy);
342  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 345 of file EventProcessor.cc.

References HDQMDatabaseProducer::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

345  :
346  actReg_(),
347  preg_(),
349  serviceToken_(),
350  input_(),
351  espController_(new eventsetup::EventSetupsController),
352  esp_(),
353  act_table_(),
355  schedule_(),
356  subProcess_(),
357  historyAppender_(new HistoryAppender),
358  fb_(),
359  looper_(),
361  principalCache_(),
362  beginJobCalled_(false),
363  shouldWeStop_(false),
365  fileMode_(),
371  forceLooperToEnd_(false),
372  looperBeginJobRun_(false),
376  setCpuAffinity_(false),
380 {
381  if(isPython) {
382  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
383  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
385  }
386  else {
387  auto processDesc = std::make_shared<ProcessDesc>(config);
389  }
390  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 557 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and subProcess_.

557  {
558  // Make the services available while everything is being deleted.
559  ServiceToken token = getToken();
560  ServiceRegistry::Operate op(token);
561 
562  // manually destroy all these thing that may need the services around
563  espController_.reset();
564  subProcess_.reset();
565  esp_.reset();
566  schedule_.reset();
567  input_.reset();
568  looper_.reset();
569  actReg_.reset();
570 
573  }
void clear()
Not thread safe.
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void clear()
Not thread safe.
Definition: Registry.cc:42
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
ServiceToken getToken()
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:14
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2063 of file EventProcessor.cc.

2063  {
2065  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 576 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, hasSubProcess(), i, input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), preallocations_, preg_, schedule_, serviceToken_, subProcess_, and edm::convertException::wrap().

576  {
577  if(beginJobCalled_) return;
578  beginJobCalled_=true;
579  bk::beginJob();
580 
581  // StateSentry toerror(this); // should we add this ?
582  //make the services available
584 
585  service::SystemBounds bounds(preallocations_.numberOfStreams(),
589  actReg_->preallocateSignal_(bounds);
590  //NOTE: This implementation assumes 'Job' means one call
591  // the EventProcessor::run
592  // If it really means once per 'application' then this code will
593  // have to be changed.
594  // Also have to deal with case where have 'run' then new Module
595  // added and do 'run'
596  // again. In that case the newly added Module needs its 'beginJob'
597  // to be called.
598 
599  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
600  // For now we delay calling beginOfJob until first beginOfRun
601  //if(looper_) {
602  // looper_->beginOfJob(es);
603  //}
604  try {
605  convertException::wrap([&]() {
606  input_->doBeginJob();
607  });
608  }
609  catch(cms::Exception& ex) {
610  ex.addContext("Calling beginJob for the source");
611  throw;
612  }
613  schedule_->beginJob(*preg_);
614  // toerror.succeeded(); // should we add this?
615  if(hasSubProcess()) subProcess_->doBeginJob();
616  actReg_->postBeginJobSignal_();
617 
618  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
619  schedule_->beginStream(i);
620  if(hasSubProcess()) subProcess_->doBeginStream(i);
621  }
622  }
int i
Definition: DBlmapReader.cc:9
PreallocationConfiguration preallocations_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void addContext(std::string const &context)
Definition: Exception.cc:227
std::auto_ptr< SubProcess > subProcess_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
bool hasSubProcess() const
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1665 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, i, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

1665  {
1666  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1667  {
1668  SendSourceTerminationSignalIfException sentry(actReg_.get());
1669 
1670  input_->doBeginLumi(lumiPrincipal, &processContext_);
1671  sentry.completedSuccessfully();
1672  }
1673 
1675  if(rng.isAvailable()) {
1676  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1677  rng->preBeginLumi(lb);
1678  }
1679 
1680  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1681  // lumi blocks know their start and end times why not also start and end events?
1682  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1683  {
1684  SendSourceTerminationSignalIfException sentry(actReg_.get());
1685  espController_->eventSetupForInstance(ts);
1686  sentry.completedSuccessfully();
1687  }
1688  EventSetup const& es = esp_->eventSetup();
1689  {
1690  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1691  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1692  if(hasSubProcess()) {
1693  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1694  }
1695  }
1696  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1697  if(looper_) {
1698  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1699  }
1700  {
1701  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1702  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1703  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1704  if(hasSubProcess()) {
1705  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1706  }
1707  }
1708  }
1709  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1710  if(looper_) {
1711  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1712  }
1713  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1570 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1570  {
1571  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1572  {
1573  SendSourceTerminationSignalIfException sentry(actReg_.get());
1574 
1575  input_->doBeginRun(runPrincipal, &processContext_);
1576  sentry.completedSuccessfully();
1577  }
1578 
1579  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1580  runPrincipal.beginTime());
1582  espController_->forceCacheClear();
1583  }
1584  {
1585  SendSourceTerminationSignalIfException sentry(actReg_.get());
1586  espController_->eventSetupForInstance(ts);
1587  sentry.completedSuccessfully();
1588  }
1589  EventSetup const& es = esp_->eventSetup();
1590  if(looper_ && looperBeginJobRun_== false) {
1591  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1592  looper_->beginOfJob(es);
1593  looperBeginJobRun_ = true;
1594  looper_->doStartingNewLoop();
1595  }
1596  {
1597  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1598  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1599  if(hasSubProcess()) {
1600  subProcess_->doBeginRun(runPrincipal, ts);
1601  }
1602  }
1603  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1604  if(looper_) {
1605  looper_->doBeginRun(runPrincipal, es, &processContext_);
1606  }
1607  {
1608  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1609  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1610  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1611  if(hasSubProcess()) {
1612  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1613  }
1614  }
1615  }
1616  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1617  if(looper_) {
1618  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1619  }
1620  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1257 of file EventProcessor.cc.

References edm::shutdown_flag.

1257  {
1258  bool returnValue = false;
1259 
1260  // Look for a shutdown signal
1261  if(shutdown_flag.load(std::memory_order_acquire)) {
1262  returnValue = true;
1263  returnCode = epSignal;
1264  }
1265  return returnValue;
1266  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1220 of file EventProcessor.cc.

1220  {
1221  schedule_->clearCounters();
1222  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1476 of file EventProcessor.cc.

References FDEBUG.

1476  {
1477  if (fb_.get() != nullptr) {
1478  SendSourceTerminationSignalIfException sentry(actReg_.get());
1479  input_->closeFile(fb_.get(), cleaningUpAfterException);
1480  sentry.completedSuccessfully();
1481  }
1482  FDEBUG(1) << "\tcloseInputFile\n";
1483  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1493 of file EventProcessor.cc.

References FDEBUG.

1493  {
1494  if (fb_.get() != nullptr) {
1495  schedule_->closeOutputFiles();
1496  if(hasSubProcess()) subProcess_->closeOutputFiles();
1497  }
1498  FDEBUG(1) << "\tcloseOutputFiles\n";
1499  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1226 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, edm::hlt::Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1226  {
1228  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1229  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1230  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1231  else {
1232  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1233  << fileMode_ << ".\n"
1234  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1235  }
1236 
1237  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1238  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1239  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1240  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1241  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1242  else {
1243  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1244  << emptyRunLumiMode_ << ".\n"
1245  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1246  }
1247 
1248  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1249  fileMode,
1250  emptyRunLumiMode));
1251 
1252  machine->initiate();
1253  return machine;
1254  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1842 of file EventProcessor.cc.

References FDEBUG.

1842  {
1844  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1845  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1846  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1830 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1830  {
1831  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1832  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1833  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1834  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1559 of file EventProcessor.cc.

References FDEBUG.

1559  {
1560  FDEBUG(1) << "\tdoErrorStuff\n";
1561  LogError("StateMachine")
1562  << "The EventProcessor state machine encountered an unexpected event\n"
1563  << "and went to the error state\n"
1564  << "Will attempt to terminate processing normally\n"
1565  << "(IF using the looper the next loop will be attempted)\n"
1566  << "This likely indicates a bug in an input module or corrupted input or both\n";
1568  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1205 of file EventProcessor.cc.

1205  {
1206  schedule_->enableEndPaths(active);
1207  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 625 of file EventProcessor.cc.

References actReg_, trackerHits::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcess(), edm::ExceptionCollector::hasThrown(), i, input_, looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcess_.

625  {
626  // Collects exceptions, so we don't throw before all operations are performed.
627  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
628 
629  //make the services available
631 
632  //NOTE: this really should go elsewhere in the future
633  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
634  c.call([this,i](){this->schedule_->endStream(i);});
635  if(hasSubProcess()) {
636  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
637  }
638  }
639  schedule_->endJob(c);
640  if(hasSubProcess()) {
641  c.call(std::bind(&SubProcess::doEndJob, subProcess_.get()));
642  }
643  c.call(std::bind(&InputSource::doEndJob, input_.get()));
644  if(looper_) {
645  c.call(std::bind(&EDLooperBase::endOfJob, looper_));
646  }
647  auto actReg = actReg_.get();
648  c.call([actReg](){actReg->postEndJobSignal_();});
649  if(c.hasThrown()) {
650  c.rethrow();
651  }
652  }
int i
Definition: DBlmapReader.cc:9
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:253
boost::shared_ptr< EDLooperBase > looper_
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
bool hasSubProcess() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1715 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, i, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

1715  {
1716  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1717  {
1718  SendSourceTerminationSignalIfException sentry(actReg_.get());
1719 
1720  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1721  sentry.completedSuccessfully();
1722  }
1723  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1724  // lumi blocks know their start and end times why not also start and end events?
1725  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1726  lumiPrincipal.endTime());
1727  {
1728  SendSourceTerminationSignalIfException sentry(actReg_.get());
1729  espController_->eventSetupForInstance(ts);
1730  sentry.completedSuccessfully();
1731  }
1732  EventSetup const& es = esp_->eventSetup();
1733  {
1734  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1735  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1736  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1737  if(hasSubProcess()) {
1738  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1739  }
1740  }
1741  }
1742  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1743  if(looper_) {
1744  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1745  }
1746  {
1747  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1748  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1749  if(hasSubProcess()) {
1750  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1751  }
1752  }
1753  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1754  if(looper_) {
1755  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1756  }
1757  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:103
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1530 of file EventProcessor.cc.

References FDEBUG, and ntuplemaker::status.

1530  {
1531  if(looper_) {
1532  ModuleChanger changer(schedule_.get(),preg_.get());
1533  looper_->setModuleChanger(&changer);
1534  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1535  looper_->setModuleChanger(nullptr);
1536  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1537  else return false;
1538  }
1539  FDEBUG(1) << "\tendOfLoop\n";
1540  return true;
1541  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ProductRegistry const > preg_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1210 of file EventProcessor.cc.

1210  {
1211  return schedule_->endPathsEnabled();
1212  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1622 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1622  {
1623  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1624  {
1625  SendSourceTerminationSignalIfException sentry(actReg_.get());
1626 
1627  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1628  sentry.completedSuccessfully();
1629  }
1630 
1632  runPrincipal.endTime());
1633  {
1634  SendSourceTerminationSignalIfException sentry(actReg_.get());
1635  espController_->eventSetupForInstance(ts);
1636  sentry.completedSuccessfully();
1637  }
1638  EventSetup const& es = esp_->eventSetup();
1639  {
1640  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1641  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1642  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1643  if(hasSubProcess()) {
1644  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1645  }
1646  }
1647  }
1648  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1649  if(looper_) {
1650  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1651  }
1652  {
1653  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1654  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1655  if(hasSubProcess()) {
1656  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1657  }
1658  }
1659  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1660  if(looper_) {
1661  looper_->doEndRun(runPrincipal, es, &processContext_);
1662  }
1663  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:103
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 884 of file EventProcessor.cc.

References bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, edm::hlt::Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

884  {
885 
886  if(0 == numberOfForkedChildren_) {return true;}
887  assert(0<numberOfForkedChildren_);
888  //do what we want done in common
889  {
890  beginJob(); //make sure this was run
891  // make the services available
893 
894  InputSource::ItemType itemType;
895  itemType = input_->nextItemType();
896 
897  assert(itemType == InputSource::IsFile);
898  {
899  readFile();
900  }
901  itemType = input_->nextItemType();
902  assert(itemType == InputSource::IsRun);
903 
904  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
905  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
906  input_->runAuxiliary()->beginTime());
907  espController_->eventSetupForInstance(ts);
908  EventSetup const& es = esp_->eventSetup();
909 
910  //now get all the data available in the EventSetup
911  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
912  es.fillAvailableRecordKeys(recordKeys);
913  std::vector<eventsetup::DataKey> dataKeys;
914  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
915  itKey != itEnd;
916  ++itKey) {
917  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
918  //see if this is on our exclusion list
919  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
920  ExcludedData const* excludedData(nullptr);
921  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
922  excludedData = &(itExcludeRec->second);
923  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
924  //skip all items in this record
925  continue;
926  }
927  }
928  if(0 != recordPtr) {
929  dataKeys.clear();
930  recordPtr->fillRegisteredDataKeys(dataKeys);
931  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
932  itDataKey != itDataKeyEnd;
933  ++itDataKey) {
934  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
935  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
936  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
937  continue;
938  }
939  try {
940  recordPtr->doGet(*itDataKey);
941  } catch(cms::Exception& e) {
942  LogWarning("ForkingEventSetupPreFetching") << e.what();
943  }
944  }
945  }
946  }
947  }
948  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
949  {
950  // make the services available
952  Service<JobReport> jobReport;
953  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
954 
955  //Now actually do the forking
956  actReg_->preForkReleaseResourcesSignal_();
957  input_->doPreForkReleaseResources();
958  schedule_->preForkReleaseResources();
959  }
960  installCustomHandler(SIGCHLD, ep_sigchld);
961 
962 
963  unsigned int childIndex = 0;
964  unsigned int const kMaxChildren = numberOfForkedChildren_;
965  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
966  std::vector<pid_t> childrenIds;
967  childrenIds.reserve(kMaxChildren);
968  std::vector<int> childrenSockets;
969  childrenSockets.reserve(kMaxChildren);
970  std::vector<int> childrenPipes;
971  childrenPipes.reserve(kMaxChildren);
972  std::vector<int> childrenSocketsCopy;
973  childrenSocketsCopy.reserve(kMaxChildren);
974  std::vector<int> childrenPipesCopy;
975  childrenPipesCopy.reserve(kMaxChildren);
976  int pipes[] {0, 0};
977 
978  {
979  // make the services available
981  Service<JobReport> jobReport;
982  int sockets[2], fd_flags;
983  for(; childIndex < kMaxChildren; ++childIndex) {
984  // Create a UNIX_DGRAM socket pair
985  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
986  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
987  exit(EXIT_FAILURE);
988  }
989  if (pipe(pipes)) {
990  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
991  exit(EXIT_FAILURE);
992  }
993  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
994  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
995  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
996  exit(EXIT_FAILURE);
997  }
998  // Mark socket as non-block. Child must be careful to do select prior
999  // to reading from socket.
1000  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1001  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1002  exit(EXIT_FAILURE);
1003  }
1004  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1005  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1006  exit(EXIT_FAILURE);
1007  }
1008  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1009  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1010  exit(EXIT_FAILURE);
1011  }
1012  // Linux man page notes there are some edge cases where reading from a
1013  // fd can block, even after a select.
1014  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1015  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1016  exit(EXIT_FAILURE);
1017  }
1018  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1019  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022 
1023  childrenPipesCopy = childrenPipes;
1024  childrenSocketsCopy = childrenSockets;
1025 
1026  pid_t value = fork();
1027  if(value == 0) {
1028  // Close the parent's side of the socket and pipe which will talk to us.
1029  close(pipes[0]);
1030  close(sockets[0]);
1031  // Close our copies of the parent's other communication pipes.
1032  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1033  close(*it);
1034  }
1035  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1036  close(*it);
1037  }
1038 
1039  // this is the child process, redirect stdout and stderr to a log file
1040  fflush(stdout);
1041  fflush(stderr);
1042  std::stringstream stout;
1043  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1044  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1045  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1046  }
1047  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1048  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1049  }
1050 
1051  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1052  if(setCpuAffinity_) {
1053  // CPU affinity is handled differently on macosx.
1054  // We disable it and print a message until someone reads:
1055  //
1056  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1057  //
1058  // and implements it.
1059 #ifdef __APPLE__
1060  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1061 #else
1062  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1063  cpu_set_t mask;
1064  CPU_ZERO(&mask);
1065  CPU_SET(childIndex, &mask);
1066  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1067  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1068  exit(-1);
1069  }
1070 #endif
1071  }
1072  break;
1073  } else {
1074  //this is the parent
1075  close(pipes[1]);
1076  close(sockets[1]);
1077  }
1078  if(value < 0) {
1079  LogError("ForkingChild") << "failed to create a child";
1080  exit(-1);
1081  }
1082  childrenIds.push_back(value);
1083  childrenSockets.push_back(sockets[0]);
1084  childrenPipes.push_back(pipes[0]);
1085  }
1086 
1087  if(childIndex < kMaxChildren) {
1088  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1089  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1090 
1091  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1092  input_->doPostForkReacquireResources(receiver);
1093  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1094  //NOTE: sources have to reset themselves by listening to the post fork message
1095  //rewindInput();
1096  return true;
1097  }
1098  jobReport->parentAfterFork(jobReportFile);
1099  }
1100 
1101  //this is the original, which is now the master for all the children
1102 
1103  //Need to wait for signals from the children or externally
1104  // To wait we must
1105  // 1) block the signals we want to wait on so we do not have a race condition
1106  // 2) check that we haven't already meet our ending criteria
1107  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1108  sigset_t blockingSigSet;
1109  sigset_t unblockingSigSet;
1110  sigset_t oldSigSet;
1111  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1112  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1113  sigaddset(&blockingSigSet, SIGCHLD);
1114  sigaddset(&blockingSigSet, SIGUSR2);
1115  sigaddset(&blockingSigSet, SIGINT);
1116  sigdelset(&unblockingSigSet, SIGCHLD);
1117  sigdelset(&unblockingSigSet, SIGUSR2);
1118  sigdelset(&unblockingSigSet, SIGINT);
1119  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1120 
1121  // If there are too many fd's (unlikely, but possible) for select, denote this
1122  // because the sender will fail.
1123  bool too_many_fds = false;
1124  if (pipes[1]+1 > FD_SETSIZE) {
1125  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1126  too_many_fds = true;
1127  }
1128 
1129  //create a thread that sends the units of work to workers
1130  // we create it after all signals were blocked so that this
1131  // thread is never interupted by a signal
1132  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1133  boost::thread senderThread(sender);
1134 
1135  if(not too_many_fds) {
1136  //NOTE: a child could have failed before we got here and even after this call
1137  // which is why the 'if' is conditional on continueAfterChildFailure_
1139  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1140  sigsuspend(&unblockingSigSet);
1142  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1143  }
1144  }
1145  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1146 
1147  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1148  if(child_failed) {
1149  LogError("ForkingStopping") << "child failed";
1150  }
1151  if(shutdown_flag) {
1152  LogSystem("ForkingStopping") << "asked to shutdown";
1153  }
1154 
1155  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1156  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1157  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1158  it != itEnd; ++it) {
1159  /* int result = */ kill(*it, SIGUSR2);
1160  }
1161  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1162  while(num_children_done != kMaxChildren) {
1163  sigsuspend(&unblockingSigSet);
1164  }
1165  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1166  }
1167  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1168  senderThread.join();
1169  if(child_failed && !continueAfterChildFailure_) {
1170  if (child_fail_signal) {
1171  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1172  } else if (child_fail_exit_status) {
1173  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1174  } else {
1175  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1176  }
1177  }
1178  if(too_many_fds) {
1179  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1180  }
1181  return false;
1182  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void possiblyContinueAfterForkChildFailure()
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1185 of file EventProcessor.cc.

1185  {
1186  return schedule_->getAllModuleDescriptions();
1187  }
std::auto_ptr< Schedule > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 655 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

655  {
656  return serviceToken_;
657  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1215 of file EventProcessor.cc.

1215  {
1216  schedule_->getTriggerReport(rep);
1217  }
string rep
Definition: cuy.py:1188
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::hasSubProcess ( ) const
inlineprivate

Definition at line 229 of file EventProcessor.h.

References subProcess_.

Referenced by beginJob(), and endJob().

229  {
230  return subProcess_.get() != 0;
231  }
std::auto_ptr< SubProcess > subProcess_
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 393 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper_, branchIDListHelper_, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, cmsHarvester::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessParameterSet(), preallocations_, edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, and subProcess_.

Referenced by EventProcessor().

395  {
396 
397  //std::cerr << processDesc->dump() << std::endl;
398 
399  ROOT::Cintex::Cintex::Enable();
400 
401  // register the empty parentage vector , once and for all
403 
404  // register the empty parameter set, once and for all.
405  ParameterSet().registerIt();
406 
407  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
408  //std::cerr << parameterSet->dump() << std::endl;
409 
410  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
411  std::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
412 
413  // Now set some parameters specific to the main process.
414  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
415  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
416  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
417  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
418  //threading
419  unsigned int nThreads=1;
420  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
421  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
422  if(nThreads == 0) {
423  nThreads = 1;
424  }
425  }
426  /* TODO: when we support having each stream run in a different thread use this default
427  unsigned int nStreams =nThreads;
428  */
429  unsigned int nStreams =1;
430  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
431  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
432  if(nStreams==0) {
433  nStreams = nThreads;
434  }
435  }
436  /*
437  bool nRunsSet = false;
438  */
439  unsigned int nConcurrentRuns =1;
440  /*
441  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
442  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
443  }
444  */
445  unsigned int nConcurrentLumis =1;
446  /*
447  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
448  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
449  } else {
450  nConcurrentLumis = nConcurrentRuns;
451  }
452  */
453  //Check that relationships between threading parameters makes sense
454  /*
455  if(nThreads<nStreams) {
456  //bad
457  }
458  if(nConcurrentRuns>nStreams) {
459  //bad
460  }
461  if(nConcurrentRuns>nConcurrentLumis) {
462  //bad
463  }
464  */
465  //forking
466  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
467  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
468  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
469  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
470  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
471  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
472  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
473  itPS != itPSEnd;
474  ++itPS) {
475  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
476  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
477  itPS->getUntrackedParameter<std::string>("label", "")));
478  }
479  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
480 
481  // Now do general initialization
482  ScheduleItems items;
483 
484  //initialize the services
485  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
486  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
487  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
488 
489  //make the services available
491 
492  if(nStreams>1) {
494  handler->willBeUsingThreads();
495  }
496 
497  // intialize miscellaneous items
498  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
499 
500  // intialize the event setup provider
501  esp_ = espController_->makeProvider(*parameterSet);
502 
503  // initialize the looper, if any
504  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
505  if(looper_) {
506  looper_->setActionTable(items.act_table_.get());
507  looper_->attachTo(*items.actReg_);
508 
509  //For now loopers make us run only 1 transition at a time
510  nStreams=1;
511  nConcurrentLumis=1;
512  nConcurrentRuns=1;
513  }
514 
515  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
516 
517  // initialize the input source
518  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_, preallocations_);
519 
520  // intialize the Schedule
521  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
522 
523  // set the data members
524  act_table_ = std::move(items.act_table_);
525  actReg_ = items.actReg_;
526  preg_.reset(items.preg_.release());
527  branchIDListHelper_ = items.branchIDListHelper_;
528  processConfiguration_ = items.processConfiguration_;
530  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
531 
532  FDEBUG(2) << parameterSet << std::endl;
533 
535  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
536  // Reusable event principal
537  auto ep = std::make_shared<EventPrincipal>(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get(), index);
538  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
539  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
541  }
542  // initialize the subprocess, if there is one
543  if(subProcessParameterSet) {
544  subProcess_.reset(new SubProcess(*subProcessParameterSet,
545  *parameterSet,
546  preg_,
547  branchIDListHelper_,
549  *actReg_,
550  token,
553  &processContext_));
554  }
555  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:546
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static void setThrowAnException(bool v)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::unique_ptr< InputSource > input_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1485 of file EventProcessor.cc.

References FDEBUG.

1485  {
1486  if (fb_.get() != nullptr) {
1487  schedule_->openOutputFiles(*fb_);
1488  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1489  }
1490  FDEBUG(1) << "\topenOutputFiles\n";
1491  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 868 of file EventProcessor.cc.

868  {
869  if(child_failed && continueAfterChildFailure_) {
870  if (child_fail_signal) {
871  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
872  child_fail_signal=0;
873  } else if (child_fail_exit_status) {
874  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
875  child_fail_exit_status=0;
876  } else {
877  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
878  }
879  child_failed =false;
880  }
881  }
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1549 of file EventProcessor.cc.

References FDEBUG.

1549  {
1550  looper_->prepareForNextLoop(esp_.get());
1551  FDEBUG(1) << "\tprepareForNextLoop\n";
1552  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void edm::EventProcessor::processEvent ( unsigned int  iStreamIndex)
private

Definition at line 1989 of file EventProcessor.cc.

References FDEBUG, edm::Service< T >::isAvailable(), edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, and summarizeEdmComparisonLogfiles::succeeded.

1989  {
1990  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1991  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1993  if(rng.isAvailable()) {
1994  Event ev(*pep, ModuleDescription(), nullptr);
1995  rng->postEventRead(ev);
1996  }
1997  assert(pep->luminosityBlockPrincipalPtrValid());
1998  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
1999  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2000 
2001  //We can only update IOVs on Lumi boundaries
2002  //IOVSyncValue ts(pep->id(), pep->time());
2003  //espController_->eventSetupForInstance(ts);
2004  EventSetup const& es = esp_->eventSetup();
2005  {
2006  typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Traits;
2007  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2008  if(hasSubProcess()) {
2009  subProcess_->doEvent(*pep);
2010  }
2011  }
2012 
2013  //NOTE: If we have a looper we only have one Stream
2014  if(looper_) {
2015  bool randomAccess = input_->randomAccess();
2016  ProcessingController::ForwardState forwardState = input_->forwardState();
2017  ProcessingController::ReverseState reverseState = input_->reverseState();
2018  ProcessingController pc(forwardState, reverseState, randomAccess);
2019 
2021  do {
2022 
2023  StreamContext streamContext(pep->streamID(), &processContext_);
2024  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2025 
2026  bool succeeded = true;
2027  if(randomAccess) {
2028  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2029  input_->skipEvents(-2);
2030  }
2031  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2032  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2033  }
2034  }
2035  pc.setLastOperationSucceeded(succeeded);
2036  } while(!pc.lastOperationSucceeded());
2037  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2038 
2039  }
2040 
2041  FDEBUG(1) << "\tprocessEvent\n";
2042  pep->clearEventPrincipal();
2043  }
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::auto_ptr< SubProcess > subProcess_
tuple status
Definition: ntuplemaker.py:245
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::processEventsForStreamAsync ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1871 of file EventProcessor.cc.

References cmsPerfStripChart::operate(), and processEvent().

1872  {
1873  try {
1874  // make the services available
1878  handler->initializeThisThreadForUse();
1879  }
1880 
1881  if(iStreamIndex==0) {
1882  processEvent(0);
1883  }
1884  do {
1885  if(shouldWeStop()) {
1886  break;
1887  }
1888  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1889  //another thread hit an exception
1890  //std::cerr<<"another thread saw an exception\n";
1891  break;
1892  }
1893  {
1894 
1895 
1896  {
1897  //nextItemType and readEvent need to be in same critical section
1898  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1899 
1900  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1901  //std::cerr<<"finishedProcessingEvents\n";
1902  break;
1903  }
1904 
1905  //If source and DelayedReader share a resource we must serialize them
1906  auto sr = input_->resourceSharedWithDelayedReader();
1907  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1908  if(sr) {
1909  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1910  }
1911  InputSource::ItemType itemType = input_->nextItemType();
1912  if (InputSource::IsEvent !=itemType) {
1914  finishedProcessingEvents->store(true,std::memory_order_release);
1915  //std::cerr<<"next item type "<<itemType<<"\n";
1916  break;
1917  }
1919  //std::cerr<<"task told to async stop\n";
1920  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1921  break;
1922  }
1923  readEvent(iStreamIndex);
1924  }
1925  }
1926  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1927  //another thread hit an exception
1928  //std::cerr<<"another thread saw an exception\n";
1929  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1930 
1931  break;
1932  }
1933  processEvent(iStreamIndex);
1934  }while(true);
1935  } catch (...) {
1936  bool expected =false;
1937  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1938  deferredExceptionPtr_ = std::current_exception();
1939  }
1940  //std::cerr<<"task caught exception\n";
1941  }
1942  }
void readEvent(unsigned int iStreamIndex)
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::mutex nextTransitionMutex_
virtual bool shouldWeStop() const
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1814 of file EventProcessor.cc.

1814  {
1815  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1816  {
1817  SendSourceTerminationSignalIfException sentry(actReg_.get());
1818  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1819  sentry.completedSuccessfully();
1820  }
1821  return input_->luminosityBlock();
1822  }
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1777 of file EventProcessor.cc.

References PDRates::Run.

1777  {
1778  principalCache_.merge(input_->runAuxiliary(), preg_);
1779  auto runPrincipal =principalCache_.runPrincipalPtr();
1780  {
1781  SendSourceTerminationSignalIfException sentry(actReg_.get());
1782  input_->readAndMergeRun(*runPrincipal);
1783  sentry.completedSuccessfully();
1784  }
1785  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1786  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1787  }
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1944 of file EventProcessor.cc.

References pyrootRender::destroy(), and processEvent().

1944  {
1945  if(numberOfForkedChildren_>0) {
1946  readEvent(0);
1947  processEvent(0);
1948  return;
1949  }
1952 
1953  std::atomic<bool> finishedProcessingEvents{false};
1954 
1955  //Task assumes Stream 0 has already read the event that caused us to go here
1956  readEvent(0);
1957 
1958  //To wait, the ref count has to b 1+#streams
1959  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1960  eventLoopWaitTask->increment_ref_count();
1961 
1962  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1963  unsigned int iStreamIndex = 0;
1964  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1965  eventLoopWaitTask->increment_ref_count();
1966  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1967 
1968  }
1969  eventLoopWaitTask->increment_ref_count();
1970  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1971  tbb::task::destroy(*eventLoopWaitTask);
1972 
1973  //One of the processing threads saw an exception
1975  std::rethrow_exception(deferredExceptionPtr_);
1976  }
1977  }
void readEvent(unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
friend class StreamProcessingTask
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1978 of file EventProcessor.cc.

References event(), and FDEBUG.

1978  {
1979  //TODO this will have to become per stream
1980  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1981  StreamContext streamContext(event.streamID(), &processContext_);
1982 
1983  SendSourceTerminationSignalIfException sentry(actReg_.get());
1984  input_->readEvent(event, streamContext);
1985  sentry.completedSuccessfully();
1986 
1987  FDEBUG(1) << "\treadEvent\n";
1988  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1458 of file EventProcessor.cc.

References FDEBUG, or, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1458  {
1459  FDEBUG(1) << " \treadFile\n";
1460  size_t size = preg_->size();
1461  SendSourceTerminationSignalIfException sentry(actReg_.get());
1462 
1463  fb_ = input_->readFile();
1464  if(size < preg_->size()) {
1466  }
1468  if((numberOfForkedChildren_ > 0) or
1471  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1472  }
1473  sentry.completedSuccessfully();
1474  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::unique_ptr< FileBlock > fb_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1789 of file EventProcessor.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

1789  {
1792  << "EventProcessor::readRun\n"
1793  << "Illegal attempt to insert lumi into cache\n"
1794  << "Contact a Framework Developer\n";
1795  }
1798  << "EventProcessor::readRun\n"
1799  << "Illegal attempt to insert lumi into cache\n"
1800  << "Run is invalid\n"
1801  << "Contact a Framework Developer\n";
1802  }
1803  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1804  {
1805  SendSourceTerminationSignalIfException sentry(actReg_.get());
1806  input_->readLuminosityBlock(*lbp, *historyAppender_);
1807  sentry.completedSuccessfully();
1808  }
1809  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1810  principalCache_.insert(lbp);
1811  return input_->luminosityBlock();
1812  }
void insert(std::shared_ptr< RunPrincipal > rp)
bool hasRunPrincipal() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ProductRegistry const > preg_
bool hasLumiPrincipal() const
std::unique_ptr< InputSource > input_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1759 of file EventProcessor.cc.

References edm::hlt::Exception, edm::errors::LogicError, and PDRates::Run.

1759  {
1762  << "EventProcessor::readRun\n"
1763  << "Illegal attempt to insert run into cache\n"
1764  << "Contact a Framework Developer\n";
1765  }
1766  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1767  {
1768  SendSourceTerminationSignalIfException sentry(actReg_.get());
1769  input_->readRun(*rp, *historyAppender_);
1770  sentry.completedSuccessfully();
1771  }
1772  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1773  principalCache_.insert(rp);
1774  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1775  }
void insert(std::shared_ptr< RunPrincipal > rp)
bool hasRunPrincipal() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1512 of file EventProcessor.cc.

References FDEBUG.

1512  {
1513  if (fb_.get() != nullptr) {
1514  schedule_->respondToCloseInputFile(*fb_);
1515  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1516  }
1517  FDEBUG(1) << "\trespondToCloseInputFile\n";
1518  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1501 of file EventProcessor.cc.

References FDEBUG.

1501  {
1502  if(hasSubProcess()) {
1503  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1504  }
1505  if (fb_.get() != nullptr) {
1506  schedule_->respondToOpenInputFile(*fb_);
1507  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1508  }
1509  FDEBUG(1) << "\trespondToOpenInputFile\n";
1510  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1543 of file EventProcessor.cc.

References FDEBUG.

1543  {
1544  input_->repeat();
1545  input_->rewind();
1546  FDEBUG(1) << "\trewind\n";
1547  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< InputSource > input_
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 311 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

311  {
312  return runToCompletion();
313  }
virtual StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1270 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), bk::beginJob(), alignCSCRings::e, edm::hlt::Exception, FDEBUG, edm::errors::LogicError, cmsPerfStripChart::operate(), runEdmFileComparison::returnCode, findQualityFiles::size, and edm::convertException::wrap().

Referenced by run().

1270  {
1271 
1274  std::auto_ptr<statemachine::Machine> machine;
1275  {
1276  beginJob(); //make sure this was called
1277 
1278  //StatusCode returnCode = epSuccess;
1280 
1281  // make the services available
1283 
1284  machine = createStateMachine();
1287  try {
1288  convertException::wrap([&]() {
1289 
1290  InputSource::ItemType itemType;
1291 
1292  while(true) {
1293 
1294  bool more = true;
1295  if(numberOfForkedChildren_ > 0) {
1296  size_t size = preg_->size();
1297  {
1298  SendSourceTerminationSignalIfException sentry(actReg_.get());
1299  more = input_->skipForForking();
1300  sentry.completedSuccessfully();
1301  }
1302  if(more) {
1303  if(size < preg_->size()) {
1305  }
1307  }
1308  }
1309  {
1310  SendSourceTerminationSignalIfException sentry(actReg_.get());
1311  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1312  sentry.completedSuccessfully();
1313  }
1314 
1315  FDEBUG(1) << "itemType = " << itemType << "\n";
1316 
1317  if(checkForAsyncStopRequest(returnCode)) {
1318  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1319  forceLooperToEnd_ = true;
1320  machine->process_event(statemachine::Stop());
1321  forceLooperToEnd_ = false;
1322  break;
1323  }
1324 
1325  if(itemType == InputSource::IsEvent) {
1326  machine->process_event(statemachine::Event());
1328  forceLooperToEnd_ = true;
1329  machine->process_event(statemachine::Stop());
1330  forceLooperToEnd_ = false;
1332  break;
1333  }
1335  }
1336 
1337  if(itemType == InputSource::IsEvent) {
1338  }
1339  else if(itemType == InputSource::IsStop) {
1340  machine->process_event(statemachine::Stop());
1341  }
1342  else if(itemType == InputSource::IsFile) {
1343  machine->process_event(statemachine::File());
1344  }
1345  else if(itemType == InputSource::IsRun) {
1346  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1347  }
1348  else if(itemType == InputSource::IsLumi) {
1349  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1350  }
1351  else if(itemType == InputSource::IsSynchronize) {
1352  //For now, we don't have to do anything
1353  }
1354  // This should be impossible
1355  else {
1357  << "Unknown next item type passed to EventProcessor\n"
1358  << "Please report this error to the Framework group\n";
1359  }
1360  if(machine->terminated()) {
1361  break;
1362  }
1363  } // End of loop over state machine events
1364  }); // convertException::wrap
1365  } // Try block
1366  // Some comments on exception handling related to the boost state machine:
1367  //
1368  // Some states used in the machine are special because they
1369  // perform actions while the machine is being terminated, actions
1370  // such as close files, call endRun, call endLumi etc ... Each of these
1371  // states has two functions that perform these actions. The functions
1372  // are almost identical. The major difference is that one version
1373  // catches all exceptions and the other lets exceptions pass through.
1374  // The destructor catches them and the other function named "exit" lets
1375  // them pass through. On a normal termination, boost will always call
1376  // "exit" and then the state destructor. In our state classes, the
1377  // the destructors do nothing if the exit function already took
1378  // care of things. Here's the interesting part. When boost is
1379  // handling an exception the "exit" function is not called (a boost
1380  // feature).
1381  //
1382  // If an exception occurs while the boost machine is in control
1383  // (which usually means inside a process_event call), then
1384  // the boost state machine destroys its states and "terminates" itself.
1385  // This already done before we hit the catch blocks below. In this case
1386  // the call to terminateMachine below only destroys an already
1387  // terminated state machine. Because exit is not called, the state destructors
1388  // handle cleaning up lumis, runs, and files. The destructors swallow
1389  // all exceptions and only pass through the exceptions messages, which
1390  // are tacked onto the original exception below.
1391  //
1392  // If an exception occurs when the boost state machine is not
1393  // in control (outside the process_event functions), then boost
1394  // cannot destroy its own states. The terminateMachine function
1395  // below takes care of that. The flag "alreadyHandlingException"
1396  // is set true so that the state exit functions do nothing (and
1397  // cannot throw more exceptions while handling the first). Then the
1398  // state destructors take care of this because exit did nothing.
1399  //
1400  // In both cases above, the EventProcessor::endOfLoop function is
1401  // not called because it can throw exceptions.
1402  //
1403  // One tricky aspect of the state machine is that things that can
1404  // throw should not be invoked by the state machine while another
1405  // exception is being handled.
1406  // Another tricky aspect is that it appears to be important to
1407  // terminate the state machine before invoking its destructor.
1408  // We've seen crashes that are not understood when that is not
1409  // done. Maintainers of this code should be careful about this.
1410 
1411  catch (cms::Exception & e) {
1413  terminateMachine(machine);
1414  alreadyHandlingException_ = false;
1415  if (!exceptionMessageLumis_.empty()) {
1417  if (e.alreadyPrinted()) {
1418  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1419  }
1420  }
1421  if (!exceptionMessageRuns_.empty()) {
1423  if (e.alreadyPrinted()) {
1424  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1425  }
1426  }
1427  if (!exceptionMessageFiles_.empty()) {
1429  if (e.alreadyPrinted()) {
1430  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1431  }
1432  }
1433  throw;
1434  }
1435 
1436  if(machine->terminated()) {
1437  FDEBUG(1) << "The state machine reports it has been terminated\n";
1438  machine.reset();
1439  }
1440 
1442  throw cms::Exception("BadState")
1443  << "The boost state machine in the EventProcessor exited after\n"
1444  << "entering the Error state.\n";
1445  }
1446 
1447  }
1448  if(machine.get() != 0) {
1449  terminateMachine(machine);
1451  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1452  << "Please report this error to the Framework group\n";
1453  }
1454 
1455  return returnCode;
1456  }
bool checkForAsyncStopRequest(StatusCode &)
std::auto_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
ServiceToken serviceToken_
std::string exceptionMessageLumis_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2051 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2051  {
2053  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2059 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2059  {
2061  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2055 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2055  {
2057  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1554 of file EventProcessor.cc.

References FDEBUG.

1554  {
1555  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1556  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1557  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2045 of file EventProcessor.cc.

References FDEBUG.

2045  {
2046  FDEBUG(1) << "\tshouldWeStop\n";
2047  if(shouldWeStop_) return true;
2048  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2049  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1520 of file EventProcessor.cc.

References FDEBUG.

1520  {
1521  shouldWeStop_ = false;
1522  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1523  // until after we've called beginOfJob
1524  if(looper_ && looperBeginJobRun_) {
1525  looper_->doStartingNewLoop();
1526  }
1527  FDEBUG(1) << "\tstartingNewLoop\n";
1528  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine)
private

Definition at line 2067 of file EventProcessor.cc.

References FDEBUG.

2067  {
2068  if(iMachine.get() != 0) {
2069  if(!iMachine->terminated()) {
2070  forceLooperToEnd_ = true;
2071  iMachine->process_event(statemachine::Stop());
2072  forceLooperToEnd_ = false;
2073  }
2074  else {
2075  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2076  }
2077  if(iMachine->terminated()) {
2078  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2079  }
2080  iMachine.reset();
2081  }
2082  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1190 of file EventProcessor.cc.

1190  {
1191  return schedule_->totalEvents();
1192  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1200 of file EventProcessor.cc.

1200  {
1201  return schedule_->totalEventsFailed();
1202  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1195 of file EventProcessor.cc.

1195  {
1196  return schedule_->totalEventsPassed();
1197  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1836 of file EventProcessor.cc.

References FDEBUG.

1836  {
1838  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1839  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1840  }
ProcessContext processContext_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1824 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1824  {
1825  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1826  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1827  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1828  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class StreamProcessingTask
friend

Definition at line 235 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 262 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 255 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 286 of file EventProcessor.h.

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 298 of file EventProcessor.h.

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 300 of file EventProcessor.h.

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 278 of file EventProcessor.h.

Referenced by beginJob().

std::shared_ptr<BranchIDListHelper> edm::EventProcessor::branchIDListHelper_
private

Definition at line 257 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 294 of file EventProcessor.h.

Referenced by init().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 274 of file EventProcessor.h.

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 273 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 282 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private

Definition at line 261 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

std::unique_ptr<eventsetup::EventSetupsController> edm::EventProcessor::espController_
private

Definition at line 260 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 304 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 283 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 285 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 284 of file EventProcessor.h.

std::unique_ptr<FileBlock> edm::EventProcessor::fb_
private

Definition at line 269 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 281 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 289 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 287 of file EventProcessor.h.

std::unique_ptr<HistoryAppender> edm::EventProcessor::historyAppender_
private

Definition at line 267 of file EventProcessor.h.

Referenced by init().

std::unique_ptr<InputSource> edm::EventProcessor::input_
private

Definition at line 259 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 288 of file EventProcessor.h.

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 299 of file EventProcessor.h.

std::mutex edm::EventProcessor::nextTransitionMutex_
private

Definition at line 276 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 291 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 292 of file EventProcessor.h.

Referenced by init().

PreallocationConfiguration edm::EventProcessor::preallocations_
private

Definition at line 296 of file EventProcessor.h.

Referenced by beginJob(), endJob(), and init().

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg_
private

Definition at line 256 of file EventProcessor.h.

Referenced by beginJob(), and init().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 277 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 263 of file EventProcessor.h.

Referenced by init().

ProcessContext edm::EventProcessor::processContext_
private

Definition at line 264 of file EventProcessor.h.

Referenced by init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 258 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 293 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 279 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 280 of file EventProcessor.h.

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_
private