CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends
edm::EventProcessor Class Reference

#include <EventProcessor.h>

Inheritance diagram for edm::EventProcessor:
edm::IEventProcessor

Public Member Functions

virtual bool alreadyHandlingException () const
 
void beginJob ()
 
virtual void beginLumi (ProcessHistoryID const &phid, int run, int lumi)
 
virtual void beginRun (statemachine::Run const &run)
 
void clearCounters ()
 Clears counters used by trigger report. More...
 
virtual void closeInputFile ()
 
virtual void closeOutputFiles ()
 
char const * currentStateName () const
 
void declareRunNumber (RunNumber_t runNumber)
 
virtual void deleteLumiFromCache (ProcessHistoryID const &phid, int run, int lumi)
 
virtual void deleteRunFromCache (statemachine::Run const &run)
 
virtual void doErrorStuff ()
 
void enableEndPaths (bool active)
 
void endJob ()
 
virtual void endLumi (ProcessHistoryID const &phid, int run, int lumi)
 
virtual bool endOfLoop ()
 
bool endPathsEnabled () const
 
virtual void endRun (statemachine::Run const &run)
 
 EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >())
 
 EventProcessor (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy)
 
 EventProcessor (std::string const &config, bool isPython)
 meant for unit tests More...
 
bool forkProcess (std::string const &jobReportFile)
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
event_processor::State getState () const
 
ServiceToken getToken ()
 
void getTriggerReport (TriggerReport &rep) const
 
char const * msgName (event_processor::Msg m) const
 
virtual void openOutputFiles ()
 
ActivityRegistry::PostProcessEventpostProcessEventSignal ()
 
virtual void prepareForNextLoop ()
 
ActivityRegistry::PreProcessEventpreProcessEventSignal ()
 
virtual int readAndCacheLumi ()
 
virtual statemachine::Run readAndCacheRun ()
 
virtual void readAndProcessEvent ()
 
virtual void readFile ()
 
virtual void respondToCloseInputFile ()
 
virtual void respondToCloseOutputFiles ()
 
virtual void respondToOpenInputFile ()
 
virtual void respondToOpenOutputFiles ()
 
void rewind ()
 
virtual void rewindInput ()
 
StatusCode run (int numberEventsToProcess, bool repeatable=true)
 
StatusCode run ()
 
void runAsync ()
 
virtual StatusCode runEventCount (int numberOfEventsToProcess)
 
virtual StatusCode runToCompletion (bool onlineStateTransitions)
 
virtual void setExceptionMessageFiles (std::string &message)
 
virtual void setExceptionMessageLumis (std::string &message)
 
virtual void setExceptionMessageRuns (std::string &message)
 
void setRunNumber (RunNumber_t runNumber)
 
virtual bool shouldWeCloseOutput () const
 
virtual bool shouldWeStop () const
 
StatusCode shutdownAsync (unsigned int timeout_secs=60 *2)
 
StatusCode skip (int numberToSkip)
 
virtual void startingNewLoop ()
 
char const * stateName (event_processor::State s) const
 
StatusCode statusAsync () const
 
StatusCode stopAsync (unsigned int timeout_secs=60 *2)
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
StatusCode waitTillDoneAsync (unsigned int timeout_seconds=0)
 
virtual void writeLumi (ProcessHistoryID const &phid, int run, int lumi)
 
virtual void writeRun (statemachine::Run const &run)
 
 ~EventProcessor ()
 
- Public Member Functions inherited from edm::IEventProcessor
virtual ~IEventProcessor ()
 

Private Types

typedef std::set< std::pair
< std::string, std::string > > 
ExcludedData
 
typedef std::map< std::string,
ExcludedData
ExcludedDataMap
 

Private Member Functions

void changeState (event_processor::Msg)
 
void connectSigs (EventProcessor *ep)
 
StatusCode doneAsync (event_processor::Msg m)
 
void errorState ()
 
void init (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
 
StatusCode runCommon (bool onlineStateTransitions, int numberOfEventsToProcess)
 
void setupSignal ()
 
void terminateMachine ()
 
StatusCode waitForAsyncCompletion (unsigned int timeout_seconds)
 

Static Private Member Functions

static void asyncRun (EventProcessor *)
 

Private Attributes

boost::shared_ptr< ActionTable
const > 
act_table_
 
boost::shared_ptr
< ActivityRegistry
actReg_
 
bool alreadyHandlingException_
 
std::string emptyRunLumiMode_
 
boost::shared_ptr
< eventsetup::EventSetupProvider
esp_
 
boost::shared_ptr< boost::thread > event_loop_
 
volatile pthread_t event_loop_id_
 
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
 
std::string exceptionMessageFiles_
 
std::string exceptionMessageLumis_
 
std::string exceptionMessageRuns_
 
boost::shared_ptr< FileBlockfb_
 
std::string fileMode_
 
bool forceESCacheClearOnNewRun_
 
bool forceLooperToEnd_
 
volatile bool id_set_
 
boost::shared_ptr< InputSourceinput_
 
std::string last_error_text_
 
volatile Status last_rc_
 
boost::shared_ptr< EDLooperBaselooper_
 
bool looperBeginJobRun_
 
std::auto_ptr
< statemachine::Machine
machine_
 
int my_sig_num_
 
int numberOfForkedChildren_
 
unsigned int numberOfSequentialEventsPerChild_
 
ActivityRegistry::PostProcessEvent postProcessEventSignal_
 
boost::shared_ptr
< SignallingProductRegistry
preg_
 
ActivityRegistry::PreProcessEvent preProcessEventSignal_
 
PrincipalCache principalCache_
 
boost::shared_ptr
< ProcessConfiguration
processConfiguration_
 
std::auto_ptr< Scheduleschedule_
 
ServiceToken serviceToken_
 
bool setCpuAffinity_
 
bool shouldWeStop_
 
boost::condition starter_
 
volatile event_processor::State state_
 
boost::mutex state_lock_
 
bool stateMachineWasInErrorState_
 
volatile int stop_count_
 
boost::mutex stop_lock_
 
boost::condition stopper_
 

Friends

class event_processor::StateSentry
 

Additional Inherited Members

- Public Types inherited from edm::IEventProcessor
enum  Status {
  epSuccess =0, epException =1, epOther =2, epSignal =3,
  epInputComplete =4, epTimedOut =5, epCountComplete =6
}
 
typedef Status StatusCode
 

Detailed Description

Definition at line 65 of file EventProcessor.h.

Member Typedef Documentation

typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData
private

Definition at line 382 of file EventProcessor.h.

typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap
private

Definition at line 383 of file EventProcessor.h.

Constructor & Destructor Documentation

edm::EventProcessor::EventProcessor ( std::string const &  config,
ServiceToken const &  token = ServiceToken(),
serviceregistry::ServiceLegacy  iLegacy = serviceregistry::kOverlapIsError,
std::vector< std::string > const &  defaultServices = std::vector<std::string>(),
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)
explicit

Definition at line 353 of file EventProcessor.cc.

References init(), and PythonProcessDesc::processDesc().

357  :
362  serviceToken_(),
363  input_(),
364  esp_(),
365  act_table_(),
367  schedule_(),
368  state_(sInit),
369  event_loop_(),
370  state_lock_(),
371  stop_lock_(),
372  stopper_(),
373  stop_count_(-1),
376  id_set_(false),
377  event_loop_id_(),
379  fb_(),
380  looper_(),
381  shouldWeStop_(false),
384  forceLooperToEnd_(false),
385  looperBeginJobRun_(false),
389  setCpuAffinity_(false) {
390  boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
391  processDesc->addServices(defaultServices, forcedServices);
392  init(processDesc, iToken, iLegacy);
393  }
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
std::auto_ptr< Schedule > schedule_
volatile pthread_t event_loop_id_
volatile Status last_rc_
boost::shared_ptr< edm::ProcessDesc > processDesc()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::shared_ptr< ActionTable const > act_table_
tuple config
Definition: cmsDriver.py:17
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
edm::EventProcessor::EventProcessor ( std::string const &  config,
std::vector< std::string > const &  defaultServices,
std::vector< std::string > const &  forcedServices = std::vector<std::string>() 
)

Definition at line 395 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::processDesc().

397  :
402  serviceToken_(),
403  input_(),
404  esp_(),
405  act_table_(),
407  schedule_(),
408  state_(sInit),
409  event_loop_(),
410  state_lock_(),
411  stop_lock_(),
412  stopper_(),
413  stop_count_(-1),
416  id_set_(false),
417  event_loop_id_(),
419  fb_(),
420  looper_(),
421  shouldWeStop_(false),
424  forceLooperToEnd_(false),
425  looperBeginJobRun_(false),
429  setCpuAffinity_(false) {
430  boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
431  processDesc->addServices(defaultServices, forcedServices);
433  }
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
std::auto_ptr< Schedule > schedule_
volatile pthread_t event_loop_id_
volatile Status last_rc_
boost::shared_ptr< edm::ProcessDesc > processDesc()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::shared_ptr< ActionTable const > act_table_
tuple config
Definition: cmsDriver.py:17
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
edm::EventProcessor::EventProcessor ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  legacy 
)

Definition at line 435 of file EventProcessor.cc.

References init().

437  :
442  serviceToken_(),
443  input_(),
444  esp_(),
445  act_table_(),
447  schedule_(),
448  state_(sInit),
449  event_loop_(),
450  state_lock_(),
451  stop_lock_(),
452  stopper_(),
453  stop_count_(-1),
456  id_set_(false),
457  event_loop_id_(),
459  fb_(),
460  looper_(),
461  shouldWeStop_(false),
464  forceLooperToEnd_(false),
465  looperBeginJobRun_(false),
467  {
468  init(processDesc, token, legacy);
469  }
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
std::auto_ptr< Schedule > schedule_
volatile pthread_t event_loop_id_
volatile Status last_rc_
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::shared_ptr< ActionTable const > act_table_
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
edm::EventProcessor::EventProcessor ( std::string const &  config,
bool  isPython 
)

meant for unit tests

Definition at line 472 of file EventProcessor.cc.

References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::processDesc().

472  :
477  serviceToken_(),
478  input_(),
479  esp_(),
480  act_table_(),
482  schedule_(),
483  state_(sInit),
484  event_loop_(),
485  state_lock_(),
486  stop_lock_(),
487  stopper_(),
488  stop_count_(-1),
491  id_set_(false),
492  event_loop_id_(),
494  fb_(),
495  looper_(),
496  shouldWeStop_(false),
499  forceLooperToEnd_(false),
500  looperBeginJobRun_(false),
502  {
503  if(isPython) {
504  boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc();
506  }
507  else {
508  boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config));
510  }
511  }
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
int getSigNum()
boost::shared_ptr< ActivityRegistry > actReg_
boost::mutex state_lock_
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
boost::mutex stop_lock_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
std::auto_ptr< Schedule > schedule_
volatile pthread_t event_loop_id_
volatile Status last_rc_
boost::shared_ptr< edm::ProcessDesc > processDesc()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::shared_ptr< ActionTable const > act_table_
tuple config
Definition: cmsDriver.py:17
ActivityRegistry::PreProcessEvent preProcessEventSignal_
boost::shared_ptr< FileBlock > fb_
edm::EventProcessor::~EventProcessor ( )

Definition at line 616 of file EventProcessor.cc.

References actReg_, changeState(), edm::BranchIDListHelper::clearRegistries(), edm::detail::ThreadSafeRegistry< KEY, T, E >::data(), ExpressReco_HICollisions_FallBack::e, esp_, cms::Exception::explainSelf(), edm::detail::ThreadSafeRegistry< KEY, T, E >::extra(), getToken(), input_, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), looper_, edm::event_processor::mDtor, schedule_, and terminateMachine().

616  {
617  // Make the services available while everything is being deleted.
618  ServiceToken token = getToken();
619  ServiceRegistry::Operate op(token);
620 
621  // The state machine should have already been cleaned up
622  // and destroyed at this point by a call to EndJob or
623  // earlier when it completed processing events, but if it
624  // has not been we'll take care of it here at the last moment.
625  // This could cause problems if we are already handling an
626  // exception and another one is thrown here ... For a critical
627  // executable the solution to this problem is for the code using
628  // the EventProcessor to explicitly call EndJob or use runToCompletion,
629  // then the next line of code is never executed.
631 
632  try {
634  }
635  catch(cms::Exception& e) {
636  LogError("System")
637  << e.explainSelf() << "\n";
638  }
639 
640  // manually destroy all these thing that may need the services around
641  esp_.reset();
642  schedule_.reset();
643  input_.reset();
644  looper_.reset();
645  actReg_.reset();
646 
647  pset::Registry* psetRegistry = pset::Registry::instance();
648  psetRegistry->data().clear();
649  psetRegistry->extra().setID(ParameterSetID());
650 
652  ParentageRegistry::instance()->data().clear();
656  }
boost::shared_ptr< InputSource > input_
virtual std::string explainSelf() const
Definition: Exception.cc:56
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
detail::ThreadSafeRegistry< ParameterSetID, ParameterSet, ProcessParameterSetIDCache > Registry
Definition: Registry.h:37
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
Hash< ParameterSetType > ParameterSetID
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
ServiceToken getToken()
static ThreadSafeRegistry * instance()
collection_type & data()
Provide access to the contained collection.

Member Function Documentation

bool edm::EventProcessor::alreadyHandlingException ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1939 of file EventProcessor.cc.

References alreadyHandlingException_.

1939  {
1941  }
void edm::EventProcessor::asyncRun ( EventProcessor me)
staticprivate

Definition at line 1323 of file EventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, edm::IEventProcessor::epException, edm::IEventProcessor::epOther, event_loop_id_, cmsCodeRules.cppFunctionSkipper::exception, cms::Exception::explainSelf(), FDEBUG, id_set_, last_error_text_, last_rc_, runToCompletion(), starter_, stop_count_, stop_lock_, and stopper_.

Referenced by runAsync().

1323  {
1324  // set up signals to allow for interruptions
1325  // ignore all other signals
1326  // make sure no exceptions escape out
1327 
1328  // temporary hack until we modify the input source to allow
1329  // wakeup calls from other threads. This mimics the solution
1330  // in EventFilter/Processor, which I do not like.
1331  // allowing cancels means that the thread just disappears at
1332  // certain points. This is bad for C++ stack variables.
1333  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
1334  //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
1335  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0);
1336  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
1337 
1338  {
1339  boost::mutex::scoped_lock(me->stop_lock_);
1340  me->event_loop_id_ = pthread_self();
1341  me->id_set_ = true;
1342  me->starter_.notify_all();
1343  }
1344 
1345  Status rc = epException;
1346  FDEBUG(2) << "asyncRun starting ......................\n";
1347 
1348  try {
1349  bool onlineStateTransitions = true;
1350  rc = me->runToCompletion(onlineStateTransitions);
1351  }
1352  catch (cms::Exception& e) {
1353  LogError("FwkJob") << "cms::Exception caught in "
1354  << "EventProcessor::asyncRun"
1355  << "\n"
1356  << e.explainSelf();
1357  me->last_error_text_ = e.explainSelf();
1358  }
1359  catch (std::exception& e) {
1360  LogError("FwkJob") << "Standard library exception caught in "
1361  << "EventProcessor::asyncRun"
1362  << "\n"
1363  << e.what();
1364  me->last_error_text_ = e.what();
1365  }
1366  catch (...) {
1367  LogError("FwkJob") << "Unknown exception caught in "
1368  << "EventProcessor::asyncRun"
1369  << "\n";
1370  me->last_error_text_ = "Unknown exception caught";
1371  rc = epOther;
1372  }
1373 
1374  me->last_rc_ = rc;
1375 
1376  {
1377  // notify anyone waiting for exit that we are doing so now
1378  boost::mutex::scoped_lock sl(me->stop_lock_);
1379  ++me->stop_count_;
1380  me->stopper_.notify_all();
1381  }
1382  FDEBUG(2) << "asyncRun ending ......................\n";
1383  }
virtual std::string explainSelf() const
Definition: Exception.cc:56
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void edm::EventProcessor::beginJob ( void  )

This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called

Definition at line 705 of file EventProcessor.cc.

References actReg_, bk::beginJob(), changeState(), ExpressReco_HICollisions_FallBack::e, cmsCodeRules.cppFunctionSkipper::exception, input_, edm::event_processor::mBeginJob, schedule_, serviceToken_, edm::event_processor::sInit, and state_.

Referenced by evf::FUEventProcessor::configuring(), declareRunNumber(), evf::FUEventProcessor::enabling(), forkProcess(), rewind(), runAsync(), runCommon(), setRunNumber(), and skip().

705  {
706  if(state_ != sInit) return;
707  bk::beginJob();
708  // can only be run if in the initial state
710 
711  // StateSentry toerror(this); // should we add this ?
712  //make the services available
714 
715  //NOTE: This implementation assumes 'Job' means one call
716  // the EventProcessor::run
717  // If it really means once per 'application' then this code will
718  // have to be changed.
719  // Also have to deal with case where have 'run' then new Module
720  // added and do 'run'
721  // again. In that case the newly added Module needs its 'beginJob'
722  // to be called.
723 
724  //NOTE: in future we should have a beginOfJob for looper which takes no arguments
725  // For now we delay calling beginOfJob until first beginOfRun
726  //if(looper_) {
727  // looper_->beginOfJob(es);
728  //}
729  try {
730  input_->doBeginJob();
731  } catch(cms::Exception& e) {
732  LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n";
733  e << "A cms::Exception happened while processing the beginJob of the 'source'\n";
734  throw;
735  } catch(std::exception& e) {
736  LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n";
737  throw;
738  } catch(...) {
739  LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n";
740  throw;
741  }
742 
743  schedule_->beginJob();
744  actReg_->postBeginJobSignal_();
745  // toerror.succeeded(); // should we add this?
746  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< ActivityRegistry > actReg_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
volatile event_processor::State state_
std::auto_ptr< Schedule > schedule_
void changeState(event_processor::Msg)
void edm::EventProcessor::beginLumi ( ProcessHistoryID const &  phid,
int  run,
int  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1788 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, FDEBUG, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), principalCache_, edm::LuminosityBlockPrincipal::run(), and schedule_.

1788  {
1789  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1790  input_->doBeginLumi(lumiPrincipal);
1791 
1793  if(rng.isAvailable()) {
1794  LuminosityBlock lb(lumiPrincipal, ModuleDescription());
1795  rng->preBeginLumi(lb);
1796  }
1797 
1798  // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea
1799  // lumi blocks know their start and end times why not also start and end events?
1800  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime());
1801  EventSetup const& es = esp_->eventSetupForInstance(ts);
1802  {
1803  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
1804  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
1805  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
1806  }
1807  FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n";
1808  if(looper_) {
1809  looper_->doBeginLuminosityBlock(lumiPrincipal, es);
1810  }
1811  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:41
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, int run, int lumi)
PrincipalCache principalCache_
void edm::EventProcessor::beginRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1745 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::beginTime(), esp_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, principalCache_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), and schedule_.

1745  {
1746  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1747  input_->doBeginRun(runPrincipal);
1748  IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0),
1749  runPrincipal.beginTime());
1751  esp_->forceCacheClear();
1752  }
1753  EventSetup const& es = esp_->eventSetupForInstance(ts);
1754  if(looper_ && looperBeginJobRun_== false) {
1755  looper_->copyInfo(ScheduleInfo(schedule_.get()));
1756  looper_->beginOfJob(es);
1757  looperBeginJobRun_ = true;
1758  looper_->doStartingNewLoop();
1759  }
1760  {
1761  typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
1762  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1763  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1764  }
1765  FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n";
1766  if(looper_) {
1767  looper_->doBeginRun(runPrincipal, es);
1768  }
1769  }
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, int run)
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
PrincipalCache principalCache_
void edm::EventProcessor::changeState ( event_processor::Msg  msg)
private

Definition at line 1267 of file EventProcessor.cc.

References cond::rpcobimon::current, edm::TransEntry::current, edm::hlt::Exception, FDEBUG, edm::TransEntry::final, edm::event_processor::mAny, edm::TransEntry::message, msgName(), edm::event_processor::sInvalid, state_, state_lock_, stateName(), and edm::table.

Referenced by beginJob(), declareRunNumber(), doneAsync(), endJob(), rewind(), runAsync(), runCommon(), setRunNumber(), shutdownAsync(), skip(), stopAsync(), waitTillDoneAsync(), ~EventProcessor(), and edm::event_processor::StateSentry::~StateSentry().

1267  {
1268  // most likely need to serialize access to this routine
1269 
1270  boost::mutex::scoped_lock sl(state_lock_);
1271  State curr = state_;
1272  int rc;
1273  // found if(not end of table) and
1274  // (state == table.state && (msg == table.message || msg == any))
1275  for(rc = 0;
1276  table[rc].current != sInvalid &&
1277  (curr != table[rc].current ||
1278  (curr == table[rc].current &&
1279  msg != table[rc].message && table[rc].message != mAny));
1280  ++rc);
1281 
1282  if(table[rc].current == sInvalid)
1283  throw cms::Exception("BadState")
1284  << "A member function of EventProcessor has been called in an"
1285  << " inappropriate order.\n"
1286  << "Bad transition from " << stateName(curr) << " "
1287  << "using message " << msgName(msg) << "\n"
1288  << "No where to go from here.\n";
1289 
1290  FDEBUG(1) << "changeState: current=" << stateName(curr)
1291  << ", message=" << msgName(msg)
1292  << " -> new=" << stateName(table[rc].final) << "\n";
1293 
1294  state_ = table[rc].final;
1295  }
boost::mutex state_lock_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * msgName(event_processor::Msg m) const
TransEntry table[]
volatile event_processor::State state_
char const * stateName(event_processor::State s) const
void edm::EventProcessor::clearCounters ( )

Clears counters used by trigger report.

Definition at line 1129 of file EventProcessor.cc.

References schedule_.

Referenced by evf::FUEventProcessor::enableCommon().

1129  {
1130  schedule_->clearCounters();
1131  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::closeInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1660 of file EventProcessor.cc.

References fb_, FDEBUG, and input_.

1660  {
1661  input_->closeFile(fb_);
1662  FDEBUG(1) << "\tcloseInputFile\n";
1663  }
boost::shared_ptr< InputSource > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< FileBlock > fb_
void edm::EventProcessor::closeOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1670 of file EventProcessor.cc.

References FDEBUG, and schedule_.

1670  {
1671  schedule_->closeOutputFiles();
1672  FDEBUG(1) << "\tcloseOutputFiles\n";
1673  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::connectSigs ( EventProcessor ep)
private

Definition at line 1085 of file EventProcessor.cc.

References actReg_, postProcessEventSignal_, and preProcessEventSignal_.

Referenced by init().

1085  {
1086  // When the FwkImpl signals are given, pass them to the
1087  // appropriate EventProcessor signals so that the outside world
1088  // can see the signal.
1089  actReg_->preProcessEventSignal_.connect(ep->preProcessEventSignal_);
1090  actReg_->postProcessEventSignal_.connect(ep->postProcessEventSignal_);
1091  }
boost::shared_ptr< ActivityRegistry > actReg_
char const * edm::EventProcessor::currentStateName ( ) const

Member functions to support asynchronous interface.

Definition at line 1133 of file EventProcessor.cc.

References getState(), and stateName().

Referenced by evf::FWEPWrapper::microState(), and evf::FWEPWrapper::monitoring().

1133  {
1134  return stateName(getState());
1135  }
event_processor::State getState() const
char const * stateName(event_processor::State s) const
void edm::EventProcessor::declareRunNumber ( RunNumber_t  runNumber)

Definition at line 1173 of file EventProcessor.cc.

References beginJob(), changeState(), and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon().

1173  {
1174  // inside of beginJob there is a check to see if it has been called before
1175  beginJob();
1177 
1178  // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007
1179  //input_->declareRunNumber(runNumber);
1180  }
void changeState(event_processor::Msg)
void edm::EventProcessor::deleteLumiFromCache ( ProcessHistoryID const &  phid,
int  run,
int  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1860 of file EventProcessor.cc.

References edm::PrincipalCache::deleteLumi(), FDEBUG, and principalCache_.

1860  {
1862  FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
1863  }
tuple lumi
Definition: fjr2json.py:41
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteLumi(ProcessHistoryID const &phid, int run, int lumi)
PrincipalCache principalCache_
void edm::EventProcessor::deleteRunFromCache ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1850 of file EventProcessor.cc.

References edm::PrincipalCache::deleteRun(), FDEBUG, principalCache_, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().

1850  {
1851  principalCache_.deleteRun(run.processHistoryID(), run.runNumber());
1852  FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n";
1853  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void deleteRun(ProcessHistoryID const &phid, int run)
PrincipalCache principalCache_
void edm::EventProcessor::doErrorStuff ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1734 of file EventProcessor.cc.

References FDEBUG, and stateMachineWasInErrorState_.

1734  {
1735  FDEBUG(1) << "\tdoErrorStuff\n";
1736  LogError("StateMachine")
1737  << "The EventProcessor state machine encountered an unexpected event\n"
1738  << "and went to the error state\n"
1739  << "Will attempt to terminate processing normally\n"
1740  << "(IF using the looper the next loop will be attempted)\n"
1741  << "This likely indicates a bug in an input module or corrupted input or both\n";
1743  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::doneAsync ( event_processor::Msg  m)
private

Definition at line 1259 of file EventProcessor.cc.

References changeState(), and waitForAsyncCompletion().

1259  {
1260  // make sure to include a timeout here so we don't wait forever
1261  // I suspect there are still timing issues with thread startup
1262  // and the setting of the various control variables (stop_count, id_set)
1263  changeState(m);
1264  return waitForAsyncCompletion(60*2);
1265  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 1114 of file EventProcessor.cc.

References schedule_.

Referenced by evf::FUEventProcessor::actionPerformed().

1114  {
1115  schedule_->enableEndPaths(active);
1116  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endJob ( void  )

This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.

Definition at line 749 of file EventProcessor.cc.

References actReg_, trackerHits::c, edm::ExceptionCollector::call(), changeState(), edm::InputSource::doEndJob(), edm::Schedule::endJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), input_, looper_, edm::event_processor::mEndJob, cmsCodeRules.cppFunctionSkipper::operator, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and terminateMachine().

Referenced by evf::FWEPWrapper::stopAndHalt().

749  {
750  // Collects exceptions, so we don't throw before all operations are performed.
751  ExceptionCollector c;
752 
753  // only allowed to run if state is sIdle, sJobReady, sRunGiven
754  c.call(boost::bind(&EventProcessor::changeState, this, mEndJob));
755 
756  //make the services available
758 
759  c.call(boost::bind(&EventProcessor::terminateMachine, this));
760  c.call(boost::bind(&Schedule::endJob, schedule_.get()));
761  c.call(boost::bind(&InputSource::doEndJob, input_));
762  if(looper_) {
763  c.call(boost::bind(&EDLooperBase::endOfJob, looper_));
764  }
765  c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_));
766  if(c.hasThrown()) {
767  c.rethrow();
768  }
769  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
ServiceToken serviceToken_
virtual void endOfJob()
Definition: EDLooperBase.cc:76
std::auto_ptr< Schedule > schedule_
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:237
void changeState(event_processor::Msg)
void endJob()
Definition: Schedule.cc:471
void edm::EventProcessor::endLumi ( ProcessHistoryID const &  phid,
int  run,
int  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1813 of file EventProcessor.cc.

References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, FDEBUG, input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::EventID::maxEventNumber(), principalCache_, edm::LuminosityBlockPrincipal::run(), and schedule_.

Referenced by Types.EventRange::cppID().

1813  {
1814  LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi);
1815  input_->doEndLumi(lumiPrincipal);
1816  //NOTE: Using the max event number for the end of a lumi block is a bad idea
1817  // lumi blocks know their start and end times why not also start and end events?
1818  IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()),
1819  lumiPrincipal.endTime());
1820  EventSetup const& es = esp_->eventSetupForInstance(ts);
1821  {
1822  typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
1823  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es);
1824  schedule_->processOneOccurrence<Traits>(lumiPrincipal, es);
1825  }
1826  FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n";
1827  if(looper_) {
1828  looper_->doEndLuminosityBlock(lumiPrincipal, es);
1829  }
1830  }
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
tuple lumi
Definition: fjr2json.py:41
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, int run, int lumi)
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
PrincipalCache principalCache_
bool edm::EventProcessor::endOfLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1705 of file EventProcessor.cc.

References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, schedule_, and ntuplemaker::status.

1705  {
1706  if(looper_) {
1707  ModuleChanger changer(schedule_.get());
1708  looper_->setModuleChanger(&changer);
1709  EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup());
1710  looper_->setModuleChanger(0);
1711  if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true;
1712  else return false;
1713  }
1714  FDEBUG(1) << "\tendOfLoop\n";
1715  return true;
1716  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
tuple status
Definition: ntuplemaker.py:245
bool edm::EventProcessor::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 1119 of file EventProcessor.cc.

References schedule_.

1119  {
1120  return schedule_->endPathsEnabled();
1121  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::endRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1771 of file EventProcessor.cc.

References actReg_, edm::RunPrincipal::endTime(), esp_, FDEBUG, input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), principalCache_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), and schedule_.

1771  {
1772  RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber());
1773  input_->doEndRun(runPrincipal);
1775  runPrincipal.endTime());
1776  EventSetup const& es = esp_->eventSetupForInstance(ts);
1777  {
1778  typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
1779  ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es);
1780  schedule_->processOneOccurrence<Traits>(runPrincipal, es);
1781  }
1782  FDEBUG(1) << "\tendRun " << run.runNumber() << "\n";
1783  if(looper_) {
1784  looper_->doEndRun(runPrincipal, es);
1785  }
1786  }
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, int run)
boost::shared_ptr< InputSource > input_
boost::shared_ptr< EDLooperBase > looper_
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
static EventNumber_t maxEventNumber()
Definition: EventID.h:106
PrincipalCache principalCache_
void edm::EventProcessor::errorState ( )
private

Definition at line 1254 of file EventProcessor.cc.

References edm::event_processor::sError, and state_.

Referenced by shutdownAsync(), stopAsync(), and waitTillDoneAsync().

1254  {
1255  state_ = sError;
1256  }
volatile event_processor::State state_
bool edm::EventProcessor::forkProcess ( std::string const &  jobReportFile)

Definition at line 863 of file EventProcessor.cc.

References actReg_, beginJob(), ExpressReco_HICollisions_FallBack::cerr, gather_cfg::cout, edm::eventsetup::EventSetupRecord::doGet(), ExpressReco_HICollisions_FallBack::e, esp_, eventSetupDataToExcludeFromPrefetching_, edm::hlt::Exception, cmsRelvalreport::exit, edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::eventsetup::EventSetupRecord::find(), input_, edm::installCustomHandler(), edm::InputSource::IsFile, edm::InputSource::IsRun, NULL, numberOfForkedChildren_, numberOfSequentialEventsPerChild_, readFile(), schedule_, serviceToken_, setCpuAffinity_, edm::shutdown_flag, and cms::Exception::what().

863  {
864 
865  if(0 == numberOfForkedChildren_) {return true;}
866  assert(0<numberOfForkedChildren_);
867  //do what we want done in common
868  {
869  beginJob(); //make sure this was run
870  // make the services available
872 
873  InputSource::ItemType itemType;
874  itemType = input_->nextItemType();
875 
876  assert(itemType == InputSource::IsFile);
877  {
878  readFile();
879  }
880  itemType = input_->nextItemType();
881  assert(itemType == InputSource::IsRun);
882 
883  std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl;
884  IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0),
885  input_->runAuxiliary()->beginTime());
886  EventSetup const& es = esp_->eventSetupForInstance(ts);
887 
888  //now get all the data available in the EventSetup
889  std::vector<eventsetup::EventSetupRecordKey> recordKeys;
890  es.fillAvailableRecordKeys(recordKeys);
891  std::vector<eventsetup::DataKey> dataKeys;
892  for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end();
893  itKey != itEnd;
894  ++itKey) {
895  eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey);
896  //see if this is on our exclusion list
897  ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name());
898  ExcludedData const* excludedData(0);
899  if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) {
900  excludedData = &(itExcludeRec->second);
901  if(excludedData->size() == 0 || excludedData->begin()->first == "*") {
902  //skip all items in this record
903  continue;
904  }
905  }
906  if(0 != recordPtr) {
907  dataKeys.clear();
908  recordPtr->fillRegisteredDataKeys(dataKeys);
909  for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end();
910  itDataKey != itDataKeyEnd;
911  ++itDataKey) {
912  //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
913  if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) {
914  std::cout << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl;
915  continue;
916  }
917  try {
918  recordPtr->doGet(*itDataKey);
919  } catch(cms::Exception& e) {
920  LogWarning("EventSetupPreFetching") << e.what();
921  }
922  }
923  }
924  }
925  }
926  std::cout << " done prefetching" << std::endl;
927 {
928  // make the services available
930  Service<JobReport> jobReport;
931  jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_);
932 
933  //Now actually do the forking
934  actReg_->preForkReleaseResourcesSignal_();
935  input_->doPreForkReleaseResources();
936  schedule_->preForkReleaseResources();
937 }
938  installCustomHandler(SIGCHLD, ep_sigchld);
939 
940  //Create a message queue used to set what events each child processes
941  int queueID;
942  if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) {
943  printf("Error obtaining message queue\n");
944  exit(EXIT_FAILURE);
945  }
946 
947  unsigned int childIndex = 0;
948  unsigned int const kMaxChildren = numberOfForkedChildren_;
949  unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren);
950  std::vector<pid_t> childrenIds;
951  childrenIds.reserve(kMaxChildren);
952 {
953  // make the services available
955  Service<JobReport> jobReport;
956  for(; childIndex < kMaxChildren; ++childIndex) {
957  pid_t value = fork();
958  if(value == 0) {
959  // this is the child process, redirect stdout and stderr to a log file
960  fflush(stdout);
961  fflush(stderr);
962  std::stringstream stout;
963  stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log";
964  if(0 == freopen(stout.str().c_str(), "w", stdout)) {
965  std::cerr << "Error during freopen of child process "
966  << childIndex << std::endl;
967  }
968  if(dup2(fileno(stdout), fileno(stderr)) < 0) {
969  std::cerr << "Error during dup2 of child process"
970  << childIndex << std::endl;
971  }
972 
973  std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl;
974  if(setCpuAffinity_) {
975  // CPU affinity is handled differently on macosx.
976  // We disable it and print a message until someone reads:
977  //
978  // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html
979  //
980  // and implements it.
981 #ifdef __APPLE__
982  std::cout << "Architecture support for CPU affinity not implemented." << std::endl;
983 #else
984  std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl;
985  cpu_set_t mask;
986  CPU_ZERO(&mask);
987  CPU_SET(childIndex, &mask);
988  if(sched_setaffinity(0, sizeof(mask), &mask) != 0) {
989  std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl;
990  exit(-1);
991  }
992 #endif
993  }
994  break;
995  }
996  if(value < 0) {
997  std::cerr << "failed to create a child" << std::endl;
998  //message queue is a system resource so must be cleaned up
999  // before parent goes away
1000  msgctl(queueID, IPC_RMID, 0);
1001  exit(-1);
1002  }
1003  childrenIds.push_back(value);
1004  }
1005 
1006  if(childIndex < kMaxChildren) {
1007  jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren);
1008  actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren);
1009 
1010  boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID));
1011  input_->doPostForkReacquireResources(receiver);
1012  schedule_->postForkReacquireResources(childIndex, kMaxChildren);
1013  //NOTE: sources have to reset themselves by listening to the post fork message
1014  //rewindInput();
1015  return true;
1016  }
1017  jobReport->parentAfterFork(jobReportFile);
1018 }
1019 
1020  //this is the original which is now the master for all the children
1021 
1022  //Need to wait for signals from the children or externally
1023  // To wait we must
1024  // 1) block the signals we want to wait on so we do not have a race condition
1025  // 2) check that we haven't already meet our ending criteria
1026  // 3) call sigsuspend which unblocks the signals and waits until a signal is caught
1027  sigset_t blockingSigSet;
1028  sigset_t unblockingSigSet;
1029  sigset_t oldSigSet;
1030  pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet);
1031  pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet);
1032  sigaddset(&blockingSigSet, SIGCHLD);
1033  sigaddset(&blockingSigSet, SIGUSR2);
1034  sigaddset(&blockingSigSet, SIGINT);
1035  sigdelset(&unblockingSigSet, SIGCHLD);
1036  sigdelset(&unblockingSigSet, SIGUSR2);
1037  sigdelset(&unblockingSigSet, SIGINT);
1038  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1039 
1040  //create a thread which sends the units of work to workers
1041  // we create it after all signals were blocked so that this
1042  // thread is never interupted by a signal
1043  MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_);
1044  boost::thread senderThread(sender);
1045 
1046  while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) {
1047  sigsuspend(&unblockingSigSet);
1048  std::cout << "woke from sigwait" << std::endl;
1049  }
1050  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1051 
1052  std::cout << "num children who have already stopped " << num_children_done << std::endl;
1053  if(child_failed) {
1054  std::cout << "child failed" << std::endl;
1055  }
1056  if(shutdown_flag) {
1057  std::cout << "asked to shutdown" << std::endl;
1058  }
1059 
1060  if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) {
1061  std::cout << "must stop children" << std::endl;
1062  for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end();
1063  it != itEnd; ++it) {
1064  /* int result = */ kill(*it, SIGUSR2);
1065  }
1066  pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet);
1067  while(num_children_done != kMaxChildren) {
1068  sigsuspend(&unblockingSigSet);
1069  }
1070  pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL);
1071  }
1072  //remove message queue, this will cause the message thread to terminate
1073  // kill it now since all children already stopped
1074  msgctl(queueID, IPC_RMID, 0);
1075  //now wait for the sender thread to finish. This should be quick since
1076  // we killed the message queue
1077  senderThread.join();
1078  if(child_failed) {
1079  throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status;
1080  }
1081  return false;
1082  }
unsigned int numberOfSequentialEventsPerChild_
virtual char const * what() const
Definition: Exception.cc:97
boost::shared_ptr< InputSource > input_
boost::shared_ptr< ActivityRegistry > actReg_
#define NULL
Definition: scimark2.h:8
void installCustomHandler(int signum, CFUNC func)
std::set< std::pair< std::string, std::string > > ExcludedData
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::auto_ptr< Schedule > schedule_
volatile bool shutdown_flag
tuple cout
Definition: gather_cfg.py:41
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1094 of file EventProcessor.cc.

References schedule_.

Referenced by evf::FWEPWrapper::init().

1094  {
1095  return schedule_->getAllModuleDescriptions();
1096  }
std::auto_ptr< Schedule > schedule_
State edm::EventProcessor::getState ( ) const
ServiceToken edm::EventProcessor::getToken ( )
void edm::EventProcessor::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1124 of file EventProcessor.cc.

References schedule_.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::init(), and evf::FWEPWrapper::taskWebPage().

1124  {
1125  schedule_->getTriggerReport(rep);
1126  }
std::auto_ptr< Schedule > schedule_
void edm::EventProcessor::init ( boost::shared_ptr< ProcessDesc > &  processDesc,
ServiceToken const &  token,
serviceregistry::ServiceLegacy  iLegacy 
)
private

Definition at line 514 of file EventProcessor.cc.

References act_table_, actReg_, edm::BranchIDListHelper::clearRegistries(), connectSigs(), edm::ServiceToken::copySlotsTo(), edm::ServiceRegistry::createContaining(), edm::ServiceRegistry::createSet(), emptyRunLumiMode_, esp_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::getPassID(), edm::getReleaseVersion(), edm::ParameterSet::getUntrackedParameterSet(), input_, edm::eventsetup::heterocontainer::insert(), edm::ServiceRegistry::instance(), edm::serviceregistry::kOverlapIsError, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, preg_, principalCache_, processConfiguration_, schedule_, serviceToken_, and setCpuAffinity_.

Referenced by EventProcessor().

516  {
517 
518  // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons.
519  // They must be cleared here because some processes run multiple EventProcessors in succession.
521 
522  boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet();
523 
524  ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet()));
525  fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", "");
526  emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", "");
527  forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false);
528  ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet());
529  numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0);
530  numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1);
531  setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false);
532  std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>());
533  for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end();
534  itPS != itPSEnd;
535  ++itPS) {
536  eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert(
537  std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"),
538  itPS->getUntrackedParameter<std::string>("label", "")));
539  }
540 
541  boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets();
542  //makeParameterSets(config, parameterSet, pServiceSets);
543 
544  //create the services
545  ServiceToken tempToken(ServiceRegistry::createSet(*pServiceSets, iToken, iLegacy));
546 
547  //see if any of the Services have to have their PSets stored
548  for(std::vector<ParameterSet>::const_iterator it = pServiceSets->begin(), itEnd = pServiceSets->end();
549  it != itEnd;
550  ++it) {
551  if(it->exists("@save_config")) {
552  parameterSet->addParameter(it->getParameter<std::string>("@service_type"), *it);
553  }
554  }
555  // Copy slots that hold all the registered callback functions like
556  // PostBeginJob into an ActivityRegistry that is owned by EventProcessor
557  tempToken.copySlotsTo(*actReg_);
558 
559  //add the ProductRegistry as a service ONLY for the construction phase
560  typedef serviceregistry::ServiceWrapper<ConstProductRegistry> w_CPR;
561  boost::shared_ptr<w_CPR>
562  reg(new w_CPR(std::auto_ptr<ConstProductRegistry>(new ConstProductRegistry(*preg_))));
564  tempToken,
566 
567  // the next thing is ugly: pull out the trigger path pset and
568  // create a service and extra token for it
569  std::string processName = parameterSet->getParameter<std::string>("@process_name");
570 
571  typedef service::TriggerNamesService TNS;
572  typedef serviceregistry::ServiceWrapper<TNS> w_TNS;
573 
574  boost::shared_ptr<w_TNS> tnsptr
575  (new w_TNS(std::auto_ptr<TNS>(new TNS(*parameterSet))));
576 
578  tempToken2,
580 
581  //make the services available
583 
584  act_table_.reset(new ActionTable(*parameterSet));
585  CommonParams common = CommonParams(processName,
587  getPassID(),
588  parameterSet->getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1),
589  parameterSet->getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1));
590 
591  std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController);
592  esp_ = espController->makeProvider(*parameterSet, common);
593  processConfiguration_.reset(new ProcessConfiguration(processName, getReleaseVersion(), getPassID()));
594 
595  looper_ = fillLooper(*esp_, *parameterSet, common);
596  if(looper_) {
597  looper_->setActionTable(act_table_.get());
598  looper_->attachTo(*actReg_);
599  }
600 
601  input_ = makeInput(*parameterSet, common, *preg_, principalCache_, actReg_, processConfiguration_);
602 
603  schedule_ = std::auto_ptr<Schedule>
604  (new Schedule(*parameterSet,
605  ServiceRegistry::instance().get<TNS>(),
606  *preg_,
607  *act_table_,
608  actReg_,
610 
611  // initialize(iToken, iLegacy);
612  FDEBUG(2) << parameterSet << std::endl;
613  connectSigs(this);
614  }
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
std::string getPassID()
Definition: GetPassID.h:8
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
static ServiceToken createSet(std::vector< ParameterSet > &)
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
static ServiceRegistry & instance()
static ServiceToken createContaining(std::auto_ptr< T > iService)
create a service token that holds the service defined by iService
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ServiceToken serviceToken_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
boost::shared_ptr< InputSource > makeInput(ParameterSet &params, CommonParams const &common, ProductRegistry &preg, PrincipalCache &pCache, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration)
boost::shared_ptr< ProcessConfiguration > processConfiguration_
void connectSigs(EventProcessor *ep)
boost::shared_ptr< EDLooperBase > fillLooper(eventsetup::EventSetupProvider &cp, ParameterSet &params, CommonParams const &common)
std::auto_ptr< Schedule > schedule_
std::string getReleaseVersion()
boost::shared_ptr< ActionTable const > act_table_
bool insert(Storage &, ItemType *, const IdTag &)
PrincipalCache principalCache_
char const * edm::EventProcessor::msgName ( event_processor::Msg  m) const

Definition at line 1141 of file EventProcessor.cc.

References m.

Referenced by changeState().

1141  {
1142  return msgNames[m];
1143  }
void edm::EventProcessor::openOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1665 of file EventProcessor.cc.

References fb_, FDEBUG, and schedule_.

1665  {
1666  schedule_->openOutputFiles(*fb_);
1667  FDEBUG(1) << "\topenOutputFiles\n";
1668  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< FileBlock > fb_
ActivityRegistry::PostProcessEvent& edm::EventProcessor::postProcessEventSignal ( )
inline

signal is emitted after all modules have finished processing the Event

Definition at line 210 of file EventProcessor.h.

References postProcessEventSignal_.

210 {return postProcessEventSignal_;}
ActivityRegistry::PostProcessEvent postProcessEventSignal_
void edm::EventProcessor::prepareForNextLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1724 of file EventProcessor.cc.

References esp_, FDEBUG, and looper_.

1724  {
1725  looper_->prepareForNextLoop(esp_.get());
1726  FDEBUG(1) << "\tprepareForNextLoop\n";
1727  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
ActivityRegistry::PreProcessEvent& edm::EventProcessor::preProcessEventSignal ( )
inline

signal is emitted after the Event has been created by the InputSource but before any modules have seen the Event

Definition at line 205 of file EventProcessor.h.

References preProcessEventSignal_.

205 {return preProcessEventSignal_;}
ActivityRegistry::PreProcessEvent preProcessEventSignal_
int edm::EventProcessor::readAndCacheLumi ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1839 of file EventProcessor.cc.

References input_.

1839  {
1840  input_->readAndCacheLumi();
1841  input_->markLumi();
1842  return input_->luminosityBlock();
1843  }
boost::shared_ptr< InputSource > input_
statemachine::Run edm::EventProcessor::readAndCacheRun ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1832 of file EventProcessor.cc.

References input_, and PDRates::Run.

1832  {
1833 
1834  input_->readAndCacheRun();
1835  input_->markRun();
1836  return statemachine::Run(input_->processHistoryID(), input_->run());
1837  }
boost::shared_ptr< InputSource > input_
void edm::EventProcessor::readAndProcessEvent ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1865 of file EventProcessor.cc.

References act_table_, alignmentValidation::action, actReg_, cms::Exception::category(), edm::EventPrincipal::clearEventPrincipal(), ExpressReco_HICollisions_FallBack::e, esp_, FDEBUG, edm::EventPrincipal::id(), input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), principalCache_, edm::ProcessingController::requestedTransition(), edm::actions::Rethrow, cms::Exception::rootCause(), schedule_, edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, summarizeEdmComparisonLogfiles::succeeded, edm::EventPrincipal::time(), and cms::Exception::what().

1865  {
1866  EventPrincipal *pep = 0;
1867  try {
1868  pep = input_->readEvent(principalCache_.lumiPrincipalPtr());
1869  FDEBUG(1) << "\treadEvent\n";
1870  }
1871  catch(cms::Exception& e) {
1873  if(action == actions::Rethrow) {
1874  throw;
1875  } else {
1876  LogWarning(e.category())
1877  << "an exception occurred and all paths for the event are being skipped: \n"
1878  << e.what();
1879  return;
1880  }
1881  }
1882  assert(pep != 0);
1883 
1884  IOVSyncValue ts(pep->id(), pep->time());
1885  EventSetup const& es = esp_->eventSetupForInstance(ts);
1886  {
1887  typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
1888  ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es);
1889  schedule_->processOneOccurrence<Traits>(*pep, es);
1890  }
1891 
1892  if(looper_) {
1893  bool randomAccess = input_->randomAccess();
1894  ProcessingController::ForwardState forwardState = input_->forwardState();
1895  ProcessingController::ReverseState reverseState = input_->reverseState();
1896  ProcessingController pc(forwardState, reverseState, randomAccess);
1897 
1899  do {
1900  status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc);
1901 
1902  bool succeeded = true;
1903  if(randomAccess) {
1904  if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) {
1905  input_->skipEvents(-2);
1906  }
1907  else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) {
1908  succeeded = input_->goToEvent(pc.specifiedEventTransition());
1909  }
1910  }
1911  pc.setLastOperationSucceeded(succeeded);
1912  } while(!pc.lastOperationSucceeded());
1913  if(status != EDLooperBase::kContinue) shouldWeStop_ = true;
1914 
1915  }
1916 
1917  FDEBUG(1) << "\tprocessEvent\n";
1918  pep->clearEventPrincipal();
1919  }
virtual char const * what() const
Definition: Exception.cc:97
boost::shared_ptr< InputSource > input_
std::string rootCause() const
Definition: Exception.cc:78
boost::shared_ptr< EDLooperBase > looper_
boost::shared_ptr< ActivityRegistry > actReg_
boost::shared_ptr< LuminosityBlockPrincipal > lumiPrincipalPtr(ProcessHistoryID const &phid, int run, int lumi)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< ActionTable const > act_table_
tuple status
Definition: ntuplemaker.py:245
PrincipalCache principalCache_
std::string category() const
Definition: Exception.cc:74
void edm::EventProcessor::readFile ( )
virtual
void edm::EventProcessor::respondToCloseInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1680 of file EventProcessor.cc.

References fb_, FDEBUG, and schedule_.

1680  {
1681  schedule_->respondToCloseInputFile(*fb_);
1682  FDEBUG(1) << "\trespondToCloseInputFile\n";
1683  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< FileBlock > fb_
void edm::EventProcessor::respondToCloseOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1690 of file EventProcessor.cc.

References fb_, FDEBUG, and schedule_.

1690  {
1691  schedule_->respondToCloseOutputFiles(*fb_);
1692  FDEBUG(1) << "\trespondToCloseOutputFiles\n";
1693  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< FileBlock > fb_
void edm::EventProcessor::respondToOpenInputFile ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1675 of file EventProcessor.cc.

References fb_, FDEBUG, and schedule_.

1675  {
1676  schedule_->respondToOpenInputFile(*fb_);
1677  FDEBUG(1) << "\trespondToOpenInputFile\n";
1678  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< FileBlock > fb_
void edm::EventProcessor::respondToOpenOutputFiles ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1685 of file EventProcessor.cc.

References fb_, FDEBUG, and schedule_.

1685  {
1686  schedule_->respondToOpenOutputFiles(*fb_);
1687  FDEBUG(1) << "\trespondToOpenOutputFiles\n";
1688  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
boost::shared_ptr< FileBlock > fb_
void edm::EventProcessor::rewind ( )

Definition at line 659 of file EventProcessor.cc.

References beginJob(), changeState(), input_, edm::event_processor::mCountComplete, edm::event_processor::mFinished, edm::event_processor::mInputRewind, edm::event_processor::mStopAsync, serviceToken_, and edm::event_processor::StateSentry::succeeded().

659  {
660  beginJob(); //make sure this was called
663  {
664  StateSentry toerror(this);
665 
666  //make the services available
668 
669  {
670  input_->repeat();
671  input_->rewind();
672  }
674  toerror.succeeded();
675  }
677  }
boost::shared_ptr< InputSource > input_
ServiceToken serviceToken_
void changeState(event_processor::Msg)
void edm::EventProcessor::rewindInput ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1718 of file EventProcessor.cc.

References FDEBUG, and input_.

1718  {
1719  input_->repeat();
1720  input_->rewind();
1721  FDEBUG(1) << "\trewind\n";
1722  }
boost::shared_ptr< InputSource > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
EventProcessor::StatusCode edm::EventProcessor::run ( int  numberEventsToProcess,
bool  repeatable = true 
)

Definition at line 680 of file EventProcessor.cc.

References runEventCount().

Referenced by Types.LuminosityBlockID::cppID().

680  {
681  return runEventCount(numberEventsToProcess);
682  }
virtual StatusCode runEventCount(int numberOfEventsToProcess)
EventProcessor::StatusCode edm::EventProcessor::run ( void  )
inline

Definition at line 392 of file EventProcessor.h.

Referenced by Types.LuminosityBlockID::cppID().

392  {
393  return run(-1, false);
394  }
void edm::EventProcessor::runAsync ( )

Definition at line 1297 of file EventProcessor.cc.

References asyncRun(), beginJob(), changeState(), edm::IEventProcessor::epSuccess, event_loop_, edm::hlt::Exception, id_set_, last_rc_, edm::event_processor::mRunAsync, starter_, stop_count_, and stop_lock_.

Referenced by evf::FUEventProcessor::enableCommon().

1297  {
1298  beginJob();
1299  {
1300  boost::mutex::scoped_lock sl(stop_lock_);
1301  if(id_set_ == true) {
1302  std::string err("runAsync called while async event loop already running\n");
1303  LogError("FwkJob") << err;
1304  throw cms::Exception("BadState") << err;
1305  }
1306 
1308 
1309  stop_count_ = 0;
1310  last_rc_ = epSuccess; // forget the last value!
1311  event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this)));
1312  boost::xtime timeout;
1313  boost::xtime_get(&timeout, boost::TIME_UTC);
1314  timeout.sec += 60; // 60 seconds to start!!!!
1315  if(starter_.timed_wait(sl, timeout) == false) {
1316  // yikes - the thread did not start
1317  throw cms::Exception("BadState")
1318  << "Async run thread did not start in 60 seconds\n";
1319  }
1320  }
1321  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::mutex stop_lock_
static void asyncRun(EventProcessor *)
volatile Status last_rc_
void changeState(event_processor::Msg)
boost::condition starter_
EventProcessor::StatusCode edm::EventProcessor::runCommon ( bool  onlineStateTransitions,
int  numberOfEventsToProcess 
)
private

Definition at line 1419 of file EventProcessor.cc.

References alreadyHandlingException_, beginJob(), changeState(), edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, ExpressReco_HICollisions_FallBack::e, emptyRunLumiMode_, edm::IEventProcessor::epCountComplete, edm::IEventProcessor::epSignal, edm::IEventProcessor::epSuccess, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, FDEBUG, harvesting_AlCaReco_cfg::fileMode, fileMode_, forceLooperToEnd_, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, input_, edm::PrincipalCache::insert(), edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::errors::LogicError, machine_, edm::event_processor::mFinished, edm::event_processor::mInputExhausted, edm::event_processor::mRunCount, edm::event_processor::mShutdownSignal, statemachine::NOMERGE, preg_, principalCache_, processConfiguration_, runEdmFileComparison::returnCode, serviceToken_, edm::shutdown_flag, edm::event_processor::sShuttingDown, edm::event_processor::sStopping, state_, stateMachineWasInErrorState_, terminateMachine(), and edm::usr2_lock.

Referenced by runEventCount(), and runToCompletion().

1419  {
1420 
1421  // Reusable event principal
1422  boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
1423  principalCache_.insert(ep);
1424 
1425  beginJob(); //make sure this was called
1426 
1427  if(!onlineStateTransitions) changeState(mRunCount);
1428 
1431 
1432  // make the services available
1434 
1435  if(machine_.get() == 0) {
1436 
1438  if(fileMode_.empty()) fileMode = statemachine::FULLMERGE;
1439  else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE;
1440  else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE;
1441  else {
1442  throw Exception(errors::Configuration, "Illegal fileMode parameter value: ")
1443  << fileMode_ << ".\n"
1444  << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n";
1445  }
1446 
1447  statemachine::EmptyRunLumiMode emptyRunLumiMode;
1448  if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1449  else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis;
1450  else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns;
1451  else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis;
1452  else {
1453  throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ")
1454  << emptyRunLumiMode_ << ".\n"
1455  << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n";
1456  }
1457 
1458  machine_.reset(new statemachine::Machine(this,
1459  fileMode,
1460  emptyRunLumiMode));
1461 
1462  machine_->initiate();
1463  }
1464 
1465  try {
1466 
1467  InputSource::ItemType itemType;
1468 
1469  int iEvents = 0;
1470 
1471  while(true) {
1472 
1473  itemType = input_->nextItemType();
1474 
1475  FDEBUG(1) << "itemType = " << itemType << "\n";
1476 
1477  // These are used for asynchronous running only and
1478  // and are checking to see if stopAsync or shutdownAsync
1479  // were called from another thread. In the future, we
1480  // may need to do something better than polling the state.
1481  // With the current code this is the simplest thing and
1482  // it should always work. If the interaction between
1483  // threads becomes more complex this may cause problems.
1484  if(state_ == sStopping) {
1485  FDEBUG(1) << "In main processing loop, encountered sStopping state\n";
1486  forceLooperToEnd_ = true;
1487  machine_->process_event(statemachine::Stop());
1488  forceLooperToEnd_ = false;
1489  break;
1490  }
1491  else if(state_ == sShuttingDown) {
1492  FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n";
1493  forceLooperToEnd_ = true;
1494  machine_->process_event(statemachine::Stop());
1495  forceLooperToEnd_ = false;
1496  break;
1497  }
1498 
1499  // Look for a shutdown signal
1500  {
1501  boost::mutex::scoped_lock sl(usr2_lock);
1502  if(shutdown_flag) {
1504  returnCode = epSignal;
1505  forceLooperToEnd_ = true;
1506  machine_->process_event(statemachine::Stop());
1507  forceLooperToEnd_ = false;
1508  break;
1509  }
1510  }
1511 
1512  if(itemType == InputSource::IsStop) {
1513  machine_->process_event(statemachine::Stop());
1514  }
1515  else if(itemType == InputSource::IsFile) {
1516  machine_->process_event(statemachine::File());
1517  }
1518  else if(itemType == InputSource::IsRun) {
1519  machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run()));
1520  }
1521  else if(itemType == InputSource::IsLumi) {
1522  machine_->process_event(statemachine::Lumi(input_->luminosityBlock()));
1523  }
1524  else if(itemType == InputSource::IsEvent) {
1525  machine_->process_event(statemachine::Event());
1526  ++iEvents;
1527  if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) {
1528  returnCode = epCountComplete;
1530  FDEBUG(1) << "Event count complete, pausing event loop\n";
1531  break;
1532  }
1533  }
1534  // This should be impossible
1535  else {
1537  << "Unknown next item type passed to EventProcessor\n"
1538  << "Please report this error to the Framework group\n";
1539  }
1540 
1541  if(machine_->terminated()) {
1543  break;
1544  }
1545  } // End of loop over state machine events
1546  } // Try block
1547 
1548  // Some comments on exception handling related to the boost state machine:
1549  //
1550  // Some states used in the machine are special because they
1551  // perform actions while the machine is being terminated, actions
1552  // such as close files, call endRun, call endLumi etc ... Each of these
1553  // states has two functions that perform these actions. The functions
1554  // are almost identical. The major difference is that one version
1555  // catches all exceptions and the other lets exceptions pass through.
1556  // The destructor catches them and the other function named "exit" lets
1557  // them pass through. On a normal termination, boost will always call
1558  // "exit" and then the state destructor. In our state classes, the
1559  // the destructors do nothing if the exit function already took
1560  // care of things. Here's the interesting part. When boost is
1561  // handling an exception the "exit" function is not called (a boost
1562  // feature).
1563  //
1564  // If an exception occurs while the boost machine is in control
1565  // (which usually means inside a process_event call), then
1566  // the boost state machine destroys its states and "terminates" itself.
1567  // This already done before we hit the catch blocks below. In this case
1568  // the call to terminateMachine below only destroys an already
1569  // terminated state machine. Because exit is not called, the state destructors
1570  // handle cleaning up lumis, runs, and files. The destructors swallow
1571  // all exceptions and only pass through the exceptions messages which
1572  // are tacked onto the original exception below.
1573  //
1574  // If an exception occurs when the boost state machine is not
1575  // in control (outside the process_event functions), then boost
1576  // cannot destroy its own states. The terminateMachine function
1577  // below takes care of that. The flag "alreadyHandlingException"
1578  // is set true so that the state exit functions do nothing (and
1579  // cannot throw more exceptions while handling the first). Then the
1580  // state destructors take care of this because exit did nothing.
1581  //
1582  // In both cases above, the EventProcessor::endOfLoop function is
1583  // not called because it can throw exceptions.
1584  //
1585  // One tricky aspect of the state machine is that things which can
1586  // throw should not be invoked by the state machine while another
1587  // exception is being handled.
1588  // Another tricky aspect is that it appears to be important to
1589  // terminate the state machine before invoking its destructor.
1590  // We've seen crashes which are not understood when that is not
1591  // done. Maintainers of this code should be careful about this.
1592 
1593  catch (cms::Exception& e) {
1595  terminateMachine();
1596  alreadyHandlingException_ = false;
1597  e << "cms::Exception caught in EventProcessor and rethrown\n";
1599  e << exceptionMessageRuns_;
1601  throw;
1602  }
1603  catch (std::bad_alloc& e) {
1605  terminateMachine();
1606  alreadyHandlingException_ = false;
1607  throw cms::Exception("std::bad_alloc")
1608  << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n"
1609  << "The job has probably exhausted the virtual memory available to the process.\n"
1613  }
1614  catch (std::exception& e) {
1616  terminateMachine();
1617  alreadyHandlingException_ = false;
1618  throw cms::Exception("StdException")
1619  << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n"
1620  << "Previous information:\n" << e.what() << "\n"
1624  }
1625  catch (...) {
1627  terminateMachine();
1628  alreadyHandlingException_ = false;
1629  throw cms::Exception("Unknown")
1630  << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n"
1634  }
1635 
1636  if(machine_->terminated()) {
1637  FDEBUG(1) << "The state machine reports it has been terminated\n";
1638  machine_.reset();
1639  }
1640 
1641  if(!onlineStateTransitions) changeState(mFinished);
1642 
1644  throw cms::Exception("BadState")
1645  << "The boost state machine in the EventProcessor exited after\n"
1646  << "entering the Error state.\n";
1647  }
1648 
1649  return returnCode;
1650  }
std::string emptyRunLumiMode_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string exceptionMessageRuns_
ServiceToken serviceToken_
std::string exceptionMessageLumis_
volatile event_processor::State state_
boost::shared_ptr< ProcessConfiguration > processConfiguration_
std::auto_ptr< statemachine::Machine > machine_
std::string exceptionMessageFiles_
void changeState(event_processor::Msg)
bool insert(boost::shared_ptr< RunPrincipal > rp)
volatile bool shutdown_flag
boost::mutex usr2_lock
PrincipalCache principalCache_
EventProcessor::StatusCode edm::EventProcessor::runEventCount ( int  numberOfEventsToProcess)
virtual

Implements edm::IEventProcessor.

Definition at line 1406 of file EventProcessor.cc.

References runEdmFileComparison::returnCode, runCommon(), and edm::event_processor::StateSentry::succeeded().

Referenced by run().

1406  {
1407 
1408  StateSentry toerror(this);
1409 
1410  bool onlineStateTransitions = false;
1411  StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
1412 
1413  toerror.succeeded();
1414 
1415  return returnCode;
1416  }
StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess)
EventProcessor::StatusCode edm::EventProcessor::runToCompletion ( bool  onlineStateTransitions)
virtual

Implements edm::IEventProcessor.

Definition at line 1387 of file EventProcessor.cc.

References edm::hlt::Exception, edm::errors::LogicError, machine_, runEdmFileComparison::returnCode, runCommon(), and edm::event_processor::StateSentry::succeeded().

Referenced by asyncRun().

1387  {
1388 
1389  StateSentry toerror(this);
1390 
1391  int numberOfEventsToProcess = -1;
1392  StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess);
1393 
1394  if(machine_.get() != 0) {
1396  << "State machine not destroyed on exit from EventProcessor::runToCompletion\n"
1397  << "Please report this error to the Framework group\n";
1398  }
1399 
1400  toerror.succeeded();
1401 
1402  return returnCode;
1403  }
std::auto_ptr< statemachine::Machine > machine_
StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess)
void edm::EventProcessor::setExceptionMessageFiles ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 1927 of file EventProcessor.cc.

References exceptionMessageFiles_, and argparse::message.

1927  {
1929  }
std::string exceptionMessageFiles_
string message
Definition: argparse.py:126
void edm::EventProcessor::setExceptionMessageLumis ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 1935 of file EventProcessor.cc.

References exceptionMessageLumis_, and argparse::message.

1935  {
1937  }
std::string exceptionMessageLumis_
string message
Definition: argparse.py:126
void edm::EventProcessor::setExceptionMessageRuns ( std::string &  message)
virtual

Implements edm::IEventProcessor.

Definition at line 1931 of file EventProcessor.cc.

References exceptionMessageRuns_, and argparse::message.

1931  {
1933  }
std::string exceptionMessageRuns_
string message
Definition: argparse.py:126
void edm::EventProcessor::setRunNumber ( RunNumber_t  runNumber)

Definition at line 1156 of file EventProcessor.cc.

References beginJob(), changeState(), input_, and edm::event_processor::mSetRun.

Referenced by evf::FUEventProcessor::enableCommon().

1156  {
1157  if(runNumber == 0) {
1158  runNumber = 1;
1159  LogWarning("Invalid Run")
1160  << "EventProcessor::setRunNumber was called with an invalid run number (0)\n"
1161  << "Run number was set to 1 instead\n";
1162  }
1163 
1164  // inside of beginJob there is a check to see if it has been called before
1165  beginJob();
1167 
1168  // interface not correct yet
1169  input_->setRunNumber(runNumber);
1170  }
boost::shared_ptr< InputSource > input_
void changeState(event_processor::Msg)
void edm::EventProcessor::setupSignal ( )
private
bool edm::EventProcessor::shouldWeCloseOutput ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1729 of file EventProcessor.cc.

References FDEBUG, and schedule_.

1729  {
1730  FDEBUG(1) << "\tshouldWeCloseOutput\n";
1731  return schedule_->shouldWeCloseOutput();
1732  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
bool edm::EventProcessor::shouldWeStop ( ) const
virtual

Implements edm::IEventProcessor.

Definition at line 1921 of file EventProcessor.cc.

References FDEBUG, schedule_, and shouldWeStop_.

1921  {
1922  FDEBUG(1) << "\tshouldWeStop\n";
1923  if(shouldWeStop_) return true;
1924  return schedule_->terminate();
1925  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
EventProcessor::StatusCode edm::EventProcessor::shutdownAsync ( unsigned int  timeout_secs = 60 * 2)
EventProcessor::StatusCode edm::EventProcessor::skip ( int  numberToSkip)

Definition at line 685 of file EventProcessor.cc.

References beginJob(), changeState(), edm::IEventProcessor::epSuccess, input_, edm::event_processor::mCountComplete, edm::event_processor::mFinished, edm::event_processor::mSkip, serviceToken_, and edm::event_processor::StateSentry::succeeded().

685  {
686  beginJob(); //make sure this was called
688  {
689  StateSentry toerror(this);
690 
691  //make the services available
693 
694  {
695  input_->skipEvents(numberToSkip);
696  }
698  toerror.succeeded();
699  }
701  return epSuccess;
702  }
boost::shared_ptr< InputSource > input_
ServiceToken serviceToken_
void changeState(event_processor::Msg)
void edm::EventProcessor::startingNewLoop ( )
virtual

Implements edm::IEventProcessor.

Definition at line 1695 of file EventProcessor.cc.

References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.

1695  {
1696  shouldWeStop_ = false;
1697  //NOTE: for first loop, need to delay calling 'doStartingNewLoop'
1698  // until after we've called beginOfJob
1699  if(looper_ && looperBeginJobRun_) {
1700  looper_->doStartingNewLoop();
1701  }
1702  FDEBUG(1) << "\tstartingNewLoop\n";
1703  }
boost::shared_ptr< EDLooperBase > looper_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
char const * edm::EventProcessor::stateName ( event_processor::State  s) const

Definition at line 1137 of file EventProcessor.cc.

References asciidump::s.

Referenced by changeState(), currentStateName(), evf::FWEPWrapper::publishConfigAndMonitorItems(), and evf::FWEPWrapper::stop().

1137  {
1138  return stateNames[s];
1139  }
string s
Definition: asciidump.py:422
EventProcessor::StatusCode edm::EventProcessor::statusAsync ( ) const

Definition at line 1149 of file EventProcessor.cc.

References last_rc_.

Referenced by evf::FUEventProcessor::enableCommon().

1149  {
1150  // the thread will record exception/error status in the event processor
1151  // for us to look at and report here
1152  return last_rc_;
1153  }
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::stopAsync ( unsigned int  timeout_secs = 60 * 2)
void edm::EventProcessor::terminateMachine ( )
private

Definition at line 1943 of file EventProcessor.cc.

References FDEBUG, forceLooperToEnd_, and machine_.

Referenced by endJob(), runCommon(), and ~EventProcessor().

1943  {
1944  if(machine_.get() != 0) {
1945  if(!machine_->terminated()) {
1946  forceLooperToEnd_ = true;
1947  machine_->process_event(statemachine::Stop());
1948  forceLooperToEnd_ = false;
1949  }
1950  else {
1951  FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n";
1952  }
1953  if(machine_->terminated()) {
1954  FDEBUG(1) << "The state machine reports it has been terminated (3)\n";
1955  }
1956  machine_.reset();
1957  }
1958  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< statemachine::Machine > machine_
int edm::EventProcessor::totalEvents ( ) const

Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 1099 of file EventProcessor.cc.

References schedule_.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1099  {
1100  return schedule_->totalEvents();
1101  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsFailed ( ) const

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 1109 of file EventProcessor.cc.

References schedule_.

1109  {
1110  return schedule_->totalEventsFailed();
1111  }
std::auto_ptr< Schedule > schedule_
int edm::EventProcessor::totalEventsPassed ( ) const

Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.

Definition at line 1104 of file EventProcessor.cc.

References schedule_.

Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::monitoring(), and evf::FUEventProcessor::receivingAndMonitor().

1104  {
1105  return schedule_->totalEventsPassed();
1106  }
std::auto_ptr< Schedule > schedule_
EventProcessor::StatusCode edm::EventProcessor::waitForAsyncCompletion ( unsigned int  timeout_seconds)
private

Definition at line 1183 of file EventProcessor.cc.

References edm::IEventProcessor::epTimedOut, event_loop_, event_loop_id_, id_set_, last_rc_, stop_count_, stop_lock_, and stopper_.

Referenced by doneAsync(), shutdownAsync(), stopAsync(), and waitTillDoneAsync().

1183  {
1184  bool rc = true;
1185  boost::xtime timeout;
1186  boost::xtime_get(&timeout, boost::TIME_UTC);
1187  timeout.sec += timeout_seconds;
1188 
1189  // make sure to include a timeout here so we don't wait forever
1190  // I suspect there are still timing issues with thread startup
1191  // and the setting of the various control variables (stop_count, id_set)
1192  {
1193  boost::mutex::scoped_lock sl(stop_lock_);
1194 
1195  // look here - if runAsync not active, just return the last return code
1196  if(stop_count_ < 0) return last_rc_;
1197 
1198  if(timeout_seconds == 0) {
1199  while(stop_count_ == 0) stopper_.wait(sl);
1200  } else {
1201  while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true);
1202  }
1203 
1204  if(rc == false) {
1205  // timeout occurred
1206  // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_);
1207  // this is a temporary hack until we get the input source
1208  // upgraded to allow blocking input sources to be unblocked
1209 
1210  // the next line is dangerous and causes all sorts of trouble
1211  if(id_set_) pthread_cancel(event_loop_id_);
1212 
1213  // we will not do anything yet
1214  LogWarning("timeout")
1215  << "An asynchronous request was made to shut down "
1216  << "the event loop "
1217  << "and the event loop did not shutdown after "
1218  << timeout_seconds << " seconds\n";
1219  } else {
1220  event_loop_->join();
1221  event_loop_.reset();
1222  id_set_ = false;
1223  stop_count_ = -1;
1224  }
1225  }
1226  return rc == false ? epTimedOut : last_rc_;
1227  }
volatile int stop_count_
volatile bool id_set_
boost::shared_ptr< boost::thread > event_loop_
boost::condition stopper_
boost::mutex stop_lock_
volatile pthread_t event_loop_id_
volatile Status last_rc_
EventProcessor::StatusCode edm::EventProcessor::waitTillDoneAsync ( unsigned int  timeout_seconds = 0)

Definition at line 1230 of file EventProcessor.cc.

References changeState(), edm::IEventProcessor::epTimedOut, errorState(), edm::event_processor::mCountComplete, and waitForAsyncCompletion().

Referenced by evf::FWEPWrapper::stop().

1230  {
1231  StatusCode rc = waitForAsyncCompletion(timeout_value_secs);
1233  else errorState();
1234  return rc;
1235  }
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
void changeState(event_processor::Msg)
void edm::EventProcessor::writeLumi ( ProcessHistoryID const &  phid,
int  run,
int  lumi 
)
virtual

Implements edm::IEventProcessor.

Definition at line 1855 of file EventProcessor.cc.

References FDEBUG, edm::PrincipalCache::lumiPrincipal(), principalCache_, and schedule_.

1855  {
1856  schedule_->writeLumi(principalCache_.lumiPrincipal(phid, run, lumi));
1857  FDEBUG(1) << "\twriteLumi " << run << "/" << lumi << "\n";
1858  }
tuple lumi
Definition: fjr2json.py:41
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, int run, int lumi)
PrincipalCache principalCache_
void edm::EventProcessor::writeRun ( statemachine::Run const &  run)
virtual

Implements edm::IEventProcessor.

Definition at line 1845 of file EventProcessor.cc.

References FDEBUG, principalCache_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), and schedule_.

1845  {
1846  schedule_->writeRun(principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()));
1847  FDEBUG(1) << "\twriteRun " << run.runNumber() << "\n";
1848  }
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, int run)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::auto_ptr< Schedule > schedule_
PrincipalCache principalCache_

Friends And Related Function Documentation

friend class event_processor::StateSentry
friend

Definition at line 385 of file EventProcessor.h.

Member Data Documentation

boost::shared_ptr<ActionTable const> edm::EventProcessor::act_table_
private

Definition at line 345 of file EventProcessor.h.

Referenced by init(), and readAndProcessEvent().

boost::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_
private
bool edm::EventProcessor::alreadyHandlingException_
private

Definition at line 374 of file EventProcessor.h.

Referenced by alreadyHandlingException(), and runCommon().

std::string edm::EventProcessor::emptyRunLumiMode_
private

Definition at line 370 of file EventProcessor.h.

Referenced by init(), and runCommon().

boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_
private
boost::shared_ptr<boost::thread> edm::EventProcessor::event_loop_
private

Definition at line 350 of file EventProcessor.h.

Referenced by runAsync(), and waitForAsyncCompletion().

volatile pthread_t edm::EventProcessor::event_loop_id_
private

Definition at line 360 of file EventProcessor.h.

Referenced by asyncRun(), and waitForAsyncCompletion().

ExcludedDataMap edm::EventProcessor::eventSetupDataToExcludeFromPrefetching_
private

Definition at line 384 of file EventProcessor.h.

Referenced by forkProcess(), and init().

std::string edm::EventProcessor::exceptionMessageFiles_
private

Definition at line 371 of file EventProcessor.h.

Referenced by runCommon(), and setExceptionMessageFiles().

std::string edm::EventProcessor::exceptionMessageLumis_
private

Definition at line 373 of file EventProcessor.h.

Referenced by runCommon(), and setExceptionMessageLumis().

std::string edm::EventProcessor::exceptionMessageRuns_
private

Definition at line 372 of file EventProcessor.h.

Referenced by runCommon(), and setExceptionMessageRuns().

boost::shared_ptr<FileBlock> edm::EventProcessor::fb_
private
std::string edm::EventProcessor::fileMode_
private

Definition at line 369 of file EventProcessor.h.

Referenced by init(), and runCommon().

bool edm::EventProcessor::forceESCacheClearOnNewRun_
private

Definition at line 377 of file EventProcessor.h.

Referenced by beginRun(), and init().

bool edm::EventProcessor::forceLooperToEnd_
private

Definition at line 375 of file EventProcessor.h.

Referenced by endOfLoop(), runCommon(), and terminateMachine().

volatile bool edm::EventProcessor::id_set_
private

Definition at line 359 of file EventProcessor.h.

Referenced by asyncRun(), runAsync(), and waitForAsyncCompletion().

boost::shared_ptr<InputSource> edm::EventProcessor::input_
private
std::string edm::EventProcessor::last_error_text_
private

Definition at line 358 of file EventProcessor.h.

Referenced by asyncRun().

volatile Status edm::EventProcessor::last_rc_
private

Definition at line 357 of file EventProcessor.h.

Referenced by asyncRun(), runAsync(), statusAsync(), and waitForAsyncCompletion().

boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_
private
bool edm::EventProcessor::looperBeginJobRun_
private

Definition at line 376 of file EventProcessor.h.

Referenced by beginRun(), and startingNewLoop().

std::auto_ptr<statemachine::Machine> edm::EventProcessor::machine_
private

Definition at line 365 of file EventProcessor.h.

Referenced by runCommon(), runToCompletion(), and terminateMachine().

int edm::EventProcessor::my_sig_num_
private

Definition at line 361 of file EventProcessor.h.

int edm::EventProcessor::numberOfForkedChildren_
private

Definition at line 379 of file EventProcessor.h.

Referenced by forkProcess(), init(), and readFile().

unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_
private

Definition at line 380 of file EventProcessor.h.

Referenced by forkProcess(), and init().

ActivityRegistry::PostProcessEvent edm::EventProcessor::postProcessEventSignal_
private

Definition at line 339 of file EventProcessor.h.

Referenced by connectSigs(), and postProcessEventSignal().

boost::shared_ptr<SignallingProductRegistry> edm::EventProcessor::preg_
private

Definition at line 341 of file EventProcessor.h.

Referenced by init(), and runCommon().

ActivityRegistry::PreProcessEvent edm::EventProcessor::preProcessEventSignal_
private

Definition at line 338 of file EventProcessor.h.

Referenced by connectSigs(), and preProcessEventSignal().

PrincipalCache edm::EventProcessor::principalCache_
private
boost::shared_ptr<ProcessConfiguration> edm::EventProcessor::processConfiguration_
private

Definition at line 346 of file EventProcessor.h.

Referenced by init(), and runCommon().

std::auto_ptr<Schedule> edm::EventProcessor::schedule_
private
ServiceToken edm::EventProcessor::serviceToken_
private

Definition at line 342 of file EventProcessor.h.

Referenced by beginJob(), endJob(), forkProcess(), getToken(), init(), rewind(), runCommon(), and skip().

bool edm::EventProcessor::setCpuAffinity_
private

Definition at line 381 of file EventProcessor.h.

Referenced by forkProcess(), and init().

bool edm::EventProcessor::shouldWeStop_
private

Definition at line 367 of file EventProcessor.h.

Referenced by readAndProcessEvent(), shouldWeStop(), and startingNewLoop().

boost::condition edm::EventProcessor::starter_
private

Definition at line 355 of file EventProcessor.h.

Referenced by asyncRun(), and runAsync().

volatile event_processor::State edm::EventProcessor::state_
private

Definition at line 349 of file EventProcessor.h.

Referenced by beginJob(), changeState(), errorState(), getState(), and runCommon().

boost::mutex edm::EventProcessor::state_lock_
private

Definition at line 352 of file EventProcessor.h.

Referenced by changeState().

bool edm::EventProcessor::stateMachineWasInErrorState_
private

Definition at line 368 of file EventProcessor.h.

Referenced by doErrorStuff(), and runCommon().

volatile int edm::EventProcessor::stop_count_
private

Definition at line 356 of file EventProcessor.h.

Referenced by asyncRun(), runAsync(), and waitForAsyncCompletion().

boost::mutex edm::EventProcessor::stop_lock_
private

Definition at line 353 of file EventProcessor.h.

Referenced by asyncRun(), runAsync(), and waitForAsyncCompletion().

boost::condition edm::EventProcessor::stopper_
private

Definition at line 354 of file EventProcessor.h.

Referenced by asyncRun(), and waitForAsyncCompletion().