CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
virtual void prepareForNextLoop ()
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual int readLuminosityBlock ()
 
virtual statemachine::Run readRun ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToOpenInputFile ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
virtual StatusCode runToCompletion ()
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
virtual void startingNewLoop ()
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

bool checkForAsyncStopRequest (StatusCode &)
 
std::auto_ptr
< statemachine::Machine
createStateMachine ()
 
bool hasSubProcess () const
 
void init (std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
void possiblyContinueAfterForkChildFailure ()
 
void processEvent (unsigned int iStreamIndex)
 
void processEventsForStreamAsync (unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
 
void readEvent (unsigned int iStreamIndex)
 
void setupSignal ()
 
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
 

Private Attributes

std::unique_ptr
< ExceptionToActionTable const > 
act_table_
 
std::shared_ptr< ActivityRegistryactReg_
 
bool alreadyHandlingException_
 
bool asyncStopRequestedWhileProcessingEvents_
 
StatusCode asyncStopStatusCodeFromProcessingEvents_
 
bool beginJobCalled_
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::exception_ptr deferredExceptionPtr_
 
std::atomic< bool > deferredExceptionPtrIsSet_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
std::unique_ptr
< eventsetup::EventSetupsController
espController_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
std::unique_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
std::unique_ptr< HistoryAppenderhistoryAppender_
 
std::unique_ptr< InputSourceinput_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
InputSource::ItemType nextItemTypeFromProcessingEvents_
 
std::mutex nextTransitionMutex_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
PathsAndConsumesOfModules pathsAndConsumesOfModules_
 
PreallocationConfiguration preallocations_
 
std::shared_ptr
< ProductRegistry const > 
preg_
 
PrincipalCache principalCache_
 
std::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
ProcessContext processContext_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
bool stateMachineWasInErrorState_
 
std::auto_ptr< SubProcesssubProcess_
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper_
 

Friends

class StreamProcessingTask
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 60 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 306 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 307 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 216 of file EventProcessor.cc.

References init(), edm::parameterSet(), and PythonProcessDesc::parameterSet().

220  :
221  actReg_(),
222  preg_(),
224  serviceToken_(),
225  input_(),
226  espController_(new eventsetup::EventSetupsController),
227  esp_(),
228  act_table_(),
230  schedule_(),
231  subProcess_(),
232  historyAppender_(new HistoryAppender),
233  fb_(),
234  looper_(),
236  principalCache_(),
237  beginJobCalled_(false),
238  shouldWeStop_(false),
240  fileMode_(),
246  forceLooperToEnd_(false),
247  looperBeginJobRun_(false),
251  setCpuAffinity_(false),
253  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
254  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
255  processDesc->addServices(defaultServices, forcedServices);
256  init(processDesc, iToken, iLegacy);
257  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 259 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

261  :
262  actReg_(),
263  preg_(),
265  serviceToken_(),
266  input_(),
267  espController_(new eventsetup::EventSetupsController),
268  esp_(),
269  act_table_(),
271  schedule_(),
272  subProcess_(),
273  historyAppender_(new HistoryAppender),
274  fb_(),
275  looper_(),
277  principalCache_(),
278  beginJobCalled_(false),
279  shouldWeStop_(false),
281  fileMode_(),
287  forceLooperToEnd_(false),
288  looperBeginJobRun_(false),
292  setCpuAffinity_(false),
296  {
297  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
298  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
299  processDesc->addServices(defaultServices, forcedServices);
301  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 303 of file EventProcessor.cc.

References init().

305  :
306  actReg_(),
307  preg_(),
309  serviceToken_(),
310  input_(),
311  espController_(new eventsetup::EventSetupsController),
312  esp_(),
313  act_table_(),
315  schedule_(),
316  subProcess_(),
317  historyAppender_(new HistoryAppender),
318  fb_(),
319  looper_(),
321  principalCache_(),
322  beginJobCalled_(false),
323  shouldWeStop_(false),
325  fileMode_(),
331  forceLooperToEnd_(false),
332  looperBeginJobRun_(false),
336  setCpuAffinity_(false),
340  {
341  init(processDesc, token, legacy);
342  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 345 of file EventProcessor.cc.

References HDQMDatabaseProducer::config, init(), edm::serviceregistry::kOverlapIsError, edm::parameterSet(), and PythonProcessDesc::parameterSet().

345  :
346  actReg_(),
347  preg_(),
349  serviceToken_(),
350  input_(),
351  espController_(new eventsetup::EventSetupsController),
352  esp_(),
353  act_table_(),
355  schedule_(),
356  subProcess_(),
357  historyAppender_(new HistoryAppender),
358  fb_(),
359  looper_(),
361  principalCache_(),
362  beginJobCalled_(false),
363  shouldWeStop_(false),
365  fileMode_(),
371  forceLooperToEnd_(false),
372  looperBeginJobRun_(false),
376  setCpuAffinity_(false),
380 {
381  if(isPython) {
382  std::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
383  auto processDesc = std::make_shared<ProcessDesc>(parameterSet);
385  }
386  else {
387  auto processDesc = std::make_shared<ProcessDesc>(config);
389  }
390  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
std::unique_ptr< ExceptionToActionTable const > act_table_
boost::shared_ptr< EDLooperBase > looper_
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::string exceptionMessageLumis_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::shared_ptr< edm::ParameterSet > parameterSet()
InputSource::ItemType nextItemTypeFromProcessingEvents_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 557 of file EventProcessor.cc.

References actReg_, edm::ParentageRegistry::clear(), edm::pset::Registry::clear(), esp_, espController_, getToken(), input_, edm::ParentageRegistry::instance(), edm::pset::Registry::instance(), looper_, schedule_, and subProcess_.

557  {
558  // Make the services available while everything is being deleted.
559  ServiceToken token = getToken();
560  ServiceRegistry::Operate op(token);
561 
562  // manually destroy all these thing that may need the services around
563  espController_.reset();
564  subProcess_.reset();
565  esp_.reset();
566  schedule_.reset();
567  input_.reset();
568  looper_.reset();
569  actReg_.reset();
570 
573  }
void clear()
Not thread safe.
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void clear()
Not thread safe.
Definition: Registry.cc:44
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
ServiceToken getToken()
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:16
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2067 of file EventProcessor.cc.

2067  {
2069  }
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 576 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), bk::beginJob(), beginJobCalled_, hasSubProcess(), i, edm::PathsAndConsumesOfModules::initialize(), input_, edm::PreallocationConfiguration::numberOfLuminosityBlocks(), edm::PreallocationConfiguration::numberOfRuns(), edm::PreallocationConfiguration::numberOfStreams(), edm::PreallocationConfiguration::numberOfThreads(), cmsPerfStripChart::operate(), pathsAndConsumesOfModules_, preallocations_, preg_, processContext_, schedule_, serviceToken_, subProcess_, and edm::convertException::wrap().

576  {
577  if(beginJobCalled_) return;
578  beginJobCalled_=true;
579  bk::beginJob();
580 
581  // StateSentry toerror(this); // should we add this ?
582  //make the services available
584 
585  service::SystemBounds bounds(preallocations_.numberOfStreams(),
589  actReg_->preallocateSignal_(bounds);
591  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
592 
593  //NOTE: This implementation assumes 'Job' means one call
594  // the EventProcessor::run
595  // If it really means once per 'application' then this code will
596  // have to be changed.
597  // Also have to deal with case where have 'run' then new Module
598  // added and do 'run'
599  // again. In that case the newly added Module needs its 'beginJob'
600  // to be called.
601 
602  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
603  // For now we delay calling beginOfJob until first beginOfRun
604  //if(looper_) {
605  // looper_->beginOfJob(es);
606  //}
607  try {
608  convertException::wrap([&]() {
609  input_->doBeginJob();
610  });
611  }
612  catch(cms::Exception& ex) {
613  ex.addContext("Calling beginJob for the source");
614  throw;
615  }
616  schedule_->beginJob(*preg_);
617  // toerror.succeeded(); // should we add this?
618  if(hasSubProcess()) subProcess_->doBeginJob();
619  actReg_->postBeginJobSignal_();
620 
621  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
622  schedule_->beginStream(i);
623  if(hasSubProcess()) subProcess_->doBeginStream(i);
624  }
625  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
PreallocationConfiguration preallocations_
void beginJob()
Definition: Breakpoints.cc:15
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
void addContext(std::string const &context)
Definition: Exception.cc:227
PathsAndConsumesOfModules pathsAndConsumesOfModules_
std::auto_ptr< SubProcess > subProcess_
auto wrap(F iFunc) -> decltype(iFunc())
std::shared_ptr< ActivityRegistry > actReg_
bool hasSubProcess() const
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1669 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, i, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

1669  {
1670  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1671  {
1672  SendSourceTerminationSignalIfException sentry(actReg_.get());
1673 
1674  input_->doBeginLumi(lumiPrincipal, &processContext_);
1675  sentry.completedSuccessfully();
1676  }
1677 
1679  if(rng.isAvailable()) {
1680  LuminosityBlock lb(lumiPrincipal, ModuleDescription(), nullptr);
1681  rng->preBeginLumi(lb);
1682  }
1683 
1684  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1685  // lumi blocks know their start and end times why not also start and end events?
1686  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1687  {
1688  SendSourceTerminationSignalIfException sentry(actReg_.get());
1689  espController_->eventSetupForInstance(ts);
1690  sentry.completedSuccessfully();
1691  }
1692  EventSetup const& es = esp_->eventSetup();
1693  {
1694  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> Traits;
1695  schedule_->processOneGlobal<Traits>(lumiPrincipal, es);
1696  if(hasSubProcess()) {
1697  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
1698  }
1699  }
1700  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1701  if(looper_) {
1702  looper_->doBeginLuminosityBlock(lumiPrincipal, es, &processContext_);
1703  }
1704  {
1705  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1706  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> Traits;
1707  schedule_->processOneStream<Traits>(i,lumiPrincipal, es);
1708  if(hasSubProcess()) {
1709  subProcess_->doStreamBeginLuminosityBlock(i,lumiPrincipal, ts);
1710  }
1711  }
1712  }
1713  FDEBUG(1) << "\tstreamBeginLumi " << run << "/" << lumi << "\n";
1714  if(looper_) {
1715  //looper_->doStreamBeginLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1716  }
1717  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1574 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1574  {
1575  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1576  {
1577  SendSourceTerminationSignalIfException sentry(actReg_.get());
1578 
1579  input_->doBeginRun(runPrincipal, &processContext_);
1580  sentry.completedSuccessfully();
1581  }
1582 
1583  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1584  runPrincipal.beginTime());
1586  espController_->forceCacheClear();
1587  }
1588  {
1589  SendSourceTerminationSignalIfException sentry(actReg_.get());
1590  espController_->eventSetupForInstance(ts);
1591  sentry.completedSuccessfully();
1592  }
1593  EventSetup const& es = esp_->eventSetup();
1594  if(looper_ && looperBeginJobRun_== false) {
1595  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1596  looper_->beginOfJob(es);
1597  looperBeginJobRun_ = true;
1598  looper_->doStartingNewLoop();
1599  }
1600  {
1601  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Traits;
1602  schedule_->processOneGlobal<Traits>(runPrincipal, es);
1603  if(hasSubProcess()) {
1604  subProcess_->doBeginRun(runPrincipal, ts);
1605  }
1606  }
1607  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1608  if(looper_) {
1609  looper_->doBeginRun(runPrincipal, es, &processContext_);
1610  }
1611  {
1612  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Traits;
1613  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1614  schedule_->processOneStream<Traits>(i,runPrincipal, es);
1615  if(hasSubProcess()) {
1616  subProcess_->doStreamBeginRun(i, runPrincipal, ts);
1617  }
1618  }
1619  }
1620  FDEBUG(1) << "\tstreamBeginRun " << run.runNumber() << "\n";
1621  if(looper_) {
1622  //looper_->doStreamBeginRun(schedule_->streamID(),runPrincipal, es);
1623  }
1624  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::checkForAsyncStopRequest ( StatusCode returnCode)
private

Definition at line 1261 of file EventProcessor.cc.

References edm::shutdown_flag.

1261  {
1262  bool returnValue = false;
1263 
1264  // Look for a shutdown signal
1265  if(shutdown_flag.load(std::memory_order_acquire)) {
1266  returnValue = true;
1267  returnCode = epSignal;
1268  }
1269  return returnValue;
1270  }
volatile std::atomic< bool > shutdown_flag
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1224 of file EventProcessor.cc.

1224  {
1225  schedule_->clearCounters();
1226  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1480 of file EventProcessor.cc.

References FDEBUG.

1480  {
1481  if (fb_.get() != nullptr) {
1482  SendSourceTerminationSignalIfException sentry(actReg_.get());
1483  input_->closeFile(fb_.get(), cleaningUpAfterException);
1484  sentry.completedSuccessfully();
1485  }
1486  FDEBUG(1) << "\tcloseInputFile\n";
1487  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1497 of file EventProcessor.cc.

References FDEBUG.

1497  {
1498  if (fb_.get() != nullptr) {
1499  schedule_->closeOutputFiles();
1500  if(hasSubProcess()) subProcess_->closeOutputFiles();
1501  }
1502  FDEBUG(1) << "\tcloseOutputFiles\n";
1503  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1230 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, edm::hlt::Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1230  {
1232  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1233  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1234  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1235  else {
1236  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1237  << fileMode_ << ".\n"
1238  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1239  }
1240 
1241  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1242  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1243  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1244  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1245  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1246  else {
1247  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1248  << emptyRunLumiMode_ << ".\n"
1249  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1250  }
1251 
1252  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1253  fileMode,
1254  emptyRunLumiMode));
1255 
1256  machine->initiate();
1257  return machine;
1258  }
std::string emptyRunLumiMode_
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1846 of file EventProcessor.cc.

References FDEBUG.

1846  {
1848  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
1849  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1850  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1834 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1834  {
1835  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1836  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
1837  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1838  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1563 of file EventProcessor.cc.

References FDEBUG.

1563  {
1564  FDEBUG(1) << "\tdoErrorStuff\n";
1565  LogError("StateMachine")
1566  << "The EventProcessor state machine encountered an unexpected event\n"
1567  << "and went to the error state\n"
1568  << "Will attempt to terminate processing normally\n"
1569  << "(IF using the looper the next loop will be attempted)\n"
1570  << "This likely indicates a bug in an input module or corrupted input or both\n";
1572  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1209 of file EventProcessor.cc.

1209  {
1210  schedule_->enableEndPaths(active);
1211  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 628 of file EventProcessor.cc.

References actReg_, EnergyCorrector::c, edm::ExceptionCollector::call(), edm::SubProcess::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcess(), edm::ExceptionCollector::hasThrown(), i, input_, looper_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), preallocations_, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcess_.

628  {
629  // Collects exceptions, so we don't throw before all operations are performed.
630  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
631 
632  //make the services available
634 
635  //NOTE: this really should go elsewhere in the future
636  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
637  c.call([this,i](){this->schedule_->endStream(i);});
638  if(hasSubProcess()) {
639  c.call([this,i](){ this->subProcess_->doEndStream(i); } );
640  }
641  }
642  auto actReg = actReg_.get();
643  c.call([actReg](){actReg->preEndJobSignal_();});
644  schedule_->endJob(c);
645  if(hasSubProcess()) {
646  c.call(std::bind(&SubProcess::doEndJob, subProcess_.get()));
647  }
648  c.call(std::bind(&InputSource::doEndJob, input_.get()));
649  if(looper_) {
650  c.call(std::bind(&EDLooperBase::endOfJob, looper_));
651  }
652  c.call([actReg](){actReg->postEndJobSignal_();});
653  if(c.hasThrown()) {
654  c.rethrow();
655  }
656  }
int i
Definition: DBlmapReader.cc:9
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:254
boost::shared_ptr< EDLooperBase > looper_
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:90
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
bool hasSubProcess() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1719 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, i, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

1719  {
1720  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1721  {
1722  SendSourceTerminationSignalIfException sentry(actReg_.get());
1723 
1724  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException, &processContext_);
1725  sentry.completedSuccessfully();
1726  }
1727  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1728  // lumi blocks know their start and end times why not also start and end events?
1729  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1730  lumiPrincipal.endTime());
1731  {
1732  SendSourceTerminationSignalIfException sentry(actReg_.get());
1733  espController_->eventSetupForInstance(ts);
1734  sentry.completedSuccessfully();
1735  }
1736  EventSetup const& es = esp_->eventSetup();
1737  {
1738  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1739  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> Traits;
1740  schedule_->processOneStream<Traits>(i,lumiPrincipal, es, cleaningUpAfterException);
1741  if(hasSubProcess()) {
1742  subProcess_->doStreamEndLuminosityBlock(i,lumiPrincipal, ts, cleaningUpAfterException);
1743  }
1744  }
1745  }
1746  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1747  if(looper_) {
1748  //looper_->doStreamEndLuminosityBlock(schedule_->streamID(),lumiPrincipal, es);
1749  }
1750  {
1751  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> Traits;
1752  schedule_->processOneGlobal<Traits>(lumiPrincipal, es, cleaningUpAfterException);
1753  if(hasSubProcess()) {
1754  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
1755  }
1756  }
1757  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1758  if(looper_) {
1759  looper_->doEndLuminosityBlock(lumiPrincipal, es, &processContext_);
1760  }
1761  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1534 of file EventProcessor.cc.

References FDEBUG, and ntuplemaker::status.

1534  {
1535  if(looper_) {
1536  ModuleChanger changer(schedule_.get(),preg_.get());
1537  looper_->setModuleChanger(&changer);
1538  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1539  looper_->setModuleChanger(nullptr);
1540  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1541  else return false;
1542  }
1543  FDEBUG(1) << "\tendOfLoop\n";
1544  return true;
1545  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::shared_ptr< ProductRegistry const > preg_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1214 of file EventProcessor.cc.

1214  {
1215  return schedule_->endPathsEnabled();
1216  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1626 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, i, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1626  {
1627  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1628  {
1629  SendSourceTerminationSignalIfException sentry(actReg_.get());
1630 
1631  input_->doEndRun(runPrincipal, cleaningUpAfterException, &processContext_);
1632  sentry.completedSuccessfully();
1633  }
1634 
1636  runPrincipal.endTime());
1637  {
1638  SendSourceTerminationSignalIfException sentry(actReg_.get());
1639  espController_->eventSetupForInstance(ts);
1640  sentry.completedSuccessfully();
1641  }
1642  EventSetup const& es = esp_->eventSetup();
1643  {
1644  for(unsigned int i=0; i<preallocations_.numberOfStreams();++i) {
1645  typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Traits;
1646  schedule_->processOneStream<Traits>(i,runPrincipal, es, cleaningUpAfterException);
1647  if(hasSubProcess()) {
1648  subProcess_->doStreamEndRun(i,runPrincipal, ts, cleaningUpAfterException);
1649  }
1650  }
1651  }
1652  FDEBUG(1) << "\tstreamEndRun " << run.runNumber() << "\n";
1653  if(looper_) {
1654  //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es);
1655  }
1656  {
1657  typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Traits;
1658  schedule_->processOneGlobal<Traits>(runPrincipal, es, cleaningUpAfterException);
1659  if(hasSubProcess()) {
1660  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
1661  }
1662  }
1663  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1664  if(looper_) {
1665  looper_->doEndRun(runPrincipal, es, &processContext_);
1666  }
1667  }
int i
Definition: DBlmapReader.cc:9
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:111
std::auto_ptr< SubProcess > subProcess_
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 888 of file EventProcessor.cc.

References assert(), bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, edm::hlt::Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

888  {
889 
890  if(0 == numberOfForkedChildren_) {return true;}
892  //do what we want done in common
893  {
894  beginJob(); //make sure this was run
895  // make the services available
897 
898  InputSource::ItemType itemType;
899  itemType = input_->nextItemType();
900 
901  assert(itemType == InputSource::IsFile);
902  {
903  readFile();
904  }
905  itemType = input_->nextItemType();
906  assert(itemType == InputSource::IsRun);
907 
908  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
909  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
910  input_->runAuxiliary()->beginTime());
911  espController_->eventSetupForInstance(ts);
912  EventSetup const& es = esp_->eventSetup();
913 
914  //now get all the data available in the EventSetup
915  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
916  es.fillAvailableRecordKeys(recordKeys);
917  std::vector<eventsetup::DataKey> dataKeys;
918  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
919  itKey != itEnd;
920  ++itKey) {
921  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
922  //see if this is on our exclusion list
923  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
924  ExcludedData const* excludedData(nullptr);
925  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
926  excludedData = &(itExcludeRec->second);
927  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
928  //skip all items in this record
929  continue;
930  }
931  }
932  if(0 != recordPtr) {
933  dataKeys.clear();
934  recordPtr->fillRegisteredDataKeys(dataKeys);
935  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
936  itDataKey != itDataKeyEnd;
937  ++itDataKey) {
938  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
939  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
940  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
941  continue;
942  }
943  try {
944  recordPtr->doGet(*itDataKey);
945  } catch(cms::Exception& e) {
946  LogWarning("ForkingEventSetupPreFetching") << e.what();
947  }
948  }
949  }
950  }
951  }
952  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
953  {
954  // make the services available
956  Service<JobReport> jobReport;
957  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
958 
959  //Now actually do the forking
960  actReg_->preForkReleaseResourcesSignal_();
961  input_->doPreForkReleaseResources();
962  schedule_->preForkReleaseResources();
963  }
964  installCustomHandler(SIGCHLD, ep_sigchld);
965 
966 
967  unsigned int childIndex = 0;
968  unsigned int const kMaxChildren = numberOfForkedChildren_;
969  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
970  std::vector<pid_t> childrenIds;
971  childrenIds.reserve(kMaxChildren);
972  std::vector<int> childrenSockets;
973  childrenSockets.reserve(kMaxChildren);
974  std::vector<int> childrenPipes;
975  childrenPipes.reserve(kMaxChildren);
976  std::vector<int> childrenSocketsCopy;
977  childrenSocketsCopy.reserve(kMaxChildren);
978  std::vector<int> childrenPipesCopy;
979  childrenPipesCopy.reserve(kMaxChildren);
980  int pipes[] {0, 0};
981 
982  {
983  // make the services available
985  Service<JobReport> jobReport;
986  int sockets[2], fd_flags;
987  for(; childIndex < kMaxChildren; ++childIndex) {
988  // Create a UNIX_DGRAM socket pair
989  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
990  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
991  exit(EXIT_FAILURE);
992  }
993  if (pipe(pipes)) {
994  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
995  exit(EXIT_FAILURE);
996  }
997  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
998  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
999  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1000  exit(EXIT_FAILURE);
1001  }
1002  // Mark socket as non-block. Child must be careful to do select prior
1003  // to reading from socket.
1004  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1005  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1006  exit(EXIT_FAILURE);
1007  }
1008  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1009  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1010  exit(EXIT_FAILURE);
1011  }
1012  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1013  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1014  exit(EXIT_FAILURE);
1015  }
1016  // Linux man page notes there are some edge cases where reading from a
1017  // fd can block, even after a select.
1018  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1019  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1020  exit(EXIT_FAILURE);
1021  }
1022  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1023  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1024  exit(EXIT_FAILURE);
1025  }
1026 
1027  childrenPipesCopy = childrenPipes;
1028  childrenSocketsCopy = childrenSockets;
1029 
1030  pid_t value = fork();
1031  if(value == 0) {
1032  // Close the parent's side of the socket and pipe which will talk to us.
1033  close(pipes[0]);
1034  close(sockets[0]);
1035  // Close our copies of the parent's other communication pipes.
1036  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1037  close(*it);
1038  }
1039  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1040  close(*it);
1041  }
1042 
1043  // this is the child process, redirect stdout and stderr to a log file
1044  fflush(stdout);
1045  fflush(stderr);
1046  std::stringstream stout;
1047  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1048  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1049  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1050  }
1051  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1052  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1053  }
1054 
1055  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1056  if(setCpuAffinity_) {
1057  // CPU affinity is handled differently on macosx.
1058  // We disable it and print a message until someone reads:
1059  //
1060  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1061  //
1062  // and implements it.
1063 #ifdef __APPLE__
1064  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1065 #else
1066  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1067  cpu_set_t mask;
1068  CPU_ZERO(&mask);
1069  CPU_SET(childIndex, &mask);
1070  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1071  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1072  exit(-1);
1073  }
1074 #endif
1075  }
1076  break;
1077  } else {
1078  //this is the parent
1079  close(pipes[1]);
1080  close(sockets[1]);
1081  }
1082  if(value < 0) {
1083  LogError("ForkingChild") << "failed to create a child";
1084  exit(-1);
1085  }
1086  childrenIds.push_back(value);
1087  childrenSockets.push_back(sockets[0]);
1088  childrenPipes.push_back(pipes[0]);
1089  }
1090 
1091  if(childIndex < kMaxChildren) {
1092  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1093  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1094 
1095  auto receiver = std::make_shared<multicore::MessageReceiverForSource>(sockets[1], pipes[1]);
1096  input_->doPostForkReacquireResources(receiver);
1097  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1098  //NOTE: sources have to reset themselves by listening to the post fork message
1099  //rewindInput();
1100  return true;
1101  }
1102  jobReport->parentAfterFork(jobReportFile);
1103  }
1104 
1105  //this is the original, which is now the master for all the children
1106 
1107  //Need to wait for signals from the children or externally
1108  // To wait we must
1109  // 1) block the signals we want to wait on so we do not have a race condition
1110  // 2) check that we haven't already meet our ending criteria
1111  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1112  sigset_t blockingSigSet;
1113  sigset_t unblockingSigSet;
1114  sigset_t oldSigSet;
1115  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1116  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1117  sigaddset(&blockingSigSet, SIGCHLD);
1118  sigaddset(&blockingSigSet, SIGUSR2);
1119  sigaddset(&blockingSigSet, SIGINT);
1120  sigdelset(&unblockingSigSet, SIGCHLD);
1121  sigdelset(&unblockingSigSet, SIGUSR2);
1122  sigdelset(&unblockingSigSet, SIGINT);
1123  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1124 
1125  // If there are too many fd's (unlikely, but possible) for select, denote this
1126  // because the sender will fail.
1127  bool too_many_fds = false;
1128  if (pipes[1]+1 > FD_SETSIZE) {
1129  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1130  too_many_fds = true;
1131  }
1132 
1133  //create a thread that sends the units of work to workers
1134  // we create it after all signals were blocked so that this
1135  // thread is never interupted by a signal
1136  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1137  boost::thread senderThread(sender);
1138 
1139  if(not too_many_fds) {
1140  //NOTE: a child could have failed before we got here and even after this call
1141  // which is why the 'if' is conditional on continueAfterChildFailure_
1143  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1144  sigsuspend(&unblockingSigSet);
1146  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1147  }
1148  }
1149  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1150 
1151  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1152  if(child_failed) {
1153  LogError("ForkingStopping") << "child failed";
1154  }
1155  if(shutdown_flag) {
1156  LogSystem("ForkingStopping") << "asked to shutdown";
1157  }
1158 
1159  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1160  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1161  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1162  it != itEnd; ++it) {
1163  /* int result = */ kill(*it, SIGUSR2);
1164  }
1165  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1166  while(num_children_done != kMaxChildren) {
1167  sigsuspend(&unblockingSigSet);
1168  }
1169  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1170  }
1171  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1172  senderThread.join();
1173  if(child_failed && !continueAfterChildFailure_) {
1174  if (child_fail_signal) {
1175  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1176  } else if (child_fail_exit_status) {
1177  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1178  } else {
1179  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1180  }
1181  }
1182  if(too_many_fds) {
1183  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1184  }
1185  return false;
1186  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void possiblyContinueAfterForkChildFailure()
assert(m_qm.get())
def pipe
Definition: pipe.py:5
#define NULL
Definition: scimark2.h:8
volatile std::atomic< bool > shutdown_flag
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
#define O_NONBLOCK
Definition: SysFile.h:21
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1189 of file EventProcessor.cc.

1189  {
1190  return schedule_->getAllModuleDescriptions();
1191  }
std::auto_ptr< Schedule > schedule_
ServiceToken edm::EventProcessor::getToken ( )

Definition at line 659 of file EventProcessor.cc.

References serviceToken_.

Referenced by ~EventProcessor().

659  {
660  return serviceToken_;
661  }
ServiceToken serviceToken_
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1219 of file EventProcessor.cc.

1219  {
1220  schedule_->getTriggerReport(rep);
1221  }
string rep
Definition: cuy.py:1188
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::hasSubProcess ( ) const
inlineprivate

Definition at line 231 of file EventProcessor.h.

References subProcess_.

Referenced by beginJob(), and endJob().

231  {
232  return subProcess_.get() != 0;
233  }
std::auto_ptr< SubProcess > subProcess_
void edm::EventProcessor::init ( std::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 393 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper_, branchIDListHelper_, continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), edm::ParameterSet::getUntrackedParameterSetVector(), historyAppender_, cmsHarvester::index, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::PrincipalCache::insert(), edm::ParentageRegistry::insertMapped(), edm::ParentageRegistry::instance(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), eostools::move(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::PreallocationConfiguration::numberOfStreams(), cmsPerfStripChart::operate(), edm::parameterSet(), edm::popSubProcessParameterSet(), preallocations_, edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, processContext_, edm::ParameterSet::registerIt(), schedule_, serviceToken_, setCpuAffinity_, edm::PrincipalCache::setNumberOfConcurrentPrincipals(), edm::ProcessContext::setProcessConfiguration(), edm::PrincipalCache::setProcessHistoryRegistry(), edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, subProcess_, edm::ScheduleItems::thinnedAssociationsHelper_, and thinnedAssociationsHelper_.

Referenced by EventProcessor().

395  {
396 
397  //std::cerr << processDesc->dump() << std::endl;
398 
399  // register the empty parentage vector , once and for all
401 
402  // register the empty parameter set, once and for all.
403  ParameterSet().registerIt();
404 
405  std::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
406  //std::cerr << parameterSet->dump() << std::endl;
407 
408  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
409  std::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
410 
411  // Now set some parameters specific to the main process.
412  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
413  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
414  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
415  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
416  //threading
417  unsigned int nThreads=1;
418  if(optionsPset.existsAs<unsigned int>("numberOfThreads",false)) {
419  nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
420  if(nThreads == 0) {
421  nThreads = 1;
422  }
423  }
424  /* TODO: when we support having each stream run in a different thread use this default
425  unsigned int nStreams =nThreads;
426  */
427  unsigned int nStreams =1;
428  if(optionsPset.existsAs<unsigned int>("numberOfStreams",false)) {
429  nStreams = optionsPset.getUntrackedParameter<unsigned int>("numberOfStreams");
430  if(nStreams==0) {
431  nStreams = nThreads;
432  }
433  }
434  /*
435  bool nRunsSet = false;
436  */
437  unsigned int nConcurrentRuns =1;
438  /*
439  if(nRunsSet = optionsPset.existsAs<unsigned int>("numberOfConcurrentRuns",false)) {
440  nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
441  }
442  */
443  unsigned int nConcurrentLumis =1;
444  /*
445  if(optionsPset.existsAs<unsigned int>("numberOfConcurrentLuminosityBlocks",false)) {
446  nConcurrentLumis = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
447  } else {
448  nConcurrentLumis = nConcurrentRuns;
449  }
450  */
451  //Check that relationships between threading parameters makes sense
452  /*
453  if(nThreads<nStreams) {
454  //bad
455  }
456  if(nConcurrentRuns>nStreams) {
457  //bad
458  }
459  if(nConcurrentRuns>nConcurrentLumis) {
460  //bad
461  }
462  */
463  //forking
464  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
465  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
466  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
467  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
468  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
469  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
470  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
471  itPS != itPSEnd;
472  ++itPS) {
473  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
474  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
475  itPS->getUntrackedParameter<std::string>("label", "")));
476  }
477  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
478 
479  // Now do general initialization
480  ScheduleItems items;
481 
482  //initialize the services
483  std::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
484  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
485  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
486 
487  //make the services available
489 
490  if(nStreams>1) {
492  handler->willBeUsingThreads();
493  }
494 
495  // intialize miscellaneous items
496  std::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
497 
498  // intialize the event setup provider
499  esp_ = espController_->makeProvider(*parameterSet);
500 
501  // initialize the looper, if any
502  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
503  if(looper_) {
504  looper_->setActionTable(items.act_table_.get());
505  looper_->attachTo(*items.actReg_);
506 
507  //For now loopers make us run only 1 transition at a time
508  nStreams=1;
509  nConcurrentLumis=1;
510  nConcurrentRuns=1;
511  }
512 
513  preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns};
514 
515  // initialize the input source
516  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.thinnedAssociationsHelper_, items.actReg_, items.processConfiguration_, preallocations_);
517 
518  // intialize the Schedule
519  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get(),preallocations_,&processContext_);
520 
521  // set the data members
522  act_table_ = std::move(items.act_table_);
523  actReg_ = items.actReg_;
524  preg_.reset(items.preg_.release());
525  branchIDListHelper_ = items.branchIDListHelper_;
526  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper_;
527  processConfiguration_ = items.processConfiguration_;
529  principalCache_.setProcessHistoryRegistry(input_->processHistoryRegistry());
530 
531  FDEBUG(2) << parameterSet << std::endl;
532 
534  for(unsigned int index = 0; index<preallocations_.numberOfStreams(); ++index ) {
535  // Reusable event principal
536  auto ep = std::make_shared<EventPrincipal>(preg_, branchIDListHelper_, thinnedAssociationsHelper_, *processConfiguration_, historyAppender_.get(), index);
537  ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
538  ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
540  }
541  // initialize the subprocess, if there is one
542  if(subProcessParameterSet) {
543  subProcess_.reset(new SubProcess(*subProcessParameterSet,
544  *parameterSet,
545  preg_,
546  branchIDListHelper_,
547  *thinnedAssociationsHelper_,
549  *actReg_,
550  token,
553  &processContext_));
554  }
555  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void insert(std::shared_ptr< RunPrincipal > rp)
ProcessContext processContext_
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:564
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static void setThrowAnException(bool v)
def move
Definition: eostools.py:508
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::auto_ptr< Schedule > schedule_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::shared_ptr< ProductRegistry const > preg_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, std::shared_ptr< BranchIDListHelper > branchIDListHelper, std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, PreallocationConfiguration const &allocations)
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::shared_ptr< ActivityRegistry > actReg_
std::unique_ptr< eventsetup::EventSetupsController > espController_
static ParentageRegistry * instance()
bool insertMapped(value_type const &v)
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
PrincipalCache principalCache_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1489 of file EventProcessor.cc.

References FDEBUG.

1489  {
1490  if (fb_.get() != nullptr) {
1491  schedule_->openOutputFiles(*fb_);
1492  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1493  }
1494  FDEBUG(1) << "\topenOutputFiles\n";
1495  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 872 of file EventProcessor.cc.

872  {
873  if(child_failed && continueAfterChildFailure_) {
874  if (child_fail_signal) {
875  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
876  child_fail_signal=0;
877  } else if (child_fail_exit_status) {
878  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
879  child_fail_exit_status=0;
880  } else {
881  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
882  }
883  child_failed =false;
884  }
885  }
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1553 of file EventProcessor.cc.

References FDEBUG.

1553  {
1554  looper_->prepareForNextLoop(esp_.get());
1555  FDEBUG(1) << "\tprepareForNextLoop\n";
1556  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
void edm::EventProcessor::processEvent ( unsigned int  iStreamIndex)
private

Definition at line 1993 of file EventProcessor.cc.

References assert(), ev, FDEBUG, edm::Service< T >::isAvailable(), edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, and summarizeEdmComparisonLogfiles::succeeded.

1993  {
1994  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
1995  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
1997  if(rng.isAvailable()) {
1998  Event ev(*pep, ModuleDescription(), nullptr);
1999  rng->postEventRead(ev);
2000  }
2001  assert(pep->luminosityBlockPrincipalPtrValid());
2002  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2003  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2004 
2005  //We can only update IOVs on Lumi boundaries
2006  //IOVSyncValue ts(pep->id(), pep->time());
2007  //espController_->eventSetupForInstance(ts);
2008  EventSetup const& es = esp_->eventSetup();
2009  {
2010  typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Traits;
2011  schedule_->processOneEvent<Traits>(iStreamIndex,*pep, es);
2012  if(hasSubProcess()) {
2013  subProcess_->doEvent(*pep);
2014  }
2015  }
2016 
2017  //NOTE: If we have a looper we only have one Stream
2018  if(looper_) {
2019  bool randomAccess = input_->randomAccess();
2020  ProcessingController::ForwardState forwardState = input_->forwardState();
2021  ProcessingController::ReverseState reverseState = input_->reverseState();
2022  ProcessingController pc(forwardState, reverseState, randomAccess);
2023 
2025  do {
2026 
2027  StreamContext streamContext(pep->streamID(), &processContext_);
2028  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext);
2029 
2030  bool succeeded = true;
2031  if(randomAccess) {
2032  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2033  input_->skipEvents(-2);
2034  }
2035  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2036  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2037  }
2038  }
2039  pc.setLastOperationSucceeded(succeeded);
2040  } while(!pc.lastOperationSucceeded());
2041  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2042 
2043  }
2044 
2045  FDEBUG(1) << "\tprocessEvent\n";
2046  pep->clearEventPrincipal();
2047  }
ProcessContext processContext_
boost::shared_ptr< EDLooperBase > looper_
assert(m_qm.get())
bool ev
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::auto_ptr< SubProcess > subProcess_
tuple status
Definition: ntuplemaker.py:245
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::processEventsForStreamAsync ( unsigned int  iStreamIndex,
std::atomic< bool > *  finishedProcessingEvents 
)
private

Definition at line 1875 of file EventProcessor.cc.

References cmsPerfStripChart::operate(), and processEvent().

1876  {
1877  try {
1878  // make the services available
1882  handler->initializeThisThreadForUse();
1883  }
1884 
1885  if(iStreamIndex==0) {
1886  processEvent(0);
1887  }
1888  do {
1889  if(shouldWeStop()) {
1890  break;
1891  }
1892  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1893  //another thread hit an exception
1894  //std::cerr<<"another thread saw an exception\n";
1895  break;
1896  }
1897  {
1898 
1899 
1900  {
1901  //nextItemType and readEvent need to be in same critical section
1902  std::lock_guard<std::mutex> sourceGuard(nextTransitionMutex_);
1903 
1904  if(finishedProcessingEvents->load(std::memory_order_acquire)) {
1905  //std::cerr<<"finishedProcessingEvents\n";
1906  break;
1907  }
1908 
1909  //If source and DelayedReader share a resource we must serialize them
1910  auto sr = input_->resourceSharedWithDelayedReader();
1911  std::unique_lock<SharedResourcesAcquirer> delayedReaderGuard;
1912  if(sr) {
1913  delayedReaderGuard = std::unique_lock<SharedResourcesAcquirer>(*sr);
1914  }
1915  InputSource::ItemType itemType = input_->nextItemType();
1916  if (InputSource::IsEvent !=itemType) {
1918  finishedProcessingEvents->store(true,std::memory_order_release);
1919  //std::cerr<<"next item type "<<itemType<<"\n";
1920  break;
1921  }
1923  //std::cerr<<"task told to async stop\n";
1924  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1925  break;
1926  }
1927  readEvent(iStreamIndex);
1928  }
1929  }
1930  if(deferredExceptionPtrIsSet_.load(std::memory_order_acquire)) {
1931  //another thread hit an exception
1932  //std::cerr<<"another thread saw an exception\n";
1933  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExceptionFromAnotherContext);
1934 
1935  break;
1936  }
1937  processEvent(iStreamIndex);
1938  }while(true);
1939  } catch (...) {
1940  bool expected =false;
1941  if(deferredExceptionPtrIsSet_.compare_exchange_strong(expected,true)) {
1942  deferredExceptionPtr_ = std::current_exception();
1943  }
1944  //std::cerr<<"task caught exception\n";
1945  }
1946  }
void readEvent(unsigned int iStreamIndex)
bool checkForAsyncStopRequest(StatusCode &)
PreallocationConfiguration preallocations_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::mutex nextTransitionMutex_
virtual bool shouldWeStop() const
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1818 of file EventProcessor.cc.

1818  {
1819  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
1820  {
1821  SendSourceTerminationSignalIfException sentry(actReg_.get());
1822  input_->readAndMergeLumi(*principalCache_.lumiPrincipalPtr());
1823  sentry.completedSuccessfully();
1824  }
1825  return input_->luminosityBlock();
1826  }
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1781 of file EventProcessor.cc.

References assert(), and PDRates::Run.

1781  {
1782  principalCache_.merge(input_->runAuxiliary(), preg_);
1783  auto runPrincipal =principalCache_.runPrincipalPtr();
1784  {
1785  SendSourceTerminationSignalIfException sentry(actReg_.get());
1786  input_->readAndMergeRun(*runPrincipal);
1787  sentry.completedSuccessfully();
1788  }
1789  assert(input_->reducedProcessHistoryID() == runPrincipal->reducedProcessHistoryID());
1790  return statemachine::Run(runPrincipal->reducedProcessHistoryID(), input_->run());
1791  }
assert(m_qm.get())
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
void merge(std::shared_ptr< RunAuxiliary > aux, std::shared_ptr< ProductRegistry const > reg)
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1948 of file EventProcessor.cc.

References pyrootRender::destroy(), and processEvent().

1948  {
1949  if(numberOfForkedChildren_>0) {
1950  readEvent(0);
1951  processEvent(0);
1952  return;
1953  }
1956 
1957  std::atomic<bool> finishedProcessingEvents{false};
1958 
1959  //Task assumes Stream 0 has already read the event that caused us to go here
1960  readEvent(0);
1961 
1962  //To wait, the ref count has to b 1+#streams
1963  tbb::task* eventLoopWaitTask{new (tbb::task::allocate_root()) tbb::empty_task{}};
1964  eventLoopWaitTask->increment_ref_count();
1965 
1966  const unsigned int kNumStreams = preallocations_.numberOfStreams();
1967  unsigned int iStreamIndex = 0;
1968  for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
1969  eventLoopWaitTask->increment_ref_count();
1970  tbb::task::enqueue( *(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex, &finishedProcessingEvents, eventLoopWaitTask}));
1971 
1972  }
1973  eventLoopWaitTask->increment_ref_count();
1974  eventLoopWaitTask->spawn_and_wait_for_all(*(new (tbb::task::allocate_root()) StreamProcessingTask{this,iStreamIndex,&finishedProcessingEvents,eventLoopWaitTask}));
1975  tbb::task::destroy(*eventLoopWaitTask);
1976 
1977  //One of the processing threads saw an exception
1979  std::rethrow_exception(deferredExceptionPtr_);
1980  }
1981  }
void readEvent(unsigned int iStreamIndex)
PreallocationConfiguration preallocations_
std::atomic< bool > deferredExceptionPtrIsSet_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void processEvent(unsigned int iStreamIndex)
std::exception_ptr deferredExceptionPtr_
bool asyncStopRequestedWhileProcessingEvents_
friend class StreamProcessingTask
void edm::EventProcessor::readEvent ( unsigned int  iStreamIndex)
private

Definition at line 1982 of file EventProcessor.cc.

References event(), and FDEBUG.

1982  {
1983  //TODO this will have to become per stream
1984  auto& event = principalCache_.eventPrincipal(iStreamIndex);
1985  StreamContext streamContext(event.streamID(), &processContext_);
1986 
1987  SendSourceTerminationSignalIfException sentry(actReg_.get());
1988  input_->readEvent(event, streamContext);
1989  sentry.completedSuccessfully();
1990 
1991  FDEBUG(1) << "\treadEvent\n";
1992  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1462 of file EventProcessor.cc.

References FDEBUG, or, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1462  {
1463  FDEBUG(1) << " \treadFile\n";
1464  size_t size = preg_->size();
1465  SendSourceTerminationSignalIfException sentry(actReg_.get());
1466 
1467  fb_ = input_->readFile();
1468  if(size < preg_->size()) {
1470  }
1472  if((numberOfForkedChildren_ > 0) or
1475  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1476  }
1477  sentry.completedSuccessfully();
1478  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
PreallocationConfiguration preallocations_
std::unique_ptr< FileBlock > fb_
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
int edm::EventProcessor::readLuminosityBlock ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1793 of file EventProcessor.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

1793  {
1796  << "EventProcessor::readRun\n"
1797  << "Illegal attempt to insert lumi into cache\n"
1798  << "Contact a Framework Developer\n";
1799  }
1802  << "EventProcessor::readRun\n"
1803  << "Illegal attempt to insert lumi into cache\n"
1804  << "Run is invalid\n"
1805  << "Contact a Framework Developer\n";
1806  }
1807  auto lbp = std::make_shared<LuminosityBlockPrincipal>(input_->luminosityBlockAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1808  {
1809  SendSourceTerminationSignalIfException sentry(actReg_.get());
1810  input_->readLuminosityBlock(*lbp, *historyAppender_);
1811  sentry.completedSuccessfully();
1812  }
1813  lbp->setRunPrincipal(principalCache_.runPrincipalPtr());
1814  principalCache_.insert(lbp);
1815  return input_->luminosityBlock();
1816  }
void insert(std::shared_ptr< RunPrincipal > rp)
bool hasRunPrincipal() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ProductRegistry const > preg_
bool hasLumiPrincipal() const
std::unique_ptr< InputSource > input_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1763 of file EventProcessor.cc.

References assert(), edm::hlt::Exception, edm::errors::LogicError, and PDRates::Run.

1763  {
1766  << "EventProcessor::readRun\n"
1767  << "Illegal attempt to insert run into cache\n"
1768  << "Contact a Framework Developer\n";
1769  }
1770  auto rp = std::make_shared<RunPrincipal>(input_->runAuxiliary(), preg_, *processConfiguration_, historyAppender_.get(), 0);
1771  {
1772  SendSourceTerminationSignalIfException sentry(actReg_.get());
1773  input_->readRun(*rp, *historyAppender_);
1774  sentry.completedSuccessfully();
1775  }
1776  assert(input_->reducedProcessHistoryID() == rp->reducedProcessHistoryID());
1777  principalCache_.insert(rp);
1778  return statemachine::Run(rp->reducedProcessHistoryID(), input_->run());
1779  }
void insert(std::shared_ptr< RunPrincipal > rp)
assert(m_qm.get())
bool hasRunPrincipal() const
std::unique_ptr< HistoryAppender > historyAppender_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::shared_ptr< ActivityRegistry > actReg_
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1516 of file EventProcessor.cc.

References FDEBUG.

1516  {
1517  if (fb_.get() != nullptr) {
1518  schedule_->respondToCloseInputFile(*fb_);
1519  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1520  }
1521  FDEBUG(1) << "\trespondToCloseInputFile\n";
1522  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1505 of file EventProcessor.cc.

References FDEBUG.

1505  {
1506  if(hasSubProcess()) {
1507  subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
1508  }
1509  if (fb_.get() != nullptr) {
1510  schedule_->respondToOpenInputFile(*fb_);
1511  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1512  }
1513  FDEBUG(1) << "\trespondToOpenInputFile\n";
1514  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1547 of file EventProcessor.cc.

References FDEBUG.

1547  {
1548  input_->repeat();
1549  input_->rewind();
1550  FDEBUG(1) << "\trewind\n";
1551  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< InputSource > input_
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 315 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.EventID::cppID(), and Types.LuminosityBlockID::cppID().

315  {
316  return runToCompletion();
317  }
virtual StatusCode runToCompletion()
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1274 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), bk::beginJob(), alignCSCRings::e, edm::hlt::Exception, FDEBUG, edm::errors::LogicError, cmsPerfStripChart::operate(), runEdmFileComparison::returnCode, findQualityFiles::size, and edm::convertException::wrap().

Referenced by run().

1274  {
1275 
1278  std::auto_ptr<statemachine::Machine> machine;
1279  {
1280  beginJob(); //make sure this was called
1281 
1282  //StatusCode returnCode = epSuccess;
1284 
1285  // make the services available
1287 
1288  machine = createStateMachine();
1291  try {
1292  convertException::wrap([&]() {
1293 
1294  InputSource::ItemType itemType;
1295 
1296  while(true) {
1297 
1298  bool more = true;
1299  if(numberOfForkedChildren_ > 0) {
1300  size_t size = preg_->size();
1301  {
1302  SendSourceTerminationSignalIfException sentry(actReg_.get());
1303  more = input_->skipForForking();
1304  sentry.completedSuccessfully();
1305  }
1306  if(more) {
1307  if(size < preg_->size()) {
1309  }
1311  }
1312  }
1313  {
1314  SendSourceTerminationSignalIfException sentry(actReg_.get());
1315  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1316  sentry.completedSuccessfully();
1317  }
1318 
1319  FDEBUG(1) << "itemType = " << itemType << "\n";
1320 
1321  if(checkForAsyncStopRequest(returnCode)) {
1322  actReg_->preSourceEarlyTerminationSignal_(TerminationOrigin::ExternalSignal);
1323  forceLooperToEnd_ = true;
1324  machine->process_event(statemachine::Stop());
1325  forceLooperToEnd_ = false;
1326  break;
1327  }
1328 
1329  if(itemType == InputSource::IsEvent) {
1330  machine->process_event(statemachine::Event());
1332  forceLooperToEnd_ = true;
1333  machine->process_event(statemachine::Stop());
1334  forceLooperToEnd_ = false;
1336  break;
1337  }
1339  }
1340 
1341  if(itemType == InputSource::IsEvent) {
1342  }
1343  else if(itemType == InputSource::IsStop) {
1344  machine->process_event(statemachine::Stop());
1345  }
1346  else if(itemType == InputSource::IsFile) {
1347  machine->process_event(statemachine::File());
1348  }
1349  else if(itemType == InputSource::IsRun) {
1350  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1351  }
1352  else if(itemType == InputSource::IsLumi) {
1353  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1354  }
1355  else if(itemType == InputSource::IsSynchronize) {
1356  //For now, we don't have to do anything
1357  }
1358  // This should be impossible
1359  else {
1361  << "Unknown next item type passed to EventProcessor\n"
1362  << "Please report this error to the Framework group\n";
1363  }
1364  if(machine->terminated()) {
1365  break;
1366  }
1367  } // End of loop over state machine events
1368  }); // convertException::wrap
1369  } // Try block
1370  // Some comments on exception handling related to the boost state machine:
1371  //
1372  // Some states used in the machine are special because they
1373  // perform actions while the machine is being terminated, actions
1374  // such as close files, call endRun, call endLumi etc ... Each of these
1375  // states has two functions that perform these actions. The functions
1376  // are almost identical. The major difference is that one version
1377  // catches all exceptions and the other lets exceptions pass through.
1378  // The destructor catches them and the other function named "exit" lets
1379  // them pass through. On a normal termination, boost will always call
1380  // "exit" and then the state destructor. In our state classes, the
1381  // the destructors do nothing if the exit function already took
1382  // care of things. Here's the interesting part. When boost is
1383  // handling an exception the "exit" function is not called (a boost
1384  // feature).
1385  //
1386  // If an exception occurs while the boost machine is in control
1387  // (which usually means inside a process_event call), then
1388  // the boost state machine destroys its states and "terminates" itself.
1389  // This already done before we hit the catch blocks below. In this case
1390  // the call to terminateMachine below only destroys an already
1391  // terminated state machine. Because exit is not called, the state destructors
1392  // handle cleaning up lumis, runs, and files. The destructors swallow
1393  // all exceptions and only pass through the exceptions messages, which
1394  // are tacked onto the original exception below.
1395  //
1396  // If an exception occurs when the boost state machine is not
1397  // in control (outside the process_event functions), then boost
1398  // cannot destroy its own states. The terminateMachine function
1399  // below takes care of that. The flag "alreadyHandlingException"
1400  // is set true so that the state exit functions do nothing (and
1401  // cannot throw more exceptions while handling the first). Then the
1402  // state destructors take care of this because exit did nothing.
1403  //
1404  // In both cases above, the EventProcessor::endOfLoop function is
1405  // not called because it can throw exceptions.
1406  //
1407  // One tricky aspect of the state machine is that things that can
1408  // throw should not be invoked by the state machine while another
1409  // exception is being handled.
1410  // Another tricky aspect is that it appears to be important to
1411  // terminate the state machine before invoking its destructor.
1412  // We've seen crashes that are not understood when that is not
1413  // done. Maintainers of this code should be careful about this.
1414 
1415  catch (cms::Exception & e) {
1417  terminateMachine(machine);
1418  alreadyHandlingException_ = false;
1419  if (!exceptionMessageLumis_.empty()) {
1421  if (e.alreadyPrinted()) {
1422  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1423  }
1424  }
1425  if (!exceptionMessageRuns_.empty()) {
1427  if (e.alreadyPrinted()) {
1428  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1429  }
1430  }
1431  if (!exceptionMessageFiles_.empty()) {
1433  if (e.alreadyPrinted()) {
1434  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1435  }
1436  }
1437  throw;
1438  }
1439 
1440  if(machine->terminated()) {
1441  FDEBUG(1) << "The state machine reports it has been terminated\n";
1442  machine.reset();
1443  }
1444 
1446  throw cms::Exception("BadState")
1447  << "The boost state machine in the EventProcessor exited after\n"
1448  << "entering the Error state.\n";
1449  }
1450 
1451  }
1452  if(machine.get() != 0) {
1453  terminateMachine(machine);
1455  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1456  << "Please report this error to the Framework group\n";
1457  }
1458 
1459  return returnCode;
1460  }
bool checkForAsyncStopRequest(StatusCode &)
std::auto_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
ServiceToken serviceToken_
std::string exceptionMessageLumis_
InputSource::ItemType nextItemTypeFromProcessingEvents_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void adjustEventsToNewProductRegistry(std::shared_ptr< ProductRegistry const > reg)
std::string exceptionMessageFiles_
std::shared_ptr< ProductRegistry const > preg_
StatusCode asyncStopStatusCodeFromProcessingEvents_
std::unique_ptr< InputSource > input_
auto wrap(F iFunc) -> decltype(iFunc())
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2055 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2055  {
2057  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2063 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2063  {
2065  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2059 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2059  {
2061  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1558 of file EventProcessor.cc.

References FDEBUG.

1558  {
1559  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1560  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1561  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2049 of file EventProcessor.cc.

References FDEBUG.

2049  {
2050  FDEBUG(1) << "\tshouldWeStop\n";
2051  if(shouldWeStop_) return true;
2052  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2053  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1524 of file EventProcessor.cc.

References FDEBUG.

1524  {
1525  shouldWeStop_ = false;
1526  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1527  // until after we've called beginOfJob
1528  if(looper_ && looperBeginJobRun_) {
1529  looper_->doStartingNewLoop();
1530  }
1531  FDEBUG(1) << "\tstartingNewLoop\n";
1532  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine)
private

Definition at line 2071 of file EventProcessor.cc.

References FDEBUG.

2071  {
2072  if(iMachine.get() != 0) {
2073  if(!iMachine->terminated()) {
2074  forceLooperToEnd_ = true;
2075  iMachine->process_event(statemachine::Stop());
2076  forceLooperToEnd_ = false;
2077  }
2078  else {
2079  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2080  }
2081  if(iMachine->terminated()) {
2082  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2083  }
2084  iMachine.reset();
2085  }
2086  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1194 of file EventProcessor.cc.

1194  {
1195  return schedule_->totalEvents();
1196  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1204 of file EventProcessor.cc.

1204  {
1205  return schedule_->totalEventsFailed();
1206  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1199 of file EventProcessor.cc.

1199  {
1200  return schedule_->totalEventsPassed();
1201  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1840 of file EventProcessor.cc.

References FDEBUG.

1840  {
1842  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
1843  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1844  }
ProcessContext processContext_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1828 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1828  {
1829  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()), &processContext_);
1830  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
1831  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1832  }
ProcessContext processContext_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class StreamProcessingTask
friend

Definition at line 237 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ExceptionToActionTable const> edm::EventProcessor::act_table_
private

Definition at line 265 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 257 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 290 of file EventProcessor.h.

bool edm::EventProcessor::asyncStopRequestedWhileProcessingEvents_
private

Definition at line 302 of file EventProcessor.h.

StatusCode edm::EventProcessor::asyncStopStatusCodeFromProcessingEvents_
private

Definition at line 304 of file EventProcessor.h.

bool edm::EventProcessor::beginJobCalled_
private

Definition at line 282 of file EventProcessor.h.

Referenced by beginJob().

std::shared_ptr<BranchIDListHelper> edm::EventProcessor::branchIDListHelper_
private

Definition at line 259 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 298 of file EventProcessor.h.

Referenced by init().

std::exception_ptr edm::EventProcessor::deferredExceptionPtr_
private

Definition at line 278 of file EventProcessor.h.

std::atomic<bool> edm::EventProcessor::deferredExceptionPtrIsSet_
private

Definition at line 277 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 286 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private

Definition at line 264 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

std::unique_ptr<eventsetup::EventSetupsController> edm::EventProcessor::espController_
private

Definition at line 263 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 308 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 287 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 289 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 288 of file EventProcessor.h.

std::unique_ptr<FileBlock> edm::EventProcessor::fb_
private

Definition at line 273 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 285 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 293 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 291 of file EventProcessor.h.

std::unique_ptr<HistoryAppender> edm::EventProcessor::historyAppender_
private

Definition at line 271 of file EventProcessor.h.

Referenced by init().

std::unique_ptr<InputSource> edm::EventProcessor::input_
private

Definition at line 262 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 292 of file EventProcessor.h.

InputSource::ItemType edm::EventProcessor::nextItemTypeFromProcessingEvents_
private

Definition at line 303 of file EventProcessor.h.

std::mutex edm::EventProcessor::nextTransitionMutex_
private

Definition at line 280 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 295 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 296 of file EventProcessor.h.

Referenced by init().

PathsAndConsumesOfModules edm::EventProcessor::pathsAndConsumesOfModules_
private

Definition at line 268 of file EventProcessor.h.

Referenced by beginJob().

PreallocationConfiguration edm::EventProcessor::preallocations_
private

Definition at line 300 of file EventProcessor.h.

Referenced by beginJob(), endJob(), and init().

std::shared_ptr<ProductRegistry const> edm::EventProcessor::preg_
private

Definition at line 258 of file EventProcessor.h.

Referenced by beginJob(), and init().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 281 of file EventProcessor.h.

Referenced by init().

std::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 266 of file EventProcessor.h.

Referenced by init().

ProcessContext edm::EventProcessor::processContext_
private

Definition at line 267 of file EventProcessor.h.

Referenced by beginJob(), and init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 261 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 297 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 283 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 284 of file EventProcessor.h.

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_
private
std::shared_ptr<ThinnedAssociationsHelper> edm::EventProcessor::thinnedAssociationsHelper_
private

Definition at line 260 of file EventProcessor.h.

Referenced by init().