CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop ()
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual int readLuminosityBlock ()
 
virtual statemachine::Run readRun ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToOpenInputFile ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
virtual StatusCode runToCompletion ()
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
virtual void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

bool checkForAsyncStopRequest (StatusCode &)
 
std::auto_ptr
< statemachine::Machine
createStateMachine ()
 
bool hasSubProcess () const
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
void possiblyContinueAfterForkChildFailure ()
 
void processEvent (unsigned int iStreamIndex)
 
void processEventsForStreamAsync (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void readEvent (unsigned int iStreamIndex)
 
void setupSignal ()
 
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
std::unique_ptr
< eventsetup::EventSetupsController
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
std::unique_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
std::unique_ptr< HistoryAppenderhistoryAppender_
 
std::unique_ptr< InputSourceinput_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
std::mutex nextTransitionMutex_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PreallocationConfiguration preallocations_
 
std::shared_ptr
< ProductRegistry const > 
preg_
 
PrincipalCache principalCache_
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
bool stateMachineWasInErrorState_
 
std::auto_ptr< SubProcesssubProcess_
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper_
 

Friends

class StreamProcessingTask
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 59 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 304 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 305 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 220 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

224  :
225  actReg_(),
226  preg_(),
228  serviceToken_(),
229  input_(),
230  espController_(new eventsetup::EventSetupsController),
231  esp_(),
232  act_table_(),
234  schedule_(),
235  subProcess_(),
236  historyAppender_(new HistoryAppender),
237  fb_(),
238  looper_(),
240  principalCache_(),
241  beginJobCalled_(false),
242  shouldWeStop_(false),
244  fileMode_(),
250  forceLooperToEnd_(false),
251  looperBeginJobRun_(false),
255  setCpuAffinity_(false),
257  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
258  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
259  processDesc->addServices(defaultServices, forcedServices);
260  init(processDesc, iToken, iLegacy);
261  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 263 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

265  :
266  actReg_(),
267  preg_(),
269  serviceToken_(),
270  input_(),
271  espController_(new eventsetup::EventSetupsController),
272  esp_(),
273  act_table_(),
275  schedule_(),
276  subProcess_(),
277  historyAppender_(new HistoryAppender),
278  fb_(),
279  looper_(),
281  principalCache_(),
282  beginJobCalled_(false),
283  shouldWeStop_(false),
285  fileMode_(),
291  forceLooperToEnd_(false),
292  looperBeginJobRun_(false),
296  setCpuAffinity_(false),
300  {
301  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
302  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
303  processDesc->addServices(defaultServices, forcedServices);
305  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 307 of file EventProcessor.cc.

References init().

309  :
310  actReg_(),
311  preg_(),
313  serviceToken_(),
314  input_(),
315  espController_(new eventsetup::EventSetupsController),
316  esp_(),
317  act_table_(),
319  schedule_(),
320  subProcess_(),
321  historyAppender_(new HistoryAppender),
322  fb_(),
323  looper_(),
325  principalCache_(),
326  beginJobCalled_(false),
327  shouldWeStop_(false),
329  fileMode_(),
335  forceLooperToEnd_(false),
336  looperBeginJobRun_(false),
340  setCpuAffinity_(false),
344  {
345  init(processDesc, token, legacy);
346  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 349 of file EventProcessor.cc.

References HDQMDatabaseProducer::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

349  :
350  actReg_(),
351  preg_(),
353  serviceToken_(),
354  input_(),
355  espController_(new eventsetup::EventSetupsController),
356  esp_(),
357  act_table_(),
359  schedule_(),
360  subProcess_(),
361  historyAppender_(new HistoryAppender),
362  fb_(),
363  looper_(),
365  principalCache_(),
366  beginJobCalled_(false),
367  shouldWeStop_(false),
369  fileMode_(),
375  forceLooperToEnd_(false),
376  looperBeginJobRun_(false),
380  setCpuAffinity_(false),
384 {
385  if(isPython) {
386  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
387  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
389  }
390  else {
391  auto processDesc = std::make_shared<ProcessDesc>(config);
393  }
394  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 563 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and subProcess_.

563  {
564  // Make the services available while everything is being deleted.
565  ServiceToken token = getToken();
566  ServiceRegistry::Operate op(token);
567 
568  // manually destroy all these thing that may need the services around
569  espController_.reset();
570  subProcess_.reset();
571  esp_.reset();
572  schedule_.reset();
573  input_.reset();
574  looper_.reset();
575  actReg_.reset();
576 
579  }
void clear()
Not thread safe.
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void clear()
Not thread safe.
Definition: Registry.cc:44
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
ServiceToken getToken()
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:16
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2069 of file EventProcessor.cc.

2069  {
2071  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 582 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, hasSubProcess(), i, input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), preallocations_, preg_, schedule_, serviceToken_, subProcess_, and edm::convertException::wrap().

582  {
583  if(beginJobCalled_) return;
584  beginJobCalled_=true;
585  bk::beginJob();
586 
587  // StateSentry toerror(this); // should we add this ?
588  //make the services available
590 
591  service::SystemBounds bounds(preallocations_.numberOfStreams(),
595  actReg_->preallocateSignal_(bounds);
596  //NOTE: This implementation assumes 'Job' means one call
597  // the EventProcessor::run
598  // If it really means once per 'application' then this code will
599  // have to be changed.
600  // Also have to deal with case where have 'run' then new Module
601  // added and do 'run'
602  // again. In that case the newly added Module needs its 'beginJob'
603  // to be called.
604 
605  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
606  // For now we delay calling beginOfJob until first beginOfRun
607  //if(looper_) {
608  // looper_->beginOfJob(es);
609  //}
610  try {
611  convertException::wrap([&]() {
612  input_->doBeginJob();
613  });
614  }
615  catch(cms::Exception& ex) {
616  ex.addContext("Calling beginJob for the source");
617  throw;
618  }
619  schedule_->beginJob(*preg_);
620  // toerror.succeeded(); // should we add this?
621  if(hasSubProcess()) subProcess_->doBeginJob();
622  actReg_->postBeginJobSignal_();
623 
624  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
625  schedule_->beginStream(i);
626  if(hasSubProcess()) subProcess_->doBeginStream(i);
627  }
628  }
int i
Definition: DBlmapReader.cc:9
PreallocationConfiguration preallocations_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void addContext(std::string const &context)
Definition: Exception.cc:227
std::auto_ptr< SubProcess > subProcess_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
bool hasSubProcess() const
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1671 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, i, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

1671  {
1672  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1673  {
1674  SendSourceTerminationSignalIfException sentry(actReg_.get());
1675 
1676  input_->doBeginLumi(lumiPrincipal, &processContext_);
1677  sentry.completedSuccessfully();
1678  }
1679 
1681  if(rng.isAvailable()) {
1682  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1683  rng->preBeginLumi(lb);
1684  }
1685 
1686  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1687  // lumi blocks know their start and end times why not also start and end events?
1688  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1689  {
1690  SendSourceTerminationSignalIfException sentry(actReg_.get());
1691  espController_->eventSetupForInstance(ts);
1692  sentry.completedSuccessfully();
1693  }
1694  EventSetup const& es = esp_->eventSetup();
1695  {
1696  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1697  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1698  if(hasSubProcess()) {
1699  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1700  }
1701  }
1702  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1703  if(looper_) {
1704  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1705  }
1706  {
1707  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1708  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1709  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1710  if(hasSubProcess()) {
1711  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1712  }
1713  }
1714  }
1715  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1716  if(looper_) {
1717  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1718  }
1719  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1576 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1576  {
1577  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1578  {
1579  SendSourceTerminationSignalIfException sentry(actReg_.get());
1580 
1581  input_->doBeginRun(runPrincipal, &processContext_);
1582  sentry.completedSuccessfully();
1583  }
1584 
1585  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1586  runPrincipal.beginTime());
1588  espController_->forceCacheClear();
1589  }
1590  {
1591  SendSourceTerminationSignalIfException sentry(actReg_.get());
1592  espController_->eventSetupForInstance(ts);
1593  sentry.completedSuccessfully();
1594  }
1595  EventSetup const& es = esp_->eventSetup();
1596  if(looper_ && looperBeginJobRun_== false) {
1597  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1598  looper_->beginOfJob(es);
1599  looperBeginJobRun_ = true;
1600  looper_->doStartingNewLoop();
1601  }
1602  {
1603  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1604  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1605  if(hasSubProcess()) {
1606  subProcess_->doBeginRun(runPrincipal, ts);
1607  }
1608  }
1609  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1610  if(looper_) {
1611  looper_->doBeginRun(runPrincipal, es, &processContext_);
1612  }
1613  {
1614  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1615  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1616  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1617  if(hasSubProcess()) {
1618  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1619  }
1620  }
1621  }
1622  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1623  if(looper_) {
1624  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1625  }
1626  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1263 of file EventProcessor.cc.

References edm::shutdown_flag.

1263  {
1264  bool returnValue = false;
1265 
1266  // Look for a shutdown signal
1267  if(shutdown_flag.load(std::memory_order_acquire)) {
1268  returnValue = true;
1269  returnCode = epSignal;
1270  }
1271  return returnValue;
1272  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1226 of file EventProcessor.cc.

1226  {
1227  schedule_->clearCounters();
1228  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1482 of file EventProcessor.cc.

References FDEBUG.

1482  {
1483  if (fb_.get() != nullptr) {
1484  SendSourceTerminationSignalIfException sentry(actReg_.get());
1485  input_->closeFile(fb_.get(), cleaningUpAfterException);
1486  sentry.completedSuccessfully();
1487  }
1488  FDEBUG(1) << "\tcloseInputFile\n";
1489  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1499 of file EventProcessor.cc.

References FDEBUG.

1499  {
1500  if (fb_.get() != nullptr) {
1501  schedule_->closeOutputFiles();
1502  if(hasSubProcess()) subProcess_->closeOutputFiles();
1503  }
1504  FDEBUG(1) << "\tcloseOutputFiles\n";
1505  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1232 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, edm::hlt::Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1232  {
1234  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1235  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1236  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1237  else {
1238  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1239  << fileMode_ << ".\n"
1240  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1241  }
1242 
1243  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1244  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1245  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1246  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1247  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1248  else {
1249  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1250  << emptyRunLumiMode_ << ".\n"
1251  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1252  }
1253 
1254  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1255  fileMode,
1256  emptyRunLumiMode));
1257 
1258  machine->initiate();
1259  return machine;
1260  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1848 of file EventProcessor.cc.

References FDEBUG.

1848  {
1850  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1851  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1852  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1836 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1836  {
1837  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1838  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1839  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1840  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1565 of file EventProcessor.cc.

References FDEBUG.

1565  {
1566  FDEBUG(1) << "\tdoErrorStuff\n";
1567  LogError("StateMachine")
1568  << "The EventProcessor state machine encountered an unexpected event\n"
1569  << "and went to the error state\n"
1570  << "Will attempt to terminate processing normally\n"
1571  << "(IF using the looper the next loop will be attempted)\n"
1572  << "This likely indicates a bug in an input module or corrupted input or both\n";
1574  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1211 of file EventProcessor.cc.

1211  {
1212  schedule_->enableEndPaths(active);
1213  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 631 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcess(), edm::ExceptionCollector::hasThrown(), i, input_, looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcess_.

631  {
632  // Collects exceptions, so we don't throw before all operations are performed.
633  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
634 
635  //make the services available
637 
638  //NOTE: this really should go elsewhere in the future
639  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
640  c.call([this,i](){this->schedule_->endStream(i);});
641  if(hasSubProcess()) {
642  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
643  }
644  }
645  schedule_->endJob(c);
646  if(hasSubProcess()) {
647  c.call(std::bind(&SubProcess::doEndJob, subProcess_.get()));
648  }
649  c.call(std::bind(&InputSource::doEndJob, input_.get()));
650  if(looper_) {
651  c.call(std::bind(&EDLooperBase::endOfJob, looper_));
652  }
653  auto actReg = actReg_.get();
654  c.call([actReg](){actReg->postEndJobSignal_();});
655  if(c.hasThrown()) {
656  c.rethrow();
657  }
658  }
int i
Definition: DBlmapReader.cc:9
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:254
boost::shared_ptr< EDLooperBase > looper_
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
bool hasSubProcess() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1721 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, i, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

1721  {
1722  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1723  {
1724  SendSourceTerminationSignalIfException sentry(actReg_.get());
1725 
1726  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1727  sentry.completedSuccessfully();
1728  }
1729  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1730  // lumi blocks know their start and end times why not also start and end events?
1731  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1732  lumiPrincipal.endTime());
1733  {
1734  SendSourceTerminationSignalIfException sentry(actReg_.get());
1735  espController_->eventSetupForInstance(ts);
1736  sentry.completedSuccessfully();
1737  }
1738  EventSetup const& es = esp_->eventSetup();
1739  {
1740  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1741  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1742  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1743  if(hasSubProcess()) {
1744  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1745  }
1746  }
1747  }
1748  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1749  if(looper_) {
1750  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1751  }
1752  {
1753  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1754  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1755  if(hasSubProcess()) {
1756  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1757  }
1758  }
1759  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1760  if(looper_) {
1761  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1762  }
1763  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1536 of file EventProcessor.cc.

References FDEBUG, and ntuplemaker::status.

1536  {
1537  if(looper_) {
1538  ModuleChanger changer(schedule_.get(),preg_.get());
1539  looper_->setModuleChanger(&changer);
1540  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1541  looper_->setModuleChanger(nullptr);
1542  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1543  else return false;
1544  }
1545  FDEBUG(1) << "\tendOfLoop\n";
1546  return true;
1547  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ProductRegistry const > preg_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1216 of file EventProcessor.cc.

1216  {
1217  return schedule_->endPathsEnabled();
1218  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1628 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1628  {
1629  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1630  {
1631  SendSourceTerminationSignalIfException sentry(actReg_.get());
1632 
1633  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1634  sentry.completedSuccessfully();
1635  }
1636 
1638  runPrincipal.endTime());
1639  {
1640  SendSourceTerminationSignalIfException sentry(actReg_.get());
1641  espController_->eventSetupForInstance(ts);
1642  sentry.completedSuccessfully();
1643  }
1644  EventSetup const& es = esp_->eventSetup();
1645  {
1646  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1647  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1648  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1649  if(hasSubProcess()) {
1650  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1651  }
1652  }
1653  }
1654  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1655  if(looper_) {
1656  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1657  }
1658  {
1659  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1660  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1661  if(hasSubProcess()) {
1662  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1663  }
1664  }
1665  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1666  if(looper_) {
1667  looper_->doEndRun(runPrincipal, es, &processContext_);
1668  }
1669  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 890 of file EventProcessor.cc.

References bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, edm::hlt::Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

890  {
891 
892  if(0 == numberOfForkedChildren_) {return true;}
893  assert(0<numberOfForkedChildren_);
894  //do what we want done in common
895  {
896  beginJob(); //make sure this was run
897  // make the services available
899 
900  InputSource::ItemType itemType;
901  itemType = input_->nextItemType();
902 
903  assert(itemType == InputSource::IsFile);
904  {
905  readFile();
906  }
907  itemType = input_->nextItemType();
908  assert(itemType == InputSource::IsRun);
909 
910  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
911  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
912  input_->runAuxiliary()->beginTime());
913  espController_->eventSetupForInstance(ts);
914  EventSetup const& es = esp_->eventSetup();
915 
916  //now get all the data available in the EventSetup
917  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
918  es.fillAvailableRecordKeys(recordKeys);
919  std::vector<eventsetup::DataKey> dataKeys;
920  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
921  itKey != itEnd;
922  ++itKey) {
923  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
924  //see if this is on our exclusion list
925  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
926  ExcludedData const* excludedData(nullptr);
927  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
928  excludedData = &(itExcludeRec->second);
929  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
930  //skip all items in this record
931  continue;
932  }
933  }
934  if(0 != recordPtr) {
935  dataKeys.clear();
936  recordPtr->fillRegisteredDataKeys(dataKeys);
937  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
938  itDataKey != itDataKeyEnd;
939  ++itDataKey) {
940  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
941  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
942  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
943  continue;
944  }
945  try {
946  recordPtr->doGet(*itDataKey);
947  } catch(cms::Exception& e) {
948  LogWarning("ForkingEventSetupPreFetching") << e.what();
949  }
950  }
951  }
952  }
953  }
954  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
955  {
956  // make the services available
958  Service<JobReport> jobReport;
959  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
960 
961  //Now actually do the forking
962  actReg_->preForkReleaseResourcesSignal_();
963  input_->doPreForkReleaseResources();
964  schedule_->preForkReleaseResources();
965  }
966  installCustomHandler(SIGCHLD, ep_sigchld);
967 
968 
969  unsigned int childIndex = 0;
970  unsigned int const kMaxChildren = numberOfForkedChildren_;
971  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
972  std::vector<pid_t> childrenIds;
973  childrenIds.reserve(kMaxChildren);
974  std::vector<int> childrenSockets;
975  childrenSockets.reserve(kMaxChildren);
976  std::vector<int> childrenPipes;
977  childrenPipes.reserve(kMaxChildren);
978  std::vector<int> childrenSocketsCopy;
979  childrenSocketsCopy.reserve(kMaxChildren);
980  std::vector<int> childrenPipesCopy;
981  childrenPipesCopy.reserve(kMaxChildren);
982  int pipes[] {0, 0};
983 
984  {
985  // make the services available
987  Service<JobReport> jobReport;
988  int sockets[2], fd_flags;
989  for(; childIndex < kMaxChildren; ++childIndex) {
990  // Create a UNIX_DGRAM socket pair
991  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
992  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
993  exit(EXIT_FAILURE);
994  }
995  if (pipe(pipes)) {
996  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
997  exit(EXIT_FAILURE);
998  }
999  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1000  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1001  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1002  exit(EXIT_FAILURE);
1003  }
1004  // Mark socket as non-block. Child must be careful to do select prior
1005  // to reading from socket.
1006  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1007  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1008  exit(EXIT_FAILURE);
1009  }
1010  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1011  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1012  exit(EXIT_FAILURE);
1013  }
1014  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1015  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1016  exit(EXIT_FAILURE);
1017  }
1018  // Linux man page notes there are some edge cases where reading from a
1019  // fd can block, even after a select.
1020  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1021  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1022  exit(EXIT_FAILURE);
1023  }
1024  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1025  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1026  exit(EXIT_FAILURE);
1027  }
1028 
1029  childrenPipesCopy = childrenPipes;
1030  childrenSocketsCopy = childrenSockets;
1031 
1032  pid_t value = fork();
1033  if(value == 0) {
1034  // Close the parent's side of the socket and pipe which will talk to us.
1035  close(pipes[0]);
1036  close(sockets[0]);
1037  // Close our copies of the parent's other communication pipes.
1038  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1039  close(*it);
1040  }
1041  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1042  close(*it);
1043  }
1044 
1045  // this is the child process, redirect stdout and stderr to a log file
1046  fflush(stdout);
1047  fflush(stderr);
1048  std::stringstream stout;
1049  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1050  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1051  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1052  }
1053  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1054  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1055  }
1056 
1057  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1058  if(setCpuAffinity_) {
1059  // CPU affinity is handled differently on macosx.
1060  // We disable it and print a message until someone reads:
1061  //
1062  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1063  //
1064  // and implements it.
1065 #ifdef __APPLE__
1066  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1067 #else
1068  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1069  cpu_set_t mask;
1070  CPU_ZERO(&mask);
1071  CPU_SET(childIndex, &mask);
1072  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1073  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1074  exit(-1);
1075  }
1076 #endif
1077  }
1078  break;
1079  } else {
1080  //this is the parent
1081  close(pipes[1]);
1082  close(sockets[1]);
1083  }
1084  if(value < 0) {
1085  LogError("ForkingChild") << "failed to create a child";
1086  exit(-1);
1087  }
1088  childrenIds.push_back(value);
1089  childrenSockets.push_back(sockets[0]);
1090  childrenPipes.push_back(pipes[0]);
1091  }
1092 
1093  if(childIndex < kMaxChildren) {
1094  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1095  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1096 
1097  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1098  input_->doPostForkReacquireResources(receiver);
1099  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1100  //NOTE: sources have to reset themselves by listening to the post fork message
1101  //rewindInput();
1102  return true;
1103  }
1104  jobReport->parentAfterFork(jobReportFile);
1105  }
1106 
1107  //this is the original, which is now the master for all the children
1108 
1109  //Need to wait for signals from the children or externally
1110  // To wait we must
1111  // 1) block the signals we want to wait on so we do not have a race condition
1112  // 2) check that we haven't already meet our ending criteria
1113  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1114  sigset_t blockingSigSet;
1115  sigset_t unblockingSigSet;
1116  sigset_t oldSigSet;
1117  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1118  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1119  sigaddset(&blockingSigSet, SIGCHLD);
1120  sigaddset(&blockingSigSet, SIGUSR2);
1121  sigaddset(&blockingSigSet, SIGINT);
1122  sigdelset(&unblockingSigSet, SIGCHLD);
1123  sigdelset(&unblockingSigSet, SIGUSR2);
1124  sigdelset(&unblockingSigSet, SIGINT);
1125  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1126 
1127  // If there are too many fd's (unlikely, but possible) for select, denote this
1128  // because the sender will fail.
1129  bool too_many_fds = false;
1130  if (pipes[1]+1 > FD_SETSIZE) {
1131  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1132  too_many_fds = true;
1133  }
1134 
1135  //create a thread that sends the units of work to workers
1136  // we create it after all signals were blocked so that this
1137  // thread is never interupted by a signal
1138  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1139  boost::thread senderThread(sender);
1140 
1141  if(not too_many_fds) {
1142  //NOTE: a child could have failed before we got here and even after this call
1143  // which is why the 'if' is conditional on continueAfterChildFailure_
1145  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1146  sigsuspend(&unblockingSigSet);
1148  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1149  }
1150  }
1151  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1152 
1153  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1154  if(child_failed) {
1155  LogError("ForkingStopping") << "child failed";
1156  }
1157  if(shutdown_flag) {
1158  LogSystem("ForkingStopping") << "asked to shutdown";
1159  }
1160 
1161  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1162  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1163  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1164  it != itEnd; ++it) {
1165  /* int result = */ kill(*it, SIGUSR2);
1166  }
1167  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1168  while(num_children_done != kMaxChildren) {
1169  sigsuspend(&unblockingSigSet);
1170  }
1171  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1172  }
1173  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1174  senderThread.join();
1175  if(child_failed && !continueAfterChildFailure_) {
1176  if (child_fail_signal) {
1177  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1178  } else if (child_fail_exit_status) {
1179  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1180  } else {
1181  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1182  }
1183  }
1184  if(too_many_fds) {
1185  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1186  }
1187  return false;
1188  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void possiblyContinueAfterForkChildFailure()
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1191 of file EventProcessor.cc.

1191  {
1192  return schedule_->getAllModuleDescriptions();
1193  }
std::auto_ptr< Schedule > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 661 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

661  {
662  return serviceToken_;
663  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1221 of file EventProcessor.cc.

1221  {
1222  schedule_->getTriggerReport(rep);
1223  }
string rep
Definition: cuy.py:1188
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::hasSubProcess ( ) const
inlineprivate

Definition at line 230 of file EventProcessor.h.

References subProcess_.

Referenced by beginJob(), and endJob().

230  {
231  return subProcess_.get() != 0;
232  }
std::auto_ptr< SubProcess > subProcess_
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 397 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper_, branchIDListHelper_, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, cmsHarvester::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessParameterSet(), preallocations_, edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcess_, edm::ScheduleItems::thinnedAssociationsHelper_, and thinnedAssociationsHelper_.

Referenced by EventProcessor().

399  {
400 
401  //std::cerr << processDesc->dump() << std::endl;
402 
403  ROOT::Cintex::Cintex::Enable();
404 
405  // register the empty parentage vector , once and for all
407 
408  // register the empty parameter set, once and for all.
409  ParameterSet().registerIt();
410 
411  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
412  //std::cerr << parameterSet->dump() << std::endl;
413 
414  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
415  std::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
416 
417  // Now set some parameters specific to the main process.
418  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
419  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
420  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
421  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
422  //threading
423  unsigned int nThreads=1;
424  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
425  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
426  if(nThreads == 0) {
427  nThreads = 1;
428  }
429  }
430  /* TODO: when we support having each stream run in a different thread use this default
431  unsigned int nStreams =nThreads;
432  */
433  unsigned int nStreams =1;
434  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
435  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
436  if(nStreams==0) {
437  nStreams = nThreads;
438  }
439  }
440  /*
441  bool nRunsSet = false;
442  */
443  unsigned int nConcurrentRuns =1;
444  /*
445  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
446  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
447  }
448  */
449  unsigned int nConcurrentLumis =1;
450  /*
451  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
452  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
453  } else {
454  nConcurrentLumis = nConcurrentRuns;
455  }
456  */
457  //Check that relationships between threading parameters makes sense
458  /*
459  if(nThreads<nStreams) {
460  //bad
461  }
462  if(nConcurrentRuns>nStreams) {
463  //bad
464  }
465  if(nConcurrentRuns>nConcurrentLumis) {
466  //bad
467  }
468  */
469  //forking
470  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
471  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
472  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
473  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
474  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
475  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
476  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
477  itPS != itPSEnd;
478  ++itPS) {
479  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
480  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
481  itPS->getUntrackedParameter<std::string>("label", "")));
482  }
483  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
484 
485  // Now do general initialization
486  ScheduleItems items;
487 
488  //initialize the services
489  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
490  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
491  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
492 
493  //make the services available
495 
496  if(nStreams>1) {
498  handler->willBeUsingThreads();
499  }
500 
501  // intialize miscellaneous items
502  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
503 
504  // intialize the event setup provider
505  esp_ = espController_->makeProvider(*parameterSet);
506 
507  // initialize the looper, if any
508  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
509  if(looper_) {
510  looper_->setActionTable(items.act_table_.get());
511  looper_->attachTo(*items.actReg_);
512 
513  //For now loopers make us run only 1 transition at a time
514  nStreams=1;
515  nConcurrentLumis=1;
516  nConcurrentRuns=1;
517  }
518 
519  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
520 
521  // initialize the input source
522  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.thinnedAssociationsHelper_, items.actReg_, items.processConfiguration_, preallocations_);
523 
524  // intialize the Schedule
525  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
526 
527  // set the data members
528  act_table_ = std::move(items.act_table_);
529  actReg_ = items.actReg_;
530  preg_.reset(items.preg_.release());
531  branchIDListHelper_ = items.branchIDListHelper_;
532  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper_;
533  processConfiguration_ = items.processConfiguration_;
535  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
536 
537  FDEBUG(2) << parameterSet << std::endl;
538 
540  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
541  // Reusable event principal
542  auto ep = std::make_shared<EventPrincipal>(preg_, branchIDListHelper_, thinnedAssociationsHelper_, *processConfiguration_, historyAppender_.get(), index);
543  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
544  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
546  }
547  // initialize the subprocess, if there is one
548  if(subProcessParameterSet) {
549  subProcess_.reset(new SubProcess(*subProcessParameterSet,
550  *parameterSet,
551  preg_,
552  branchIDListHelper_,
553  *thinnedAssociationsHelper_,
555  *actReg_,
556  token,
559  &processContext_));
560  }
561  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:560
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static void setThrowAnException(bool v)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1491 of file EventProcessor.cc.

References FDEBUG.

1491  {
1492  if (fb_.get() != nullptr) {
1493  schedule_->openOutputFiles(*fb_);
1494  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1495  }
1496  FDEBUG(1) << "\topenOutputFiles\n";
1497  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 874 of file EventProcessor.cc.

874  {
875  if(child_failed && continueAfterChildFailure_) {
876  if (child_fail_signal) {
877  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
878  child_fail_signal=0;
879  } else if (child_fail_exit_status) {
880  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
881  child_fail_exit_status=0;
882  } else {
883  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
884  }
885  child_failed =false;
886  }
887  }
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1555 of file EventProcessor.cc.

References FDEBUG.

1555  {
1556  looper_->prepareForNextLoop(esp_.get());
1557  FDEBUG(1) << "\tprepareForNextLoop\n";
1558  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void edm::EventProcessor::processEvent ( unsigned int  iStreamIndex)
private

Definition at line 1995 of file EventProcessor.cc.

References ev, FDEBUG, edm::Service< T >::isAvailable(), edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, and summarizeEdmComparisonLogfiles::succeeded.

1995  {
1996  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1997  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1999  if(rng.isAvailable()) {
2000  Event ev(*pep, ModuleDescription(), nullptr);
2001  rng->postEventRead(ev);
2002  }
2003  assert(pep->luminosityBlockPrincipalPtrValid());
2004  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2005  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2006 
2007  //We can only update IOVs on Lumi boundaries
2008  //IOVSyncValue ts(pep->id(), pep->time());
2009  //espController_->eventSetupForInstance(ts);
2010  EventSetup const& es = esp_->eventSetup();
2011  {
2012  typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Traits;
2013  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2014  if(hasSubProcess()) {
2015  subProcess_->doEvent(*pep);
2016  }
2017  }
2018 
2019  //NOTE: If we have a looper we only have one Stream
2020  if(looper_) {
2021  bool randomAccess = input_->randomAccess();
2022  ProcessingController::ForwardState forwardState = input_->forwardState();
2023  ProcessingController::ReverseState reverseState = input_->reverseState();
2024  ProcessingController pc(forwardState, reverseState, randomAccess);
2025 
2027  do {
2028 
2029  StreamContext streamContext(pep->streamID(), &processContext_);
2030  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2031 
2032  bool succeeded = true;
2033  if(randomAccess) {
2034  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2035  input_->skipEvents(-2);
2036  }
2037  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2038  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2039  }
2040  }
2041  pc.setLastOperationSucceeded(succeeded);
2042  } while(!pc.lastOperationSucceeded());
2043  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2044 
2045  }
2046 
2047  FDEBUG(1) << "\tprocessEvent\n";
2048  pep->clearEventPrincipal();
2049  }
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::auto_ptr< SubProcess > subProcess_
tuple status
Definition: ntuplemaker.py:245
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::processEventsForStreamAsync ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1877 of file EventProcessor.cc.

References cmsPerfStripChart::operate(), and processEvent().

1878  {
1879  try {
1880  // make the services available
1884  handler->initializeThisThreadForUse();
1885  }
1886 
1887  if(iStreamIndex==0) {
1888  processEvent(0);
1889  }
1890  do {
1891  if(shouldWeStop()) {
1892  break;
1893  }
1894  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1895  //another thread hit an exception
1896  //std::cerr<<"another thread saw an exception\n";
1897  break;
1898  }
1899  {
1900 
1901 
1902  {
1903  //nextItemType and readEvent need to be in same critical section
1904  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1905 
1906  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1907  //std::cerr<<"finishedProcessingEvents\n";
1908  break;
1909  }
1910 
1911  //If source and DelayedReader share a resource we must serialize them
1912  auto sr = input_->resourceSharedWithDelayedReader();
1913  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1914  if(sr) {
1915  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1916  }
1917  InputSource::ItemType itemType = input_->nextItemType();
1918  if (InputSource::IsEvent !=itemType) {
1920  finishedProcessingEvents->store(true,std::memory_order_release);
1921  //std::cerr<<"next item type "<<itemType<<"\n";
1922  break;
1923  }
1925  //std::cerr<<"task told to async stop\n";
1926  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1927  break;
1928  }
1929  readEvent(iStreamIndex);
1930  }
1931  }
1932  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1933  //another thread hit an exception
1934  //std::cerr<<"another thread saw an exception\n";
1935  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1936 
1937  break;
1938  }
1939  processEvent(iStreamIndex);
1940  }while(true);
1941  } catch (...) {
1942  bool expected =false;
1943  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1944  deferredExceptionPtr_ = std::current_exception();
1945  }
1946  //std::cerr<<"task caught exception\n";
1947  }
1948  }
void readEvent(unsigned int iStreamIndex)
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::mutex nextTransitionMutex_
virtual bool shouldWeStop() const
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1820 of file EventProcessor.cc.

1820  {
1821  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1822  {
1823  SendSourceTerminationSignalIfException sentry(actReg_.get());
1824  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1825  sentry.completedSuccessfully();
1826  }
1827  return input_->luminosityBlock();
1828  }
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1783 of file EventProcessor.cc.

References PDRates::Run.

1783  {
1784  principalCache_.merge(input_->runAuxiliary(), preg_);
1785  auto runPrincipal =principalCache_.runPrincipalPtr();
1786  {
1787  SendSourceTerminationSignalIfException sentry(actReg_.get());
1788  input_->readAndMergeRun(*runPrincipal);
1789  sentry.completedSuccessfully();
1790  }
1791  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1792  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1793  }
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1950 of file EventProcessor.cc.

References pyrootRender::destroy(), and processEvent().

1950  {
1951  if(numberOfForkedChildren_>0) {
1952  readEvent(0);
1953  processEvent(0);
1954  return;
1955  }
1958 
1959  std::atomic<bool> finishedProcessingEvents{false};
1960 
1961  //Task assumes Stream 0 has already read the event that caused us to go here
1962  readEvent(0);
1963 
1964  //To wait, the ref count has to b 1+#streams
1965  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1966  eventLoopWaitTask->increment_ref_count();
1967 
1968  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1969  unsigned int iStreamIndex = 0;
1970  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1971  eventLoopWaitTask->increment_ref_count();
1972  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1973 
1974  }
1975  eventLoopWaitTask->increment_ref_count();
1976  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1977  tbb::task::destroy(*eventLoopWaitTask);
1978 
1979  //One of the processing threads saw an exception
1981  std::rethrow_exception(deferredExceptionPtr_);
1982  }
1983  }
void readEvent(unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
friend class StreamProcessingTask
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1984 of file EventProcessor.cc.

References event(), and FDEBUG.

1984  {
1985  //TODO this will have to become per stream
1986  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1987  StreamContext streamContext(event.streamID(), &processContext_);
1988 
1989  SendSourceTerminationSignalIfException sentry(actReg_.get());
1990  input_->readEvent(event, streamContext);
1991  sentry.completedSuccessfully();
1992 
1993  FDEBUG(1) << "\treadEvent\n";
1994  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1464 of file EventProcessor.cc.

References FDEBUG, or, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1464  {
1465  FDEBUG(1) << " \treadFile\n";
1466  size_t size = preg_->size();
1467  SendSourceTerminationSignalIfException sentry(actReg_.get());
1468 
1469  fb_ = input_->readFile();
1470  if(size < preg_->size()) {
1472  }
1474  if((numberOfForkedChildren_ > 0) or
1477  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1478  }
1479  sentry.completedSuccessfully();
1480  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::unique_ptr< FileBlock > fb_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1795 of file EventProcessor.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

1795  {
1798  << "EventProcessor::readRun\n"
1799  << "Illegal attempt to insert lumi into cache\n"
1800  << "Contact a Framework Developer\n";
1801  }
1804  << "EventProcessor::readRun\n"
1805  << "Illegal attempt to insert lumi into cache\n"
1806  << "Run is invalid\n"
1807  << "Contact a Framework Developer\n";
1808  }
1809  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1810  {
1811  SendSourceTerminationSignalIfException sentry(actReg_.get());
1812  input_->readLuminosityBlock(*lbp, *historyAppender_);
1813  sentry.completedSuccessfully();
1814  }
1815  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1816  principalCache_.insert(lbp);
1817  return input_->luminosityBlock();
1818  }
void insert(std::shared_ptr< RunPrincipal > rp)
bool hasRunPrincipal() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ProductRegistry const > preg_
bool hasLumiPrincipal() const
std::unique_ptr< InputSource > input_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1765 of file EventProcessor.cc.

References edm::hlt::Exception, edm::errors::LogicError, and PDRates::Run.

1765  {
1768  << "EventProcessor::readRun\n"
1769  << "Illegal attempt to insert run into cache\n"
1770  << "Contact a Framework Developer\n";
1771  }
1772  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1773  {
1774  SendSourceTerminationSignalIfException sentry(actReg_.get());
1775  input_->readRun(*rp, *historyAppender_);
1776  sentry.completedSuccessfully();
1777  }
1778  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1779  principalCache_.insert(rp);
1780  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1781  }
void insert(std::shared_ptr< RunPrincipal > rp)
bool hasRunPrincipal() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1518 of file EventProcessor.cc.

References FDEBUG.

1518  {
1519  if (fb_.get() != nullptr) {
1520  schedule_->respondToCloseInputFile(*fb_);
1521  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1522  }
1523  FDEBUG(1) << "\trespondToCloseInputFile\n";
1524  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1507 of file EventProcessor.cc.

References FDEBUG.

1507  {
1508  if(hasSubProcess()) {
1509  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1510  }
1511  if (fb_.get() != nullptr) {
1512  schedule_->respondToOpenInputFile(*fb_);
1513  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1514  }
1515  FDEBUG(1) << "\trespondToOpenInputFile\n";
1516  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1549 of file EventProcessor.cc.

References FDEBUG.

1549  {
1550  input_->repeat();
1551  input_->rewind();
1552  FDEBUG(1) << "\trewind\n";
1553  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< InputSource > input_
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 313 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

313  {
314  return runToCompletion();
315  }
virtual StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1276 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), bk::beginJob(), alignCSCRings::e, edm::hlt::Exception, FDEBUG, edm::errors::LogicError, cmsPerfStripChart::operate(), runEdmFileComparison::returnCode, findQualityFiles::size, and edm::convertException::wrap().

Referenced by run().

1276  {
1277 
1280  std::auto_ptr<statemachine::Machine> machine;
1281  {
1282  beginJob(); //make sure this was called
1283 
1284  //StatusCode returnCode = epSuccess;
1286 
1287  // make the services available
1289 
1290  machine = createStateMachine();
1293  try {
1294  convertException::wrap([&]() {
1295 
1296  InputSource::ItemType itemType;
1297 
1298  while(true) {
1299 
1300  bool more = true;
1301  if(numberOfForkedChildren_ > 0) {
1302  size_t size = preg_->size();
1303  {
1304  SendSourceTerminationSignalIfException sentry(actReg_.get());
1305  more = input_->skipForForking();
1306  sentry.completedSuccessfully();
1307  }
1308  if(more) {
1309  if(size < preg_->size()) {
1311  }
1313  }
1314  }
1315  {
1316  SendSourceTerminationSignalIfException sentry(actReg_.get());
1317  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1318  sentry.completedSuccessfully();
1319  }
1320 
1321  FDEBUG(1) << "itemType = " << itemType << "\n";
1322 
1323  if(checkForAsyncStopRequest(returnCode)) {
1324  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1325  forceLooperToEnd_ = true;
1326  machine->process_event(statemachine::Stop());
1327  forceLooperToEnd_ = false;
1328  break;
1329  }
1330 
1331  if(itemType == InputSource::IsEvent) {
1332  machine->process_event(statemachine::Event());
1334  forceLooperToEnd_ = true;
1335  machine->process_event(statemachine::Stop());
1336  forceLooperToEnd_ = false;
1338  break;
1339  }
1341  }
1342 
1343  if(itemType == InputSource::IsEvent) {
1344  }
1345  else if(itemType == InputSource::IsStop) {
1346  machine->process_event(statemachine::Stop());
1347  }
1348  else if(itemType == InputSource::IsFile) {
1349  machine->process_event(statemachine::File());
1350  }
1351  else if(itemType == InputSource::IsRun) {
1352  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1353  }
1354  else if(itemType == InputSource::IsLumi) {
1355  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1356  }
1357  else if(itemType == InputSource::IsSynchronize) {
1358  //For now, we don't have to do anything
1359  }
1360  // This should be impossible
1361  else {
1363  << "Unknown next item type passed to EventProcessor\n"
1364  << "Please report this error to the Framework group\n";
1365  }
1366  if(machine->terminated()) {
1367  break;
1368  }
1369  } // End of loop over state machine events
1370  }); // convertException::wrap
1371  } // Try block
1372  // Some comments on exception handling related to the boost state machine:
1373  //
1374  // Some states used in the machine are special because they
1375  // perform actions while the machine is being terminated, actions
1376  // such as close files, call endRun, call endLumi etc ... Each of these
1377  // states has two functions that perform these actions. The functions
1378  // are almost identical. The major difference is that one version
1379  // catches all exceptions and the other lets exceptions pass through.
1380  // The destructor catches them and the other function named "exit" lets
1381  // them pass through. On a normal termination, boost will always call
1382  // "exit" and then the state destructor. In our state classes, the
1383  // the destructors do nothing if the exit function already took
1384  // care of things. Here's the interesting part. When boost is
1385  // handling an exception the "exit" function is not called (a boost
1386  // feature).
1387  //
1388  // If an exception occurs while the boost machine is in control
1389  // (which usually means inside a process_event call), then
1390  // the boost state machine destroys its states and "terminates" itself.
1391  // This already done before we hit the catch blocks below. In this case
1392  // the call to terminateMachine below only destroys an already
1393  // terminated state machine. Because exit is not called, the state destructors
1394  // handle cleaning up lumis, runs, and files. The destructors swallow
1395  // all exceptions and only pass through the exceptions messages, which
1396  // are tacked onto the original exception below.
1397  //
1398  // If an exception occurs when the boost state machine is not
1399  // in control (outside the process_event functions), then boost
1400  // cannot destroy its own states. The terminateMachine function
1401  // below takes care of that. The flag "alreadyHandlingException"
1402  // is set true so that the state exit functions do nothing (and
1403  // cannot throw more exceptions while handling the first). Then the
1404  // state destructors take care of this because exit did nothing.
1405  //
1406  // In both cases above, the EventProcessor::endOfLoop function is
1407  // not called because it can throw exceptions.
1408  //
1409  // One tricky aspect of the state machine is that things that can
1410  // throw should not be invoked by the state machine while another
1411  // exception is being handled.
1412  // Another tricky aspect is that it appears to be important to
1413  // terminate the state machine before invoking its destructor.
1414  // We've seen crashes that are not understood when that is not
1415  // done. Maintainers of this code should be careful about this.
1416 
1417  catch (cms::Exception & e) {
1419  terminateMachine(machine);
1420  alreadyHandlingException_ = false;
1421  if (!exceptionMessageLumis_.empty()) {
1423  if (e.alreadyPrinted()) {
1424  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1425  }
1426  }
1427  if (!exceptionMessageRuns_.empty()) {
1429  if (e.alreadyPrinted()) {
1430  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1431  }
1432  }
1433  if (!exceptionMessageFiles_.empty()) {
1435  if (e.alreadyPrinted()) {
1436  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1437  }
1438  }
1439  throw;
1440  }
1441 
1442  if(machine->terminated()) {
1443  FDEBUG(1) << "The state machine reports it has been terminated\n";
1444  machine.reset();
1445  }
1446 
1448  throw cms::Exception("BadState")
1449  << "The boost state machine in the EventProcessor exited after\n"
1450  << "entering the Error state.\n";
1451  }
1452 
1453  }
1454  if(machine.get() != 0) {
1455  terminateMachine(machine);
1457  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1458  << "Please report this error to the Framework group\n";
1459  }
1460 
1461  return returnCode;
1462  }
bool checkForAsyncStopRequest(StatusCode &)
std::auto_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
ServiceToken serviceToken_
std::string exceptionMessageLumis_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2057 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2057  {
2059  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2065 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2065  {
2067  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2061 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2061  {
2063  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1560 of file EventProcessor.cc.

References FDEBUG.

1560  {
1561  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1562  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1563  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2051 of file EventProcessor.cc.

References FDEBUG.

2051  {
2052  FDEBUG(1) << "\tshouldWeStop\n";
2053  if(shouldWeStop_) return true;
2054  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2055  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1526 of file EventProcessor.cc.

References FDEBUG.

1526  {
1527  shouldWeStop_ = false;
1528  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1529  // until after we've called beginOfJob
1530  if(looper_ && looperBeginJobRun_) {
1531  looper_->doStartingNewLoop();
1532  }
1533  FDEBUG(1) << "\tstartingNewLoop\n";
1534  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine)
private

Definition at line 2073 of file EventProcessor.cc.

References FDEBUG.

2073  {
2074  if(iMachine.get() != 0) {
2075  if(!iMachine->terminated()) {
2076  forceLooperToEnd_ = true;
2077  iMachine->process_event(statemachine::Stop());
2078  forceLooperToEnd_ = false;
2079  }
2080  else {
2081  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2082  }
2083  if(iMachine->terminated()) {
2084  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2085  }
2086  iMachine.reset();
2087  }
2088  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1196 of file EventProcessor.cc.

1196  {
1197  return schedule_->totalEvents();
1198  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1206 of file EventProcessor.cc.

1206  {
1207  return schedule_->totalEventsFailed();
1208  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1201 of file EventProcessor.cc.

1201  {
1202  return schedule_->totalEventsPassed();
1203  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1842 of file EventProcessor.cc.

References FDEBUG.

1842  {
1844  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1845  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1846  }
ProcessContext processContext_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1830 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1830  {
1831  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1832  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1833  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1834  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class StreamProcessingTask
friend

Definition at line 236 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 264 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 256 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 288 of file EventProcessor.h.

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 300 of file EventProcessor.h.

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 302 of file EventProcessor.h.

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 280 of file EventProcessor.h.

Referenced by beginJob().

std::shared_ptr<BranchIDListHelper> edm::EventProcessor::branchIDListHelper_
private

Definition at line 258 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 296 of file EventProcessor.h.

Referenced by init().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 276 of file EventProcessor.h.

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 275 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 284 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private

Definition at line 263 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

std::unique_ptr<eventsetup::EventSetupsController> edm::EventProcessor::espController_
private

Definition at line 262 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 306 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 285 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 287 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 286 of file EventProcessor.h.

std::unique_ptr<FileBlock> edm::EventProcessor::fb_
private

Definition at line 271 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 283 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 291 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 289 of file EventProcessor.h.

std::unique_ptr<HistoryAppender> edm::EventProcessor::historyAppender_
private

Definition at line 269 of file EventProcessor.h.

Referenced by init().

std::unique_ptr<InputSource> edm::EventProcessor::input_
private

Definition at line 261 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 290 of file EventProcessor.h.

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 301 of file EventProcessor.h.

std::mutex edm::EventProcessor::nextTransitionMutex_
private

Definition at line 278 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 293 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 294 of file EventProcessor.h.

Referenced by init().

PreallocationConfiguration edm::EventProcessor::preallocations_
private

Definition at line 298 of file EventProcessor.h.

Referenced by beginJob(), endJob(), and init().

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg_
private

Definition at line 257 of file EventProcessor.h.

Referenced by beginJob(), and init().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 279 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 265 of file EventProcessor.h.

Referenced by init().

ProcessContext edm::EventProcessor::processContext_
private

Definition at line 266 of file EventProcessor.h.

Referenced by init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 260 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 295 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 281 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 282 of file EventProcessor.h.

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_
private
std::shared_ptr<ThinnedAssociationsHelper> edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 259 of file EventProcessor.h.

Referenced by init().