CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, int run, int lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile (bool cleaningUpAfterException)
 
virtual void closeOutputFiles ()
 
char const * currentStateName () const
 
void declareRunNumber (RunNumber_t runNumber)
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, int run, int lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, int run, int lumi, bool cleaningUpAfterException)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run, bool cleaningUpAfterException)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
event_processor::State getState () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
char const * msgName (event_processor::Msg m) const
 
virtual void openOutputFiles ()
 
ActivityRegistry::PostProcessEventpostProcessEventSignal ()
 
virtual void prepareForNextLoop ()
 
ActivityRegistry::PreProcessEventpreProcessEventSignal ()
 
virtual int readAndCacheLumi (bool merge)
 
virtual statemachine::Run readAndCacheRun (bool merge)
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToCloseOutputFiles ()
 
virtual void respondToOpenInputFile ()
 
virtual void respondToOpenOutputFiles ()
 
void rewind ()
 
virtual void rewindInput ()
 
StatusCode run (int numberEventsToProcess, bool repeatable=true)
 
StatusCode run ()
 
void runAsync ()
 
virtual StatusCode runEventCount (int numberOfEventsToProcess)
 
virtual StatusCode runToCompletion (bool onlineStateTransitions)
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
void setRunNumber (RunNumber_t runNumber)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
StatusCode shutdownAsync (unsigned int timeout_secs=60 *2)
 
StatusCode skip (int numberToSkip)
 
virtual void startingNewLoop ()
 
char const * stateName (event_processor::State s) const
 
StatusCode statusAsync () const
 
StatusCode stopAsync (unsigned int timeout_secs=60 *2)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
StatusCode waitTillDoneAsync (unsigned int timeout_seconds=0)
 
virtual void writeLumi (ProcessHistoryID const &phid, int run, int lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

void changeState (event_processor::Msg)
 
void connectSigs (EventProcessor *ep)
 
StatusCode doneAsync (event_processor::Msg m)
 
void errorState ()
 
bool hasSubProcess () const
 
void init (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
StatusCode runCommon (bool onlineStateTransitions, int numberOfEventsToProcess)
 
void setupSignal ()
 
void terminateMachine ()
 
StatusCode waitForAsyncCompletion (unsigned int timeout_seconds)
 

Static Private Member Functions

static void asyncRun (EventProcessor *)
 

Private Attributes

boost::shared_ptr< ActionTable
const > 
act_table_
 
boost::shared_ptr
< ActivityRegistry
actReg_
 
bool alreadyHandlingException_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
boost::scoped_ptr
< eventsetup::EventSetupsController
espController_
 
boost::shared_ptr< boost::thread > event_loop_
 
volatile pthread_t event_loop_id_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
boost::shared_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
boost::scoped_ptr
< HistoryAppender
historyAppender_
 
volatile bool id_set_
 
boost::shared_ptr< InputSourceinput_
 
std::string last_error_text_
 
volatile Status last_rc_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
std::auto_ptr
< statemachine::Machine
machine_
 
int my_sig_num_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
ActivityRegistry::PostProcessEvent postProcessEventSignal_
 
boost::shared_ptr
< SignallingProductRegistry
preg_
 
ActivityRegistry::PreProcessEvent preProcessEventSignal_
 
PrincipalCache principalCache_
 
boost::shared_ptr
< ProcessConfiguration
processConfiguration_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
boost::condition starter_
 
volatile event_processor::State state_
 
boost::mutex state_lock_
 
bool stateMachineWasInErrorState_
 
volatile int stop_count_
 
boost::mutex stop_lock_
 
boost::condition stopper_
 
std::auto_ptr< SubProcesssubProcess_
 

Friends

class event_processor::StateSentry
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 69 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 393 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 394 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 367 of file EventProcessor.cc.

References init(), and PythonProcessDesc::parameterSet().

371  :
374  actReg_(),
375  preg_(),
376  serviceToken_(),
377  input_(),
378  espController_(new eventsetup::EventSetupsController),
379  esp_(),
380  act_table_(),
382  schedule_(),
383  subProcess_(),
384  historyAppender_(new HistoryAppender),
385  state_(sInit),
386  event_loop_(),
387  state_lock_(),
388  stop_lock_(),
389  stopper_(),
390  starter_(),
391  stop_count_(-1),
394  id_set_(false),
395  event_loop_id_(),
397  fb_(),
398  looper_(),
399  machine_(),
400  principalCache_(),
401  shouldWeStop_(false),
403  fileMode_(),
409  forceLooperToEnd_(false),
410  looperBeginJobRun_(false),
414  setCpuAffinity_(false),
416  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
417  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
418  processDesc->addServices(defaultServices, forcedServices);
419  init(processDesc, iToken, iLegacy);
420  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< statemachine::Machine > machine_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
volatile pthread_t event_loop_id_
volatile Status last_rc_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
boost::shared_ptr< ActionTable const > act_table_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
PrincipalCache principalCache_
boost::scoped_ptr< HistoryAppender > historyAppender_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 422 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::parameterSet().

424  :
427  actReg_(),
428  preg_(),
429  serviceToken_(),
430  input_(),
431  espController_(new eventsetup::EventSetupsController),
432  esp_(),
433  act_table_(),
435  schedule_(),
436  subProcess_(),
437  historyAppender_(new HistoryAppender),
438  state_(sInit),
439  event_loop_(),
440  state_lock_(),
441  stop_lock_(),
442  stopper_(),
443  starter_(),
444  stop_count_(-1),
447  id_set_(false),
448  event_loop_id_(),
450  fb_(),
451  looper_(),
452  machine_(),
453  principalCache_(),
454  shouldWeStop_(false),
456  fileMode_(),
462  forceLooperToEnd_(false),
463  looperBeginJobRun_(false),
467  setCpuAffinity_(false),
469  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
470  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
471  processDesc->addServices(defaultServices, forcedServices);
473  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< statemachine::Machine > machine_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
volatile pthread_t event_loop_id_
volatile Status last_rc_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
boost::shared_ptr< ActionTable const > act_table_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
PrincipalCache principalCache_
boost::scoped_ptr< HistoryAppender > historyAppender_
edm::EventProcessor::EventProcessor ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 475 of file EventProcessor.cc.

References init().

477  :
480  actReg_(),
481  preg_(),
482  serviceToken_(),
483  input_(),
484  espController_(new eventsetup::EventSetupsController),
485  esp_(),
486  act_table_(),
488  schedule_(),
489  subProcess_(),
490  historyAppender_(new HistoryAppender),
491  state_(sInit),
492  event_loop_(),
493  state_lock_(),
494  stop_lock_(),
495  stopper_(),
496  starter_(),
497  stop_count_(-1),
500  id_set_(false),
501  event_loop_id_(),
503  fb_(),
504  looper_(),
505  machine_(),
506  principalCache_(),
507  shouldWeStop_(false),
509  fileMode_(),
515  forceLooperToEnd_(false),
516  looperBeginJobRun_(false),
520  setCpuAffinity_(false),
522  init(processDesc, token, legacy);
523  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< statemachine::Machine > machine_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
volatile pthread_t event_loop_id_
volatile Status last_rc_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
boost::shared_ptr< ActionTable const > act_table_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
PrincipalCache principalCache_
boost::scoped_ptr< HistoryAppender > historyAppender_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 526 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::parameterSet().

526  :
529  actReg_(),
530  preg_(),
531  serviceToken_(),
532  input_(),
533  espController_(new eventsetup::EventSetupsController),
534  esp_(),
535  act_table_(),
537  schedule_(),
538  subProcess_(),
539  historyAppender_(new HistoryAppender),
540  state_(sInit),
541  event_loop_(),
542  state_lock_(),
543  stop_lock_(),
544  stopper_(),
545  starter_(),
546  stop_count_(-1),
549  id_set_(false),
550  event_loop_id_(),
552  fb_(),
553  looper_(),
554  machine_(),
555  principalCache_(),
556  shouldWeStop_(false),
558  fileMode_(),
564  forceLooperToEnd_(false),
565  looperBeginJobRun_(false),
569  setCpuAffinity_(false),
571  if(isPython) {
572  boost::shared_ptr<ParameterSet> parameterSet = PythonProcessDesc(config).parameterSet();
573  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(parameterSet));
575  }
576  else {
577  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
579  }
580  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
std::string exceptionMessageRuns_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
boost::shared_ptr< edm::ParameterSet > parameterSet()
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
std::string exceptionMessageLumis_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< statemachine::Machine > machine_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
volatile pthread_t event_loop_id_
volatile Status last_rc_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
boost::shared_ptr< ActionTable const > act_table_
std::auto_ptr< SubProcess > subProcess_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
PrincipalCache principalCache_
boost::scoped_ptr< HistoryAppender > historyAppender_
edm::EventProcessor::~EventProcessor ( )

Definition at line 663 of file EventProcessor.cc.

References actReg_, changeState(), edm::BranchIDListHelper::clearRegistries(), edm::detail::ThreadSafeRegistry< KEY, T, E >::data(), alignCSCRings::e, esp_, espController_, cms::Exception::explainSelf(), edm::detail::ThreadSafeRegistry< KEY, T, E >::extra(), getToken(), input_, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), looper_, edm::event_processor::mDtor, schedule_, subProcess_, and terminateMachine().

663  {
664  // Make the services available while everything is being deleted.
665  ServiceToken token = getToken();
666  ServiceRegistry::Operate op(token);
667 
668  // The state machine should have already been cleaned up
669  // and destroyed at this point by a call to EndJob or
670  // earlier when it completed processing events, but if it
671  // has not been we'll take care of it here at the last moment.
672  // This could cause problems if we are already handling an
673  // exception and another one is thrown here ... For a critical
674  // executable the solution to this problem is for the code using
675  // the EventProcessor to explicitly call EndJob or use runToCompletion,
676  // then the next line of code is never executed.
678 
679  try {
681  }
682  catch(cms::Exception& e) {
683  LogError("System")
684  << e.explainSelf() << "\n";
685  }
686 
687  // manually destroy all these thing that may need the services around
688  espController_.reset();
689  subProcess_.reset();
690  esp_.reset();
691  schedule_.reset();
692  input_.reset();
693  looper_.reset();
694  actReg_.reset();
695 
696  pset::Registry* psetRegistry = pset::Registry::instance();
697  psetRegistry->data().clear();
698  psetRegistry->extra().setID(ParameterSetID());
699 
701  ParentageRegistry::instance()->data().clear();
705  }
boost::shared_ptr< InputSource > input_
virtual std::string explainSelf() const
Definition: Exception.cc:146
boost::shared_ptr< EDLooperBase > looper_
static ThreadSafeRegistry * instance()
boost::shared_ptr< ActivityRegistry > actReg_
detail::ThreadSafeRegistry< ParameterSetID, ParameterSet, ProcessParameterSetIDCache > Registry
Definition: Registry.h:37
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
ServiceToken getToken()
std::auto_ptr< SubProcess > subProcess_
collection_type & data()
Provide access to the contained collection.

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2203 of file EventProcessor.cc.

2203  {
2205  }
void edm::EventProcessor::asyncRun ( EventProcessor me)
staticprivate

Definition at line 1569 of file EventProcessor.cc.

References alignCSCRings::e, event_loop_id_, cppFunctionSkipper::exception, cms::Exception::explainSelf(), FDEBUG, id_set_, last_error_text_, last_rc_, runToCompletion(), starter_, stop_count_, stop_lock_, and stopper_.

1569  {
1570  // set up signals to allow for interruptions
1571  // ignore all other signals
1572  // make sure no exceptions escape out
1573 
1574  // temporary hack until we modify the input source to allow
1575  // wakeup calls from other threads. This mimics the solution
1576  // in EventFilter/Processor, which I do not like.
1577  // allowing cancels means that the thread just disappears at
1578  // certain points. This is bad for C++ stack variables.
1579  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1580  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
1581  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1582  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1583 
1584  {
1585  boost::mutex::scoped_lock(me->stop_lock_);
1586  me->event_loop_id_ = pthread_self();
1587  me->id_set_ = true;
1588  me->starter_.notify_all();
1589  }
1590 
1591  Status rc = epException;
1592  FDEBUG(2) << "asyncRun starting ......................\n";
1593 
1594  try {
1595  bool onlineStateTransitions = true;
1596  rc = me->runToCompletion(onlineStateTransitions);
1597  }
1598  catch (cms::Exception& e) {
1599  LogError("FwkJob") << "cms::Exception caught in "
1600  << "EventProcessor::asyncRun"
1601  << "\n"
1602  << e.explainSelf();
1603  me->last_error_text_ = e.explainSelf();
1604  }
1605  catch (std::exception& e) {
1606  LogError("FwkJob") << "Standard library exception caught in "
1607  << "EventProcessor::asyncRun"
1608  << "\n"
1609  << e.what();
1610  me->last_error_text_ = e.what();
1611  }
1612  catch (...) {
1613  LogError("FwkJob") << "Unknown exception caught in "
1614  << "EventProcessor::asyncRun"
1615  << "\n";
1616  me->last_error_text_ = "Unknown exception caught";
1617  rc = epOther;
1618  }
1619 
1620  me->last_rc_ = rc;
1621 
1622  {
1623  // notify anyone waiting for exit that we are doing so now
1624  boost::mutex::scoped_lock sl(me->stop_lock_);
1625  ++me->stop_count_;
1626  me->stopper_.notify_all();
1627  }
1628  FDEBUG(2) << "asyncRun ending ......................\n";
1629  }
virtual std::string explainSelf() const
Definition: Exception.cc:146
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 754 of file EventProcessor.cc.

References actReg_, cms::Exception::addContext(), edm::convertException::badAllocToEDM(), bk::beginJob(), trackerHits::c, changeState(), edm::convertException::charPtrToEDM(), alignCSCRings::e, cppFunctionSkipper::exception, hasSubProcess(), input_, edm::event_processor::mBeginJob, cmsPerfStripChart::operate(), alignCSCRings::s, schedule_, serviceToken_, edm::event_processor::sInit, state_, edm::convertException::stdToEDM(), edm::convertException::stringToEDM(), subProcess_, and edm::convertException::unknownToEDM().

Referenced by evf::FUEventProcessor::configuring(), evf::FUEventProcessor::enabling(), rewind(), and skip().

754  {
755  if(state_ != sInit) return;
756  bk::beginJob();
757  // can only be run if in the initial state
759 
760  // StateSentry toerror(this); // should we add this ?
761  //make the services available
763 
764  //NOTE: This implementation assumes 'Job' means one call
765  // the EventProcessor::run
766  // If it really means once per 'application' then this code will
767  // have to be changed.
768  // Also have to deal with case where have 'run' then new Module
769  // added and do 'run'
770  // again. In that case the newly added Module needs its 'beginJob'
771  // to be called.
772 
773  //NOTE: in future we should have a beginOfJob for looper that takes no arguments
774  // For now we delay calling beginOfJob until first beginOfRun
775  //if(looper_) {
776  // looper_->beginOfJob(es);
777  //}
778  try {
779  try {
780  input_->doBeginJob();
781  }
782  catch (cms::Exception& e) { throw; }
783  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
784  catch (std::exception& e) { convertException::stdToEDM(e); }
785  catch(std::string& s) { convertException::stringToEDM(s); }
786  catch(char const* c) { convertException::charPtrToEDM(c); }
787  catch (...) { convertException::unknownToEDM(); }
788  }
789  catch(cms::Exception& ex) {
790  ex.addContext("Calling beginJob for the source");
791  throw;
792  }
793  schedule_->beginJob();
794  // toerror.succeeded(); // should we add this?
795  if(hasSubProcess()) subProcess_->doBeginJob();
796  actReg_->postBeginJobSignal_();
797  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< ActivityRegistry > actReg_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
void stdToEDM(std::exception const &e)
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
void addContext(std::string const &context)
Definition: Exception.cc:227
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
int  run,
int  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2051 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::beginTime(), FDEBUG, edm::Service< T >::isAvailable(), edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

2051  {
2052  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2053  input_->doBeginLumi(lumiPrincipal);
2054 
2056  if(rng.isAvailable()) {
2057  LuminosityBlock lb(lumiPrincipal, ModuleDescription());
2058  rng->preBeginLumi(lb);
2059  }
2060 
2061  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
2062  // lumi blocks know their start and end times why not also start and end events?
2063  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
2064  espController_->eventSetupForInstance(ts);
2065  EventSetup const& es = esp_->eventSetup();
2066  {
2067  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
2068  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2069  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
2070  if(hasSubProcess()) {
2071  subProcess_->doBeginLuminosityBlock(lumiPrincipal, ts);
2072  }
2073  }
2074  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
2075  if(looper_) {
2076  looper_->doBeginLuminosityBlock(lumiPrincipal, es);
2077  }
2078  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 2000 of file EventProcessor.cc.

References edm::RunPrincipal::beginTime(), FDEBUG, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

2000  {
2001  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2002  input_->doBeginRun(runPrincipal);
2003  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
2004  runPrincipal.beginTime());
2006  espController_->forceCacheClear();
2007  }
2008  espController_->eventSetupForInstance(ts);
2009  EventSetup const& es = esp_->eventSetup();
2010  if(looper_ && looperBeginJobRun_== false) {
2011  looper_->copyInfo(ScheduleInfo(schedule_.get()));
2012  looper_->beginOfJob(es);
2013  looperBeginJobRun_ = true;
2014  looper_->doStartingNewLoop();
2015  }
2016  {
2017  typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
2018  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2019  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
2020  if(hasSubProcess()) {
2021  subProcess_->doBeginRun(runPrincipal, ts);
2022  }
2023  }
2024  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
2025  if(looper_) {
2026  looper_->doBeginRun(runPrincipal, es);
2027  }
2028  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::changeState ( event_processor::Msg  msg)
private

Definition at line 1513 of file EventProcessor.cc.

References cond::rpcobimon::current, edm::hlt::Exception, FDEBUG, edm::event_processor::mAny, edm::event_processor::sInvalid, and asciidump::table.

Referenced by beginJob(), endJob(), rewind(), skip(), ~EventProcessor(), and edm::event_processor::StateSentry::~StateSentry().

1513  {
1514  // most likely need to serialize access to this routine
1515 
1516  boost::mutex::scoped_lock sl(state_lock_);
1517  State curr = state_;
1518  int rc;
1519  // found if(not end of table) and
1520  // (state == table.state && (msg == table.message || msg == any))
1521  for(rc = 0;
1522  table[rc].current != sInvalid &&
1523  (curr != table[rc].current ||
1524  (curr == table[rc].current &&
1525  msg != table[rc].message && table[rc].message != mAny));
1526  ++rc);
1527 
1528  if(table[rc].current == sInvalid)
1529  throw cms::Exception("BadState")
1530  << "A member function of EventProcessor has been called in an"
1531  << " inappropriate order.\n"
1532  << "Bad transition from " << stateName(curr) << " "
1533  << "using message " << msgName(msg) << "\n"
1534  << "No where to go from here.\n";
1535 
1536  FDEBUG(1) << "changeState: current=" << stateName(curr)
1537  << ", message=" << msgName(msg)
1538  << " -> new=" << stateName(table[rc].final) << "\n";
1539 
1540  state_ = table[rc].final;
1541  }
boost::mutex state_lock_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * msgName(event_processor::Msg m) const
TransEntry table[]
volatile event_processor::State state_
char const * stateName(event_processor::State s) const
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1375 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1375  {
1376  schedule_->clearCounters();
1377  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( bool  cleaningUpAfterException)
virtual

Implements edm::IEventProcessor.

Definition at line 1895 of file EventProcessor.cc.

References FDEBUG.

1895  {
1896  if (fb_.get() != 0) {
1897  input_->closeFile(fb_, cleaningUpAfterException);
1898  }
1899  FDEBUG(1) << "\tcloseInputFile\n";
1900  }
boost::shared_ptr< InputSource > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< FileBlock > fb_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1910 of file EventProcessor.cc.

References FDEBUG.

1910  {
1911  if (fb_.get() != 0) {
1912  schedule_->closeOutputFiles();
1913  if(hasSubProcess()) subProcess_->closeOutputFiles();
1914  }
1915  FDEBUG(1) << "\tcloseOutputFiles\n";
1916  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< FileBlock > fb_
bool hasSubProcess() const
void edm::EventProcessor::connectSigs ( EventProcessor ep)
private

Definition at line 1331 of file EventProcessor.cc.

References postProcessEventSignal_, and preProcessEventSignal_.

Referenced by init().

1331  {
1332  // When the FwkImpl signals are given, pass them to the
1333  // appropriate EventProcessor signals so that the outside world
1334  // can see the signal.
1335  actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
1336  actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
1337  }
boost::shared_ptr< ActivityRegistry > actReg_
char const * edm::EventProcessor::currentStateName ( ) const

Member functions to support asynchronous interface.

Definition at line 1379 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::microState(), and evf::FWEPWrapper::monitoring().

1379  {
1380  return stateName(getState());
1381  }
event_processor::State getState() const
char const * stateName(event_processor::State s) const
void edm::EventProcessor::declareRunNumber ( RunNumber_t  runNumber)

Definition at line 1419 of file EventProcessor.cc.

References bk::beginJob(), and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1419  {
1420  // inside of beginJob there is a check to see if it has been called before
1421  beginJob();
1423 
1424  // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
1425  //input_->declareRunNumber(runNumber);
1426  }
void changeState(event_processor::Msg)
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
int  run,
int  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2133 of file EventProcessor.cc.

References FDEBUG.

2133  {
2135  if(hasSubProcess()) subProcess_->deleteLumiFromCache(phid, run, lumi);
2136  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
2137  }
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
tuple lumi
Definition: fjr2json.py:35
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 2121 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

2121  {
2122  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
2123  if(hasSubProcess()) subProcess_->deleteRunFromCache(run.processHistoryID(), run.runNumber());
2124  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
2125  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1989 of file EventProcessor.cc.

References FDEBUG.

1989  {
1990  FDEBUG(1) << "\tdoErrorStuff\n";
1991  LogError("StateMachine")
1992  << "The EventProcessor state machine encountered an unexpected event\n"
1993  << "and went to the error state\n"
1994  << "Will attempt to terminate processing normally\n"
1995  << "(IF using the looper the next loop will be attempted)\n"
1996  << "This likely indicates a bug in an input module or corrupted input or both\n";
1998  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::doneAsync ( event_processor::Msg  m)
private

Definition at line 1505 of file EventProcessor.cc.

1505  {
1506  // make sure to include a timeout here so we don't wait forever
1507  // I suspect there are still timing issues with thread startup
1508  // and the setting of the various control variables (stop_count, id_set)
1509  changeState(m);
1510  return waitForAsyncCompletion(60*2);
1511  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1360 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::actionPerformed().

1360  {
1361  schedule_->enableEndPaths(active);
1362  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 800 of file EventProcessor.cc.

References actReg_, trackerHits::c, edm::ExceptionCollector::call(), changeState(), edm::OutputModule::doEndJob(), edm::InputSource::doEndJob(), edm::EDLooperBase::endOfJob(), hasSubProcess(), edm::ExceptionCollector::hasThrown(), input_, looper_, edm::event_processor::mEndJob, cmsPerfStripChart::operate(), cppFunctionSkipper::operator, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, subProcess_, and terminateMachine().

Referenced by evf::FWEPWrapper::init(), and evf::FWEPWrapper::stopAndHalt().

800  {
801  // Collects exceptions, so we don't throw before all operations are performed.
802  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
803 
804  // only allowed to run if state is sIdle, sJobReady, sRunGiven
805  c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
806 
807  //make the services available
809 
810  c.call(boost::bind(&EventProcessor::terminateMachine, this));
811  schedule_->endJob(c);
812  if(hasSubProcess()) {
813  c.call(boost::bind(&SubProcess::doEndJob, subProcess_.get()));
814  }
815  c.call(boost::bind(&InputSource::doEndJob, input_));
816  if(looper_) {
817  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
818  }
819  c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
820  if(c.hasThrown()) {
821  c.rethrow();
822  }
823  }
boost::shared_ptr< InputSource > input_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:235
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:74
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
int  run,
int  lumi,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2080 of file EventProcessor.cc.

References edm::LuminosityBlockPrincipal::endTime(), FDEBUG, edm::LuminosityBlockPrincipal::luminosityBlock(), and edm::LuminosityBlockPrincipal::run().

Referenced by Types.EventRange::cppID().

2080  {
2081  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
2082  input_->doEndLumi(lumiPrincipal, cleaningUpAfterException);
2083  //NOTE: Using the max event number for the end of a lumi block is a bad idea
2084  // lumi blocks know their start and end times why not also start and end events?
2085  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
2086  lumiPrincipal.endTime());
2087  espController_->eventSetupForInstance(ts);
2088  EventSetup const& es = esp_->eventSetup();
2089  {
2090  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
2091  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
2092  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es, cleaningUpAfterException);
2093  if(hasSubProcess()) {
2094  subProcess_->doEndLuminosityBlock(lumiPrincipal, ts, cleaningUpAfterException);
2095  }
2096  }
2097  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
2098  if(looper_) {
2099  looper_->doEndLuminosityBlock(lumiPrincipal, es);
2100  }
2101  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< Schedule > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1960 of file EventProcessor.cc.

References FDEBUG, and ntuplemaker::status.

1960  {
1961  if(looper_) {
1962  ModuleChanger changer(schedule_.get());
1963  looper_->setModuleChanger(&changer);
1964  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1965  looper_->setModuleChanger(0);
1966  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1967  else return false;
1968  }
1969  FDEBUG(1) << "\tendOfLoop\n";
1970  return true;
1971  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1365 of file EventProcessor.cc.

1365  {
1366  return schedule_->endPathsEnabled();
1367  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run,
bool  cleaningUpAfterException 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2030 of file EventProcessor.cc.

References edm::RunPrincipal::endTime(), FDEBUG, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), and statemachine::Run::runNumber().

2030  {
2031  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
2032  input_->doEndRun(runPrincipal, cleaningUpAfterException);
2034  runPrincipal.endTime());
2035  espController_->eventSetupForInstance(ts);
2036  EventSetup const& es = esp_->eventSetup();
2037  {
2038  typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
2039  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
2040  schedule_->processOneOccurrence<Traits>(runPrincipal, es, cleaningUpAfterException);
2041  if(hasSubProcess()) {
2042  subProcess_->doEndRun(runPrincipal, ts, cleaningUpAfterException);
2043  }
2044  }
2045  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
2046  if(looper_) {
2047  looper_->doEndRun(runPrincipal, es);
2048  }
2049  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< Schedule > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
void edm::EventProcessor::errorState ( )
private

Definition at line 1500 of file EventProcessor.cc.

References edm::event_processor::sError.

1500  {
1501  state_ = sError;
1502  }
volatile event_processor::State state_
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 1036 of file EventProcessor.cc.

References bk::beginJob(), edm::eventsetup::EventSetupRecord::doGet(), alignCSCRings::e, edm::hlt::Exception, cmsRelvalreport::exit, edm::EventSetup::fillAvailableRecordKeys(), edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::EventSetup::find(), edm::eventsetup::EventSetupRecord::find(), edm::installCustomHandler(), NULL, O_NONBLOCK, cmsPerfStripChart::operate(), pipe::pipe(), RooFit::readFile(), edm::shutdown_flag, relativeConstraints::value, and cms::Exception::what().

1036  {
1037 
1038  if(0 == numberOfForkedChildren_) {return true;}
1039  assert(0<numberOfForkedChildren_);
1040  //do what we want done in common
1041  {
1042  beginJob(); //make sure this was run
1043  // make the services available
1045 
1046  InputSource::ItemType itemType;
1047  itemType = input_->nextItemType();
1048 
1049  assert(itemType == InputSource::IsFile);
1050  {
1051  readFile();
1052  }
1053  itemType = input_->nextItemType();
1054  assert(itemType == InputSource::IsRun);
1055 
1056  LogSystem("ForkingEventSetupPreFetching") << " prefetching for run " << input_->runAuxiliary()->run();
1057  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
1058  input_->runAuxiliary()->beginTime());
1059  espController_->eventSetupForInstance(ts);
1060  EventSetup const& es = esp_->eventSetup();
1061 
1062  //now get all the data available in the EventSetup
1063  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
1064  es.fillAvailableRecordKeys(recordKeys);
1065  std::vector<eventsetup::DataKey> dataKeys;
1066  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
1067  itKey != itEnd;
1068  ++itKey) {
1069  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
1070  //see if this is on our exclusion list
1071  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
1072  ExcludedData const* excludedData(0);
1073  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
1074  excludedData = &(itExcludeRec->second);
1075  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
1076  //skip all items in this record
1077  continue;
1078  }
1079  }
1080  if(0 != recordPtr) {
1081  dataKeys.clear();
1082  recordPtr->fillRegisteredDataKeys(dataKeys);
1083  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
1084  itDataKey != itDataKeyEnd;
1085  ++itDataKey) {
1086  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1087  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
1088  LogInfo("ForkingEventSetupPreFetching") << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
1089  continue;
1090  }
1091  try {
1092  recordPtr->doGet(*itDataKey);
1093  } catch(cms::Exception& e) {
1094  LogWarning("ForkingEventSetupPreFetching") << e.what();
1095  }
1096  }
1097  }
1098  }
1099  }
1100  LogSystem("ForkingEventSetupPreFetching") <<" done prefetching";
1101  {
1102  // make the services available
1104  Service<JobReport> jobReport;
1105  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
1106 
1107  //Now actually do the forking
1108  actReg_->preForkReleaseResourcesSignal_();
1109  input_->doPreForkReleaseResources();
1110  schedule_->preForkReleaseResources();
1111  }
1112  installCustomHandler(SIGCHLD, ep_sigchld);
1113 
1114 
1115  unsigned int childIndex = 0;
1116  unsigned int const kMaxChildren = numberOfForkedChildren_;
1117  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
1118  std::vector<pid_t> childrenIds;
1119  childrenIds.reserve(kMaxChildren);
1120  std::vector<int> childrenSockets;
1121  childrenSockets.reserve(kMaxChildren);
1122  std::vector<int> childrenPipes;
1123  childrenPipes.reserve(kMaxChildren);
1124  std::vector<int> childrenSocketsCopy;
1125  childrenSocketsCopy.reserve(kMaxChildren);
1126  std::vector<int> childrenPipesCopy;
1127  childrenPipesCopy.reserve(kMaxChildren);
1128  int pipes[2];
1129 
1130  {
1131  // make the services available
1133  Service<JobReport> jobReport;
1134  int sockets[2], fd_flags;
1135  for(; childIndex < kMaxChildren; ++childIndex) {
1136  // Create a UNIX_DGRAM socket pair
1137  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)) {
1138  printf("Error creating communication socket (errno=%d, %s)\n", errno, strerror(errno));
1139  exit(EXIT_FAILURE);
1140  }
1141  if (pipe(pipes)) {
1142  printf("Error creating communication pipes (errno=%d, %s)\n", errno, strerror(errno));
1143  exit(EXIT_FAILURE);
1144  }
1145  // set CLOEXEC so the socket/pipe doesn't get leaked if the child exec's.
1146  if ((fd_flags = fcntl(sockets[1], F_GETFD, NULL)) == -1) {
1147  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1148  exit(EXIT_FAILURE);
1149  }
1150  // Mark socket as non-block. Child must be careful to do select prior
1151  // to reading from socket.
1152  if (fcntl(sockets[1], F_SETFD, fd_flags | FD_CLOEXEC | O_NONBLOCK) == -1) {
1153  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1154  exit(EXIT_FAILURE);
1155  }
1156  if ((fd_flags = fcntl(pipes[1], F_GETFD, NULL)) == -1) {
1157  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1158  exit(EXIT_FAILURE);
1159  }
1160  if (fcntl(pipes[1], F_SETFD, fd_flags | FD_CLOEXEC) == -1) {
1161  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1162  exit(EXIT_FAILURE);
1163  }
1164  // Linux man page notes there are some edge cases where reading from a
1165  // fd can block, even after a select.
1166  if ((fd_flags = fcntl(pipes[0], F_GETFD, NULL)) == -1) {
1167  printf("Failed to get fd flags: %d %s\n", errno, strerror(errno));
1168  exit(EXIT_FAILURE);
1169  }
1170  if (fcntl(pipes[0], F_SETFD, fd_flags | O_NONBLOCK) == -1) {
1171  printf("Failed to set new fd flags: %d %s\n", errno, strerror(errno));
1172  exit(EXIT_FAILURE);
1173  }
1174 
1175  childrenPipesCopy = childrenPipes;
1176  childrenSocketsCopy = childrenSockets;
1177 
1178  pid_t value = fork();
1179  if(value == 0) {
1180  // Close the parent's side of the socket and pipe which will talk to us.
1181  close(pipes[0]);
1182  close(sockets[0]);
1183  // Close our copies of the parent's other communication pipes.
1184  for(std::vector<int>::const_iterator it=childrenPipesCopy.begin(); it != childrenPipesCopy.end(); it++) {
1185  close(*it);
1186  }
1187  for(std::vector<int>::const_iterator it=childrenSocketsCopy.begin(); it != childrenSocketsCopy.end(); it++) {
1188  close(*it);
1189  }
1190 
1191  // this is the child process, redirect stdout and stderr to a log file
1192  fflush(stdout);
1193  fflush(stderr);
1194  std::stringstream stout;
1195  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
1196  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
1197  LogError("ForkingStdOutRedirect") << "Error during freopen of child process "<< childIndex;
1198  }
1199  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
1200  LogError("ForkingStdOutRedirect") << "Error during dup2 of child process"<< childIndex;
1201  }
1202 
1203  LogInfo("ForkingChild") << "I am child " << childIndex << " with pgid " << getpgrp();
1204  if(setCpuAffinity_) {
1205  // CPU affinity is handled differently on macosx.
1206  // We disable it and print a message until someone reads:
1207  //
1208  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
1209  //
1210  // and implements it.
1211 #ifdef __APPLE__
1212  LogInfo("ForkingChildAffinity") << "Architecture support for CPU affinity not implemented.";
1213 #else
1214  LogInfo("ForkingChildAffinity") << "Setting CPU affinity, setting this child to cpu " << childIndex;
1215  cpu_set_t mask;
1216  CPU_ZERO(&mask);
1217  CPU_SET(childIndex, &mask);
1218  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
1219  LogError("ForkingChildAffinity") << "Failed to set the cpu affinity, errno " << errno;
1220  exit(-1);
1221  }
1222 #endif
1223  }
1224  break;
1225  } else {
1226  //this is the parent
1227  close(pipes[1]);
1228  close(sockets[1]);
1229  }
1230  if(value < 0) {
1231  LogError("ForkingChild") << "failed to create a child";
1232  exit(-1);
1233  }
1234  childrenIds.push_back(value);
1235  childrenSockets.push_back(sockets[0]);
1236  childrenPipes.push_back(pipes[0]);
1237  }
1238 
1239  if(childIndex < kMaxChildren) {
1240  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1241  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1242 
1243  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(sockets[1], pipes[1]));
1244  input_->doPostForkReacquireResources(receiver);
1245  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1246  //NOTE: sources have to reset themselves by listening to the post fork message
1247  //rewindInput();
1248  return true;
1249  }
1250  jobReport->parentAfterFork(jobReportFile);
1251  }
1252 
1253  //this is the original, which is now the master for all the children
1254 
1255  //Need to wait for signals from the children or externally
1256  // To wait we must
1257  // 1) block the signals we want to wait on so we do not have a race condition
1258  // 2) check that we haven't already meet our ending criteria
1259  // 3) call sigsuspend, which unblocks the signals and waits until a signal is caught
1260  sigset_t blockingSigSet;
1261  sigset_t unblockingSigSet;
1262  sigset_t oldSigSet;
1263  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1264  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1265  sigaddset(&blockingSigSet, SIGCHLD);
1266  sigaddset(&blockingSigSet, SIGUSR2);
1267  sigaddset(&blockingSigSet, SIGINT);
1268  sigdelset(&unblockingSigSet, SIGCHLD);
1269  sigdelset(&unblockingSigSet, SIGUSR2);
1270  sigdelset(&unblockingSigSet, SIGINT);
1271  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1272 
1273  // If there are too many fd's (unlikely, but possible) for select, denote this
1274  // because the sender will fail.
1275  bool too_many_fds = false;
1276  if (pipes[1]+1 > FD_SETSIZE) {
1277  LogError("ForkingFileDescriptors") << "too many file descriptors for multicore job";
1278  too_many_fds = true;
1279  }
1280 
1281  //create a thread that sends the units of work to workers
1282  // we create it after all signals were blocked so that this
1283  // thread is never interupted by a signal
1284  MessageSenderToSource sender(childrenSockets, childrenPipes, numberOfSequentialEventsPerChild_);
1285  boost::thread senderThread(sender);
1286 
1287  while(!too_many_fds && !shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
1288  sigsuspend(&unblockingSigSet);
1289  LogInfo("ForkingAwake") << "woke from sigwait" << std::endl;
1290  }
1291  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1292 
1293  LogInfo("ForkingStopping") << "num children who have already stopped " << num_children_done;
1294  if(child_failed) {
1295  LogError("ForkingStopping") << "child failed";
1296  }
1297  if(shutdown_flag) {
1298  LogSystem("ForkingStopping") << "asked to shutdown";
1299  }
1300 
1301  if(too_many_fds || shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1302  LogInfo("ForkingStopping") << "must stop children" << std::endl;
1303  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1304  it != itEnd; ++it) {
1305  /* int result = */ kill(*it, SIGUSR2);
1306  }
1307  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1308  while(num_children_done != kMaxChildren) {
1309  sigsuspend(&unblockingSigSet);
1310  }
1311  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1312  }
1313  // The senderThread will notice the pipes die off, one by one. Once all children are gone, it will exit.
1314  senderThread.join();
1315  if(child_failed) {
1316  if (child_fail_signal) {
1317  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with signal " << child_fail_signal;
1318  } else if (child_fail_exit_status) {
1319  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1320  } else {
1321  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally for unknown reason";
1322  }
1323  }
1324  if(too_many_fds) {
1325  throw cms::Exception("ForkedParentFailed") << "hit select limit for number of fds";
1326  }
1327  return false;
1328  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:141
boost::shared_ptr< InputSource > input_
def pipe
Definition: pipe.py:5
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< Schedule > schedule_
volatile bool shutdown_flag
#define O_NONBLOCK
Definition: SysFile.h:21
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1340 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::init().

1340  {
1341  return schedule_->getAllModuleDescriptions();
1342  }
std::auto_ptr< Schedule > schedule_
State edm::EventProcessor::getState ( ) const
ServiceToken edm::EventProcessor::getToken ( )
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1370 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::init(), and evf::FWEPWrapper::taskWebPage().

1370  {
1371  schedule_->getTriggerReport(rep);
1372  }
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::hasSubProcess ( ) const
inlineprivate

Definition at line 335 of file EventProcessor.h.

References subProcess_.

Referenced by beginJob(), and endJob().

335  {
336  return subProcess_.get() != 0;
337  }
std::auto_ptr< SubProcess > subProcess_
void edm::EventProcessor::init ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 583 of file EventProcessor.cc.

References edm::ScheduleItems::act_table_, act_table_, edm::ScheduleItems::actReg_, actReg_, edm::ScheduleItems::addCPRandTNS(), edm::BranchIDListHelper::clearRegistries(), connectSigs(), emptyRunLumiMode_, esp_, espController_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::ParameterSet::getUntrackedParameterSet(), edm::ScheduleItems::initMisc(), edm::ScheduleItems::initSchedule(), edm::ScheduleItems::initServices(), input_, edm::eventsetup::heterocontainer::insert(), edm::serviceregistry::kConfigurationOverrides, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, cmsPerfStripChart::operate(), edm::popSubProcessParameterSet(), edm::ScheduleItems::preg_, preg_, principalCache_, edm::ScheduleItems::processConfiguration_, processConfiguration_, schedule_, serviceToken_, setCpuAffinity_, edm::IllegalParameters::setThrowAnException(), and subProcess_.

Referenced by EventProcessor().

585  {
586 
587  //std::cerr << processDesc->dump() << std::endl;
588  // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
589  // They must be cleared here because some processes run multiple EventProcessors in succession.
591 
592  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
593  //std::cerr << parameterSet->dump() << std::endl;
594 
595  // If there is a subprocess, pop the subprocess parameter set out of the process parameter set
596  boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*parameterSet).release());
597 
598  // Now set some parameters specific to the main process.
599  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
600  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
601  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
602  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
603  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
604  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
605  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
606  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
607  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
608  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
609  itPS != itPSEnd;
610  ++itPS) {
611  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
612  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
613  itPS->getUntrackedParameter<std::string>("label", "")));
614  }
615  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
616 
617  // Now do general initialization
618  ScheduleItems items;
619 
620  //initialize the services
621  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
622  ServiceToken token = items.initServices(*pServiceSets, *parameterSet, iToken, iLegacy, true);
623  serviceToken_ = items.addCPRandTNS(*parameterSet, token);
624 
625  //make the services available
627 
628  // intialize miscellaneous items
629  boost::shared_ptr<CommonParams> common(items.initMisc(*parameterSet));
630 
631  // intialize the event setup provider
632  esp_ = espController_->makeProvider(*parameterSet);
633 
634  // initialize the looper, if any
635  looper_ = fillLooper(*espController_, *esp_, *parameterSet);
636  if(looper_) {
637  looper_->setActionTable(items.act_table_.get());
638  looper_->attachTo(*items.actReg_);
639  }
640 
641  // initialize the input source
642  input_ = makeInput(*parameterSet, *common, *items.preg_, principalCache_, items.actReg_, items.processConfiguration_);
643 
644  // intialize the Schedule
645  schedule_ = items.initSchedule(*parameterSet,subProcessParameterSet.get());
646 
647  // set the data members
648  act_table_ = items.act_table_;
649  actReg_ = items.actReg_;
650  preg_ = items.preg_;
651  processConfiguration_ = items.processConfiguration_;
652 
653  FDEBUG(2) << parameterSet << std::endl;
654  connectSigs(this);
655 
656  // initialize the subprocess, if there is one
657  if(subProcessParameterSet) {
658  subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides));
659  }
660  espController_->clearComponents();
661  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:361
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
static void setThrowAnException(bool v)
boost::shared_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, PrincipalCache &pCache, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration)
boost::shared_ptr< ProcessConfiguration > processConfiguration_
void connectSigs(EventProcessor *ep)
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ActionTable const > act_table_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupsController &esController, eventsetup::EventSetupProvider &cp, ParameterSet &params)
bool insert(Storage &, ItemType *, const IdTag &)
PrincipalCache principalCache_
char const * edm::EventProcessor::msgName ( event_processor::Msg  m) const

Definition at line 1387 of file EventProcessor.cc.

References m.

1387  {
1388  return msgNames[m];
1389  }
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1902 of file EventProcessor.cc.

References FDEBUG.

1902  {
1903  if (fb_.get() != 0) {
1904  schedule_->openOutputFiles(*fb_);
1905  if(hasSubProcess()) subProcess_->openOutputFiles(*fb_);
1906  }
1907  FDEBUG(1) << "\topenOutputFiles\n";
1908  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< FileBlock > fb_
bool hasSubProcess() const
ActivityRegistry::PostProcessEvent& edm::EventProcessor::postProcessEventSignal ( )
inline

signal is emitted after all modules have finished processing the Event

Definition at line 214 of file EventProcessor.h.

References postProcessEventSignal_.

214 {return postProcessEventSignal_;}
ActivityRegistry::PostProcessEvent postProcessEventSignal_
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1979 of file EventProcessor.cc.

References FDEBUG.

1979  {
1980  looper_->prepareForNextLoop(esp_.get());
1981  FDEBUG(1) << "\tprepareForNextLoop\n";
1982  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ActivityRegistry::PreProcessEvent& edm::EventProcessor::preProcessEventSignal ( )
inline

signal is emitted after the Event has been created by the InputSource but before any modules have seen the Event

Definition at line 209 of file EventProcessor.h.

References preProcessEventSignal_.

209 {return preProcessEventSignal_;}
ActivityRegistry::PreProcessEvent preProcessEventSignal_
int edm::EventProcessor::readAndCacheLumi ( bool  merge)
virtual

Implements edm::IEventProcessor.

Definition at line 2109 of file EventProcessor.cc.

2109  {
2110  input_->readAndCacheLumi(merge, *historyAppender_);
2111  input_->markLumi();
2112  return input_->luminosityBlock();
2113  }
boost::shared_ptr< InputSource > input_
bool merge(LuminosityBlockRange &lh, LuminosityBlockRange &rh)
boost::scoped_ptr< HistoryAppender > historyAppender_
statemachine::Run edm::EventProcessor::readAndCacheRun ( bool  merge)
virtual

Implements edm::IEventProcessor.

Definition at line 2103 of file EventProcessor.cc.

References PDRates::Run.

2103  {
2104  input_->readAndCacheRun(merge, *historyAppender_);
2105  input_->markRun();
2106  return statemachine::Run(input_->reducedProcessHistoryID(), input_->run());
2107  }
boost::shared_ptr< InputSource > input_
bool merge(LuminosityBlockRange &lh, LuminosityBlockRange &rh)
boost::scoped_ptr< HistoryAppender > historyAppender_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 2139 of file EventProcessor.cc.

References edm::EventPrincipal::clearEventPrincipal(), FDEBUG, edm::EventPrincipal::id(), edm::ProcessingController::lastOperationSucceeded(), edm::ProcessingController::requestedTransition(), edm::ProcessingController::setLastOperationSucceeded(), edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, summarizeEdmComparisonLogfiles::succeeded, and edm::EventPrincipal::time().

2139  {
2140  EventPrincipal *pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
2141  FDEBUG(1) << "\treadEvent\n";
2142  assert(pep != 0);
2143 
2144  IOVSyncValue ts(pep->id(), pep->time());
2145  espController_->eventSetupForInstance(ts);
2146  EventSetup const& es = esp_->eventSetup();
2147  {
2148  typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
2149  ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
2150  schedule_->processOneOccurrence<Traits>(*pep, es);
2151  if(hasSubProcess()) {
2152  subProcess_->doEvent(*pep, ts);
2153  }
2154  }
2155 
2156  if(looper_) {
2157  bool randomAccess = input_->randomAccess();
2158  ProcessingController::ForwardState forwardState = input_->forwardState();
2159  ProcessingController::ReverseState reverseState = input_->reverseState();
2160  ProcessingController pc(forwardState, reverseState, randomAccess);
2161 
2163  do {
2164  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
2165 
2166  bool succeeded = true;
2167  if(randomAccess) {
2168  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
2169  input_->skipEvents(-2);
2170  }
2171  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
2172  succeeded = input_->goToEvent(pc.specifiedEventTransition());
2173  }
2174  }
2175  pc.setLastOperationSucceeded(succeeded);
2176  } while(!pc.lastOperationSucceeded());
2177  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
2178 
2179  }
2180 
2181  FDEBUG(1) << "\tprocessEvent\n";
2182  pep->clearEventPrincipal();
2183  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
tuple status
Definition: ntuplemaker.py:245
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::readFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1887 of file EventProcessor.cc.

References FDEBUG.

Referenced by Vispa.Plugins.EventBrowser.EventBrowserTabController.EventBrowserTabController::navigate(), Vispa.Main.TabController.TabController::open(), and Vispa.Main.TabController.TabController::refresh().

1887  {
1888  FDEBUG(1) << " \treadFile\n";
1889  fb_ = input_->readFile();
1890  if(numberOfForkedChildren_ > 0) {
1891  fb_->setNotFastClonable(FileBlock::ParallelProcesses);
1892  }
1893  }
boost::shared_ptr< InputSource > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< FileBlock > fb_
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1926 of file EventProcessor.cc.

References FDEBUG.

1926  {
1927  if (fb_.get() != 0) {
1928  schedule_->respondToCloseInputFile(*fb_);
1929  if(hasSubProcess()) subProcess_->respondToCloseInputFile(*fb_);
1930  }
1931  FDEBUG(1) << "\trespondToCloseInputFile\n";
1932  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< FileBlock > fb_
bool hasSubProcess() const
void edm::EventProcessor::respondToCloseOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1942 of file EventProcessor.cc.

References FDEBUG.

1942  {
1943  if (fb_.get() != 0) {
1944  schedule_->respondToCloseOutputFiles(*fb_);
1945  if(hasSubProcess()) subProcess_->respondToCloseOutputFiles(*fb_);
1946  }
1947  FDEBUG(1) << "\trespondToCloseOutputFiles\n";
1948  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< FileBlock > fb_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1918 of file EventProcessor.cc.

References FDEBUG.

1918  {
1919  if (fb_.get() != 0) {
1920  schedule_->respondToOpenInputFile(*fb_);
1921  if(hasSubProcess()) subProcess_->respondToOpenInputFile(*fb_);
1922  }
1923  FDEBUG(1) << "\trespondToOpenInputFile\n";
1924  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< FileBlock > fb_
bool hasSubProcess() const
void edm::EventProcessor::respondToOpenOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1934 of file EventProcessor.cc.

References FDEBUG.

1934  {
1935  if (fb_.get() != 0) {
1936  schedule_->respondToOpenOutputFiles(*fb_);
1937  if(hasSubProcess()) subProcess_->respondToOpenOutputFiles(*fb_);
1938  }
1939  FDEBUG(1) << "\trespondToOpenOutputFiles\n";
1940  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< FileBlock > fb_
bool hasSubProcess() const
void edm::EventProcessor::rewind ( )

Definition at line 708 of file EventProcessor.cc.

References beginJob(), changeState(), input_, edm::event_processor::mCountComplete, edm::event_processor::mFinished, edm::event_processor::mInputRewind, edm::event_processor::mStopAsync, cmsPerfStripChart::operate(), serviceToken_, and edm::event_processor::StateSentry::succeeded().

708  {
709  beginJob(); //make sure this was called
712  {
713  StateSentry toerror(this);
714 
715  //make the services available
717 
718  {
719  input_->repeat();
720  input_->rewind();
721  }
723  toerror.succeeded();
724  }
726  }
boost::shared_ptr< InputSource > input_
ServiceToken serviceToken_
void changeState(event_processor::Msg)
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1973 of file EventProcessor.cc.

References FDEBUG.

1973  {
1974  input_->repeat();
1975  input_->rewind();
1976  FDEBUG(1) << "\trewind\n";
1977  }
boost::shared_ptr< InputSource > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( int  numberEventsToProcess,
bool  repeatable = true 
)

Definition at line 729 of file EventProcessor.cc.

References runEventCount().

Referenced by Types.LuminosityBlockID::cppID().

729  {
730  return runEventCount(numberEventsToProcess);
731  }
virtual StatusCode runEventCount(int numberOfEventsToProcess)
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 403 of file EventProcessor.h.

Referenced by Types.LuminosityBlockID::cppID().

403  {
404  return run(-1, false);
405  }
void edm::EventProcessor::runAsync ( )

Definition at line 1543 of file EventProcessor.cc.

References bk::beginJob(), edm::hlt::Exception, and edm::event_processor::mRunAsync.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1543  {
1544  beginJob();
1545  {
1546  boost::mutex::scoped_lock sl(stop_lock_);
1547  if(id_set_ == true) {
1548  std::string err("runAsync called while async event loop already running\n");
1549  LogError("FwkJob") << err;
1550  throw cms::Exception("BadState") << err;
1551  }
1552 
1554 
1555  stop_count_ = 0;
1556  last_rc_ = epSuccess; // forget the last value!
1557  event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
1558  boost::xtime timeout;
1559  boost::xtime_get(&timeout, boost::TIME_UTC);
1560  timeout.sec += 60; // 60 seconds to start!!!!
1561  if(starter_.timed_wait(sl, timeout) == false) {
1562  // yikes - the thread did not start
1563  throw cms::Exception("BadState")
1564  << "Async run thread did not start in 60 seconds\n";
1565  }
1566  }
1567  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::mutex stop_lock_
static void asyncRun(EventProcessor *)
volatile Status last_rc_
void changeState(event_processor::Msg)
boost::condition starter_
EventProcessor::StatusCode edm::EventProcessor::runCommon ( bool  onlineStateTransitions,
int  numberOfEventsToProcess 
)
private

Definition at line 1665 of file EventProcessor.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::alreadyPrinted(), edm::convertException::badAllocToEDM(), bk::beginJob(), trackerHits::c, edm::convertException::charPtrToEDM(), edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, FDEBUG, dtDQMClient_cfg::fileMode, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, edm::errors::LogicError, edm::event_processor::mFinished, edm::event_processor::mInputExhausted, edm::event_processor::mRunCount, edm::event_processor::mShutdownSignal, statemachine::NOMERGE, cmsPerfStripChart::operate(), runEdmFileComparison::returnCode, alignCSCRings::s, edm::shutdown_flag, edm::event_processor::sShuttingDown, edm::event_processor::sStopping, edm::convertException::stdToEDM(), edm::convertException::stringToEDM(), edm::convertException::unknownToEDM(), and edm::usr2_lock.

1665  {
1666 
1667  // Reusable event principal
1668  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_, historyAppender_.get()));
1669  principalCache_.insert(ep);
1670 
1671  beginJob(); //make sure this was called
1672 
1673  if(!onlineStateTransitions) changeState(mRunCount);
1674 
1677 
1678  // make the services available
1680 
1681  if(machine_.get() == 0) {
1682 
1684  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1685  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1686  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1687  else {
1688  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1689  << fileMode_ << ".\n"
1690  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1691  }
1692 
1693  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1694  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1695  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1696  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1697  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1698  else {
1699  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1700  << emptyRunLumiMode_ << ".\n"
1701  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1702  }
1703 
1704  machine_.reset(new statemachine::Machine(this,
1705  fileMode,
1706  emptyRunLumiMode));
1707 
1708  machine_->initiate();
1709  }
1710 
1711  try {
1712  try {
1713 
1714  InputSource::ItemType itemType;
1715 
1716  int iEvents = 0;
1717 
1718  while(true) {
1719 
1720  itemType = input_->nextItemType();
1721 
1722  FDEBUG(1) << "itemType = " << itemType << "\n";
1723 
1724  // These are used for asynchronous running only and
1725  // and are checking to see if stopAsync or shutdownAsync
1726  // were called from another thread. In the future, we
1727  // may need to do something better than polling the state.
1728  // With the current code this is the simplest thing and
1729  // it should always work. If the interaction between
1730  // threads becomes more complex this may cause problems.
1731  if(state_ == sStopping) {
1732  FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
1733  forceLooperToEnd_ = true;
1734  machine_->process_event(statemachine::Stop());
1735  forceLooperToEnd_ = false;
1736  break;
1737  }
1738  else if(state_ == sShuttingDown) {
1739  FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
1740  forceLooperToEnd_ = true;
1741  machine_->process_event(statemachine::Stop());
1742  forceLooperToEnd_ = false;
1743  break;
1744  }
1745 
1746  // Look for a shutdown signal
1747  {
1748  boost::mutex::scoped_lock sl(usr2_lock);
1749  if(shutdown_flag) {
1751  returnCode = epSignal;
1752  forceLooperToEnd_ = true;
1753  machine_->process_event(statemachine::Stop());
1754  forceLooperToEnd_ = false;
1755  break;
1756  }
1757  }
1758 
1759  if(itemType == InputSource::IsStop) {
1760  machine_->process_event(statemachine::Stop());
1761  }
1762  else if(itemType == InputSource::IsFile) {
1763  machine_->process_event(statemachine::File());
1764  }
1765  else if(itemType == InputSource::IsRun) {
1766  machine_->process_event(statemachine::Run(input_->reducedProcessHistoryID(), input_->run()));
1767  }
1768  else if(itemType == InputSource::IsLumi) {
1769  machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
1770  }
1771  else if(itemType == InputSource::IsEvent) {
1772  machine_->process_event(statemachine::Event());
1773  ++iEvents;
1774  if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
1775  returnCode = epCountComplete;
1777  FDEBUG(1) << "Event count complete, pausing event loop\n";
1778  break;
1779  }
1780  }
1781  // This should be impossible
1782  else {
1784  << "Unknown next item type passed to EventProcessor\n"
1785  << "Please report this error to the Framework group\n";
1786  }
1787 
1788  if(machine_->terminated()) {
1790  break;
1791  }
1792  } // End of loop over state machine events
1793  } // Try block
1794  catch (cms::Exception& e) { throw; }
1795  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
1796  catch (std::exception& e) { convertException::stdToEDM(e); }
1797  catch(std::string& s) { convertException::stringToEDM(s); }
1798  catch(char const* c) { convertException::charPtrToEDM(c); }
1799  catch (...) { convertException::unknownToEDM(); }
1800  } // Try block
1801  // Some comments on exception handling related to the boost state machine:
1802  //
1803  // Some states used in the machine are special because they
1804  // perform actions while the machine is being terminated, actions
1805  // such as close files, call endRun, call endLumi etc ... Each of these
1806  // states has two functions that perform these actions. The functions
1807  // are almost identical. The major difference is that one version
1808  // catches all exceptions and the other lets exceptions pass through.
1809  // The destructor catches them and the other function named "exit" lets
1810  // them pass through. On a normal termination, boost will always call
1811  // "exit" and then the state destructor. In our state classes, the
1812  // the destructors do nothing if the exit function already took
1813  // care of things. Here's the interesting part. When boost is
1814  // handling an exception the "exit" function is not called (a boost
1815  // feature).
1816  //
1817  // If an exception occurs while the boost machine is in control
1818  // (which usually means inside a process_event call), then
1819  // the boost state machine destroys its states and "terminates" itself.
1820  // This already done before we hit the catch blocks below. In this case
1821  // the call to terminateMachine below only destroys an already
1822  // terminated state machine. Because exit is not called, the state destructors
1823  // handle cleaning up lumis, runs, and files. The destructors swallow
1824  // all exceptions and only pass through the exceptions messages, which
1825  // are tacked onto the original exception below.
1826  //
1827  // If an exception occurs when the boost state machine is not
1828  // in control (outside the process_event functions), then boost
1829  // cannot destroy its own states. The terminateMachine function
1830  // below takes care of that. The flag "alreadyHandlingException"
1831  // is set true so that the state exit functions do nothing (and
1832  // cannot throw more exceptions while handling the first). Then the
1833  // state destructors take care of this because exit did nothing.
1834  //
1835  // In both cases above, the EventProcessor::endOfLoop function is
1836  // not called because it can throw exceptions.
1837  //
1838  // One tricky aspect of the state machine is that things that can
1839  // throw should not be invoked by the state machine while another
1840  // exception is being handled.
1841  // Another tricky aspect is that it appears to be important to
1842  // terminate the state machine before invoking its destructor.
1843  // We've seen crashes that are not understood when that is not
1844  // done. Maintainers of this code should be careful about this.
1845 
1846  catch (cms::Exception & e) {
1848  terminateMachine();
1849  alreadyHandlingException_ = false;
1850  if (!exceptionMessageLumis_.empty()) {
1852  if (e.alreadyPrinted()) {
1853  LogAbsolute("Additional Exceptions") << exceptionMessageLumis_;
1854  }
1855  }
1856  if (!exceptionMessageRuns_.empty()) {
1858  if (e.alreadyPrinted()) {
1859  LogAbsolute("Additional Exceptions") << exceptionMessageRuns_;
1860  }
1861  }
1862  if (!exceptionMessageFiles_.empty()) {
1864  if (e.alreadyPrinted()) {
1865  LogAbsolute("Additional Exceptions") << exceptionMessageFiles_;
1866  }
1867  }
1868  throw;
1869  }
1870 
1871  if(machine_->terminated()) {
1872  FDEBUG(1) << "The state machine reports it has been terminated\n";
1873  machine_.reset();
1874  }
1875 
1876  if(!onlineStateTransitions) changeState(mFinished);
1877 
1879  throw cms::Exception("BadState")
1880  << "The boost state machine in the EventProcessor exited after\n"
1881  << "entering the Error state.\n";
1882  }
1883 
1884  return returnCode;
1885  }
std::string emptyRunLumiMode_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
void insert(boost::shared_ptr< RunPrincipal > rp)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
bool alreadyPrinted() const
Definition: Exception.cc:251
ServiceToken serviceToken_
std::string exceptionMessageLumis_
void stdToEDM(std::exception const &e)
volatile event_processor::State state_
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
boost::shared_ptr< ProcessConfiguration > processConfiguration_
std::auto_ptr< statemachine::Machine > machine_
std::string exceptionMessageFiles_
void charPtrToEDM(char const *c)
void changeState(event_processor::Msg)
void stringToEDM(std::string &s)
volatile bool shutdown_flag
boost::mutex usr2_lock
PrincipalCache principalCache_
boost::scoped_ptr< HistoryAppender > historyAppender_
EventProcessor::StatusCode edm::EventProcessor::runEventCount ( int  numberOfEventsToProcess)
virtual

Implements edm::IEventProcessor.

Definition at line 1652 of file EventProcessor.cc.

References runEdmFileComparison::returnCode, and edm::event_processor::StateSentry::succeeded().

Referenced by run().

1652  {
1653 
1654  StateSentry toerror(this);
1655 
1656  bool onlineStateTransitions = false;
1657  StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
1658 
1659  toerror.succeeded();
1660 
1661  return returnCode;
1662  }
StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess)
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( bool  onlineStateTransitions)
virtual

Implements edm::IEventProcessor.

Definition at line 1633 of file EventProcessor.cc.

References edm::hlt::Exception, edm::errors::LogicError, runEdmFileComparison::returnCode, and edm::event_processor::StateSentry::succeeded().

Referenced by asyncRun().

1633  {
1634 
1635  StateSentry toerror(this);
1636 
1637  int numberOfEventsToProcess = -1;
1638  StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
1639 
1640  if(machine_.get() != 0) {
1642  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1643  << "Please report this error to the Framework group\n";
1644  }
1645 
1646  toerror.succeeded();
1647 
1648  return returnCode;
1649  }
std::auto_ptr< statemachine::Machine > machine_
StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess)
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2191 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2191  {
2193  }
std::string exceptionMessageFiles_
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2199 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2199  {
2201  }
std::string exceptionMessageLumis_
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 2195 of file EventProcessor.cc.

References python.rootplot.argparse::message.

2195  {
2197  }
std::string exceptionMessageRuns_
void edm::EventProcessor::setRunNumber ( RunNumber_t  runNumber)

Definition at line 1402 of file EventProcessor.cc.

References bk::beginJob(), and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1402  {
1403  if(runNumber == 0) {
1404  runNumber = 1;
1405  LogWarning("Invalid Run")
1406  << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
1407  << "Run number was set to 1 instead\n";
1408  }
1409 
1410  // inside of beginJob there is a check to see if it has been called before
1411  beginJob();
1413 
1414  // interface not correct yet
1415  input_->setRunNumber(runNumber);
1416  }
boost::shared_ptr< InputSource > input_
void changeState(event_processor::Msg)
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1984 of file EventProcessor.cc.

References FDEBUG.

1984  {
1985  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1986  return hasSubProcess() ? subProcess_->shouldWeCloseOutput() : schedule_->shouldWeCloseOutput();
1987  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 2185 of file EventProcessor.cc.

References FDEBUG.

2185  {
2186  FDEBUG(1) << "\tshouldWeStop\n";
2187  if(shouldWeStop_) return true;
2188  return (schedule_->terminate() || (hasSubProcess() && subProcess_->terminate()));
2189  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
bool hasSubProcess() const
EventProcessor::StatusCode edm::EventProcessor::shutdownAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1492 of file EventProcessor.cc.

References edm::event_processor::mFinished, and edm::event_processor::mShutdownAsync.

1492  {
1495  if(rc != epTimedOut) changeState(mFinished);
1496  else errorState();
1497  return rc;
1498  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
EventProcessor::StatusCode edm::EventProcessor::skip ( int  numberToSkip)

Definition at line 734 of file EventProcessor.cc.

References beginJob(), changeState(), edm::IEventProcessor::epSuccess, input_, edm::event_processor::mCountComplete, edm::event_processor::mFinished, edm::event_processor::mSkip, cmsPerfStripChart::operate(), serviceToken_, and edm::event_processor::StateSentry::succeeded().

734  {
735  beginJob(); //make sure this was called
737  {
738  StateSentry toerror(this);
739 
740  //make the services available
742 
743  {
744  input_->skipEvents(numberToSkip);
745  }
747  toerror.succeeded();
748  }
750  return epSuccess;
751  }
boost::shared_ptr< InputSource > input_
ServiceToken serviceToken_
void changeState(event_processor::Msg)
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1950 of file EventProcessor.cc.

References FDEBUG.

1950  {
1951  shouldWeStop_ = false;
1952  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1953  // until after we've called beginOfJob
1954  if(looper_ && looperBeginJobRun_) {
1955  looper_->doStartingNewLoop();
1956  }
1957  FDEBUG(1) << "\tstartingNewLoop\n";
1958  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * edm::EventProcessor::stateName ( event_processor::State  s) const
EventProcessor::StatusCode edm::EventProcessor::statusAsync ( ) const

Definition at line 1395 of file EventProcessor.cc.

Referenced by evf::FUEventProcessor::enableCommon(), and evf::FUEventProcessor::enableForkInEDM().

1395  {
1396  // the thread will record exception/error status in the event processor
1397  // for us to look at and report here
1398  return last_rc_;
1399  }
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::stopAsync ( unsigned int  timeout_secs = 60 * 2)

Definition at line 1484 of file EventProcessor.cc.

References edm::event_processor::mFinished, and edm::event_processor::mStopAsync.

1484  {
1487  if(rc != epTimedOut) changeState(mFinished);
1488  else errorState();
1489  return rc;
1490  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::terminateMachine ( )
private

Definition at line 2207 of file EventProcessor.cc.

References FDEBUG.

Referenced by endJob(), and ~EventProcessor().

2207  {
2208  if(machine_.get() != 0) {
2209  if(!machine_->terminated()) {
2210  forceLooperToEnd_ = true;
2211  machine_->process_event(statemachine::Stop());
2212  forceLooperToEnd_ = false;
2213  }
2214  else {
2215  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
2216  }
2217  if(machine_->terminated()) {
2218  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
2219  }
2220  machine_.reset();
2221  }
2222  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< statemachine::Machine > machine_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1345 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1345  {
1346  return schedule_->totalEvents();
1347  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1355 of file EventProcessor.cc.

1355  {
1356  return schedule_->totalEventsFailed();
1357  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1350 of file EventProcessor.cc.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1350  {
1351  return schedule_->totalEventsPassed();
1352  }
std::auto_ptr< Schedule > schedule_
EventProcessor::StatusCode edm::EventProcessor::waitForAsyncCompletion ( unsigned int  timeout_seconds)
private

Definition at line 1429 of file EventProcessor.cc.

1429  {
1430  bool rc = true;
1431  boost::xtime timeout;
1432  boost::xtime_get(&timeout, boost::TIME_UTC);
1433  timeout.sec += timeout_seconds;
1434 
1435  // make sure to include a timeout here so we don't wait forever
1436  // I suspect there are still timing issues with thread startup
1437  // and the setting of the various control variables (stop_count, id_set)
1438  {
1439  boost::mutex::scoped_lock sl(stop_lock_);
1440 
1441  // look here - if runAsync not active, just return the last return code
1442  if(stop_count_ < 0) return last_rc_;
1443 
1444  if(timeout_seconds == 0) {
1445  while(stop_count_ == 0) stopper_.wait(sl);
1446  } else {
1447  while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
1448  }
1449 
1450  if(rc == false) {
1451  // timeout occurred
1452  // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
1453  // this is a temporary hack until we get the input source
1454  // upgraded to allow blocking input sources to be unblocked
1455 
1456  // the next line is dangerous and causes all sorts of trouble
1457  if(id_set_) pthread_cancel(event_loop_id_);
1458 
1459  // we will not do anything yet
1460  LogWarning("timeout")
1461  << "An asynchronous request was made to shut down "
1462  << "the event loop "
1463  << "and the event loop did not shutdown after "
1464  << timeout_seconds << " seconds\n";
1465  } else {
1466  event_loop_->join();
1467  event_loop_.reset();
1468  id_set_ = false;
1469  stop_count_ = -1;
1470  }
1471  }
1472  return rc == false ? epTimedOut : last_rc_;
1473  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::condition stopper_
boost::mutex stop_lock_
volatile pthread_t event_loop_id_
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::waitTillDoneAsync ( unsigned int  timeout_seconds = 0)

Definition at line 1476 of file EventProcessor.cc.

References edm::event_processor::mCountComplete.

Referenced by evf::FWEPWrapper::stop().

1476  {
1477  StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1479  else errorState();
1480  return rc;
1481  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
int  run,
int  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 2127 of file EventProcessor.cc.

References FDEBUG.

2127  {
2128  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
2129  if(hasSubProcess()) subProcess_->writeLumi(phid, run, lumi);
2130  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
2131  }
tuple lumi
Definition: fjr2json.py:35
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 2115 of file EventProcessor.cc.

References FDEBUG, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

2115  {
2116  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
2117  if(hasSubProcess()) subProcess_->writeRun(run.processHistoryID(), run.runNumber());
2118  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
2119  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
std::auto_ptr< SubProcess > subProcess_
PrincipalCache principalCache_
bool hasSubProcess() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const

Friends And Related Function Documentation

friend class event_processor::StateSentry
friend

Definition at line 396 of file EventProcessor.h.

Member Data Documentation

boost::shared_ptr<ActionTable const> edm::EventProcessor::act_table_
private

Definition at line 354 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private

Definition at line 348 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), and ~EventProcessor().

bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 385 of file EventProcessor.h.

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 381 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private

Definition at line 353 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

boost::scoped_ptr<eventsetup::EventSetupsController> edm::EventProcessor::espController_
private

Definition at line 352 of file EventProcessor.h.

Referenced by init(), and ~EventProcessor().

boost::shared_ptr<boost::thread> edm::EventProcessor::event_loop_
private

Definition at line 361 of file EventProcessor.h.

volatile pthread_t edm::EventProcessor::event_loop_id_
private

Definition at line 371 of file EventProcessor.h.

Referenced by asyncRun().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 395 of file EventProcessor.h.

Referenced by init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 382 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 384 of file EventProcessor.h.

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 383 of file EventProcessor.h.

boost::shared_ptr<FileBlock> edm::EventProcessor::fb_
private

Definition at line 373 of file EventProcessor.h.

std::string edm::EventProcessor::fileMode_
private

Definition at line 380 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 388 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 386 of file EventProcessor.h.

boost::scoped_ptr<HistoryAppender> edm::EventProcessor::historyAppender_
private

Definition at line 358 of file EventProcessor.h.

volatile bool edm::EventProcessor::id_set_
private

Definition at line 370 of file EventProcessor.h.

Referenced by asyncRun().

boost::shared_ptr<InputSource> edm::EventProcessor::input_
private

Definition at line 351 of file EventProcessor.h.

Referenced by beginJob(), endJob(), init(), rewind(), skip(), and ~EventProcessor().

std::string edm::EventProcessor::last_error_text_
private

Definition at line 369 of file EventProcessor.h.

Referenced by asyncRun().

volatile Status edm::EventProcessor::last_rc_
private

Definition at line 368 of file EventProcessor.h.

Referenced by asyncRun().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 387 of file EventProcessor.h.

std::auto_ptr<statemachine::Machine> edm::EventProcessor::machine_
private

Definition at line 376 of file EventProcessor.h.

int edm::EventProcessor::my_sig_num_
private

Definition at line 372 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 390 of file EventProcessor.h.

Referenced by init().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 391 of file EventProcessor.h.

Referenced by init().

ActivityRegistry::PostProcessEvent edm::EventProcessor::postProcessEventSignal_
private

Definition at line 347 of file EventProcessor.h.

Referenced by connectSigs(), and postProcessEventSignal().

boost::shared_ptr<SignallingProductRegistry> edm::EventProcessor::preg_
private

Definition at line 349 of file EventProcessor.h.

Referenced by init().

ActivityRegistry::PreProcessEvent edm::EventProcessor::preProcessEventSignal_
private

Definition at line 346 of file EventProcessor.h.

Referenced by connectSigs(), and preProcessEventSignal().

PrincipalCache edm::EventProcessor::principalCache_
private

Definition at line 377 of file EventProcessor.h.

Referenced by init().

boost::shared_ptr<ProcessConfiguration> edm::EventProcessor::processConfiguration_
private

Definition at line 355 of file EventProcessor.h.

Referenced by init().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 350 of file EventProcessor.h.

Referenced by beginJob(), endJob(), getToken(), init(), rewind(), and skip().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 392 of file EventProcessor.h.

Referenced by init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 378 of file EventProcessor.h.

boost::condition edm::EventProcessor::starter_
private

Definition at line 366 of file EventProcessor.h.

Referenced by asyncRun().

volatile event_processor::State edm::EventProcessor::state_
private

Definition at line 360 of file EventProcessor.h.

Referenced by beginJob().

boost::mutex edm::EventProcessor::state_lock_
private

Definition at line 363 of file EventProcessor.h.

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 379 of file EventProcessor.h.

volatile int edm::EventProcessor::stop_count_
private

Definition at line 367 of file EventProcessor.h.

Referenced by asyncRun().

boost::mutex edm::EventProcessor::stop_lock_
private

Definition at line 364 of file EventProcessor.h.

Referenced by asyncRun().

boost::condition edm::EventProcessor::stopper_
private

Definition at line 365 of file EventProcessor.h.

Referenced by asyncRun().

std::auto_ptr<SubProcess> edm::EventProcessor::subProcess_
private