CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
char const * currentStateName () const
 
void declareRunNumber (RunNumber_t runNumber)
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
event_processor::State getState () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
char const * msgName (event_processor::Msg m) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
ActivityRegistry::PostProcessEventpostProcessEventSignal ()
 
virtual void prepareForNextLoop ()
 
ActivityRegistry::PreProcessEventpreProcessEventSignal ()
 
virtual int readAndCacheLumi ()
 
virtual statemachine::Run readAndCacheRun ()
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToCloseOutputFiles ()
 
virtual void respondToOpenInputFile ()
 
virtual void respondToOpenOutputFiles ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
void runAsync ()
 
virtual StatusCode runToCompletion (bool onlineStateTransitions)
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
void setRunNumber (RunNumber_t runNumber)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
StatusCode shutdownAsync (unsigned int timeout_secs=60 *2)
 
virtual void startingNewLoop ()
 
char const * stateName (event_processor::State s) const
 
StatusCode statusAsync () const
 
StatusCode stopAsync (unsigned int timeout_secs=60 *2)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
StatusCode waitTillDoneAsync (unsigned int timeout_seconds=0)
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

void changeState (event_processor::Msg)
 
void connectSigs (EventProcessor *ep)
 
std::auto_ptr
< statemachine::Machine
createStateMachine ()
 
StatusCode doneAsync (event_processor::Msg m)
 
void errorState ()
 
bool hasSubProcess () const
 
void init (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
void possiblyContinueAfterForkChildFailure ()
 
void setupSignal ()
 
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
 
StatusCode waitForAsyncCompletion (unsigned int timeout_seconds)
 

Static Private Member Functions

static void asyncRun (EventProcessor *)
 

Private Attributes

std::unique_ptr< ActionTable
const > 
act_table_
 
boost::shared_ptr
< ActivityRegistry
actReg_
 
bool alreadyHandlingException_
 
boost::shared_ptr
< BranchIDListHelper
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
std::unique_ptr
< eventsetup::EventSetupsController
espController_
 
boost::shared_ptr< boost::thread > event_loop_
 
volatile pthread_t event_loop_id_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
std::unique_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
std::unique_ptr< HistoryAppenderhistoryAppender_
 
volatile bool id_set_
 
std::unique_ptr< InputSourceinput_
 
std::string last_error_text_
 
volatile Status last_rc_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
int my_sig_num_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
ActivityRegistry::PostProcessEvent postProcessEventSignal_
 
boost::shared_ptr
< ProductRegistry const > 
preg_
 
ActivityRegistry::PreProcessEvent preProcessEventSignal_
 
PrincipalCache principalCache_
 
boost::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
boost::condition starter_
 
volatile event_processor::State state_
 
boost::mutex state_lock_
 
bool stateMachineWasInErrorState_
 
volatile int stop_count_
 
boost::mutex stop_lock_
 
boost::condition stopper_
 
std::auto_ptr< SubProcesssubProcess_
 

Friends

class event_processor::StateSentry
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 70 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 374 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 375 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 378 of file EventProcessor.cc.

References init(), and PythonProcessDesc::parameterSet().

382  :
385  actReg_(),
386  preg_(),
388  serviceToken_(),
389  input_(),
390  espController_(new eventsetup::EventSetupsController),
391  esp_(),
392  act_table_(),
394  schedule_(),
395  subProcess_(),
396  historyAppender_(new HistoryAppender),
397  state_(sInit),
398  event_loop_(),
399  state_lock_(),
400  stop_lock_(),
401  stopper_(),
402  starter_(),
403  stop_count_(-1),
406  id_set_(false),
407  event_loop_id_(),
409  fb_(),
410  looper_(),
411  principalCache_(),
412  shouldWeStop_(false),
414  fileMode_(),
420  forceLooperToEnd_(false),
421  looperBeginJobRun_(false),
425  setCpuAffinity_(false),
427  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
428  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
429  processDesc->addServices(defaultServices, forcedServices);
430  init(processDesc, iToken, iLegacy);
431  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 433 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::parameterSet().

435  :
438  actReg_(),
439  preg_(),
441  serviceToken_(),
442  input_(),
443  espController_(new eventsetup::EventSetupsController),
444  esp_(),
445  act_table_(),
447  schedule_(),
448  subProcess_(),
449  historyAppender_(new HistoryAppender),
450  state_(sInit),
451  event_loop_(),
452  state_lock_(),
453  stop_lock_(),
454  stopper_(),
455  starter_(),
456  stop_count_(-1),
459  id_set_(false),
460  event_loop_id_(),
462  fb_(),
463  looper_(),
464  principalCache_(),
465  shouldWeStop_(false),
467  fileMode_(),
473  forceLooperToEnd_(false),
474  looperBeginJobRun_(false),
478  setCpuAffinity_(false),
480  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
481  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
482  processDesc->addServices(defaultServices, forcedServices);
484  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 486 of file EventProcessor.cc.

References init().

488  :
491  actReg_(),
492  preg_(),
494  serviceToken_(),
495  input_(),
496  espController_(new eventsetup::EventSetupsController),
497  esp_(),
498  act_table_(),
500  schedule_(),
501  subProcess_(),
502  historyAppender_(new HistoryAppender),
503  state_(sInit),
504  event_loop_(),
505  state_lock_(),
506  stop_lock_(),
507  stopper_(),
508  starter_(),
509  stop_count_(-1),
512  id_set_(false),
513  event_loop_id_(),
515  fb_(),
516  looper_(),
517  principalCache_(),
518  shouldWeStop_(false),
520  fileMode_(),
526  forceLooperToEnd_(false),
527  looperBeginJobRun_(false),
531  setCpuAffinity_(false),
533  init(processDesc, token, legacy);
534  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 537 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::parameterSet().

537  :
540  actReg_(),
541  preg_(),
543  serviceToken_(),
544  input_(),
545  espController_(new eventsetup::EventSetupsController),
546  esp_(),
547  act_table_(),
549  schedule_(),
550  subProcess_(),
551  historyAppender_(new HistoryAppender),
552  state_(sInit),
553  event_loop_(),
554  state_lock_(),
555  stop_lock_(),
556  stopper_(),
557  starter_(),
558  stop_count_(-1),
561  id_set_(false),
562  event_loop_id_(),
564  fb_(),
565  looper_(),
566  principalCache_(),
567  shouldWeStop_(false),
569  fileMode_(),
575  forceLooperToEnd_(false),
576  looperBeginJobRun_(false),
580  setCpuAffinity_(false),
582  if(isPython) {
583  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
584  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
586  }
587  else {
588  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
590  }
591  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 678 of file EventProcessor.cc.

References actReg_, changeState(), edm::detail::ThreadSafeRegistry< KEY, T, E >::data(), alignCSCRings::e, esp_, espController_, cms::Exception::explainSelf(), edm::detail::ThreadSafeRegistry< KEY, T, E >::extra(), getToken(), input_, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), looper_, edm::event_processor::mDtor, schedule_, and subProcess_.

678  {
679  // Make the services available while everything is being deleted.
680  ServiceToken token = getToken();
681  ServiceRegistry::Operate op(token);
682  try {
684  }
685  catch(cms::Exception& e) {
686  LogError("System")
687  << e.explainSelf() << "\n";
688  }
689 
690  // manually destroy all these thing that may need the services around
691  espController_.reset();
692  subProcess_.reset();
693  esp_.reset();
694  schedule_.reset();
695  input_.reset();
696  looper_.reset();
697  actReg_.reset();
698 
699  pset::Registry* psetRegistry = pset::Registry::instance();
700  psetRegistry->data().clear();
701  psetRegistry->extra().setID(ParameterSetID());
702 
704  ParentageRegistry::instance()->data().clear();
707  }
virtual std::string explainSelf() const
Definition: Exception.cc:146
static ThreadSafeRegistry * instance()
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
detail::ThreadSafeRegistry< ParameterSetID, ParameterSet, ProcessParameterSetIDCache > Registry
Definition: Registry.h:37
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
std::unique_ptr< InputSource > input_
ServiceToken getToken()
std::auto_ptr< SubProcess > subProcess_
collection_type & data()
Provide access to the contained collection.
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2220 of file EventProcessor.cc.

2220  {
2222  }
void edm::EventProcessor::asyncRun ( EventProcessor me)
staticprivate

Definition at line 1559 of file EventProcessor.cc.

References alignCSCRings::e, event_loop_id_, cppFunctionSkipper::exception, cms::Exception::explainSelf(), FDEBUG, id_set_, last_error_text_, last_rc_, runToCompletion(), starter_, stop_count_, stop_lock_, and stopper_.

1559  {
1560  // set up signals to allow for interruptions
1561  // ignore all other signals
1562  // make sure no exceptions escape out
1563 
1564  // temporary hack until we modify the input source to allow
1565  // wakeup calls from other threads. This mimics the solution
1566  // in EventFilter/Processor, which I do not like.
1567  // allowing cancels means that the thread just disappears at
1568  // certain points. This is bad for C++ stack variables.
1569  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1570  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
1571  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1572  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1573 
1574  {
1575  boost::mutex::scoped_lock sl(me->stop_lock_);
1576  me->event_loop_id_ = pthread_self();
1577  me->id_set_ = true;
1578  me->starter_.notify_all();
1579  }
1580 
1581  Status rc = epException;
1582  FDEBUG(2) << "asyncRun starting ......................\n";
1583 
1584  try {
1585  bool onlineStateTransitions = true;
1586  rc = me->runToCompletion(onlineStateTransitions);
1587  }
1588  catch (cms::Exception& e) {
1589  LogError("FwkJob") << "cms::Exception caught in "
1590  << "EventProcessor::asyncRun"
1591  << "\n"
1592  << e.explainSelf();
1593  me->last_error_text_ = e.explainSelf();
1594  }
1595  catch (std::exception& e) {
1596  LogError("FwkJob") << "Standard library exception caught in "
1597  << "EventProcessor::asyncRun"
1598  << "\n"
1599  << e.what();
1600  me->last_error_text_ = e.what();
1601  }
1602  catch (...) {
1603  LogError("FwkJob") << "Unknown exception caught in "
1604  << "EventProcessor::asyncRun"
1605  << "\n";
1606  me->last_error_text_ = "Unknown exception caught";
1607  rc = epOther;
1608  }
1609 
1610  me->last_rc_ = rc;
1611 
1612  {
1613  // notify anyone waiting for exit that we are doing so now
1614  boost::mutex::scoped_lock sl(me->stop_lock_);
1615  ++me->stop_count_;
1616  me->stopper_.notify_all();
1617  }
1618  FDEBUG(2) << "asyncRun ending ......................\n";
1619  }
virtual std::string explainSelf() const
Definition: Exception.cc:146
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 710 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), edm::convertException::badAllocToEDM(), bk::beginJob(), trackerHits::c, changeState(), edm::convertException::charPtrToEDM(), alignCSCRings::e, cppFunctionSkipper::exception, hasSubProcess(), input_, edm::event_processor::mBeginJob, preg_, alignCSCRings::s, schedule_, serviceToken_, edm::event_processor::sInit, state_, edm::convertException::stdToEDM(), AlCaHLTBitMon_QueryRunRegistry::string, edm::convertException::stringToEDM(), subProcess_, and edm::convertException::unknownToEDM().

Referenced by evf::FUEventProcessor::configuring(), and evf::FUEventProcessor::enabling().

710  {
711  if(state_ != sInit) return;
712  bk::beginJob();
713  // can only be run if in the initial state
715 
716  // StateSentry toerror(this); // should we add this ?
717  //make the services available
719 
720  //NOTE: This implementation assumes 'Job' means one call
721  // the EventProcessor::run
722  // If it really means once per 'application' then this code will
723  // have to be changed.
724  // Also have to deal with case where have 'run' then new Module
725  // added and do 'run'
726  // again. In that case the newly added Module needs its 'beginJob'
727  // to be called.
728 
729  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
730  // For now we delay calling beginOfJob until first beginOfRun
731  //if(looper_) {
732  // looper_->beginOfJob(es);
733  //}
734  try {
735  try {
736  input_->doBeginJob();
737  }
738  catch (cms::Exception& e) { throw; }
739  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
740  catch (std::exception& e) { convertException::stdToEDM(e); }
742  catch(char const* c) { convertException::charPtrToEDM(c); }
743  catch (...) { convertException::unknownToEDM(); }
744  }
745  catch(cms::Exception& ex) {
746  ex.addContext("Calling beginJob for the source");
747  throw;
748  }
749  schedule_->beginJob(*preg_);
750  // toerror.succeeded(); // should we add this?
751  if(hasSubProcess()) subProcess_->doBeginJob();
752  actReg_->postBeginJobSignal_();
753  }
boost::shared_ptr< ActivityRegistry > actReg_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
void stdToEDM(std::exception const &e)
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
std::unique_ptr< InputSource > input_
void addContext(std::string const &context)
Definition: Exception.cc:227
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2031 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

2031  {
2032  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2033  input_->doBeginLumi(lumiPrincipal);
2034 
2036  if(rng.isAvailable()) {
2037  LuminosityBlock lb(lumiPrincipal, ModuleDescription());
2038  rng->preBeginLumi(lb);
2039  }
2040 
2041  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
2042  // lumi blocks know their start and end times why not also start and end events?
2043  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
2044  espController_->eventSetupForInstance(ts);
2045  EventSetup const& es = esp_->eventSetup();
2046  {
2047  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
2048  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2049  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2050  if(hasSubProcess()) {
2051  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2052  }
2053  sentry.allowThrow();
2054  }
2055  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
2056  if(looper_) {
2057  looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2058  }
2059  }
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1978 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1978  {
1979  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1980  input_->doBeginRun(runPrincipal);
1981  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1982  runPrincipal.beginTime());
1984  espController_->forceCacheClear();
1985  }
1986  espController_->eventSetupForInstance(ts);
1987  EventSetup const& es = esp_->eventSetup();
1988  if(looper_ && looperBeginJobRun_== false) {
1989  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1990  looper_->beginOfJob(es);
1991  looperBeginJobRun_ = true;
1992  looper_->doStartingNewLoop();
1993  }
1994  {
1995  typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
1996  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1997  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1998  if(hasSubProcess()) {
1999  subProcess_->doBeginRun(runPrincipal, ts);
2000  }
2001  sentry.allowThrow();
2002  }
2003  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
2004  if(looper_) {
2005  looper_->doBeginRun(runPrincipal, es);
2006  }
2007  }
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::changeState ( event_processor::Msg  msg)
private

Definition at line 1499 of file EventProcessor.cc.

References cond::rpcobimon::current, edm::hlt::Exception, FDEBUG, edm::event_processor::mAny, edm::event_processor::sInvalid, and asciidump::table.

Referenced by beginJob(), endJob(), ~EventProcessor(), and edm::event_processor::StateSentry::~StateSentry().

1499  {
1500  // most likely need to serialize access to this routine
1501 
1502  boost::mutex::scoped_lock sl(state_lock_);
1503  State curr = state_;
1504  int rc;
1505  // found if(not end of table) and
1506  // (state == table.state && (msg == table.message || msg == any))
1507  for(rc = 0;
1508  table[rc].current != sInvalid &&
1509  (curr != table[rc].current ||
1510  (curr == table[rc].current &&
1511  msg != table[rc].message && table[rc].message != mAny));
1512  ++rc);
1513 
1514  if(table[rc].current == sInvalid)
1515  throw cms::Exception("BadState")
1516  << "A member function of EventProcessor has been called in an"
1517  << " inappropriate order.\n"
1518  << "Bad transition from " << stateName(curr) << " "
1519  << "using message " << msgName(msg) << "\n"
1520  << "No where to go from here.\n";
1521 
1522  FDEBUG(1) << "changeState: current=" << stateName(curr)
1523  << ", message=" << msgName(msg)
1524  << " -> new=" << stateName(table[rc].final) << "\n";
1525 
1526  state_ = table[rc].final;
1527  }
boost::mutex state_lock_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * msgName(event_processor::Msg m) const
TransEntry table[]
volatile event_processor::State state_
char const * stateName(event_processor::State s) const
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1356 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1356  {
1357  schedule_->clearCounters();
1358  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1873 of file EventProcessor.cc.

References FDEBUG.

1873  {
1874  if (fb_.get() != nullptr) {
1875  input_->closeFile(fb_.get(), cleaningUpAfterException);
1876  }
1877  FDEBUG(1) << "\tcloseInputFile\n";
1878  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::unique_ptr< InputSource > input_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1888 of file EventProcessor.cc.

References FDEBUG.

1888  {
1889  if (fb_.get() != nullptr) {
1890  schedule_->closeOutputFiles();
1891  if(hasSubProcess()) subProcess_->closeOutputFiles();
1892  }
1893  FDEBUG(1) << "\tcloseOutputFiles\n";
1894  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::connectSigs ( EventProcessor ep)
private

Definition at line 1312 of file EventProcessor.cc.

References postProcessEventSignal_, and preProcessEventSignal_.

Referenced by init().

1312  {
1313  // When the FwkImpl signals are given, pass them to the
1314  // appropriate EventProcessor signals so that the outside world
1315  // can see the signal.
1316  actReg_->preProcessEventSignal_.connect(std::cref(ep->preProcessEventSignal_));
1317  actReg_->postProcessEventSignal_.connect(std::cref(ep->postProcessEventSignal_));
1318  }
boost::shared_ptr< ActivityRegistry > actReg_
std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1622 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, edm::hlt::Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1622  {
1624  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1625  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1626  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1627  else {
1628  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1629  << fileMode_ << ".\n"
1630  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1631  }
1632 
1633  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1634  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1635  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1636  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1637  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1638  else {
1639  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1640  << emptyRunLumiMode_ << ".\n"
1641  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1642  }
1643 
1644  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1645  fileMode,
1646  emptyRunLumiMode));
1647 
1648  machine->initiate();
1649  return machine;
1650  }
std::string emptyRunLumiMode_
char const * edm::EventProcessor::currentStateName ( ) const

Member functions to support asynchronous interface.

Definition at line 1360 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::microState(), and evf::FWEPWrapper::monitoring().

1360  {
1361  return stateName(getState());
1362  }
event_processor::State getState() const
char const * stateName(event_processor::State s) const
void edm::EventProcessor::declareRunNumber ( RunNumber_t  runNumber)

Definition at line 1400 of file EventProcessor.cc.

References bk::beginJob(), and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1400  {
1401  // inside of beginJob there is a check to see if it has been called before
1402  beginJob();
1404 
1405  // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
1406  //input_->declareRunNumber(runNumber);
1407  }
void changeState(event_processor::Msg)
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2145 of file EventProcessor.cc.

References FDEBUG.

2145  {
2147  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2148  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
2149  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 2133 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

2133  {
2134  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
2135  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
2136  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
2137  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1967 of file EventProcessor.cc.

References FDEBUG.

1967  {
1968  FDEBUG(1) << "\tdoErrorStuff\n";
1969  LogError("StateMachine")
1970  << "The EventProcessor state machine encountered an unexpected event\n"
1971  << "and went to the error state\n"
1972  << "Will attempt to terminate processing normally\n"
1973  << "(IF using the looper the next loop will be attempted)\n"
1974  << "This likely indicates a bug in an input module or corrupted input or both\n";
1976  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::doneAsync ( event_processor::Msg  m)
private

Definition at line 1491 of file EventProcessor.cc.

1491  {
1492  // make sure to include a timeout here so we don't wait forever
1493  // I suspect there are still timing issues with thread startup
1494  // and the setting of the various control variables (stop_count, id_set)
1495  changeState(m);
1496  return waitForAsyncCompletion(60*2);
1497  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1341 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::actionPerformed().

1341  {
1342  schedule_->enableEndPaths(active);
1343  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 756 of file EventProcessor.cc.

References actReg_, trackerHits::c, edm::ExceptionCollector::call(), changeState(), edm::OutputModule::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcess(), edm::ExceptionCollector::hasThrown(), input_, looper_, edm::event_processor::mEndJob, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcess_.

Referenced by evf::FWEPWrapper::init(), and evf::FWEPWrapper::stopAndHalt().

756  {
757  // Collects exceptions, so we don't throw before all operations are performed.
758  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
759 
760  // only allowed to run if state is sIdle, sJobReady, sRunGiven
761  c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
762 
763  //make the services available
765 
766  schedule_->endJob(c);
767  if(hasSubProcess()) {
768  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
769  }
770  c.call(boost::bind(&InputSource::doEndJob, input_.get()));
771  if(looper_) {
772  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
773  }
774  auto actReg = actReg_.get();
775  c.call([actReg](){actReg->postEndJobSignal_();});
776  if(c.hasThrown()) {
777  c.rethrow();
778  }
779  }
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:249
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:74
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2061 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

2061  {
2062  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2063  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2064  //NOTE: Using the max event number for the end of a lumi block is a bad idea
2065  // lumi blocks know their start and end times why not also start and end events?
2066  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
2067  lumiPrincipal.endTime());
2068  espController_->eventSetupForInstance(ts);
2069  EventSetup const& es = esp_->eventSetup();
2070  {
2071  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
2072  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2073  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2074  if(hasSubProcess()) {
2075  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2076  }
2077  sentry.allowThrow();
2078  }
2079  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
2080  if(looper_) {
2081  looper_->doEndLuminosityBlock(lumiPrincipal, es);
2082  }
2083  }
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1938 of file EventProcessor.cc.

References FDEBUG, and ntuplemaker::status.

1938  {
1939  if(looper_) {
1940  ModuleChanger changer(schedule_.get());
1941  looper_->setModuleChanger(&changer);
1942  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1943  looper_->setModuleChanger(nullptr);
1944  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1945  else return false;
1946  }
1947  FDEBUG(1) << "\tendOfLoop\n";
1948  return true;
1949  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1346 of file EventProcessor.cc.

1346  {
1347  return schedule_->endPathsEnabled();
1348  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2009 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

2009  {
2010  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2011  input_->doEndRun(runPrincipal, cleaningUpAfterException);
2013  runPrincipal.endTime());
2014  espController_->eventSetupForInstance(ts);
2015  EventSetup const& es = esp_->eventSetup();
2016  {
2017  typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
2018  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2019  schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2020  if(hasSubProcess()) {
2021  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2022  }
2023  sentry.allowThrow();
2024  }
2025  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
2026  if(looper_) {
2027  looper_->doEndRun(runPrincipal, es);
2028  }
2029  }
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::errorState ( )
private

Definition at line 1486 of file EventProcessor.cc.

References edm::event_processor::sError.

1486  {
1487  state_ = sError;
1488  }
volatile event_processor::State state_
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 1011 of file EventProcessor.cc.

References bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, edm::hlt::Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

1011  {
1012 
1013  if(0 == numberOfForkedChildren_) {return true;}
1014  assert(0<numberOfForkedChildren_);
1015  //do what we want done in common
1016  {
1017  beginJob(); //make sure this was run
1018  // make the services available
1020 
1021  InputSource::ItemType itemType;
1022  itemType = input_->nextItemType();
1023 
1024  assert(itemType == InputSource::IsFile);
1025  {
1026  readFile();
1027  }
1028  itemType = input_->nextItemType();
1029  assert(itemType == InputSource::IsRun);
1030 
1031  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
1032  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
1033  input_->runAuxiliary()->beginTime());
1034  espController_->eventSetupForInstance(ts);
1035  EventSetup const& es = esp_->eventSetup();
1036 
1037  //now get all the data available in the EventSetup
1038  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1039  es.fillAvailableRecordKeys(recordKeys);
1040  std::vector<eventsetup::DataKey> dataKeys;
1041  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1042  itKey != itEnd;
1043  ++itKey) {
1044  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
1045  //see if this is on our exclusion list
1046  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
1047  ExcludedData const* excludedData(nullptr);
1048  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1049  excludedData = &(itExcludeRec->second);
1050  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
1051  //skip all items in this record
1052  continue;
1053  }
1054  }
1055  if(0 != recordPtr) {
1056  dataKeys.clear();
1057  recordPtr->fillRegisteredDataKeys(dataKeys);
1058  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1059  itDataKey != itDataKeyEnd;
1060  ++itDataKey) {
1061  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1062  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1063  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1064  continue;
1065  }
1066  try {
1067  recordPtr->doGet(*itDataKey);
1068  } catch(cms::Exception& e) {
1069  LogWarning("ForkingEventSetupPreFetching") << e.what();
1070  }
1071  }
1072  }
1073  }
1074  }
1075  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
1076  {
1077  // make the services available
1079  Service<JobReport> jobReport;
1080  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1081 
1082  //Now actually do the forking
1083  actReg_->preForkReleaseResourcesSignal_();
1084  input_->doPreForkReleaseResources();
1085  schedule_->preForkReleaseResources();
1086  }
1087  installCustomHandler(SIGCHLD, ep_sigchld);
1088 
1089 
1090  unsigned int childIndex = 0;
1091  unsigned int const kMaxChildren = numberOfForkedChildren_;
1092  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1093  std::vector<pid_t> childrenIds;
1094  childrenIds.reserve(kMaxChildren);
1095  std::vector<int> childrenSockets;
1096  childrenSockets.reserve(kMaxChildren);
1097  std::vector<int> childrenPipes;
1098  childrenPipes.reserve(kMaxChildren);
1099  std::vector<int> childrenSocketsCopy;
1100  childrenSocketsCopy.reserve(kMaxChildren);
1101  std::vector<int> childrenPipesCopy;
1102  childrenPipesCopy.reserve(kMaxChildren);
1103  int pipes[] {0, 0};
1104 
1105  {
1106  // make the services available
1108  Service<JobReport> jobReport;
1109  int sockets[2], fd_flags;
1110  for(; childIndex < kMaxChildren; ++childIndex) {
1111  // Create a UNIX_DGRAM socket pair
1112  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1113  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1114  exit(EXIT_FAILURE);
1115  }
1116  if (pipe(pipes)) {
1117  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1118  exit(EXIT_FAILURE);
1119  }
1120  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1121  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1122  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1123  exit(EXIT_FAILURE);
1124  }
1125  // Mark socket as non-block. Child must be careful to do select prior
1126  // to reading from socket.
1127  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1128  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1129  exit(EXIT_FAILURE);
1130  }
1131  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1132  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1133  exit(EXIT_FAILURE);
1134  }
1135  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1136  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1137  exit(EXIT_FAILURE);
1138  }
1139  // Linux man page notes there are some edge cases where reading from a
1140  // fd can block, even after a select.
1141  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1142  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1143  exit(EXIT_FAILURE);
1144  }
1145  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1146  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1147  exit(EXIT_FAILURE);
1148  }
1149 
1150  childrenPipesCopy = childrenPipes;
1151  childrenSocketsCopy = childrenSockets;
1152 
1153  pid_t value = fork();
1154  if(value == 0) {
1155  // Close the parent's side of the socket and pipe which will talk to us.
1156  close(pipes[0]);
1157  close(sockets[0]);
1158  // Close our copies of the parent's other communication pipes.
1159  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1160  close(*it);
1161  }
1162  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1163  close(*it);
1164  }
1165 
1166  // this is the child process, redirect stdout and stderr to a log file
1167  fflush(stdout);
1168  fflush(stderr);
1169  std::stringstream stout;
1170  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1171  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1172  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1173  }
1174  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1175  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1176  }
1177 
1178  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1179  if(setCpuAffinity_) {
1180  // CPU affinity is handled differently on macosx.
1181  // We disable it and print a message until someone reads:
1182  //
1183  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1184  //
1185  // and implements it.
1186 #ifdef __APPLE__
1187  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1188 #else
1189  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1190  cpu_set_t mask;
1191  CPU_ZERO(&mask);
1192  CPU_SET(childIndex, &mask);
1193  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1194  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1195  exit(-1);
1196  }
1197 #endif
1198  }
1199  break;
1200  } else {
1201  //this is the parent
1202  close(pipes[1]);
1203  close(sockets[1]);
1204  }
1205  if(value < 0) {
1206  LogError("ForkingChild") << "failed to create a child";
1207  exit(-1);
1208  }
1209  childrenIds.push_back(value);
1210  childrenSockets.push_back(sockets[0]);
1211  childrenPipes.push_back(pipes[0]);
1212  }
1213 
1214  if(childIndex < kMaxChildren) {
1215  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1216  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1217 
1218  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1219  input_->doPostForkReacquireResources(receiver);
1220  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1221  //NOTE: sources have to reset themselves by listening to the post fork message
1222  //rewindInput();
1223  return true;
1224  }
1225  jobReport->parentAfterFork(jobReportFile);
1226  }
1227 
1228  //this is the original, which is now the master for all the children
1229 
1230  //Need to wait for signals from the children or externally
1231  // To wait we must
1232  // 1) block the signals we want to wait on so we do not have a race condition
1233  // 2) check that we haven't already meet our ending criteria
1234  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1235  sigset_t blockingSigSet;
1236  sigset_t unblockingSigSet;
1237  sigset_t oldSigSet;
1238  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1239  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1240  sigaddset(&blockingSigSet, SIGCHLD);
1241  sigaddset(&blockingSigSet, SIGUSR2);
1242  sigaddset(&blockingSigSet, SIGINT);
1243  sigdelset(&unblockingSigSet, SIGCHLD);
1244  sigdelset(&unblockingSigSet, SIGUSR2);
1245  sigdelset(&unblockingSigSet, SIGINT);
1246  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1247 
1248  // If there are too many fd's (unlikely, but possible) for select, denote this
1249  // because the sender will fail.
1250  bool too_many_fds = false;
1251  if (pipes[1]+1 > FD_SETSIZE) {
1252  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1253  too_many_fds = true;
1254  }
1255 
1256  //create a thread that sends the units of work to workers
1257  // we create it after all signals were blocked so that this
1258  // thread is never interupted by a signal
1259  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1260  boost::thread senderThread(sender);
1261 
1262  if(not too_many_fds) {
1263  //NOTE: a child could have failed before we got here and even after this call
1264  // which is why the 'if' is conditional on continueAfterChildFailure_
1266  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1267  sigsuspend(&unblockingSigSet);
1269  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1270  }
1271  }
1272  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1273 
1274  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1275  if(child_failed) {
1276  LogError("ForkingStopping") << "child failed";
1277  }
1278  if(shutdown_flag) {
1279  LogSystem("ForkingStopping") << "asked to shutdown";
1280  }
1281 
1282  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1283  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1284  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1285  it != itEnd; ++it) {
1286  /* int result = */ kill(*it, SIGUSR2);
1287  }
1288  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1289  while(num_children_done != kMaxChildren) {
1290  sigsuspend(&unblockingSigSet);
1291  }
1292  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1293  }
1294  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1295  senderThread.join();
1296  if(child_failed && !continueAfterChildFailure_) {
1297  if (child_fail_signal) {
1298  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1299  } else if (child_fail_exit_status) {
1300  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1301  } else {
1302  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1303  }
1304  }
1305  if(too_many_fds) {
1306  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1307  }
1308  return false;
1309  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void possiblyContinueAfterForkChildFailure()
def pipe
Definition: pipe.py:5
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
volatile bool shutdown_flag
#define O_NONBLOCK
Definition: SysFile.h:21
std::unique_ptr< eventsetup::EventSetupsController > espController_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1321 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::init().

1321  {
1322  return schedule_->getAllModuleDescriptions();
1323  }
std::auto_ptr< Schedule > schedule_
State edm::EventProcessor::getState ( ) const
ServiceToken edm::EventProcessor::getToken ( )
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1351 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::init(), and evf::FWEPWrapper::taskWebPage().

1351  {
1352  schedule_->getTriggerReport(rep);
1353  }
string rep
Definition: cuy.py:1188
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::hasSubProcess ( ) const
inlineprivate

Definition at line 313 of file EventProcessor.h.

References subProcess_.

Referenced by beginJob(), and endJob().

313  {
314  return subProcess_.get() != 0;
315  }
std::auto_ptr< SubProcess > subProcess_
void edm::EventProcessor::init ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 594 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper_, branchIDListHelper_, connectSigs(), continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::PrincipalCache::insert(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::popSubProcessParameterSet(), edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, schedule_, serviceToken_, setCpuAffinity_, edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, and subProcess_.

Referenced by EventProcessor().

596  {
597 
598  //std::cerr << processDesc->dump() << std::endl;
599 
600  ROOT::Cintex::Cintex::Enable();
601 
602  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
603  //std::cerr << parameterSet->dump() << std::endl;
604 
605  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
606  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
607 
608  // Now set some parameters specific to the main process.
609  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
610  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
611  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
612  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
613  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
614  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
615  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
616  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
617  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
618  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
619  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
620  itPS != itPSEnd;
621  ++itPS) {
622  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
623  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
624  itPS->getUntrackedParameter<std::string>("label", "")));
625  }
626  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
627 
628  // Now do general initialization
629  ScheduleItems items;
630 
631  //initialize the services
632  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
633  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
634  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
635 
636  //make the services available
638 
639  // intialize miscellaneous items
640  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
641 
642  // intialize the event setup provider
643  esp_ = espController_->makeProvider(*parameterSet);
644 
645  // initialize the looper, if any
646  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
647  if(looper_) {
648  looper_->setActionTable(items.act_table_.get());
649  looper_->attachTo(*items.actReg_);
650  }
651 
652  // initialize the input source
653  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
654 
655  // intialize the Schedule
656  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
657 
658  // set the data members
659  act_table_ = std::move(items.act_table_);
660  actReg_ = items.actReg_;
661  preg_.reset(items.preg_.release());
662  branchIDListHelper_ = items.branchIDListHelper_;
663  processConfiguration_ = items.processConfiguration_;
664 
665  FDEBUG(2) << parameterSet << std::endl;
666  connectSigs(this);
667 
668  // Reusable event principal
669  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
671 
672  // initialize the subprocess, if there is one
673  if(subProcessParameterSet) {
674  subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
675  }
676  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:378
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
void insert(boost::shared_ptr< RunPrincipal > rp)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static void setThrowAnException(bool v)
void connectSigs(EventProcessor *ep)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
char const * edm::EventProcessor::msgName ( event_processor::Msg  m) const

Definition at line 1368 of file EventProcessor.cc.

References m.

1368  {
1369  return msgNames[m];
1370  }
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1880 of file EventProcessor.cc.

References FDEBUG.

1880  {
1881  if (fb_.get() != nullptr) {
1882  schedule_->openOutputFiles(*fb_);
1883  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1884  }
1885  FDEBUG(1) << "\topenOutputFiles\n";
1886  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 995 of file EventProcessor.cc.

995  {
996  if(child_failed && continueAfterChildFailure_) {
997  if (child_fail_signal) {
998  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
999  child_fail_signal=0;
1000  } else if (child_fail_exit_status) {
1001  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1002  child_fail_exit_status=0;
1003  } else {
1004  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1005  }
1006  child_failed =false;
1007  }
1008  }
ActivityRegistry::PostProcessEvent& edm::EventProcessor::postProcessEventSignal ( )
inline

signal is emitted after all modules have finished processing the Event

Definition at line 208 of file EventProcessor.h.

References postProcessEventSignal_.

208 {return postProcessEventSignal_;}
ActivityRegistry::PostProcessEvent postProcessEventSignal_
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1957 of file EventProcessor.cc.

References FDEBUG.

1957  {
1958  looper_->prepareForNextLoop(esp_.get());
1959  FDEBUG(1) << "\tprepareForNextLoop\n";
1960  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ActivityRegistry::PreProcessEvent& edm::EventProcessor::preProcessEventSignal ( )
inline

signal is emitted after the Event has been created by the InputSource but before any modules have seen the Event

Definition at line 203 of file EventProcessor.h.

References preProcessEventSignal_.

203 {return preProcessEventSignal_;}
ActivityRegistry::PreProcessEvent preProcessEventSignal_
int edm::EventProcessor::readAndCacheLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2102 of file EventProcessor.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

2102  {
2105  << "EventProcessor::readAndCacheRun\n"
2106  << "Illegal attempt to insert lumi into cache\n"
2107  << "Contact a Framework Developer\n";
2108  }
2111  << "EventProcessor::readAndCacheRun\n"
2112  << "Illegal attempt to insert lumi into cache\n"
2113  << "Run is invalid\n"
2114  << "Contact a Framework Developer\n";
2115  }
2116  principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
2118  return input_->luminosityBlock();
2119  }
bool hasRunPrincipal() const
void insert(boost::shared_ptr< RunPrincipal > rp)
boost::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::unique_ptr< HistoryAppender > historyAppender_
bool hasLumiPrincipal() const
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndCacheRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2085 of file EventProcessor.cc.

References edm::hlt::Exception, edm::errors::LogicError, and PDRates::Run.

2085  {
2088  << "EventProcessor::readAndCacheRun\n"
2089  << "Illegal attempt to insert run into cache\n"
2090  << "Contact a Framework Developer\n";
2091  }
2092  principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
2093  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2094  }
bool hasRunPrincipal() const
void insert(boost::shared_ptr< RunPrincipal > rp)
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2121 of file EventProcessor.cc.

2121  {
2122  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
2123  input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
2124  return input_->luminosityBlock();
2125  }
void merge(boost::shared_ptr< RunAuxiliary > aux, boost::shared_ptr< ProductRegistry const > reg)
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2096 of file EventProcessor.cc.

References PDRates::Run.

2096  {
2097  principalCache_.merge(input_->runAuxiliary(), preg_);
2098  input_->readAndMergeRun(principalCache_.runPrincipalPtr());
2099  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2100  }
void merge(boost::shared_ptr< RunAuxiliary > aux, boost::shared_ptr< ProductRegistry const > reg)
boost::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2151 of file EventProcessor.cc.

References edm::EventPrincipal::clearEventPrincipal(), FDEBUG, edm::EventPrincipal::id(), edm::ProcessingController::lastOperationSucceeded(), edm::EventPrincipal::luminosityBlock(), edm::EventPrincipal::luminosityBlockPrincipalPtrValid(), edm::ProcessingController::requestedTransition(), edm::EventPrincipal::run(), edm::ProcessingController::setLastOperationSucceeded(), edm::EventPrincipal::setLuminosityBlockPrincipal(), edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, summarizeEdmComparisonLogfiles::succeeded, and edm::EventPrincipal::time().

2151  {
2152  EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
2153  FDEBUG(1) << "\treadEvent\n";
2154  assert(pep != 0);
2155  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2156  assert(pep->luminosityBlockPrincipalPtrValid());
2157  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2158  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2159 
2160  IOVSyncValue ts(pep->id(), pep->time());
2161  espController_->eventSetupForInstance(ts);
2162  EventSetup const& es = esp_->eventSetup();
2163  {
2164  typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
2165  ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2166  schedule_->processOneOccurrence<Traits>(*pep, es);
2167  if(hasSubProcess()) {
2168  subProcess_->doEvent(*pep, ts);
2169  }
2170  sentry.allowThrow();
2171  }
2172 
2173  if(looper_) {
2174  bool randomAccess = input_->randomAccess();
2175  ProcessingController::ForwardState forwardState = input_->forwardState();
2176  ProcessingController::ReverseState reverseState = input_->reverseState();
2177  ProcessingController pc(forwardState, reverseState, randomAccess);
2178 
2180  do {
2181  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2182 
2183  bool succeeded = true;
2184  if(randomAccess) {
2185  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2186  input_->skipEvents(-2);
2187  }
2188  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2189  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2190  }
2191  }
2192  pc.setLastOperationSucceeded(succeeded);
2193  } while(!pc.lastOperationSucceeded());
2194  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2195 
2196  }
2197 
2198  FDEBUG(1) << "\tprocessEvent\n";
2199  pep->clearEventPrincipal();
2200  }
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
EventPrincipal & eventPrincipal() const
std::auto_ptr< SubProcess > subProcess_
tuple status
Definition: ntuplemaker.py:245
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1860 of file EventProcessor.cc.

References FDEBUG, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1860  {
1861  FDEBUG(1) << " \treadFile\n";
1862  size_t size = preg_->size();
1863  fb_ = input_->readFile();
1864  if(size < preg_->size()) {
1866  }
1868  if(numberOfForkedChildren_ > 0) {
1869  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1870  }
1871  }
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void adjustEventToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
std::unique_ptr< FileBlock > fb_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1904 of file EventProcessor.cc.

References FDEBUG.

1904  {
1905  if (fb_.get() != nullptr) {
1906  schedule_->respondToCloseInputFile(*fb_);
1907  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1908  }
1909  FDEBUG(1) << "\trespondToCloseInputFile\n";
1910  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToCloseOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1920 of file EventProcessor.cc.

References FDEBUG.

1920  {
1921  if (fb_.get() != nullptr) {
1922  schedule_->respondToCloseOutputFiles(*fb_);
1923  if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1924  }
1925  FDEBUG(1) << "\trespondToCloseOutputFiles\n";
1926  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1896 of file EventProcessor.cc.

References FDEBUG.

1896  {
1897  if (fb_.get() != nullptr) {
1898  schedule_->respondToOpenInputFile(*fb_);
1899  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1900  }
1901  FDEBUG(1) << "\trespondToOpenInputFile\n";
1902  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1912 of file EventProcessor.cc.

References FDEBUG.

1912  {
1913  if (fb_.get() != nullptr) {
1914  schedule_->respondToOpenOutputFiles(*fb_);
1915  if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1916  }
1917  FDEBUG(1) << "\trespondToOpenOutputFiles\n";
1918  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1951 of file EventProcessor.cc.

References FDEBUG.

1951  {
1952  input_->repeat();
1953  input_->rewind();
1954  FDEBUG(1) << "\trewind\n";
1955  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< InputSource > input_
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 384 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.LuminosityBlockID::cppID().

384  {
385  return runToCompletion(false);
386  }
virtual StatusCode runToCompletion(bool onlineStateTransitions)
void edm::EventProcessor::runAsync ( )

Definition at line 1529 of file EventProcessor.cc.

References bk::beginJob(), edm::hlt::Exception, edm::event_processor::mRunAsync, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1529  {
1530  beginJob();
1531  {
1532  boost::mutex::scoped_lock sl(stop_lock_);
1533  if(id_set_ == true) {
1534  std::string err("runAsync called while async event loop already running\n");
1535  LogError("FwkJob") << err;
1536  throw cms::Exception("BadState") << err;
1537  }
1538 
1540 
1541  stop_count_ = 0;
1542  last_rc_ = epSuccess; // forget the last value!
1543  event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
1544  boost::xtime timeout;
1545 #if BOOST_VERSION >= 105000
1546  boost::xtime_get(&timeout, boost::TIME_UTC_);
1547 #else
1548  boost::xtime_get(&timeout, boost::TIME_UTC);
1549 #endif
1550  timeout.sec += 60; // 60 seconds to start!!!!
1551  if(starter_.timed_wait(sl, timeout) == false) {
1552  // yikes - the thread did not start
1553  throw cms::Exception("BadState")
1554  << "Async run thread did not start in 60 seconds\n";
1555  }
1556  }
1557  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::mutex stop_lock_
static void asyncRun(EventProcessor *)
volatile Status last_rc_
void changeState(event_processor::Msg)
boost::condition starter_
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( bool  onlineStateTransitions)
virtual

Implements edm::IEventProcessor.

Definition at line 1654 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), edm::convertException::badAllocToEDM(), bk::beginJob(), trackerHits::c, edm::convertException::charPtrToEDM(), alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, FDEBUG, edm::errors::LogicError, edm::event_processor::mFinished, edm::event_processor::mInputExhausted, edm::event_processor::mRunCount, edm::event_processor::mShutdownSignal, runEdmFileComparison::returnCode, alignCSCRings::s, edm::shutdown_flag, findQualityFiles::size, edm::event_processor::sShuttingDown, edm::event_processor::sStopping, edm::convertException::stdToEDM(), AlCaHLTBitMon_QueryRunRegistry::string, edm::convertException::stringToEDM(), edm::event_processor::StateSentry::succeeded(), edm::convertException::unknownToEDM(), and edm::usr2_lock.

Referenced by asyncRun(), and run().

1654  {
1655 
1656  StateSentry toerror(this);
1657 
1659  std::auto_ptr<statemachine::Machine> machine;
1660  {
1661  beginJob(); //make sure this was called
1662 
1663  if(!onlineStateTransitions) changeState(mRunCount);
1664 
1665  //StatusCode returnCode = epSuccess;
1667 
1668  // make the services available
1670 
1671  machine = createStateMachine();
1672  try {
1673  try {
1674 
1675  InputSource::ItemType itemType;
1676 
1677  while(true) {
1678 
1679  bool more = true;
1680  if(numberOfForkedChildren_ > 0) {
1681  size_t size = preg_->size();
1682  more = input_->skipForForking();
1683  if(more) {
1684  if(size < preg_->size()) {
1686  }
1688  }
1689  }
1690  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1691 
1692  FDEBUG(1) << "itemType = " << itemType << "\n";
1693 
1694  // These are used for asynchronous running only and
1695  // and are checking to see if stopAsync or shutdownAsync
1696  // were called from another thread. In the future, we
1697  // may need to do something better than polling the state.
1698  // With the current code this is the simplest thing and
1699  // it should always work. If the interaction between
1700  // threads becomes more complex this may cause problems.
1701  if(state_ == sStopping) {
1702  FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
1703  forceLooperToEnd_ = true;
1704  machine->process_event(statemachine::Stop());
1705  forceLooperToEnd_ = false;
1706  break;
1707  }
1708  else if(state_ == sShuttingDown) {
1709  FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
1710  forceLooperToEnd_ = true;
1711  machine->process_event(statemachine::Stop());
1712  forceLooperToEnd_ = false;
1713  break;
1714  }
1715 
1716  // Look for a shutdown signal
1717  {
1718  boost::mutex::scoped_lock sl(usr2_lock);
1719  if(shutdown_flag) {
1721  returnCode = epSignal;
1722  forceLooperToEnd_ = true;
1723  machine->process_event(statemachine::Stop());
1724  forceLooperToEnd_ = false;
1725  break;
1726  }
1727  }
1728 
1729  if(itemType == InputSource::IsStop) {
1730  machine->process_event(statemachine::Stop());
1731  }
1732  else if(itemType == InputSource::IsFile) {
1733  machine->process_event(statemachine::File());
1734  }
1735  else if(itemType == InputSource::IsRun) {
1736  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1737  }
1738  else if(itemType == InputSource::IsLumi) {
1739  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1740  }
1741  else if(itemType == InputSource::IsEvent) {
1742  machine->process_event(statemachine::Event());
1743  }
1744  // This should be impossible
1745  else {
1747  << "Unknown next item type passed to EventProcessor\n"
1748  << "Please report this error to the Framework group\n";
1749  }
1750 
1751  if(machine->terminated()) {
1753  break;
1754  }
1755  } // End of loop over state machine events
1756  } // Try block
1757  catch (cms::Exception& e) { throw; }
1758  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
1759  catch (std::exception& e) { convertException::stdToEDM(e); }
1761  catch(char const* c) { convertException::charPtrToEDM(c); }
1762  catch (...) { convertException::unknownToEDM(); }
1763  } // Try block
1764  // Some comments on exception handling related to the boost state machine:
1765  //
1766  // Some states used in the machine are special because they
1767  // perform actions while the machine is being terminated, actions
1768  // such as close files, call endRun, call endLumi etc ... Each of these
1769  // states has two functions that perform these actions. The functions
1770  // are almost identical. The major difference is that one version
1771  // catches all exceptions and the other lets exceptions pass through.
1772  // The destructor catches them and the other function named "exit" lets
1773  // them pass through. On a normal termination, boost will always call
1774  // "exit" and then the state destructor. In our state classes, the
1775  // the destructors do nothing if the exit function already took
1776  // care of things. Here's the interesting part. When boost is
1777  // handling an exception the "exit" function is not called (a boost
1778  // feature).
1779  //
1780  // If an exception occurs while the boost machine is in control
1781  // (which usually means inside a process_event call), then
1782  // the boost state machine destroys its states and "terminates" itself.
1783  // This already done before we hit the catch blocks below. In this case
1784  // the call to terminateMachine below only destroys an already
1785  // terminated state machine. Because exit is not called, the state destructors
1786  // handle cleaning up lumis, runs, and files. The destructors swallow
1787  // all exceptions and only pass through the exceptions messages, which
1788  // are tacked onto the original exception below.
1789  //
1790  // If an exception occurs when the boost state machine is not
1791  // in control (outside the process_event functions), then boost
1792  // cannot destroy its own states. The terminateMachine function
1793  // below takes care of that. The flag "alreadyHandlingException"
1794  // is set true so that the state exit functions do nothing (and
1795  // cannot throw more exceptions while handling the first). Then the
1796  // state destructors take care of this because exit did nothing.
1797  //
1798  // In both cases above, the EventProcessor::endOfLoop function is
1799  // not called because it can throw exceptions.
1800  //
1801  // One tricky aspect of the state machine is that things that can
1802  // throw should not be invoked by the state machine while another
1803  // exception is being handled.
1804  // Another tricky aspect is that it appears to be important to
1805  // terminate the state machine before invoking its destructor.
1806  // We've seen crashes that are not understood when that is not
1807  // done. Maintainers of this code should be careful about this.
1808 
1809  catch (cms::Exception & e) {
1811  terminateMachine(machine);
1812  alreadyHandlingException_ = false;
1813  if (!exceptionMessageLumis_.empty()) {
1815  if (e.alreadyPrinted()) {
1816  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1817  }
1818  }
1819  if (!exceptionMessageRuns_.empty()) {
1821  if (e.alreadyPrinted()) {
1822  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1823  }
1824  }
1825  if (!exceptionMessageFiles_.empty()) {
1827  if (e.alreadyPrinted()) {
1828  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1829  }
1830  }
1831  throw;
1832  }
1833 
1834  if(machine->terminated()) {
1835  FDEBUG(1) << "The state machine reports it has been terminated\n";
1836  machine.reset();
1837  }
1838 
1839  if(!onlineStateTransitions) changeState(mFinished);
1840 
1842  throw cms::Exception("BadState")
1843  << "The boost state machine in the EventProcessor exited after\n"
1844  << "entering the Error state.\n";
1845  }
1846 
1847  }
1848  if(machine.get() != 0) {
1849  terminateMachine(machine);
1851  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1852  << "Please report this error to the Framework group\n";
1853  }
1854 
1855  toerror.succeeded();
1856 
1857  return returnCode;
1858  }
std::auto_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
void adjustEventToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
ServiceToken serviceToken_
std::string exceptionMessageLumis_
void stdToEDM(std::exception const &e)
volatile event_processor::State state_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
std::unique_ptr< InputSource > input_
volatile bool shutdown_flag
tuple size
Write out results.
boost::mutex usr2_lock
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2208 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2208  {
2210  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2216 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2216  {
2218  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2212 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2212  {
2214  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setRunNumber ( RunNumber_t  runNumber)

Definition at line 1383 of file EventProcessor.cc.

References bk::beginJob(), and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1383  {
1384  if(runNumber == 0) {
1385  runNumber = 1;
1386  LogWarning("Invalid Run")
1387  << "EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
1388  << "Run number was set to 1 instead\n";
1389  }
1390 
1391  // inside of beginJob there is a check to see if it has been called before
1392  beginJob();
1394 
1395  // interface not correct yet
1396  input_->setRunNumber(runNumber);
1397  }
void changeState(event_processor::Msg)
std::unique_ptr< InputSource > input_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1962 of file EventProcessor.cc.

References FDEBUG.

1962  {
1963  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1964  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1965  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2202 of file EventProcessor.cc.

References FDEBUG.

2202  {
2203  FDEBUG(1) << "\tshouldWeStop\n";
2204  if(shouldWeStop_) return true;
2205  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2206  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor::StatusCode edm::EventProcessor::shutdownAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1478 of file EventProcessor.cc.

References edm::event_processor::mFinished, and edm::event_processor::mShutdownAsync.

1478  {
1481  if(rc != epTimedOut) changeState(mFinished);
1482  else errorState();
1483  return rc;
1484  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1928 of file EventProcessor.cc.

References FDEBUG.

1928  {
1929  shouldWeStop_ = false;
1930  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1931  // until after we've called beginOfJob
1932  if(looper_ && looperBeginJobRun_) {
1933  looper_->doStartingNewLoop();
1934  }
1935  FDEBUG(1) << "\tstartingNewLoop\n";
1936  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * edm::EventProcessor::stateName ( event_processor::State  s) const
EventProcessor::StatusCode edm::EventProcessor::statusAsync ( ) const

Definition at line 1376 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1376  {
1377  // the thread will record exception/error status in the event processor
1378  // for us to look at and report here
1379  return last_rc_;
1380  }
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::stopAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1470 of file EventProcessor.cc.

References edm::event_processor::mFinished, and edm::event_processor::mStopAsync.

1470  {
1473  if(rc != epTimedOut) changeState(mFinished);
1474  else errorState();
1475  return rc;
1476  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine)
private

Definition at line 2224 of file EventProcessor.cc.

References FDEBUG.

2224  {
2225  if(iMachine.get() != 0) {
2226  if(!iMachine->terminated()) {
2227  forceLooperToEnd_ = true;
2228  iMachine->process_event(statemachine::Stop());
2229  forceLooperToEnd_ = false;
2230  }
2231  else {
2232  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2233  }
2234  if(iMachine->terminated()) {
2235  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2236  }
2237  iMachine.reset();
2238  }
2239  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1326 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1326  {
1327  return schedule_->totalEvents();
1328  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1336 of file EventProcessor.cc.

1336  {
1337  return schedule_->totalEventsFailed();
1338  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1331 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1331  {
1332  return schedule_->totalEventsPassed();
1333  }
std::auto_ptr< Schedule > schedule_
EventProcessor::StatusCode edm::EventProcessor::waitForAsyncCompletion ( unsigned int  timeout_seconds)
private

Definition at line 1410 of file EventProcessor.cc.

1410  {
1411  bool rc = true;
1412  boost::xtime timeout;
1413 
1414 #if BOOST_VERSION >= 105000
1415  boost::xtime_get(&timeout, boost::TIME_UTC_);
1416 #else
1417  boost::xtime_get(&timeout, boost::TIME_UTC);
1418 #endif
1419  timeout.sec += timeout_seconds;
1420 
1421  // make sure to include a timeout here so we don't wait forever
1422  // I suspect there are still timing issues with thread startup
1423  // and the setting of the various control variables (stop_count, id_set)
1424  {
1425  boost::mutex::scoped_lock sl(stop_lock_);
1426 
1427  // look here - if runAsync not active, just return the last return code
1428  if(stop_count_ < 0) return last_rc_;
1429 
1430  if(timeout_seconds == 0) {
1431  while(stop_count_ == 0) stopper_.wait(sl);
1432  } else {
1433  while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
1434  }
1435 
1436  if(rc == false) {
1437  // timeout occurred
1438  // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
1439  // this is a temporary hack until we get the input source
1440  // upgraded to allow blocking input sources to be unblocked
1441 
1442  // the next line is dangerous and causes all sorts of trouble
1443  if(id_set_) pthread_cancel(event_loop_id_);
1444 
1445  // we will not do anything yet
1446  LogWarning("timeout")
1447  << "An asynchronous request was made to shut down "
1448  << "the event loop "
1449  << "and the event loop did not shutdown after "
1450  << timeout_seconds << " seconds\n";
1451  } else {
1452  event_loop_->join();
1453  event_loop_.reset();
1454  id_set_ = false;
1455  stop_count_ = -1;
1456  }
1457  }
1458  return rc == false ? epTimedOut : last_rc_;
1459  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::condition stopper_
boost::mutex stop_lock_
volatile pthread_t event_loop_id_
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::waitTillDoneAsync ( unsigned int  timeout_seconds = 0)

Definition at line 1462 of file EventProcessor.cc.

References edm::event_processor::mCountComplete.

Referenced by evf::FWEPWrapper::stop().

1462  {
1463  StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1465  else errorState();
1466  return rc;
1467  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2139 of file EventProcessor.cc.

References FDEBUG.

2139  {
2140  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2141  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2142  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
2143  }
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 2127 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

2127  {
2128  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
2129  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
2130  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
2131  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class event_processor::StateSentry
friend

Definition at line 377 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ActionTable const> edm::EventProcessor::act_table_
private

Definition at line 334 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 364 of file EventProcessor.h.

boost::shared_ptr<BranchIDListHelper> edm::EventProcessor::branchIDListHelper_
private

Definition at line 329 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 372 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 360 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private

Definition at line 333 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

std::unique_ptr<eventsetup::EventSetupsController> edm::EventProcessor::espController_
private

Definition at line 332 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

boost::shared_ptr<boost::thread> edm::EventProcessor::event_loop_
private

Definition at line 341 of file EventProcessor.h.

volatile pthread_t edm::EventProcessor::event_loop_id_
private

Definition at line 351 of file EventProcessor.h.

Referenced by asyncRun().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 376 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 361 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 363 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 362 of file EventProcessor.h.

std::unique_ptr<FileBlock> edm::EventProcessor::fb_
private

Definition at line 353 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 359 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 367 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 365 of file EventProcessor.h.

std::unique_ptr<HistoryAppender> edm::EventProcessor::historyAppender_
private

Definition at line 338 of file EventProcessor.h.

Referenced by init().

volatile bool edm::EventProcessor::id_set_
private

Definition at line 350 of file EventProcessor.h.

Referenced by asyncRun().

std::unique_ptr<InputSource> edm::EventProcessor::input_
private

Definition at line 331 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

std::string edm::EventProcessor::last_error_text_
private

Definition at line 349 of file EventProcessor.h.

Referenced by asyncRun().

volatile Status edm::EventProcessor::last_rc_
private

Definition at line 348 of file EventProcessor.h.

Referenced by asyncRun().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 366 of file EventProcessor.h.

int edm::EventProcessor::my_sig_num_
private

Definition at line 352 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 369 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 370 of file EventProcessor.h.

Referenced by init().

ActivityRegistry::PostProcessEvent edm::EventProcessor::postProcessEventSignal_
private

Definition at line 326 of file EventProcessor.h.

Referenced by connectSigs(), and postProcessEventSignal().

boost::shared_ptr<ProductRegistry const> edm::EventProcessor::preg_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginJob(), and init().

ActivityRegistry::PreProcessEvent edm::EventProcessor::preProcessEventSignal_
private

Definition at line 325 of file EventProcessor.h.

Referenced by connectSigs(), and preProcessEventSignal().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 356 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 335 of file EventProcessor.h.

Referenced by init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 371 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 357 of file EventProcessor.h.

boost::condition edm::EventProcessor::starter_
private

Definition at line 346 of file EventProcessor.h.

Referenced by asyncRun().

volatile event_processor::State edm::EventProcessor::state_
private

Definition at line 340 of file EventProcessor.h.

Referenced by beginJob().

boost::mutex edm::EventProcessor::state_lock_
private

Definition at line 343 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 358 of file EventProcessor.h.

volatile int edm::EventProcessor::stop_count_
private

Definition at line 347 of file EventProcessor.h.

Referenced by asyncRun().

boost::mutex edm::EventProcessor::stop_lock_
private

Definition at line 344 of file EventProcessor.h.

Referenced by asyncRun().

boost::condition edm::EventProcessor::stopper_
private

Definition at line 345 of file EventProcessor.h.

Referenced by asyncRun().

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_
private