CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
char const * currentStateName () const
 
void declareRunNumber (RunNumber_t runNumber)
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
 EventProcessor (EventProcessor const &)=delete
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
event_processor::State getState () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
char const * msgName (event_processor::Msg m) const
 
virtual void openOutputFiles ()
 
EventProcessoroperator= (EventProcessor const &)=delete
 
ActivityRegistry::PostProcessEventpostProcessEventSignal ()
 
virtual void prepareForNextLoop ()
 
ActivityRegistry::PreProcessEventpreProcessEventSignal ()
 
virtual int readAndCacheLumi ()
 
virtual statemachine::Run readAndCacheRun ()
 
virtual int readAndMergeLumi ()
 
virtual statemachine::Run readAndMergeRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToCloseOutputFiles ()
 
virtual void respondToOpenInputFile ()
 
virtual void respondToOpenOutputFiles ()
 
virtual void rewindInput ()
 
StatusCode run ()
 
void runAsync ()
 
virtual StatusCode runToCompletion (bool onlineStateTransitions)
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
void setRunNumber (RunNumber_t runNumber)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
StatusCode shutdownAsync (unsigned int timeout_secs=60 *2)
 
virtual void startingNewLoop ()
 
char const * stateName (event_processor::State s) const
 
StatusCode statusAsync () const
 
StatusCode stopAsync (unsigned int timeout_secs=60 *2)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
StatusCode waitTillDoneAsync (unsigned int timeout_seconds=0)
 
virtual void writeLumi (ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

void changeState (event_processor::Msg)
 
void connectSigs (EventProcessor *ep)
 
std::auto_ptr
< statemachine::Machine
createStateMachine ()
 
StatusCode doneAsync (event_processor::Msg m)
 
void errorState ()
 
bool hasSubProcess () const
 
void init (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
void possiblyContinueAfterForkChildFailure ()
 
void setupSignal ()
 
void terminateMachine (std::auto_ptr< statemachine::Machine > &)
 
StatusCode waitForAsyncCompletion (unsigned int timeout_seconds)
 

Static Private Member Functions

static void asyncRun (EventProcessor *)
 

Private Attributes

std::unique_ptr< ActionTable
const > 
act_table_
 
boost::shared_ptr
< ActivityRegistry
actReg_
 
bool alreadyHandlingException_
 
boost::shared_ptr
< BranchIDListHelper
branchIDListHelper_
 
bool continueAfterChildFailure_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
std::unique_ptr
< eventsetup::EventSetupsController
espController_
 
boost::shared_ptr< boost::thread > event_loop_
 
volatile pthread_t event_loop_id_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
std::unique_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
std::unique_ptr< HistoryAppenderhistoryAppender_
 
volatile bool id_set_
 
std::unique_ptr< InputSourceinput_
 
std::string last_error_text_
 
volatile Status last_rc_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
int my_sig_num_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
ActivityRegistry::PostProcessEvent postProcessEventSignal_
 
boost::shared_ptr
< ProductRegistry const > 
preg_
 
ActivityRegistry::PreProcessEvent preProcessEventSignal_
 
PrincipalCache principalCache_
 
boost::shared_ptr
< ProcessConfiguration const > 
processConfiguration_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
boost::condition starter_
 
volatile event_processor::State state_
 
boost::mutex state_lock_
 
bool stateMachineWasInErrorState_
 
volatile int stop_count_
 
boost::mutex stop_lock_
 
boost::condition stopper_
 
std::auto_ptr< SubProcesssubProcess_
 

Friends

class event_processor::StateSentry
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 70 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 374 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 375 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 370 of file EventProcessor.cc.

References init(), and PythonProcessDesc::parameterSet().

374  :
377  actReg_(),
378  preg_(),
380  serviceToken_(),
381  input_(),
382  espController_(new eventsetup::EventSetupsController),
383  esp_(),
384  act_table_(),
386  schedule_(),
387  subProcess_(),
388  historyAppender_(new HistoryAppender),
389  state_(sInit),
390  event_loop_(),
391  state_lock_(),
392  stop_lock_(),
393  stopper_(),
394  starter_(),
395  stop_count_(-1),
398  id_set_(false),
399  event_loop_id_(),
401  fb_(),
402  looper_(),
403  principalCache_(),
404  shouldWeStop_(false),
406  fileMode_(),
412  forceLooperToEnd_(false),
413  looperBeginJobRun_(false),
417  setCpuAffinity_(false),
419  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
420  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
421  processDesc->addServices(defaultServices, forcedServices);
422  init(processDesc, iToken, iLegacy);
423  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 425 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::parameterSet().

427  :
430  actReg_(),
431  preg_(),
433  serviceToken_(),
434  input_(),
435  espController_(new eventsetup::EventSetupsController),
436  esp_(),
437  act_table_(),
439  schedule_(),
440  subProcess_(),
441  historyAppender_(new HistoryAppender),
442  state_(sInit),
443  event_loop_(),
444  state_lock_(),
445  stop_lock_(),
446  stopper_(),
447  starter_(),
448  stop_count_(-1),
451  id_set_(false),
452  event_loop_id_(),
454  fb_(),
455  looper_(),
456  principalCache_(),
457  shouldWeStop_(false),
459  fileMode_(),
465  forceLooperToEnd_(false),
466  looperBeginJobRun_(false),
470  setCpuAffinity_(false),
472  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
473  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
474  processDesc->addServices(defaultServices, forcedServices);
476  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 478 of file EventProcessor.cc.

References init().

480  :
483  actReg_(),
484  preg_(),
486  serviceToken_(),
487  input_(),
488  espController_(new eventsetup::EventSetupsController),
489  esp_(),
490  act_table_(),
492  schedule_(),
493  subProcess_(),
494  historyAppender_(new HistoryAppender),
495  state_(sInit),
496  event_loop_(),
497  state_lock_(),
498  stop_lock_(),
499  stopper_(),
500  starter_(),
501  stop_count_(-1),
504  id_set_(false),
505  event_loop_id_(),
507  fb_(),
508  looper_(),
509  principalCache_(),
510  shouldWeStop_(false),
512  fileMode_(),
518  forceLooperToEnd_(false),
519  looperBeginJobRun_(false),
523  setCpuAffinity_(false),
525  init(processDesc, token, legacy);
526  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 529 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::parameterSet().

529  :
532  actReg_(),
533  preg_(),
535  serviceToken_(),
536  input_(),
537  espController_(new eventsetup::EventSetupsController),
538  esp_(),
539  act_table_(),
541  schedule_(),
542  subProcess_(),
543  historyAppender_(new HistoryAppender),
544  state_(sInit),
545  event_loop_(),
546  state_lock_(),
547  stop_lock_(),
548  stopper_(),
549  starter_(),
550  stop_count_(-1),
553  id_set_(false),
554  event_loop_id_(),
556  fb_(),
557  looper_(),
558  principalCache_(),
559  shouldWeStop_(false),
561  fileMode_(),
567  forceLooperToEnd_(false),
568  looperBeginJobRun_(false),
572  setCpuAffinity_(false),
574  if(isPython) {
575  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
576  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
578  }
579  else {
580  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
582  }
583  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
volatile pthread_t event_loop_id_
volatile Status last_rc_
std::unique_ptr< InputSource > input_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
edm::EventProcessor::~EventProcessor ( )

Definition at line 670 of file EventProcessor.cc.

References actReg_, changeState(), edm::detail::ThreadSafeRegistry< KEY, T, E >::data(), alignCSCRings::e, esp_, espController_, cms::Exception::explainSelf(), edm::detail::ThreadSafeRegistry< KEY, T, E >::extra(), getToken(), input_, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), looper_, edm::event_processor::mDtor, schedule_, and subProcess_.

670  {
671  // Make the services available while everything is being deleted.
672  ServiceToken token = getToken();
673  ServiceRegistry::Operate op(token);
674  try {
676  }
677  catch(cms::Exception& e) {
678  LogError("System")
679  << e.explainSelf() << "\n";
680  }
681 
682  // manually destroy all these thing that may need the services around
683  espController_.reset();
684  subProcess_.reset();
685  esp_.reset();
686  schedule_.reset();
687  input_.reset();
688  looper_.reset();
689  actReg_.reset();
690 
691  pset::Registry* psetRegistry = pset::Registry::instance();
692  psetRegistry->data().clear();
693  psetRegistry->extra().setID(ParameterSetID());
694 
696  ParentageRegistry::instance()->data().clear();
699  }
virtual std::string explainSelf() const
Definition: Exception.cc:146
static ThreadSafeRegistry * instance()
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
detail::ThreadSafeRegistry< ParameterSetID, ParameterSet, ProcessParameterSetIDCache > Registry
Definition: Registry.h:37
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
std::unique_ptr< InputSource > input_
ServiceToken getToken()
std::auto_ptr< SubProcess > subProcess_
collection_type & data()
Provide access to the contained collection.
std::unique_ptr< eventsetup::EventSetupsController > espController_
edm::EventProcessor::EventProcessor ( EventProcessor const &  )
delete

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2207 of file EventProcessor.cc.

2207  {
2209  }
void edm::EventProcessor::asyncRun ( EventProcessor me)
staticprivate

Definition at line 1551 of file EventProcessor.cc.

References alignCSCRings::e, event_loop_id_, cppFunctionSkipper::exception, cms::Exception::explainSelf(), FDEBUG, id_set_, last_error_text_, last_rc_, runToCompletion(), starter_, stop_count_, stop_lock_, and stopper_.

1551  {
1552  // set up signals to allow for interruptions
1553  // ignore all other signals
1554  // make sure no exceptions escape out
1555 
1556  // temporary hack until we modify the input source to allow
1557  // wakeup calls from other threads. This mimics the solution
1558  // in EventFilter/Processor, which I do not like.
1559  // allowing cancels means that the thread just disappears at
1560  // certain points. This is bad for C++ stack variables.
1561  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1562  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
1563  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1564  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1565 
1566  {
1567  boost::mutex::scoped_lock sl(me->stop_lock_);
1568  me->event_loop_id_ = pthread_self();
1569  me->id_set_ = true;
1570  me->starter_.notify_all();
1571  }
1572 
1573  Status rc = epException;
1574  FDEBUG(2) << "asyncRun starting ......................\n";
1575 
1576  try {
1577  bool onlineStateTransitions = true;
1578  rc = me->runToCompletion(onlineStateTransitions);
1579  }
1580  catch (cms::Exception& e) {
1581  LogError("FwkJob") << "cms::Exception caught in "
1582  << "EventProcessor::asyncRun"
1583  << "\n"
1584  << e.explainSelf();
1585  me->last_error_text_ = e.explainSelf();
1586  }
1587  catch (std::exception& e) {
1588  LogError("FwkJob") << "Standard library exception caught in "
1589  << "EventProcessor::asyncRun"
1590  << "\n"
1591  << e.what();
1592  me->last_error_text_ = e.what();
1593  }
1594  catch (...) {
1595  LogError("FwkJob") << "Unknown exception caught in "
1596  << "EventProcessor::asyncRun"
1597  << "\n";
1598  me->last_error_text_ = "Unknown exception caught";
1599  rc = epOther;
1600  }
1601 
1602  me->last_rc_ = rc;
1603 
1604  {
1605  // notify anyone waiting for exit that we are doing so now
1606  boost::mutex::scoped_lock sl(me->stop_lock_);
1607  ++me->stop_count_;
1608  me->stopper_.notify_all();
1609  }
1610  FDEBUG(2) << "asyncRun ending ......................\n";
1611  }
virtual std::string explainSelf() const
Definition: Exception.cc:146
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 702 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), edm::convertException::badAllocToEDM(), bk::beginJob(), trackerHits::c, changeState(), edm::convertException::charPtrToEDM(), alignCSCRings::e, cppFunctionSkipper::exception, hasSubProcess(), input_, edm::event_processor::mBeginJob, preg_, alignCSCRings::s, schedule_, serviceToken_, edm::event_processor::sInit, state_, edm::convertException::stdToEDM(), AlCaHLTBitMon_QueryRunRegistry::string, edm::convertException::stringToEDM(), subProcess_, and edm::convertException::unknownToEDM().

Referenced by evf::FUEventProcessor::configuring(), and evf::FUEventProcessor::enabling().

702  {
703  if(state_ != sInit) return;
704  bk::beginJob();
705  // can only be run if in the initial state
707 
708  // StateSentry toerror(this); // should we add this ?
709  //make the services available
711 
712  //NOTE: This implementation assumes 'Job' means one call
713  // the EventProcessor::run
714  // If it really means once per 'application' then this code will
715  // have to be changed.
716  // Also have to deal with case where have 'run' then new Module
717  // added and do 'run'
718  // again. In that case the newly added Module needs its 'beginJob'
719  // to be called.
720 
721  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
722  // For now we delay calling beginOfJob until first beginOfRun
723  //if(looper_) {
724  // looper_->beginOfJob(es);
725  //}
726  try {
727  try {
728  input_->doBeginJob();
729  }
730  catch (cms::Exception& e) { throw; }
731  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
732  catch (std::exception& e) { convertException::stdToEDM(e); }
734  catch(char const* c) { convertException::charPtrToEDM(c); }
735  catch (...) { convertException::unknownToEDM(); }
736  }
737  catch(cms::Exception& ex) {
738  ex.addContext("Calling beginJob for the source");
739  throw;
740  }
741  schedule_->beginJob(*preg_);
742  // toerror.succeeded(); // should we add this?
743  if(hasSubProcess()) subProcess_->doBeginJob();
744  actReg_->postBeginJobSignal_();
745  }
boost::shared_ptr< ActivityRegistry > actReg_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
void stdToEDM(std::exception const &e)
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
std::unique_ptr< InputSource > input_
void addContext(std::string const &context)
Definition: Exception.cc:227
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2021 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

2021  {
2022  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2023  input_->doBeginLumi(lumiPrincipal);
2024 
2026  if(rng.isAvailable()) {
2027  LuminosityBlock lb(lumiPrincipal, ModuleDescription());
2028  rng->preBeginLumi(lb);
2029  }
2030 
2031  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
2032  // lumi blocks know their start and end times why not also start and end events?
2033  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
2034  espController_->eventSetupForInstance(ts);
2035  EventSetup const& es = esp_->eventSetup();
2036  {
2037  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
2038  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2039  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2040  if(hasSubProcess()) {
2041  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2042  }
2043  }
2044  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
2045  if(looper_) {
2046  looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2047  }
2048  }
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1970 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

1970  {
1971  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1972  input_->doBeginRun(runPrincipal);
1973  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1974  runPrincipal.beginTime());
1976  espController_->forceCacheClear();
1977  }
1978  espController_->eventSetupForInstance(ts);
1979  EventSetup const& es = esp_->eventSetup();
1980  if(looper_ && looperBeginJobRun_== false) {
1981  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1982  looper_->beginOfJob(es);
1983  looperBeginJobRun_ = true;
1984  looper_->doStartingNewLoop();
1985  }
1986  {
1987  typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
1988  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1989  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1990  if(hasSubProcess()) {
1991  subProcess_->doBeginRun(runPrincipal, ts);
1992  }
1993  }
1994  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1995  if(looper_) {
1996  looper_->doBeginRun(runPrincipal, es);
1997  }
1998  }
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::changeState ( event_processor::Msg  msg)
private

Definition at line 1491 of file EventProcessor.cc.

References cond::rpcobimon::current, edm::hlt::Exception, FDEBUG, edm::event_processor::mAny, edm::event_processor::sInvalid, and asciidump::table.

Referenced by beginJob(), endJob(), ~EventProcessor(), and edm::event_processor::StateSentry::~StateSentry().

1491  {
1492  // most likely need to serialize access to this routine
1493 
1494  boost::mutex::scoped_lock sl(state_lock_);
1495  State curr = state_;
1496  int rc;
1497  // found if(not end of table) and
1498  // (state == table.state && (msg == table.message || msg == any))
1499  for(rc = 0;
1500  table[rc].current != sInvalid &&
1501  (curr != table[rc].current ||
1502  (curr == table[rc].current &&
1503  msg != table[rc].message && table[rc].message != mAny));
1504  ++rc);
1505 
1506  if(table[rc].current == sInvalid)
1507  throw cms::Exception("BadState")
1508  << "A member function of EventProcessor has been called in an"
1509  << " inappropriate order.\n"
1510  << "Bad transition from " << stateName(curr) << " "
1511  << "using message " << msgName(msg) << "\n"
1512  << "No where to go from here.\n";
1513 
1514  FDEBUG(1) << "changeState: current=" << stateName(curr)
1515  << ", message=" << msgName(msg)
1516  << " -> new=" << stateName(table[rc].final) << "\n";
1517 
1518  state_ = table[rc].final;
1519  }
boost::mutex state_lock_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * msgName(event_processor::Msg m) const
TransEntry table[]
volatile event_processor::State state_
char const * stateName(event_processor::State s) const
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1348 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1348  {
1349  schedule_->clearCounters();
1350  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1865 of file EventProcessor.cc.

References FDEBUG.

1865  {
1866  if (fb_.get() != nullptr) {
1867  input_->closeFile(fb_.get(), cleaningUpAfterException);
1868  }
1869  FDEBUG(1) << "\tcloseInputFile\n";
1870  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::unique_ptr< InputSource > input_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1880 of file EventProcessor.cc.

References FDEBUG.

1880  {
1881  if (fb_.get() != nullptr) {
1882  schedule_->closeOutputFiles();
1883  if(hasSubProcess()) subProcess_->closeOutputFiles();
1884  }
1885  FDEBUG(1) << "\tcloseOutputFiles\n";
1886  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::connectSigs ( EventProcessor ep)
private

Definition at line 1304 of file EventProcessor.cc.

References postProcessEventSignal_, and preProcessEventSignal_.

Referenced by init().

1304  {
1305  // When the FwkImpl signals are given, pass them to the
1306  // appropriate EventProcessor signals so that the outside world
1307  // can see the signal.
1308  actReg_->preProcessEventSignal_.connect(std::cref(ep->preProcessEventSignal_));
1309  actReg_->postProcessEventSignal_.connect(std::cref(ep->postProcessEventSignal_));
1310  }
boost::shared_ptr< ActivityRegistry > actReg_
std::auto_ptr< statemachine::Machine > edm::EventProcessor::createStateMachine ( )
private

Definition at line 1614 of file EventProcessor.cc.

References edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, edm::hlt::Exception, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, statemachine::NOMERGE, and AlCaHLTBitMon_QueryRunRegistry::string.

1614  {
1616  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1617  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1618  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1619  else {
1620  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1621  << fileMode_ << ".\n"
1622  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1623  }
1624 
1625  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1626  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1627  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1628  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1629  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1630  else {
1631  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1632  << emptyRunLumiMode_ << ".\n"
1633  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1634  }
1635 
1636  std::auto_ptr<statemachine::Machine> machine(new statemachine::Machine(this,
1637  fileMode,
1638  emptyRunLumiMode));
1639 
1640  machine->initiate();
1641  return machine;
1642  }
std::string emptyRunLumiMode_
char const * edm::EventProcessor::currentStateName ( ) const

Member functions to support asynchronous interface.

Definition at line 1352 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::microState(), and evf::FWEPWrapper::monitoring().

1352  {
1353  return stateName(getState());
1354  }
event_processor::State getState() const
char const * stateName(event_processor::State s) const
void edm::EventProcessor::declareRunNumber ( RunNumber_t  runNumber)

Definition at line 1392 of file EventProcessor.cc.

References bk::beginJob(), and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1392  {
1393  // inside of beginJob there is a check to see if it has been called before
1394  beginJob();
1396 
1397  // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
1398  //input_->declareRunNumber(runNumber);
1399  }
void changeState(event_processor::Msg)
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2133 of file EventProcessor.cc.

References FDEBUG.

2133  {
2135  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2136  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
2137  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 2121 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

2121  {
2122  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
2123  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
2124  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
2125  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1959 of file EventProcessor.cc.

References FDEBUG.

1959  {
1960  FDEBUG(1) << "\tdoErrorStuff\n";
1961  LogError("StateMachine")
1962  << "The EventProcessor state machine encountered an unexpected event\n"
1963  << "and went to the error state\n"
1964  << "Will attempt to terminate processing normally\n"
1965  << "(IF using the looper the next loop will be attempted)\n"
1966  << "This likely indicates a bug in an input module or corrupted input or both\n";
1968  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::doneAsync ( event_processor::Msg  m)
private

Definition at line 1483 of file EventProcessor.cc.

1483  {
1484  // make sure to include a timeout here so we don't wait forever
1485  // I suspect there are still timing issues with thread startup
1486  // and the setting of the various control variables (stop_count, id_set)
1487  changeState(m);
1488  return waitForAsyncCompletion(60*2);
1489  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1333 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::actionPerformed().

1333  {
1334  schedule_->enableEndPaths(active);
1335  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 748 of file EventProcessor.cc.

References actReg_, trackerHits::c, edm::ExceptionCollector::call(), changeState(), edm::OutputModule::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcess(), edm::ExceptionCollector::hasThrown(), input_, looper_, edm::event_processor::mEndJob, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and subProcess_.

Referenced by evf::FWEPWrapper::init(), and evf::FWEPWrapper::stopAndHalt().

748  {
749  // Collects exceptions, so we don't throw before all operations are performed.
750  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
751 
752  // only allowed to run if state is sIdle, sJobReady, sRunGiven
753  c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
754 
755  //make the services available
757 
758  schedule_->endJob(c);
759  if(hasSubProcess()) {
760  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
761  }
762  c.call(boost::bind(&InputSource::doEndJob, input_.get()));
763  if(looper_) {
764  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
765  }
766  auto actReg = actReg_.get();
767  c.call([actReg](){actReg->postEndJobSignal_();});
768  if(c.hasThrown()) {
769  c.rethrow();
770  }
771  }
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:249
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:74
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2050 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

2050  {
2051  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2052  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2053  //NOTE: Using the max event number for the end of a lumi block is a bad idea
2054  // lumi blocks know their start and end times why not also start and end events?
2055  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
2056  lumiPrincipal.endTime());
2057  espController_->eventSetupForInstance(ts);
2058  EventSetup const& es = esp_->eventSetup();
2059  {
2060  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
2061  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2062  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2063  if(hasSubProcess()) {
2064  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2065  }
2066  }
2067  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
2068  if(looper_) {
2069  looper_->doEndLuminosityBlock(lumiPrincipal, es);
2070  }
2071  }
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1930 of file EventProcessor.cc.

References FDEBUG, and ntuplemaker::status.

1930  {
1931  if(looper_) {
1932  ModuleChanger changer(schedule_.get());
1933  looper_->setModuleChanger(&changer);
1934  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1935  looper_->setModuleChanger(nullptr);
1936  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1937  else return false;
1938  }
1939  FDEBUG(1) << "\tendOfLoop\n";
1940  return true;
1941  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1338 of file EventProcessor.cc.

1338  {
1339  return schedule_->endPathsEnabled();
1340  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2000 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

2000  {
2001  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2002  input_->doEndRun(runPrincipal, cleaningUpAfterException);
2004  runPrincipal.endTime());
2005  espController_->eventSetupForInstance(ts);
2006  EventSetup const& es = esp_->eventSetup();
2007  {
2008  typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
2009  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2010  schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2011  if(hasSubProcess()) {
2012  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2013  }
2014  }
2015  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
2016  if(looper_) {
2017  looper_->doEndRun(runPrincipal, es);
2018  }
2019  }
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::errorState ( )
private

Definition at line 1478 of file EventProcessor.cc.

References edm::event_processor::sError.

1478  {
1479  state_ = sError;
1480  }
volatile event_processor::State state_
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 1003 of file EventProcessor.cc.

References bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, edm::hlt::Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, or, pipe::pipe(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

1003  {
1004 
1005  if(0 == numberOfForkedChildren_) {return true;}
1006  assert(0<numberOfForkedChildren_);
1007  //do what we want done in common
1008  {
1009  beginJob(); //make sure this was run
1010  // make the services available
1012 
1013  InputSource::ItemType itemType;
1014  itemType = input_->nextItemType();
1015 
1016  assert(itemType == InputSource::IsFile);
1017  {
1018  readFile();
1019  }
1020  itemType = input_->nextItemType();
1021  assert(itemType == InputSource::IsRun);
1022 
1023  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
1024  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
1025  input_->runAuxiliary()->beginTime());
1026  espController_->eventSetupForInstance(ts);
1027  EventSetup const& es = esp_->eventSetup();
1028 
1029  //now get all the data available in the EventSetup
1030  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1031  es.fillAvailableRecordKeys(recordKeys);
1032  std::vector<eventsetup::DataKey> dataKeys;
1033  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1034  itKey != itEnd;
1035  ++itKey) {
1036  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
1037  //see if this is on our exclusion list
1038  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
1039  ExcludedData const* excludedData(nullptr);
1040  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1041  excludedData = &(itExcludeRec->second);
1042  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
1043  //skip all items in this record
1044  continue;
1045  }
1046  }
1047  if(0 != recordPtr) {
1048  dataKeys.clear();
1049  recordPtr->fillRegisteredDataKeys(dataKeys);
1050  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1051  itDataKey != itDataKeyEnd;
1052  ++itDataKey) {
1053  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1054  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1055  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1056  continue;
1057  }
1058  try {
1059  recordPtr->doGet(*itDataKey);
1060  } catch(cms::Exception& e) {
1061  LogWarning("ForkingEventSetupPreFetching") << e.what();
1062  }
1063  }
1064  }
1065  }
1066  }
1067  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
1068  {
1069  // make the services available
1071  Service<JobReport> jobReport;
1072  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1073 
1074  //Now actually do the forking
1075  actReg_->preForkReleaseResourcesSignal_();
1076  input_->doPreForkReleaseResources();
1077  schedule_->preForkReleaseResources();
1078  }
1079  installCustomHandler(SIGCHLD, ep_sigchld);
1080 
1081 
1082  unsigned int childIndex = 0;
1083  unsigned int const kMaxChildren = numberOfForkedChildren_;
1084  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1085  std::vector<pid_t> childrenIds;
1086  childrenIds.reserve(kMaxChildren);
1087  std::vector<int> childrenSockets;
1088  childrenSockets.reserve(kMaxChildren);
1089  std::vector<int> childrenPipes;
1090  childrenPipes.reserve(kMaxChildren);
1091  std::vector<int> childrenSocketsCopy;
1092  childrenSocketsCopy.reserve(kMaxChildren);
1093  std::vector<int> childrenPipesCopy;
1094  childrenPipesCopy.reserve(kMaxChildren);
1095  int pipes[] {0, 0};
1096 
1097  {
1098  // make the services available
1100  Service<JobReport> jobReport;
1101  int sockets[2], fd_flags;
1102  for(; childIndex < kMaxChildren; ++childIndex) {
1103  // Create a UNIX_DGRAM socket pair
1104  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1105  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1106  exit(EXIT_FAILURE);
1107  }
1108  if (pipe(pipes)) {
1109  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1110  exit(EXIT_FAILURE);
1111  }
1112  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1113  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1114  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1115  exit(EXIT_FAILURE);
1116  }
1117  // Mark socket as non-block. Child must be careful to do select prior
1118  // to reading from socket.
1119  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1120  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1121  exit(EXIT_FAILURE);
1122  }
1123  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1124  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1125  exit(EXIT_FAILURE);
1126  }
1127  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1128  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1129  exit(EXIT_FAILURE);
1130  }
1131  // Linux man page notes there are some edge cases where reading from a
1132  // fd can block, even after a select.
1133  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1134  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1135  exit(EXIT_FAILURE);
1136  }
1137  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1138  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1139  exit(EXIT_FAILURE);
1140  }
1141 
1142  childrenPipesCopy = childrenPipes;
1143  childrenSocketsCopy = childrenSockets;
1144 
1145  pid_t value = fork();
1146  if(value == 0) {
1147  // Close the parent's side of the socket and pipe which will talk to us.
1148  close(pipes[0]);
1149  close(sockets[0]);
1150  // Close our copies of the parent's other communication pipes.
1151  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1152  close(*it);
1153  }
1154  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1155  close(*it);
1156  }
1157 
1158  // this is the child process, redirect stdout and stderr to a log file
1159  fflush(stdout);
1160  fflush(stderr);
1161  std::stringstream stout;
1162  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1163  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1164  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1165  }
1166  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1167  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1168  }
1169 
1170  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1171  if(setCpuAffinity_) {
1172  // CPU affinity is handled differently on macosx.
1173  // We disable it and print a message until someone reads:
1174  //
1175  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1176  //
1177  // and implements it.
1178 #ifdef __APPLE__
1179  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1180 #else
1181  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1182  cpu_set_t mask;
1183  CPU_ZERO(&mask);
1184  CPU_SET(childIndex, &mask);
1185  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1186  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1187  exit(-1);
1188  }
1189 #endif
1190  }
1191  break;
1192  } else {
1193  //this is the parent
1194  close(pipes[1]);
1195  close(sockets[1]);
1196  }
1197  if(value < 0) {
1198  LogError("ForkingChild") << "failed to create a child";
1199  exit(-1);
1200  }
1201  childrenIds.push_back(value);
1202  childrenSockets.push_back(sockets[0]);
1203  childrenPipes.push_back(pipes[0]);
1204  }
1205 
1206  if(childIndex < kMaxChildren) {
1207  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1208  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1209 
1210  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1211  input_->doPostForkReacquireResources(receiver);
1212  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1213  //NOTE: sources have to reset themselves by listening to the post fork message
1214  //rewindInput();
1215  return true;
1216  }
1217  jobReport->parentAfterFork(jobReportFile);
1218  }
1219 
1220  //this is the original, which is now the master for all the children
1221 
1222  //Need to wait for signals from the children or externally
1223  // To wait we must
1224  // 1) block the signals we want to wait on so we do not have a race condition
1225  // 2) check that we haven't already meet our ending criteria
1226  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1227  sigset_t blockingSigSet;
1228  sigset_t unblockingSigSet;
1229  sigset_t oldSigSet;
1230  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1231  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1232  sigaddset(&blockingSigSet, SIGCHLD);
1233  sigaddset(&blockingSigSet, SIGUSR2);
1234  sigaddset(&blockingSigSet, SIGINT);
1235  sigdelset(&unblockingSigSet, SIGCHLD);
1236  sigdelset(&unblockingSigSet, SIGUSR2);
1237  sigdelset(&unblockingSigSet, SIGINT);
1238  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1239 
1240  // If there are too many fd's (unlikely, but possible) for select, denote this
1241  // because the sender will fail.
1242  bool too_many_fds = false;
1243  if (pipes[1]+1 > FD_SETSIZE) {
1244  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1245  too_many_fds = true;
1246  }
1247 
1248  //create a thread that sends the units of work to workers
1249  // we create it after all signals were blocked so that this
1250  // thread is never interupted by a signal
1251  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1252  boost::thread senderThread(sender);
1253 
1254  if(not too_many_fds) {
1255  //NOTE: a child could have failed before we got here and even after this call
1256  // which is why the 'if' is conditional on continueAfterChildFailure_
1258  while(!shutdown_flag && (!child_failed or continueAfterChildFailure_) && (childrenIds.size() != num_children_done)) {
1259  sigsuspend(&unblockingSigSet);
1261  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1262  }
1263  }
1264  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1265 
1266  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1267  if(child_failed) {
1268  LogError("ForkingStopping") << "child failed";
1269  }
1270  if(shutdown_flag) {
1271  LogSystem("ForkingStopping") << "asked to shutdown";
1272  }
1273 
1274  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1275  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1276  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1277  it != itEnd; ++it) {
1278  /* int result = */ kill(*it, SIGUSR2);
1279  }
1280  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1281  while(num_children_done != kMaxChildren) {
1282  sigsuspend(&unblockingSigSet);
1283  }
1284  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1285  }
1286  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1287  senderThread.join();
1288  if(child_failed && !continueAfterChildFailure_) {
1289  if (child_fail_signal) {
1290  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1291  } else if (child_fail_exit_status) {
1292  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1293  } else {
1294  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1295  }
1296  }
1297  if(too_many_fds) {
1298  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1299  }
1300  return false;
1301  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void possiblyContinueAfterForkChildFailure()
def pipe
Definition: pipe.py:5
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
volatile bool shutdown_flag
#define O_NONBLOCK
Definition: SysFile.h:21
std::unique_ptr< eventsetup::EventSetupsController > espController_
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1313 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::init().

1313  {
1314  return schedule_->getAllModuleDescriptions();
1315  }
std::auto_ptr< Schedule > schedule_
State edm::EventProcessor::getState ( ) const
ServiceToken edm::EventProcessor::getToken ( )
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1343 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::init(), and evf::FWEPWrapper::taskWebPage().

1343  {
1344  schedule_->getTriggerReport(rep);
1345  }
string rep
Definition: cuy.py:1188
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::hasSubProcess ( ) const
inlineprivate

Definition at line 313 of file EventProcessor.h.

References subProcess_.

Referenced by beginJob(), and endJob().

313  {
314  return subProcess_.get() != 0;
315  }
std::auto_ptr< SubProcess > subProcess_
void edm::EventProcessor::init ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 586 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::ScheduleItems::branchIDListHelper_, branchIDListHelper_, connectSigs(), continueAfterChildFailure_, emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameterSet(), historyAppender_, edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::PrincipalCache::insert(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, edm::popSubProcessParameterSet(), edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, schedule_, serviceToken_, setCpuAffinity_, edm::IllegalParameters::setThrowAnException(), AlCaHLTBitMon_QueryRunRegistry::string, and subProcess_.

Referenced by EventProcessor().

588  {
589 
590  //std::cerr << processDesc->dump() << std::endl;
591 
592  ROOT::Cintex::Cintex::Enable();
593 
594  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
595  //std::cerr << parameterSet->dump() << std::endl;
596 
597  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
598  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
599 
600  // Now set some parameters specific to the main process.
601  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
602  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
603  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
604  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
605  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
606  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
607  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
608  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
609  continueAfterChildFailure_ = forking.getUntrackedParameter<bool>("continueAfterChildFailure",false);
610  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
611  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
612  itPS != itPSEnd;
613  ++itPS) {
614  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
615  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
616  itPS->getUntrackedParameter<std::string>("label", "")));
617  }
618  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
619 
620  // Now do general initialization
621  ScheduleItems items;
622 
623  //initialize the services
624  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
625  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
626  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
627 
628  //make the services available
630 
631  // intialize miscellaneous items
632  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
633 
634  // intialize the event setup provider
635  esp_ = espController_->makeProvider(*parameterSet);
636 
637  // initialize the looper, if any
638  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
639  if(looper_) {
640  looper_->setActionTable(items.act_table_.get());
641  looper_->attachTo(*items.actReg_);
642  }
643 
644  // initialize the input source
645  input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_);
646 
647  // intialize the Schedule
648  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
649 
650  // set the data members
651  act_table_ = std::move(items.act_table_);
652  actReg_ = items.actReg_;
653  preg_.reset(items.preg_.release());
654  branchIDListHelper_ = items.branchIDListHelper_;
655  processConfiguration_ = items.processConfiguration_;
656 
657  FDEBUG(2) << parameterSet << std::endl;
658  connectSigs(this);
659 
660  // Reusable event principal
661  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get()));
663 
664  // initialize the subprocess, if there is one
665  if(subProcessParameterSet) {
666  subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
667  }
668  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:378
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
void insert(boost::shared_ptr< RunPrincipal > rp)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
std::unique_ptr< HistoryAppender > historyAppender_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static void setThrowAnException(bool v)
void connectSigs(EventProcessor *ep)
std::unique_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, boost::shared_ptr< BranchIDListHelper > branchIDListHelper, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration const > processConfiguration)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
Definition: HCMethods.h:49
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
std::unique_ptr< ActionTable const > act_table_
std::unique_ptr< eventsetup::EventSetupsController > espController_
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
char const * edm::EventProcessor::msgName ( event_processor::Msg  m) const

Definition at line 1360 of file EventProcessor.cc.

References m.

1360  {
1361  return msgNames[m];
1362  }
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1872 of file EventProcessor.cc.

References FDEBUG.

1872  {
1873  if (fb_.get() != nullptr) {
1874  schedule_->openOutputFiles(*fb_);
1875  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1876  }
1877  FDEBUG(1) << "\topenOutputFiles\n";
1878  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor& edm::EventProcessor::operator= ( EventProcessor const &  )
delete
void edm::EventProcessor::possiblyContinueAfterForkChildFailure ( )
private

Definition at line 987 of file EventProcessor.cc.

987  {
988  if(child_failed && continueAfterChildFailure_) {
989  if (child_fail_signal) {
990  LogSystem("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
991  child_fail_signal=0;
992  } else if (child_fail_exit_status) {
993  LogSystem("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
994  child_fail_exit_status=0;
995  } else {
996  LogSystem("ForkedChildFailed") << "child process ended abnormally for unknown reason";
997  }
998  child_failed =false;
999  }
1000  }
ActivityRegistry::PostProcessEvent& edm::EventProcessor::postProcessEventSignal ( )
inline

signal is emitted after all modules have finished processing the Event

Definition at line 208 of file EventProcessor.h.

References postProcessEventSignal_.

208 {return postProcessEventSignal_;}
ActivityRegistry::PostProcessEvent postProcessEventSignal_
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1949 of file EventProcessor.cc.

References FDEBUG.

1949  {
1950  looper_->prepareForNextLoop(esp_.get());
1951  FDEBUG(1) << "\tprepareForNextLoop\n";
1952  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ActivityRegistry::PreProcessEvent& edm::EventProcessor::preProcessEventSignal ( )
inline

signal is emitted after the Event has been created by the InputSource but before any modules have seen the Event

Definition at line 203 of file EventProcessor.h.

References preProcessEventSignal_.

203 {return preProcessEventSignal_;}
ActivityRegistry::PreProcessEvent preProcessEventSignal_
int edm::EventProcessor::readAndCacheLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2090 of file EventProcessor.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

2090  {
2093  << "EventProcessor::readAndCacheRun\n"
2094  << "Illegal attempt to insert lumi into cache\n"
2095  << "Contact a Framework Developer\n";
2096  }
2099  << "EventProcessor::readAndCacheRun\n"
2100  << "Illegal attempt to insert lumi into cache\n"
2101  << "Run is invalid\n"
2102  << "Contact a Framework Developer\n";
2103  }
2104  principalCache_.insert(input_->readAndCacheLumi(*historyAppender_));
2106  return input_->luminosityBlock();
2107  }
bool hasRunPrincipal() const
void insert(boost::shared_ptr< RunPrincipal > rp)
boost::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::unique_ptr< HistoryAppender > historyAppender_
bool hasLumiPrincipal() const
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndCacheRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2073 of file EventProcessor.cc.

References edm::hlt::Exception, edm::errors::LogicError, and PDRates::Run.

2073  {
2076  << "EventProcessor::readAndCacheRun\n"
2077  << "Illegal attempt to insert run into cache\n"
2078  << "Contact a Framework Developer\n";
2079  }
2080  principalCache_.insert(input_->readAndCacheRun(*historyAppender_));
2081  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2082  }
bool hasRunPrincipal() const
void insert(boost::shared_ptr< RunPrincipal > rp)
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
int edm::EventProcessor::readAndMergeLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2109 of file EventProcessor.cc.

2109  {
2110  principalCache_.merge(input_->luminosityBlockAuxiliary(), preg_);
2111  input_->readAndMergeLumi(principalCache_.lumiPrincipalPtr());
2112  return input_->luminosityBlock();
2113  }
void merge(boost::shared_ptr< RunAuxiliary > aux, boost::shared_ptr< ProductRegistry const > reg)
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
statemachine::Run edm::EventProcessor::readAndMergeRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2084 of file EventProcessor.cc.

References PDRates::Run.

2084  {
2085  principalCache_.merge(input_->runAuxiliary(), preg_);
2086  input_->readAndMergeRun(principalCache_.runPrincipalPtr());
2087  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2088  }
void merge(boost::shared_ptr< RunAuxiliary > aux, boost::shared_ptr< ProductRegistry const > reg)
boost::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
PrincipalCache principalCache_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2139 of file EventProcessor.cc.

References edm::EventPrincipal::clearEventPrincipal(), FDEBUG, edm::EventPrincipal::id(), edm::ProcessingController::lastOperationSucceeded(), edm::EventPrincipal::luminosityBlock(), edm::EventPrincipal::luminosityBlockPrincipalPtrValid(), edm::ProcessingController::requestedTransition(), edm::EventPrincipal::run(), edm::ProcessingController::setLastOperationSucceeded(), edm::EventPrincipal::setLuminosityBlockPrincipal(), edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, summarizeEdmComparisonLogfiles::succeeded, and edm::EventPrincipal::time().

2139  {
2140  EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal());
2141  FDEBUG(1) << "\treadEvent\n";
2142  assert(pep != 0);
2143  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
2144  assert(pep->luminosityBlockPrincipalPtrValid());
2145  assert(principalCache_.lumiPrincipalPtr()->run() == pep->run());
2146  assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock());
2147 
2148  IOVSyncValue ts(pep->id(), pep->time());
2149  espController_->eventSetupForInstance(ts);
2150  EventSetup const& es = esp_->eventSetup();
2151  {
2152  typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
2153  ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2154  schedule_->processOneOccurrence<Traits>(*pep, es);
2155  if(hasSubProcess()) {
2156  subProcess_->doEvent(*pep, ts);
2157  }
2158  }
2159 
2160  if(looper_) {
2161  bool randomAccess = input_->randomAccess();
2162  ProcessingController::ForwardState forwardState = input_->forwardState();
2163  ProcessingController::ReverseState reverseState = input_->reverseState();
2164  ProcessingController pc(forwardState, reverseState, randomAccess);
2165 
2167  do {
2168  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2169 
2170  bool succeeded = true;
2171  if(randomAccess) {
2172  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2173  input_->skipEvents(-2);
2174  }
2175  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2176  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2177  }
2178  }
2179  pc.setLastOperationSucceeded(succeeded);
2180  } while(!pc.lastOperationSucceeded());
2181  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2182 
2183  }
2184 
2185  FDEBUG(1) << "\tprocessEvent\n";
2186  pep->clearEventPrincipal();
2187  }
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
std::auto_ptr< Schedule > schedule_
std::unique_ptr< InputSource > input_
EventPrincipal & eventPrincipal() const
std::auto_ptr< SubProcess > subProcess_
tuple status
Definition: ntuplemaker.py:245
std::unique_ptr< eventsetup::EventSetupsController > espController_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1852 of file EventProcessor.cc.

References FDEBUG, and findQualityFiles::size.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1852  {
1853  FDEBUG(1) << " \treadFile\n";
1854  size_t size = preg_->size();
1855  fb_ = input_->readFile();
1856  if(size < preg_->size()) {
1858  }
1860  if(numberOfForkedChildren_ > 0) {
1861  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1862  }
1863  }
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void adjustEventToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
std::unique_ptr< FileBlock > fb_
boost::shared_ptr< ProductRegistry const > preg_
std::unique_ptr< InputSource > input_
tuple size
Write out results.
PrincipalCache principalCache_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1896 of file EventProcessor.cc.

References FDEBUG.

1896  {
1897  if (fb_.get() != nullptr) {
1898  schedule_->respondToCloseInputFile(*fb_);
1899  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1900  }
1901  FDEBUG(1) << "\trespondToCloseInputFile\n";
1902  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToCloseOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1912 of file EventProcessor.cc.

References FDEBUG.

1912  {
1913  if (fb_.get() != nullptr) {
1914  schedule_->respondToCloseOutputFiles(*fb_);
1915  if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1916  }
1917  FDEBUG(1) << "\trespondToCloseOutputFiles\n";
1918  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1888 of file EventProcessor.cc.

References FDEBUG.

1888  {
1889  if (fb_.get() != nullptr) {
1890  schedule_->respondToOpenInputFile(*fb_);
1891  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1892  }
1893  FDEBUG(1) << "\trespondToOpenInputFile\n";
1894  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1904 of file EventProcessor.cc.

References FDEBUG.

1904  {
1905  if (fb_.get() != nullptr) {
1906  schedule_->respondToOpenOutputFiles(*fb_);
1907  if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1908  }
1909  FDEBUG(1) << "\trespondToOpenOutputFiles\n";
1910  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< FileBlock > fb_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1943 of file EventProcessor.cc.

References FDEBUG.

1943  {
1944  input_->repeat();
1945  input_->rewind();
1946  FDEBUG(1) << "\trewind\n";
1947  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::unique_ptr< InputSource > input_
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 384 of file EventProcessor.h.

References runToCompletion().

Referenced by Types.LuminosityBlockID::cppID().

384  {
385  return runToCompletion(false);
386  }
virtual StatusCode runToCompletion(bool onlineStateTransitions)
void edm::EventProcessor::runAsync ( )

Definition at line 1521 of file EventProcessor.cc.

References bk::beginJob(), edm::hlt::Exception, edm::event_processor::mRunAsync, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1521  {
1522  beginJob();
1523  {
1524  boost::mutex::scoped_lock sl(stop_lock_);
1525  if(id_set_ == true) {
1526  std::string err("runAsync called while async event loop already running\n");
1527  LogError("FwkJob") << err;
1528  throw cms::Exception("BadState") << err;
1529  }
1530 
1532 
1533  stop_count_ = 0;
1534  last_rc_ = epSuccess; // forget the last value!
1535  event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
1536  boost::xtime timeout;
1537 #if BOOST_VERSION >= 105000
1538  boost::xtime_get(&timeout, boost::TIME_UTC_);
1539 #else
1540  boost::xtime_get(&timeout, boost::TIME_UTC);
1541 #endif
1542  timeout.sec += 60; // 60 seconds to start!!!!
1543  if(starter_.timed_wait(sl, timeout) == false) {
1544  // yikes - the thread did not start
1545  throw cms::Exception("BadState")
1546  << "Async run thread did not start in 60 seconds\n";
1547  }
1548  }
1549  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::mutex stop_lock_
static void asyncRun(EventProcessor *)
volatile Status last_rc_
void changeState(event_processor::Msg)
boost::condition starter_
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( bool  onlineStateTransitions)
virtual

Implements edm::IEventProcessor.

Definition at line 1646 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), edm::convertException::badAllocToEDM(), bk::beginJob(), trackerHits::c, edm::convertException::charPtrToEDM(), alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, FDEBUG, edm::errors::LogicError, edm::event_processor::mFinished, edm::event_processor::mInputExhausted, cfg-viewer::more, edm::event_processor::mRunCount, edm::event_processor::mShutdownSignal, runEdmFileComparison::returnCode, alignCSCRings::s, edm::shutdown_flag, findQualityFiles::size, edm::event_processor::sShuttingDown, edm::event_processor::sStopping, edm::convertException::stdToEDM(), AlCaHLTBitMon_QueryRunRegistry::string, edm::convertException::stringToEDM(), edm::event_processor::StateSentry::succeeded(), edm::convertException::unknownToEDM(), and edm::usr2_lock.

Referenced by asyncRun(), and run().

1646  {
1647 
1648  StateSentry toerror(this);
1649 
1651  std::auto_ptr<statemachine::Machine> machine;
1652  {
1653  beginJob(); //make sure this was called
1654 
1655  if(!onlineStateTransitions) changeState(mRunCount);
1656 
1657  //StatusCode returnCode = epSuccess;
1659 
1660  // make the services available
1662 
1663  machine = createStateMachine();
1664  try {
1665  try {
1666 
1667  InputSource::ItemType itemType;
1668 
1669  while(true) {
1670 
1671  bool more = true;
1672  if(numberOfForkedChildren_ > 0) {
1673  size_t size = preg_->size();
1674  more = input_->skipForForking();
1675  if(more) {
1676  if(size < preg_->size()) {
1678  }
1680  }
1681  }
1682  itemType = (more ? input_->nextItemType() : InputSource::IsStop);
1683 
1684  FDEBUG(1) << "itemType = " << itemType << "\n";
1685 
1686  // These are used for asynchronous running only and
1687  // and are checking to see if stopAsync or shutdownAsync
1688  // were called from another thread. In the future, we
1689  // may need to do something better than polling the state.
1690  // With the current code this is the simplest thing and
1691  // it should always work. If the interaction between
1692  // threads becomes more complex this may cause problems.
1693  if(state_ == sStopping) {
1694  FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
1695  forceLooperToEnd_ = true;
1696  machine->process_event(statemachine::Stop());
1697  forceLooperToEnd_ = false;
1698  break;
1699  }
1700  else if(state_ == sShuttingDown) {
1701  FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
1702  forceLooperToEnd_ = true;
1703  machine->process_event(statemachine::Stop());
1704  forceLooperToEnd_ = false;
1705  break;
1706  }
1707 
1708  // Look for a shutdown signal
1709  {
1710  boost::mutex::scoped_lock sl(usr2_lock);
1711  if(shutdown_flag) {
1713  returnCode = epSignal;
1714  forceLooperToEnd_ = true;
1715  machine->process_event(statemachine::Stop());
1716  forceLooperToEnd_ = false;
1717  break;
1718  }
1719  }
1720 
1721  if(itemType == InputSource::IsStop) {
1722  machine->process_event(statemachine::Stop());
1723  }
1724  else if(itemType == InputSource::IsFile) {
1725  machine->process_event(statemachine::File());
1726  }
1727  else if(itemType == InputSource::IsRun) {
1728  machine->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1729  }
1730  else if(itemType == InputSource::IsLumi) {
1731  machine->process_event(statemachine::Lumi(input_->luminosityBlock()));
1732  }
1733  else if(itemType == InputSource::IsEvent) {
1734  machine->process_event(statemachine::Event());
1735  }
1736  // This should be impossible
1737  else {
1739  << "Unknown next item type passed to EventProcessor\n"
1740  << "Please report this error to the Framework group\n";
1741  }
1742 
1743  if(machine->terminated()) {
1745  break;
1746  }
1747  } // End of loop over state machine events
1748  } // Try block
1749  catch (cms::Exception& e) { throw; }
1750  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
1751  catch (std::exception& e) { convertException::stdToEDM(e); }
1753  catch(char const* c) { convertException::charPtrToEDM(c); }
1754  catch (...) { convertException::unknownToEDM(); }
1755  } // Try block
1756  // Some comments on exception handling related to the boost state machine:
1757  //
1758  // Some states used in the machine are special because they
1759  // perform actions while the machine is being terminated, actions
1760  // such as close files, call endRun, call endLumi etc ... Each of these
1761  // states has two functions that perform these actions. The functions
1762  // are almost identical. The major difference is that one version
1763  // catches all exceptions and the other lets exceptions pass through.
1764  // The destructor catches them and the other function named "exit" lets
1765  // them pass through. On a normal termination, boost will always call
1766  // "exit" and then the state destructor. In our state classes, the
1767  // the destructors do nothing if the exit function already took
1768  // care of things. Here's the interesting part. When boost is
1769  // handling an exception the "exit" function is not called (a boost
1770  // feature).
1771  //
1772  // If an exception occurs while the boost machine is in control
1773  // (which usually means inside a process_event call), then
1774  // the boost state machine destroys its states and "terminates" itself.
1775  // This already done before we hit the catch blocks below. In this case
1776  // the call to terminateMachine below only destroys an already
1777  // terminated state machine. Because exit is not called, the state destructors
1778  // handle cleaning up lumis, runs, and files. The destructors swallow
1779  // all exceptions and only pass through the exceptions messages, which
1780  // are tacked onto the original exception below.
1781  //
1782  // If an exception occurs when the boost state machine is not
1783  // in control (outside the process_event functions), then boost
1784  // cannot destroy its own states. The terminateMachine function
1785  // below takes care of that. The flag "alreadyHandlingException"
1786  // is set true so that the state exit functions do nothing (and
1787  // cannot throw more exceptions while handling the first). Then the
1788  // state destructors take care of this because exit did nothing.
1789  //
1790  // In both cases above, the EventProcessor::endOfLoop function is
1791  // not called because it can throw exceptions.
1792  //
1793  // One tricky aspect of the state machine is that things that can
1794  // throw should not be invoked by the state machine while another
1795  // exception is being handled.
1796  // Another tricky aspect is that it appears to be important to
1797  // terminate the state machine before invoking its destructor.
1798  // We've seen crashes that are not understood when that is not
1799  // done. Maintainers of this code should be careful about this.
1800 
1801  catch (cms::Exception & e) {
1803  terminateMachine(machine);
1804  alreadyHandlingException_ = false;
1805  if (!exceptionMessageLumis_.empty()) {
1807  if (e.alreadyPrinted()) {
1808  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1809  }
1810  }
1811  if (!exceptionMessageRuns_.empty()) {
1813  if (e.alreadyPrinted()) {
1814  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1815  }
1816  }
1817  if (!exceptionMessageFiles_.empty()) {
1819  if (e.alreadyPrinted()) {
1820  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1821  }
1822  }
1823  throw;
1824  }
1825 
1826  if(machine->terminated()) {
1827  FDEBUG(1) << "The state machine reports it has been terminated\n";
1828  machine.reset();
1829  }
1830 
1831  if(!onlineStateTransitions) changeState(mFinished);
1832 
1834  throw cms::Exception("BadState")
1835  << "The boost state machine in the EventProcessor exited after\n"
1836  << "entering the Error state.\n";
1837  }
1838 
1839  }
1840  if(machine.get() != 0) {
1841  terminateMachine(machine);
1843  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1844  << "Please report this error to the Framework group\n";
1845  }
1846 
1847  toerror.succeeded();
1848 
1849  return returnCode;
1850  }
std::auto_ptr< statemachine::Machine > createStateMachine()
void adjustIndexesAfterProductRegistryAddition()
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
void adjustEventToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
ServiceToken serviceToken_
std::string exceptionMessageLumis_
void stdToEDM(std::exception const &e)
volatile event_processor::State state_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
std::unique_ptr< InputSource > input_
volatile bool shutdown_flag
tuple size
Write out results.
boost::mutex usr2_lock
PrincipalCache principalCache_
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2195 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2195  {
2197  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2203 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2203  {
2205  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2199 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2199  {
2201  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setRunNumber ( RunNumber_t  runNumber)

Definition at line 1375 of file EventProcessor.cc.

References bk::beginJob(), and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1375  {
1376  if(runNumber == 0) {
1377  runNumber = 1;
1378  LogWarning("Invalid Run")
1379  << "EventProcessor::setRunNumber was called with an invalid run number (nullptr)\n"
1380  << "Run number was set to 1 instead\n";
1381  }
1382 
1383  // inside of beginJob there is a check to see if it has been called before
1384  beginJob();
1386 
1387  // interface not correct yet
1388  input_->setRunNumber(runNumber);
1389  }
void changeState(event_processor::Msg)
std::unique_ptr< InputSource > input_
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1954 of file EventProcessor.cc.

References FDEBUG.

1954  {
1955  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1956  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1957  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2189 of file EventProcessor.cc.

References FDEBUG.

2189  {
2190  FDEBUG(1) << "\tshouldWeStop\n";
2191  if(shouldWeStop_) return true;
2192  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2193  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor::StatusCode edm::EventProcessor::shutdownAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1470 of file EventProcessor.cc.

References edm::event_processor::mFinished, and edm::event_processor::mShutdownAsync.

1470  {
1473  if(rc != epTimedOut) changeState(mFinished);
1474  else errorState();
1475  return rc;
1476  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1920 of file EventProcessor.cc.

References FDEBUG.

1920  {
1921  shouldWeStop_ = false;
1922  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1923  // until after we've called beginOfJob
1924  if(looper_ && looperBeginJobRun_) {
1925  looper_->doStartingNewLoop();
1926  }
1927  FDEBUG(1) << "\tstartingNewLoop\n";
1928  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * edm::EventProcessor::stateName ( event_processor::State  s) const
EventProcessor::StatusCode edm::EventProcessor::statusAsync ( ) const

Definition at line 1368 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1368  {
1369  // the thread will record exception/error status in the event processor
1370  // for us to look at and report here
1371  return last_rc_;
1372  }
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::stopAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1462 of file EventProcessor.cc.

References edm::event_processor::mFinished, and edm::event_processor::mStopAsync.

1462  {
1465  if(rc != epTimedOut) changeState(mFinished);
1466  else errorState();
1467  return rc;
1468  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::terminateMachine ( std::auto_ptr< statemachine::Machine > &  iMachine)
private

Definition at line 2211 of file EventProcessor.cc.

References FDEBUG.

2211  {
2212  if(iMachine.get() != 0) {
2213  if(!iMachine->terminated()) {
2214  forceLooperToEnd_ = true;
2215  iMachine->process_event(statemachine::Stop());
2216  forceLooperToEnd_ = false;
2217  }
2218  else {
2219  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2220  }
2221  if(iMachine->terminated()) {
2222  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2223  }
2224  iMachine.reset();
2225  }
2226  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1318 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1318  {
1319  return schedule_->totalEvents();
1320  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1328 of file EventProcessor.cc.

1328  {
1329  return schedule_->totalEventsFailed();
1330  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1323 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1323  {
1324  return schedule_->totalEventsPassed();
1325  }
std::auto_ptr< Schedule > schedule_
EventProcessor::StatusCode edm::EventProcessor::waitForAsyncCompletion ( unsigned int  timeout_seconds)
private

Definition at line 1402 of file EventProcessor.cc.

1402  {
1403  bool rc = true;
1404  boost::xtime timeout;
1405 
1406 #if BOOST_VERSION >= 105000
1407  boost::xtime_get(&timeout, boost::TIME_UTC_);
1408 #else
1409  boost::xtime_get(&timeout, boost::TIME_UTC);
1410 #endif
1411  timeout.sec += timeout_seconds;
1412 
1413  // make sure to include a timeout here so we don't wait forever
1414  // I suspect there are still timing issues with thread startup
1415  // and the setting of the various control variables (stop_count, id_set)
1416  {
1417  boost::mutex::scoped_lock sl(stop_lock_);
1418 
1419  // look here - if runAsync not active, just return the last return code
1420  if(stop_count_ < 0) return last_rc_;
1421 
1422  if(timeout_seconds == 0) {
1423  while(stop_count_ == 0) stopper_.wait(sl);
1424  } else {
1425  while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
1426  }
1427 
1428  if(rc == false) {
1429  // timeout occurred
1430  // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
1431  // this is a temporary hack until we get the input source
1432  // upgraded to allow blocking input sources to be unblocked
1433 
1434  // the next line is dangerous and causes all sorts of trouble
1435  if(id_set_) pthread_cancel(event_loop_id_);
1436 
1437  // we will not do anything yet
1438  LogWarning("timeout")
1439  << "An asynchronous request was made to shut down "
1440  << "the event loop "
1441  << "and the event loop did not shutdown after "
1442  << timeout_seconds << " seconds\n";
1443  } else {
1444  event_loop_->join();
1445  event_loop_.reset();
1446  id_set_ = false;
1447  stop_count_ = -1;
1448  }
1449  }
1450  return rc == false ? epTimedOut : last_rc_;
1451  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::condition stopper_
boost::mutex stop_lock_
volatile pthread_t event_loop_id_
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::waitTillDoneAsync ( unsigned int  timeout_seconds = 0)

Definition at line 1454 of file EventProcessor.cc.

References edm::event_processor::mCountComplete.

Referenced by evf::FWEPWrapper::stop().

1454  {
1455  StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1457  else errorState();
1458  return rc;
1459  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
RunNumber_t  run,
LuminosityBlockNumber_t  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2127 of file EventProcessor.cc.

References FDEBUG.

2127  {
2128  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2129  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2130  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
2131  }
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 2115 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

2115  {
2116  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
2117  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
2118  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
2119  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class event_processor::StateSentry
friend

Definition at line 377 of file EventProcessor.h.

Member Data Documentation

std::unique_ptr<ActionTable const> edm::EventProcessor::act_table_
private

Definition at line 334 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 327 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 364 of file EventProcessor.h.

boost::shared_ptr<BranchIDListHelper> edm::EventProcessor::branchIDListHelper_
private

Definition at line 329 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::continueAfterChildFailure_
private

Definition at line 372 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 360 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private

Definition at line 333 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

std::unique_ptr<eventsetup::EventSetupsController> edm::EventProcessor::espController_
private

Definition at line 332 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

boost::shared_ptr<boost::thread> edm::EventProcessor::event_loop_
private

Definition at line 341 of file EventProcessor.h.

volatile pthread_t edm::EventProcessor::event_loop_id_
private

Definition at line 351 of file EventProcessor.h.

Referenced by asyncRun().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 376 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 361 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 363 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 362 of file EventProcessor.h.

std::unique_ptr<FileBlock> edm::EventProcessor::fb_
private

Definition at line 353 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 359 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 367 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 365 of file EventProcessor.h.

std::unique_ptr<HistoryAppender> edm::EventProcessor::historyAppender_
private

Definition at line 338 of file EventProcessor.h.

Referenced by init().

volatile bool edm::EventProcessor::id_set_
private

Definition at line 350 of file EventProcessor.h.

Referenced by asyncRun().

std::unique_ptr<InputSource> edm::EventProcessor::input_
private

Definition at line 331 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

std::string edm::EventProcessor::last_error_text_
private

Definition at line 349 of file EventProcessor.h.

Referenced by asyncRun().

volatile Status edm::EventProcessor::last_rc_
private

Definition at line 348 of file EventProcessor.h.

Referenced by asyncRun().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 366 of file EventProcessor.h.

int edm::EventProcessor::my_sig_num_
private

Definition at line 352 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 369 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 370 of file EventProcessor.h.

Referenced by init().

ActivityRegistry::PostProcessEvent edm::EventProcessor::postProcessEventSignal_
private

Definition at line 326 of file EventProcessor.h.

Referenced by connectSigs(), and postProcessEventSignal().

boost::shared_ptr<ProductRegistry const> edm::EventProcessor::preg_
private

Definition at line 328 of file EventProcessor.h.

Referenced by beginJob(), and init().

ActivityRegistry::PreProcessEvent edm::EventProcessor::preProcessEventSignal_
private

Definition at line 325 of file EventProcessor.h.

Referenced by connectSigs(), and preProcessEventSignal().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 356 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ProcessConfiguration const> edm::EventProcessor::processConfiguration_
private

Definition at line 335 of file EventProcessor.h.

Referenced by init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 330 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), and init().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 371 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 357 of file EventProcessor.h.

boost::condition edm::EventProcessor::starter_
private

Definition at line 346 of file EventProcessor.h.

Referenced by asyncRun().

volatile event_processor::State edm::EventProcessor::state_
private

Definition at line 340 of file EventProcessor.h.

Referenced by beginJob().

boost::mutex edm::EventProcessor::state_lock_
private

Definition at line 343 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 358 of file EventProcessor.h.

volatile int edm::EventProcessor::stop_count_
private

Definition at line 347 of file EventProcessor.h.

Referenced by asyncRun().

boost::mutex edm::EventProcessor::stop_lock_
private

Definition at line 344 of file EventProcessor.h.

Referenced by asyncRun().

boost::condition edm::EventProcessor::stopper_
private

Definition at line 345 of file EventProcessor.h.

Referenced by asyncRun().

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_
private