#include <EventProcessor.h>
Public Member Functions | |
virtual bool | alreadyHandlingException () const |
void | beginJob () |
virtual void | beginLumi (ProcessHistoryID const &phid, int run, int lumi) |
virtual void | beginRun (statemachine::Run const &run) |
void | clearCounters () |
Clears counters used by trigger report. | |
virtual void | closeInputFile () |
virtual void | closeOutputFiles () |
char const * | currentStateName () const |
void | declareRunNumber (RunNumber_t runNumber) |
virtual void | deleteLumiFromCache (ProcessHistoryID const &phid, int run, int lumi) |
virtual void | deleteRunFromCache (statemachine::Run const &run) |
virtual void | doErrorStuff () |
void | enableEndPaths (bool active) |
void | endJob () |
virtual void | endLumi (ProcessHistoryID const &phid, int run, int lumi) |
virtual bool | endOfLoop () |
bool | endPathsEnabled () const |
virtual void | endRun (statemachine::Run const &run) |
EventProcessor (std::string const &config, bool isPython) | |
meant for unit tests | |
EventProcessor (std::string const &config, std::vector< std::string > const &defaultServices, std::vector< std::string > const &forcedServices=std::vector< std::string >()) | |
EventProcessor (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy legacy) | |
EventProcessor (std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >()) | |
bool | forkProcess (std::string const &jobReportFile) |
std::vector< ModuleDescription const * > | getAllModuleDescriptions () const |
event_processor::State | getState () const |
ServiceToken | getToken () |
void | getTriggerReport (TriggerReport &rep) const |
char const * | msgName (event_processor::Msg m) const |
virtual void | openOutputFiles () |
ActivityRegistry::PostProcessEvent & | postProcessEventSignal () |
virtual void | prepareForNextLoop () |
ActivityRegistry::PreProcessEvent & | preProcessEventSignal () |
virtual int | readAndCacheLumi () |
virtual statemachine::Run | readAndCacheRun () |
virtual void | readAndProcessEvent () |
virtual void | readFile () |
virtual void | respondToCloseInputFile () |
virtual void | respondToCloseOutputFiles () |
virtual void | respondToOpenInputFile () |
virtual void | respondToOpenOutputFiles () |
void | rewind () |
virtual void | rewindInput () |
StatusCode | run (int numberEventsToProcess, bool repeatable=true) |
StatusCode | run () |
void | runAsync () |
virtual StatusCode | runEventCount (int numberOfEventsToProcess) |
virtual StatusCode | runToCompletion (bool onlineStateTransitions) |
virtual void | setExceptionMessageFiles (std::string &message) |
virtual void | setExceptionMessageLumis (std::string &message) |
virtual void | setExceptionMessageRuns (std::string &message) |
void | setRunNumber (RunNumber_t runNumber) |
virtual bool | shouldWeCloseOutput () const |
virtual bool | shouldWeStop () const |
StatusCode | shutdownAsync (unsigned int timeout_secs=60 *2) |
StatusCode | skip (int numberToSkip) |
virtual void | startingNewLoop () |
char const * | stateName (event_processor::State s) const |
StatusCode | statusAsync () const |
StatusCode | stopAsync (unsigned int timeout_secs=60 *2) |
int | totalEvents () const |
int | totalEventsFailed () const |
int | totalEventsPassed () const |
StatusCode | waitTillDoneAsync (unsigned int timeout_seconds=0) |
virtual void | writeLumi (ProcessHistoryID const &phid, int run, int lumi) |
virtual void | writeRun (statemachine::Run const &run) |
~EventProcessor () | |
Private Types | |
typedef std::set< std::pair < std::string, std::string > > | ExcludedData |
typedef std::map< std::string, ExcludedData > | ExcludedDataMap |
Private Member Functions | |
void | changeState (event_processor::Msg) |
void | connectSigs (EventProcessor *ep) |
StatusCode | doneAsync (event_processor::Msg m) |
void | errorState () |
void | init (boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy) |
StatusCode | runCommon (bool onlineStateTransitions, int numberOfEventsToProcess) |
void | setupSignal () |
void | terminateMachine () |
StatusCode | waitForAsyncCompletion (unsigned int timeout_seconds) |
Static Private Member Functions | |
static void | asyncRun (EventProcessor *) |
Private Attributes | |
boost::shared_ptr< ActionTable const > | act_table_ |
boost::shared_ptr < ActivityRegistry > | actReg_ |
bool | alreadyHandlingException_ |
std::string | emptyRunLumiMode_ |
boost::shared_ptr < eventsetup::EventSetupProvider > | esp_ |
boost::shared_ptr< boost::thread > | event_loop_ |
volatile pthread_t | event_loop_id_ |
ExcludedDataMap | eventSetupDataToExcludeFromPrefetching_ |
std::string | exceptionMessageFiles_ |
std::string | exceptionMessageLumis_ |
std::string | exceptionMessageRuns_ |
boost::shared_ptr< FileBlock > | fb_ |
std::string | fileMode_ |
bool | forceESCacheClearOnNewRun_ |
bool | forceLooperToEnd_ |
volatile bool | id_set_ |
boost::shared_ptr< InputSource > | input_ |
std::string | last_error_text_ |
volatile Status | last_rc_ |
boost::shared_ptr< EDLooperBase > | looper_ |
bool | looperBeginJobRun_ |
std::auto_ptr < statemachine::Machine > | machine_ |
int | my_sig_num_ |
int | numberOfForkedChildren_ |
unsigned int | numberOfSequentialEventsPerChild_ |
ActivityRegistry::PostProcessEvent | postProcessEventSignal_ |
boost::shared_ptr < SignallingProductRegistry > | preg_ |
ActivityRegistry::PreProcessEvent | preProcessEventSignal_ |
PrincipalCache | principalCache_ |
boost::shared_ptr < ProcessConfiguration > | processConfiguration_ |
std::auto_ptr< Schedule > | schedule_ |
ServiceToken | serviceToken_ |
bool | setCpuAffinity_ |
bool | shouldWeStop_ |
boost::condition | starter_ |
volatile event_processor::State | state_ |
boost::mutex | state_lock_ |
bool | stateMachineWasInErrorState_ |
volatile int | stop_count_ |
boost::mutex | stop_lock_ |
boost::condition | stopper_ |
Friends | |
class | event_processor::StateSentry |
Definition at line 65 of file EventProcessor.h.
typedef std::set<std::pair<std::string, std::string> > edm::EventProcessor::ExcludedData [private] |
Definition at line 382 of file EventProcessor.h.
typedef std::map<std::string, ExcludedData> edm::EventProcessor::ExcludedDataMap [private] |
Definition at line 383 of file EventProcessor.h.
edm::EventProcessor::EventProcessor | ( | std::string const & | config, |
ServiceToken const & | token = ServiceToken() , |
||
serviceregistry::ServiceLegacy | iLegacy = serviceregistry::kOverlapIsError , |
||
std::vector< std::string > const & | defaultServices = std::vector<std::string>() , |
||
std::vector< std::string > const & | forcedServices = std::vector<std::string>() |
||
) | [explicit] |
Definition at line 353 of file EventProcessor.cc.
References init(), and PythonProcessDesc::processDesc().
: preProcessEventSignal_(), postProcessEventSignal_(), actReg_(new ActivityRegistry), preg_(new SignallingProductRegistry), serviceToken_(), input_(), esp_(), act_table_(), processConfiguration_(), schedule_(), state_(sInit), event_loop_(), state_lock_(), stop_lock_(), stopper_(), stop_count_(-1), last_rc_(epSuccess), last_error_text_(), id_set_(false), event_loop_id_(), my_sig_num_(getSigNum()), fb_(), looper_(), shouldWeStop_(false), stateMachineWasInErrorState_(false), alreadyHandlingException_(false), forceLooperToEnd_(false), looperBeginJobRun_(false), forceESCacheClearOnNewRun_(false), numberOfForkedChildren_(0), numberOfSequentialEventsPerChild_(1), setCpuAffinity_(false) { boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc(); processDesc->addServices(defaultServices, forcedServices); init(processDesc, iToken, iLegacy); }
edm::EventProcessor::EventProcessor | ( | std::string const & | config, |
std::vector< std::string > const & | defaultServices, | ||
std::vector< std::string > const & | forcedServices = std::vector<std::string>() |
||
) |
Definition at line 395 of file EventProcessor.cc.
References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::processDesc().
: preProcessEventSignal_(), postProcessEventSignal_(), actReg_(new ActivityRegistry), preg_(new SignallingProductRegistry), serviceToken_(), input_(), esp_(), act_table_(), processConfiguration_(), schedule_(), state_(sInit), event_loop_(), state_lock_(), stop_lock_(), stopper_(), stop_count_(-1), last_rc_(epSuccess), last_error_text_(), id_set_(false), event_loop_id_(), my_sig_num_(getSigNum()), fb_(), looper_(), shouldWeStop_(false), stateMachineWasInErrorState_(false), alreadyHandlingException_(false), forceLooperToEnd_(false), looperBeginJobRun_(false), forceESCacheClearOnNewRun_(false), numberOfForkedChildren_(0), numberOfSequentialEventsPerChild_(1), setCpuAffinity_(false) { boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc(); processDesc->addServices(defaultServices, forcedServices); init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError); }
edm::EventProcessor::EventProcessor | ( | boost::shared_ptr< ProcessDesc > & | processDesc, |
ServiceToken const & | token, | ||
serviceregistry::ServiceLegacy | legacy | ||
) |
Definition at line 435 of file EventProcessor.cc.
References init().
: preProcessEventSignal_(), postProcessEventSignal_(), actReg_(new ActivityRegistry), preg_(new SignallingProductRegistry), serviceToken_(), input_(), esp_(), act_table_(), processConfiguration_(), schedule_(), state_(sInit), event_loop_(), state_lock_(), stop_lock_(), stopper_(), stop_count_(-1), last_rc_(epSuccess), last_error_text_(), id_set_(false), event_loop_id_(), my_sig_num_(getSigNum()), fb_(), looper_(), shouldWeStop_(false), stateMachineWasInErrorState_(false), alreadyHandlingException_(false), forceLooperToEnd_(false), looperBeginJobRun_(false), forceESCacheClearOnNewRun_(false) { init(processDesc, token, legacy); }
edm::EventProcessor::EventProcessor | ( | std::string const & | config, |
bool | isPython | ||
) |
meant for unit tests
Definition at line 472 of file EventProcessor.cc.
References init(), edm::serviceregistry::kOverlapIsError, and PythonProcessDesc::processDesc().
: preProcessEventSignal_(), postProcessEventSignal_(), actReg_(new ActivityRegistry), preg_(new SignallingProductRegistry), serviceToken_(), input_(), esp_(), act_table_(), processConfiguration_(), schedule_(), state_(sInit), event_loop_(), state_lock_(), stop_lock_(), stopper_(), stop_count_(-1), last_rc_(epSuccess), last_error_text_(), id_set_(false), event_loop_id_(), my_sig_num_(getSigNum()), fb_(), looper_(), shouldWeStop_(false), stateMachineWasInErrorState_(false), alreadyHandlingException_(false), forceLooperToEnd_(false), looperBeginJobRun_(false), forceESCacheClearOnNewRun_(false) { if(isPython) { boost::shared_ptr<ProcessDesc> processDesc = PythonProcessDesc(config).processDesc(); init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError); } else { boost::shared_ptr<ProcessDesc> processDesc(new ProcessDesc(config)); init(processDesc, ServiceToken(), serviceregistry::kOverlapIsError); } }
edm::EventProcessor::~EventProcessor | ( | ) |
Definition at line 616 of file EventProcessor.cc.
References actReg_, changeState(), edm::BranchIDListHelper::clearRegistries(), edm::detail::ThreadSafeRegistry< KEY, T, E >::data(), ExpressReco_HICollisions_FallBack::e, esp_, cms::Exception::explainSelf(), edm::detail::ThreadSafeRegistry< KEY, T, E >::extra(), getToken(), input_, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), looper_, edm::event_processor::mDtor, schedule_, and terminateMachine().
{ // Make the services available while everything is being deleted. ServiceToken token = getToken(); ServiceRegistry::Operate op(token); // The state machine should have already been cleaned up // and destroyed at this point by a call to EndJob or // earlier when it completed processing events, but if it // has not been we'll take care of it here at the last moment. // This could cause problems if we are already handling an // exception and another one is thrown here ... For a critical // executable the solution to this problem is for the code using // the EventProcessor to explicitly call EndJob or use runToCompletion, // then the next line of code is never executed. terminateMachine(); try { changeState(mDtor); } catch(cms::Exception& e) { LogError("System") << e.explainSelf() << "\n"; } // manually destroy all these thing that may need the services around esp_.reset(); schedule_.reset(); input_.reset(); looper_.reset(); actReg_.reset(); pset::Registry* psetRegistry = pset::Registry::instance(); psetRegistry->data().clear(); psetRegistry->extra().setID(ParameterSetID()); EntryDescriptionRegistry::instance()->data().clear(); ParentageRegistry::instance()->data().clear(); ProcessConfigurationRegistry::instance()->data().clear(); ProcessHistoryRegistry::instance()->data().clear(); BranchIDListHelper::clearRegistries(); }
bool edm::EventProcessor::alreadyHandlingException | ( | ) | const [virtual] |
Implements edm::IEventProcessor.
Definition at line 1939 of file EventProcessor.cc.
References alreadyHandlingException_.
{ return alreadyHandlingException_; }
void edm::EventProcessor::asyncRun | ( | EventProcessor * | me | ) | [static, private] |
Definition at line 1323 of file EventProcessor.cc.
References ExpressReco_HICollisions_FallBack::e, edm::IEventProcessor::epException, edm::IEventProcessor::epOther, event_loop_id_, exception, cms::Exception::explainSelf(), FDEBUG, id_set_, last_error_text_, last_rc_, runToCompletion(), starter_, stop_count_, stop_lock_, and stopper_.
Referenced by runAsync().
{ // set up signals to allow for interruptions // ignore all other signals // make sure no exceptions escape out // temporary hack until we modify the input source to allow // wakeup calls from other threads. This mimics the solution // in EventFilter/Processor, which I do not like. // allowing cancels means that the thread just disappears at // certain points. This is bad for C++ stack variables. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0); //pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, 0); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0); { boost::mutex::scoped_lock(me->stop_lock_); me->event_loop_id_ = pthread_self(); me->id_set_ = true; me->starter_.notify_all(); } Status rc = epException; FDEBUG(2) << "asyncRun starting ......................\n"; try { bool onlineStateTransitions = true; rc = me->runToCompletion(onlineStateTransitions); } catch (cms::Exception& e) { LogError("FwkJob") << "cms::Exception caught in " << "EventProcessor::asyncRun" << "\n" << e.explainSelf(); me->last_error_text_ = e.explainSelf(); } catch (std::exception& e) { LogError("FwkJob") << "Standard library exception caught in " << "EventProcessor::asyncRun" << "\n" << e.what(); me->last_error_text_ = e.what(); } catch (...) { LogError("FwkJob") << "Unknown exception caught in " << "EventProcessor::asyncRun" << "\n"; me->last_error_text_ = "Unknown exception caught"; rc = epOther; } me->last_rc_ = rc; { // notify anyone waiting for exit that we are doing so now boost::mutex::scoped_lock sl(me->stop_lock_); ++me->stop_count_; me->stopper_.notify_all(); } FDEBUG(2) << "asyncRun ending ......................\n"; }
void edm::EventProcessor::beginJob | ( | void | ) |
This should be called before the first call to 'run' If this is not called in time, it will automatically be called the first time 'run' is called
Definition at line 705 of file EventProcessor.cc.
References actReg_, changeState(), ExpressReco_HICollisions_FallBack::e, exception, input_, edm::event_processor::mBeginJob, schedule_, serviceToken_, edm::event_processor::sInit, and state_.
Referenced by declareRunNumber(), forkProcess(), rewind(), runAsync(), runCommon(), setRunNumber(), and skip().
{ if(state_ != sInit) return; bk::beginJob(); // can only be run if in the initial state changeState(mBeginJob); // StateSentry toerror(this); // should we add this ? //make the services available ServiceRegistry::Operate operate(serviceToken_); //NOTE: This implementation assumes 'Job' means one call // the EventProcessor::run // If it really means once per 'application' then this code will // have to be changed. // Also have to deal with case where have 'run' then new Module // added and do 'run' // again. In that case the newly added Module needs its 'beginJob' // to be called. //NOTE: in future we should have a beginOfJob for looper which takes no arguments // For now we delay calling beginOfJob until first beginOfRun //if(looper_) { // looper_->beginOfJob(es); //} try { input_->doBeginJob(); } catch(cms::Exception& e) { LogError("BeginJob") << "A cms::Exception happened while processing the beginJob of the 'source'\n"; e << "A cms::Exception happened while processing the beginJob of the 'source'\n"; throw; } catch(std::exception& e) { LogError("BeginJob") << "A std::exception happened while processing the beginJob of the 'source'\n"; throw; } catch(...) { LogError("BeginJob") << "An unknown exception happened while processing the beginJob of the 'source'\n"; throw; } schedule_->beginJob(); actReg_->postBeginJobSignal_(); // toerror.succeeded(); // should we add this? }
void edm::EventProcessor::beginLumi | ( | ProcessHistoryID const & | phid, |
int | run, | ||
int | lumi | ||
) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1788 of file EventProcessor.cc.
References actReg_, edm::LuminosityBlockPrincipal::beginTime(), esp_, FDEBUG, input_, edm::Service< T >::isAvailable(), looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), principalCache_, edm::LuminosityBlockPrincipal::run(), and schedule_.
{ LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi); input_->doBeginLumi(lumiPrincipal); Service<RandomNumberGenerator> rng; if(rng.isAvailable()) { LuminosityBlock lb(lumiPrincipal, ModuleDescription()); rng->preBeginLumi(lb); } // NOTE: Using 0 as the event number for the begin of a lumi block is a bad idea // lumi blocks know their start and end times why not also start and end events? IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0), lumiPrincipal.beginTime()); EventSetup const& es = esp_->eventSetupForInstance(ts); { typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits; ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es); schedule_->processOneOccurrence<Traits>(lumiPrincipal, es); } FDEBUG(1) << "\tbeginLumi " << run << "/" << lumi << "\n"; if(looper_) { looper_->doBeginLuminosityBlock(lumiPrincipal, es); } }
void edm::EventProcessor::beginRun | ( | statemachine::Run const & | run | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1745 of file EventProcessor.cc.
References actReg_, edm::RunPrincipal::beginTime(), esp_, FDEBUG, forceESCacheClearOnNewRun_, input_, looper_, looperBeginJobRun_, principalCache_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), and schedule_.
{ RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()); input_->doBeginRun(runPrincipal); IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime()); if(forceESCacheClearOnNewRun_){ esp_->forceCacheClear(); } EventSetup const& es = esp_->eventSetupForInstance(ts); if(looper_ && looperBeginJobRun_== false) { looper_->copyInfo(ScheduleInfo(schedule_.get())); looper_->beginOfJob(es); looperBeginJobRun_ = true; looper_->doStartingNewLoop(); } { typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits; ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es); schedule_->processOneOccurrence<Traits>(runPrincipal, es); } FDEBUG(1) << "\tbeginRun " << run.runNumber() << "\n"; if(looper_) { looper_->doBeginRun(runPrincipal, es); } }
void edm::EventProcessor::changeState | ( | event_processor::Msg | msg | ) | [private] |
Definition at line 1267 of file EventProcessor.cc.
References cond::rpcobimon::current, edm::TransEntry::current, Exception, FDEBUG, edm::TransEntry::final, edm::event_processor::mAny, edm::TransEntry::message, msgName(), edm::event_processor::sInvalid, state_, state_lock_, stateName(), and edm::table.
Referenced by beginJob(), declareRunNumber(), doneAsync(), endJob(), rewind(), runAsync(), runCommon(), setRunNumber(), shutdownAsync(), skip(), stopAsync(), waitTillDoneAsync(), ~EventProcessor(), and edm::event_processor::StateSentry::~StateSentry().
{ // most likely need to serialize access to this routine boost::mutex::scoped_lock sl(state_lock_); State curr = state_; int rc; // found if(not end of table) and // (state == table.state && (msg == table.message || msg == any)) for(rc = 0; table[rc].current != sInvalid && (curr != table[rc].current || (curr == table[rc].current && msg != table[rc].message && table[rc].message != mAny)); ++rc); if(table[rc].current == sInvalid) throw cms::Exception("BadState") << "A member function of EventProcessor has been called in an" << " inappropriate order.\n" << "Bad transition from " << stateName(curr) << " " << "using message " << msgName(msg) << "\n" << "No where to go from here.\n"; FDEBUG(1) << "changeState: current=" << stateName(curr) << ", message=" << msgName(msg) << " -> new=" << stateName(table[rc].final) << "\n"; state_ = table[rc].final; }
void edm::EventProcessor::clearCounters | ( | ) |
Clears counters used by trigger report.
Definition at line 1129 of file EventProcessor.cc.
References schedule_.
{ schedule_->clearCounters(); }
void edm::EventProcessor::closeInputFile | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1660 of file EventProcessor.cc.
void edm::EventProcessor::closeOutputFiles | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1670 of file EventProcessor.cc.
void edm::EventProcessor::connectSigs | ( | EventProcessor * | ep | ) | [private] |
Definition at line 1085 of file EventProcessor.cc.
References actReg_, postProcessEventSignal_, and preProcessEventSignal_.
Referenced by init().
char const * edm::EventProcessor::currentStateName | ( | ) | const |
Member functions to support asynchronous interface.
Definition at line 1133 of file EventProcessor.cc.
References getState(), and stateName().
Referenced by evf::FWEPWrapper::microState(), and evf::FWEPWrapper::monitoring().
void edm::EventProcessor::declareRunNumber | ( | RunNumber_t | runNumber | ) |
Definition at line 1173 of file EventProcessor.cc.
References beginJob(), changeState(), and edm::event_processor::mSetRun.
{ // inside of beginJob there is a check to see if it has been called before beginJob(); changeState(mSetRun); // interface not correct yet - wait for Bill to be done with run/lumi loop stuff 21-Jun-2007 //input_->declareRunNumber(runNumber); }
void edm::EventProcessor::deleteLumiFromCache | ( | ProcessHistoryID const & | phid, |
int | run, | ||
int | lumi | ||
) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1860 of file EventProcessor.cc.
References edm::PrincipalCache::deleteLumi(), FDEBUG, and principalCache_.
void edm::EventProcessor::deleteRunFromCache | ( | statemachine::Run const & | run | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1850 of file EventProcessor.cc.
References edm::PrincipalCache::deleteRun(), FDEBUG, principalCache_, statemachine::Run::processHistoryID(), and statemachine::Run::runNumber().
{ principalCache_.deleteRun(run.processHistoryID(), run.runNumber()); FDEBUG(1) << "\tdeleteRunFromCache " << run.runNumber() << "\n"; }
void edm::EventProcessor::doErrorStuff | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1734 of file EventProcessor.cc.
References FDEBUG, and stateMachineWasInErrorState_.
{ FDEBUG(1) << "\tdoErrorStuff\n"; LogError("StateMachine") << "The EventProcessor state machine encountered an unexpected event\n" << "and went to the error state\n" << "Will attempt to terminate processing normally\n" << "(IF using the looper the next loop will be attempted)\n" << "This likely indicates a bug in an input module or corrupted input or both\n"; stateMachineWasInErrorState_ = true; }
EventProcessor::StatusCode edm::EventProcessor::doneAsync | ( | event_processor::Msg | m | ) | [private] |
Definition at line 1259 of file EventProcessor.cc.
References changeState(), and waitForAsyncCompletion().
{ // make sure to include a timeout here so we don't wait forever // I suspect there are still timing issues with thread startup // and the setting of the various control variables (stop_count, id_set) changeState(m); return waitForAsyncCompletion(60*2); }
void edm::EventProcessor::enableEndPaths | ( | bool | active | ) |
Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.
Definition at line 1114 of file EventProcessor.cc.
References schedule_.
{ schedule_->enableEndPaths(active); }
void edm::EventProcessor::endJob | ( | void | ) |
This should be called before the EventProcessor is destroyed throws if any module's endJob throws an exception.
Definition at line 749 of file EventProcessor.cc.
References actReg_, trackerHits::c, edm::ExceptionCollector::call(), changeState(), edm::InputSource::doEndJob(), edm::Schedule::endJob(), edm::EDLooperBase::endOfJob(), edm::ExceptionCollector::hasThrown(), input_, looper_, edm::event_processor::mEndJob, cmsCodeRules::cppFunctionSkipper::operator, edm::ExceptionCollector::rethrow(), schedule_, serviceToken_, and terminateMachine().
Referenced by evf::FWEPWrapper::stopAndHalt().
{ // Collects exceptions, so we don't throw before all operations are performed. ExceptionCollector c; // only allowed to run if state is sIdle, sJobReady, sRunGiven c.call(boost::bind(&EventProcessor::changeState, this, mEndJob)); //make the services available ServiceRegistry::Operate operate(serviceToken_); c.call(boost::bind(&EventProcessor::terminateMachine, this)); c.call(boost::bind(&Schedule::endJob, schedule_.get())); c.call(boost::bind(&InputSource::doEndJob, input_)); if(looper_) { c.call(boost::bind(&EDLooperBase::endOfJob, looper_)); } c.call(boost::bind(&ActivityRegistry::PostEndJob::operator(), &actReg_->postEndJobSignal_)); if(c.hasThrown()) { c.rethrow(); } }
void edm::EventProcessor::endLumi | ( | ProcessHistoryID const & | phid, |
int | run, | ||
int | lumi | ||
) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1813 of file EventProcessor.cc.
References actReg_, edm::LuminosityBlockPrincipal::endTime(), esp_, FDEBUG, input_, looper_, edm::LuminosityBlockPrincipal::luminosityBlock(), edm::PrincipalCache::lumiPrincipal(), edm::EventID::maxEventNumber(), principalCache_, edm::LuminosityBlockPrincipal::run(), and schedule_.
{ LuminosityBlockPrincipal& lumiPrincipal = principalCache_.lumiPrincipal(phid, run, lumi); input_->doEndLumi(lumiPrincipal); //NOTE: Using the max event number for the end of a lumi block is a bad idea // lumi blocks know their start and end times why not also start and end events? IOVSyncValue ts(EventID(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), EventID::maxEventNumber()), lumiPrincipal.endTime()); EventSetup const& es = esp_->eventSetupForInstance(ts); { typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits; ScheduleSignalSentry<Traits> sentry(actReg_.get(), &lumiPrincipal, &es); schedule_->processOneOccurrence<Traits>(lumiPrincipal, es); } FDEBUG(1) << "\tendLumi " << run << "/" << lumi << "\n"; if(looper_) { looper_->doEndLuminosityBlock(lumiPrincipal, es); } }
bool edm::EventProcessor::endOfLoop | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1705 of file EventProcessor.cc.
References esp_, FDEBUG, forceLooperToEnd_, edm::EDLooperBase::kContinue, looper_, schedule_, and ntuplemaker::status.
{ if(looper_) { ModuleChanger changer(schedule_.get()); looper_->setModuleChanger(&changer); EDLooperBase::Status status = looper_->doEndOfLoop(esp_->eventSetup()); looper_->setModuleChanger(0); if(status != EDLooperBase::kContinue || forceLooperToEnd_) return true; else return false; } FDEBUG(1) << "\tendOfLoop\n"; return true; }
bool edm::EventProcessor::endPathsEnabled | ( | ) | const |
Return true if end_paths are active, and false if they are inactive.
Definition at line 1119 of file EventProcessor.cc.
References schedule_.
{ return schedule_->endPathsEnabled(); }
void edm::EventProcessor::endRun | ( | statemachine::Run const & | run | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1771 of file EventProcessor.cc.
References actReg_, edm::RunPrincipal::endTime(), esp_, FDEBUG, input_, looper_, edm::EventID::maxEventNumber(), edm::LuminosityBlockID::maxLuminosityBlockNumber(), principalCache_, statemachine::Run::processHistoryID(), edm::RunPrincipal::run(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), and schedule_.
{ RunPrincipal& runPrincipal = principalCache_.runPrincipal(run.processHistoryID(), run.runNumber()); input_->doEndRun(runPrincipal); IOVSyncValue ts(EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()), runPrincipal.endTime()); EventSetup const& es = esp_->eventSetupForInstance(ts); { typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits; ScheduleSignalSentry<Traits> sentry(actReg_.get(), &runPrincipal, &es); schedule_->processOneOccurrence<Traits>(runPrincipal, es); } FDEBUG(1) << "\tendRun " << run.runNumber() << "\n"; if(looper_) { looper_->doEndRun(runPrincipal, es); } }
void edm::EventProcessor::errorState | ( | ) | [private] |
Definition at line 1254 of file EventProcessor.cc.
References edm::event_processor::sError, and state_.
Referenced by shutdownAsync(), stopAsync(), and waitTillDoneAsync().
bool edm::EventProcessor::forkProcess | ( | std::string const & | jobReportFile | ) |
Definition at line 863 of file EventProcessor.cc.
References actReg_, beginJob(), ExpressReco_HICollisions_FallBack::cerr, gather_cfg::cout, edm::eventsetup::EventSetupRecord::doGet(), ExpressReco_HICollisions_FallBack::e, esp_, eventSetupDataToExcludeFromPrefetching_, Exception, cmsRelvalreport::exit, edm::eventsetup::EventSetupRecord::fillRegisteredDataKeys(), edm::eventsetup::EventSetupRecord::find(), input_, edm::installCustomHandler(), edm::InputSource::IsFile, edm::InputSource::IsRun, NULL, numberOfForkedChildren_, numberOfSequentialEventsPerChild_, readFile(), schedule_, serviceToken_, setCpuAffinity_, edm::shutdown_flag, and cms::Exception::what().
{ if(0 == numberOfForkedChildren_) {return true;} assert(0<numberOfForkedChildren_); //do what we want done in common { beginJob(); //make sure this was run // make the services available ServiceRegistry::Operate operate(serviceToken_); InputSource::ItemType itemType; itemType = input_->nextItemType(); assert(itemType == InputSource::IsFile); { readFile(); } itemType = input_->nextItemType(); assert(itemType == InputSource::IsRun); std::cout << " prefetching for run " << input_->runAuxiliary()->run() << std::endl; IOVSyncValue ts(EventID(input_->runAuxiliary()->run(), 0, 0), input_->runAuxiliary()->beginTime()); EventSetup const& es = esp_->eventSetupForInstance(ts); //now get all the data available in the EventSetup std::vector<eventsetup::EventSetupRecordKey> recordKeys; es.fillAvailableRecordKeys(recordKeys); std::vector<eventsetup::DataKey> dataKeys; for(std::vector<eventsetup::EventSetupRecordKey>::const_iterator itKey = recordKeys.begin(), itEnd = recordKeys.end(); itKey != itEnd; ++itKey) { eventsetup::EventSetupRecord const* recordPtr = es.find(*itKey); //see if this is on our exclusion list ExcludedDataMap::const_iterator itExcludeRec = eventSetupDataToExcludeFromPrefetching_.find(itKey->type().name()); ExcludedData const* excludedData(0); if(itExcludeRec != eventSetupDataToExcludeFromPrefetching_.end()) { excludedData = &(itExcludeRec->second); if(excludedData->size() == 0 || excludedData->begin()->first == "*") { //skip all items in this record continue; } } if(0 != recordPtr) { dataKeys.clear(); recordPtr->fillRegisteredDataKeys(dataKeys); for(std::vector<eventsetup::DataKey>::const_iterator itDataKey = dataKeys.begin(), itDataKeyEnd = dataKeys.end(); itDataKey != itDataKeyEnd; ++itDataKey) { //std::cout << " " << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl; if(0 != excludedData && excludedData->find(std::make_pair(itDataKey->type().name(), itDataKey->name().value())) != excludedData->end()) { std::cout << " excluding:" << itDataKey->type().name() << " " << itDataKey->name().value() << std::endl; continue; } try { recordPtr->doGet(*itDataKey); } catch(cms::Exception& e) { LogWarning("EventSetupPreFetching") << e.what(); } } } } } std::cout << " done prefetching" << std::endl; { // make the services available ServiceRegistry::Operate operate(serviceToken_); Service<JobReport> jobReport; jobReport->parentBeforeFork(jobReportFile, numberOfForkedChildren_); //Now actually do the forking actReg_->preForkReleaseResourcesSignal_(); input_->doPreForkReleaseResources(); schedule_->preForkReleaseResources(); } installCustomHandler(SIGCHLD, ep_sigchld); //Create a message queue used to set what events each child processes int queueID; if(-1 == (queueID = msgget(IPC_PRIVATE, IPC_CREAT|0660))) { printf("Error obtaining message queue\n"); exit(EXIT_FAILURE); } unsigned int childIndex = 0; unsigned int const kMaxChildren = numberOfForkedChildren_; unsigned int const numberOfDigitsInIndex = numberOfDigitsInChildIndex(kMaxChildren); std::vector<pid_t> childrenIds; childrenIds.reserve(kMaxChildren); { // make the services available ServiceRegistry::Operate operate(serviceToken_); Service<JobReport> jobReport; for(; childIndex < kMaxChildren; ++childIndex) { pid_t value = fork(); if(value == 0) { // this is the child process, redirect stdout and stderr to a log file fflush(stdout); fflush(stderr); std::stringstream stout; stout << "redirectout_" << getpgrp() << "_" << std::setw(numberOfDigitsInIndex) << std::setfill('0') << childIndex << ".log"; if(0 == freopen(stout.str().c_str(), "w", stdout)) { std::cerr << "Error during freopen of child process " << childIndex << std::endl; } if(dup2(fileno(stdout), fileno(stderr)) < 0) { std::cerr << "Error during dup2 of child process" << childIndex << std::endl; } std::cout << "I am child " << childIndex << " with pgid " << getpgrp() << std::endl; if(setCpuAffinity_) { // CPU affinity is handled differently on macosx. // We disable it and print a message until someone reads: // // http://developer.apple.com/mac/library/releasenotes/Performance/RN-AffinityAPI/index.html // // and implements it. #ifdef __APPLE__ std::cout << "Architecture support for CPU affinity not implemented." << std::endl; #else std::cout << "Setting CPU affinity, setting this child to cpu " << childIndex << std::endl; cpu_set_t mask; CPU_ZERO(&mask); CPU_SET(childIndex, &mask); if(sched_setaffinity(0, sizeof(mask), &mask) != 0) { std::cerr << "Failed to set the cpu affinity, errno " << errno << std::endl; exit(-1); } #endif } break; } if(value < 0) { std::cerr << "failed to create a child" << std::endl; //message queue is a system resource so must be cleaned up // before parent goes away msgctl(queueID, IPC_RMID, 0); exit(-1); } childrenIds.push_back(value); } if(childIndex < kMaxChildren) { jobReport->childAfterFork(jobReportFile, childIndex, kMaxChildren); actReg_->postForkReacquireResourcesSignal_(childIndex, kMaxChildren); boost::shared_ptr<multicore::MessageReceiverForSource> receiver(new multicore::MessageReceiverForSource(queueID)); input_->doPostForkReacquireResources(receiver); schedule_->postForkReacquireResources(childIndex, kMaxChildren); //NOTE: sources have to reset themselves by listening to the post fork message //rewindInput(); return true; } jobReport->parentAfterFork(jobReportFile); } //this is the original which is now the master for all the children //Need to wait for signals from the children or externally // To wait we must // 1) block the signals we want to wait on so we do not have a race condition // 2) check that we haven't already meet our ending criteria // 3) call sigsuspend which unblocks the signals and waits until a signal is caught sigset_t blockingSigSet; sigset_t unblockingSigSet; sigset_t oldSigSet; pthread_sigmask(SIG_SETMASK, NULL, &unblockingSigSet); pthread_sigmask(SIG_SETMASK, NULL, &blockingSigSet); sigaddset(&blockingSigSet, SIGCHLD); sigaddset(&blockingSigSet, SIGUSR2); sigaddset(&blockingSigSet, SIGINT); sigdelset(&unblockingSigSet, SIGCHLD); sigdelset(&unblockingSigSet, SIGUSR2); sigdelset(&unblockingSigSet, SIGINT); pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet); //create a thread which sends the units of work to workers // we create it after all signals were blocked so that this // thread is never interupted by a signal MessageSenderToSource sender(queueID, numberOfSequentialEventsPerChild_); boost::thread senderThread(sender); while(!shutdown_flag && !child_failed && (childrenIds.size() != num_children_done)) { sigsuspend(&unblockingSigSet); std::cout << "woke from sigwait" << std::endl; } pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL); std::cout << "num children who have already stopped " << num_children_done << std::endl; if(child_failed) { std::cout << "child failed" << std::endl; } if(shutdown_flag) { std::cout << "asked to shutdown" << std::endl; } if(shutdown_flag || (child_failed && (num_children_done != childrenIds.size()))) { std::cout << "must stop children" << std::endl; for(std::vector<pid_t>::iterator it = childrenIds.begin(), itEnd = childrenIds.end(); it != itEnd; ++it) { /* int result = */ kill(*it, SIGUSR2); } pthread_sigmask(SIG_BLOCK, &blockingSigSet, &oldSigSet); while(num_children_done != kMaxChildren) { sigsuspend(&unblockingSigSet); } pthread_sigmask(SIG_SETMASK, &oldSigSet, NULL); } //remove message queue, this will cause the message thread to terminate // kill it now since all children already stopped msgctl(queueID, IPC_RMID, 0); //now wait for the sender thread to finish. This should be quick since // we killed the message queue senderThread.join(); if(child_failed) { throw cms::Exception("ForkedChildFailed") << "child process ended abnormally with exit code " << child_fail_exit_status; } return false; }
std::vector< ModuleDescription const * > edm::EventProcessor::getAllModuleDescriptions | ( | ) | const |
Return a vector allowing const access to all the ModuleDescriptions for this EventProccessor. *** N.B. *** Ownership of the ModuleDescriptions is *not* passed to the caller. Do not call delete on these pointers!
Definition at line 1094 of file EventProcessor.cc.
References schedule_.
Referenced by evf::FWEPWrapper::init().
{ return schedule_->getAllModuleDescriptions(); }
State edm::EventProcessor::getState | ( | ) | const |
Definition at line 1145 of file EventProcessor.cc.
References state_.
Referenced by currentStateName(), evf::FWEPWrapper::microState(), evf::FWEPWrapper::monitoring(), evf::FWEPWrapper::stop(), and evf::FWEPWrapper::stopAndHalt().
{ return state_; }
ServiceToken edm::EventProcessor::getToken | ( | ) |
Definition at line 772 of file EventProcessor.cc.
References serviceToken_.
Referenced by evf::FWEPWrapper::moduleWeb(), evf::FWEPWrapper::serviceWeb(), evf::FWEPWrapper::taskWebPage(), and ~EventProcessor().
{ return serviceToken_; }
void edm::EventProcessor::getTriggerReport | ( | TriggerReport & | rep | ) | const |
Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.
Definition at line 1124 of file EventProcessor.cc.
References schedule_.
Referenced by evf::FWEPWrapper::getTriggerReport(), evf::FWEPWrapper::init(), and evf::FWEPWrapper::taskWebPage().
{ schedule_->getTriggerReport(rep); }
void edm::EventProcessor::init | ( | boost::shared_ptr< ProcessDesc > & | processDesc, |
ServiceToken const & | token, | ||
serviceregistry::ServiceLegacy | iLegacy | ||
) | [private] |
Definition at line 514 of file EventProcessor.cc.
References act_table_, actReg_, edm::BranchIDListHelper::clearRegistries(), connectSigs(), edm::ServiceToken::copySlotsTo(), edm::ServiceRegistry::createContaining(), edm::ServiceRegistry::createSet(), emptyRunLumiMode_, esp_, eventSetupDataToExcludeFromPrefetching_, FDEBUG, fileMode_, edm::fillLooper(), forceESCacheClearOnNewRun_, edm::getPassID(), edm::getReleaseVersion(), edm::ParameterSet::getUntrackedParameterSet(), input_, edm::eventsetup::heterocontainer::insert(), edm::ServiceRegistry::instance(), edm::serviceregistry::kOverlapIsError, looper_, edm::makeInput(), numberOfForkedChildren_, numberOfSequentialEventsPerChild_, preg_, principalCache_, processConfiguration_, schedule_, serviceToken_, and setCpuAffinity_.
Referenced by EventProcessor().
{ // The BranchIDListRegistry and ProductIDListRegistry are indexed registries, and are singletons. // They must be cleared here because some processes run multiple EventProcessors in succession. BranchIDListHelper::clearRegistries(); boost::shared_ptr<ParameterSet> parameterSet = processDesc->getProcessPSet(); ParameterSet const& optionsPset(parameterSet->getUntrackedParameterSet("options", ParameterSet())); fileMode_ = optionsPset.getUntrackedParameter<std::string>("fileMode", ""); emptyRunLumiMode_ = optionsPset.getUntrackedParameter<std::string>("emptyRunLumiMode", ""); forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun", false); ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet()); numberOfForkedChildren_ = forking.getUntrackedParameter<int>("maxChildProcesses", 0); numberOfSequentialEventsPerChild_ = forking.getUntrackedParameter<unsigned int>("maxSequentialEventsPerChild", 1); setCpuAffinity_ = forking.getUntrackedParameter<bool>("setCpuAffinity", false); std::vector<ParameterSet> const& excluded = forking.getUntrackedParameterSetVector("eventSetupDataToExcludeFromPrefetching", std::vector<ParameterSet>()); for(std::vector<ParameterSet>::const_iterator itPS = excluded.begin(), itPSEnd = excluded.end(); itPS != itPSEnd; ++itPS) { eventSetupDataToExcludeFromPrefetching_[itPS->getUntrackedParameter<std::string>("record")].insert( std::make_pair(itPS->getUntrackedParameter<std::string>("type", "*"), itPS->getUntrackedParameter<std::string>("label", ""))); } boost::shared_ptr<std::vector<ParameterSet> > pServiceSets = processDesc->getServicesPSets(); //makeParameterSets(config, parameterSet, pServiceSets); //create the services ServiceToken tempToken(ServiceRegistry::createSet(*pServiceSets, iToken, iLegacy)); //see if any of the Services have to have their PSets stored for(std::vector<ParameterSet>::const_iterator it = pServiceSets->begin(), itEnd = pServiceSets->end(); it != itEnd; ++it) { if(it->exists("@save_config")) { parameterSet->addParameter(it->getParameter<std::string>("@service_type"), *it); } } // Copy slots that hold all the registered callback functions like // PostBeginJob into an ActivityRegistry that is owned by EventProcessor tempToken.copySlotsTo(*actReg_); //add the ProductRegistry as a service ONLY for the construction phase typedef serviceregistry::ServiceWrapper<ConstProductRegistry> w_CPR; boost::shared_ptr<w_CPR> reg(new w_CPR(std::auto_ptr<ConstProductRegistry>(new ConstProductRegistry(*preg_)))); ServiceToken tempToken2(ServiceRegistry::createContaining(reg, tempToken, serviceregistry::kOverlapIsError)); // the next thing is ugly: pull out the trigger path pset and // create a service and extra token for it std::string processName = parameterSet->getParameter<std::string>("@process_name"); typedef service::TriggerNamesService TNS; typedef serviceregistry::ServiceWrapper<TNS> w_TNS; boost::shared_ptr<w_TNS> tnsptr (new w_TNS(std::auto_ptr<TNS>(new TNS(*parameterSet)))); serviceToken_ = ServiceRegistry::createContaining(tnsptr, tempToken2, serviceregistry::kOverlapIsError); //make the services available ServiceRegistry::Operate operate(serviceToken_); act_table_.reset(new ActionTable(*parameterSet)); CommonParams common = CommonParams(processName, getReleaseVersion(), getPassID(), parameterSet->getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1), parameterSet->getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1)); std::auto_ptr<eventsetup::EventSetupsController> espController(new eventsetup::EventSetupsController); esp_ = espController->makeProvider(*parameterSet, common); processConfiguration_.reset(new ProcessConfiguration(processName, getReleaseVersion(), getPassID())); looper_ = fillLooper(*esp_, *parameterSet, common); if(looper_) { looper_->setActionTable(act_table_.get()); looper_->attachTo(*actReg_); } input_ = makeInput(*parameterSet, common, *preg_, principalCache_, actReg_, processConfiguration_); schedule_ = std::auto_ptr<Schedule> (new Schedule(*parameterSet, ServiceRegistry::instance().get<TNS>(), *preg_, *act_table_, actReg_, processConfiguration_)); // initialize(iToken, iLegacy); FDEBUG(2) << parameterSet << std::endl; connectSigs(this); }
char const * edm::EventProcessor::msgName | ( | event_processor::Msg | m | ) | const |
Definition at line 1141 of file EventProcessor.cc.
References m.
Referenced by changeState().
{ return msgNames[m]; }
void edm::EventProcessor::openOutputFiles | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1665 of file EventProcessor.cc.
ActivityRegistry::PostProcessEvent& edm::EventProcessor::postProcessEventSignal | ( | ) | [inline] |
signal is emitted after all modules have finished processing the Event
Definition at line 210 of file EventProcessor.h.
References postProcessEventSignal_.
{return postProcessEventSignal_;}
void edm::EventProcessor::prepareForNextLoop | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1724 of file EventProcessor.cc.
ActivityRegistry::PreProcessEvent& edm::EventProcessor::preProcessEventSignal | ( | ) | [inline] |
signal is emitted after the Event has been created by the InputSource but before any modules have seen the Event
Definition at line 205 of file EventProcessor.h.
References preProcessEventSignal_.
{return preProcessEventSignal_;}
int edm::EventProcessor::readAndCacheLumi | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1839 of file EventProcessor.cc.
References input_.
statemachine::Run edm::EventProcessor::readAndCacheRun | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1832 of file EventProcessor.cc.
References input_, and PDRates::Run.
{ input_->readAndCacheRun(); input_->markRun(); return statemachine::Run(input_->processHistoryID(), input_->run()); }
void edm::EventProcessor::readAndProcessEvent | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1865 of file EventProcessor.cc.
References act_table_, alignmentValidation::action, actReg_, cms::Exception::category(), edm::EventPrincipal::clearEventPrincipal(), ExpressReco_HICollisions_FallBack::e, esp_, FDEBUG, edm::EventPrincipal::id(), input_, edm::EDLooperBase::kContinue, edm::ProcessingController::kToPreviousEvent, edm::ProcessingController::kToSpecifiedEvent, edm::ProcessingController::lastOperationSucceeded(), looper_, edm::PrincipalCache::lumiPrincipalPtr(), principalCache_, edm::ProcessingController::requestedTransition(), edm::actions::Rethrow, cms::Exception::rootCause(), schedule_, edm::ProcessingController::setLastOperationSucceeded(), shouldWeStop_, edm::ProcessingController::specifiedEventTransition(), ntuplemaker::status, summarizeEdmComparisonLogfiles::succeeded, edm::EventPrincipal::time(), and cms::Exception::what().
{ EventPrincipal *pep = 0; try { pep = input_->readEvent(principalCache_.lumiPrincipalPtr()); FDEBUG(1) << "\treadEvent\n"; } catch(cms::Exception& e) { actions::ActionCodes action = act_table_->find(e.rootCause()); if(action == actions::Rethrow) { throw; } else { LogWarning(e.category()) << "an exception occurred and all paths for the event are being skipped: \n" << e.what(); return; } } assert(pep != 0); IOVSyncValue ts(pep->id(), pep->time()); EventSetup const& es = esp_->eventSetupForInstance(ts); { typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits; ScheduleSignalSentry<Traits> sentry(actReg_.get(), pep, &es); schedule_->processOneOccurrence<Traits>(*pep, es); } if(looper_) { bool randomAccess = input_->randomAccess(); ProcessingController::ForwardState forwardState = input_->forwardState(); ProcessingController::ReverseState reverseState = input_->reverseState(); ProcessingController pc(forwardState, reverseState, randomAccess); EDLooperBase::Status status = EDLooperBase::kContinue; do { status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc); bool succeeded = true; if(randomAccess) { if(pc.requestedTransition() == ProcessingController::kToPreviousEvent) { input_->skipEvents(-2); } else if(pc.requestedTransition() == ProcessingController::kToSpecifiedEvent) { succeeded = input_->goToEvent(pc.specifiedEventTransition()); } } pc.setLastOperationSucceeded(succeeded); } while(!pc.lastOperationSucceeded()); if(status != EDLooperBase::kContinue) shouldWeStop_ = true; } FDEBUG(1) << "\tprocessEvent\n"; pep->clearEventPrincipal(); }
void edm::EventProcessor::readFile | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1652 of file EventProcessor.cc.
References fb_, FDEBUG, input_, numberOfForkedChildren_, and edm::FileBlock::ParallelProcesses.
Referenced by forkProcess().
{ FDEBUG(1) << " \treadFile\n"; fb_ = input_->readFile(); if(numberOfForkedChildren_ > 0) { fb_->setNotFastClonable(FileBlock::ParallelProcesses); } }
void edm::EventProcessor::respondToCloseInputFile | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1680 of file EventProcessor.cc.
void edm::EventProcessor::respondToCloseOutputFiles | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1690 of file EventProcessor.cc.
void edm::EventProcessor::respondToOpenInputFile | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1675 of file EventProcessor.cc.
void edm::EventProcessor::respondToOpenOutputFiles | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1685 of file EventProcessor.cc.
void edm::EventProcessor::rewind | ( | ) |
Definition at line 659 of file EventProcessor.cc.
References beginJob(), changeState(), input_, edm::event_processor::mCountComplete, edm::event_processor::mFinished, edm::event_processor::mInputRewind, edm::event_processor::mStopAsync, serviceToken_, and edm::event_processor::StateSentry::succeeded().
{ beginJob(); //make sure this was called changeState(mStopAsync); changeState(mInputRewind); { StateSentry toerror(this); //make the services available ServiceRegistry::Operate operate(serviceToken_); { input_->repeat(); input_->rewind(); } changeState(mCountComplete); toerror.succeeded(); } changeState(mFinished); }
void edm::EventProcessor::rewindInput | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1718 of file EventProcessor.cc.
EventProcessor::StatusCode edm::EventProcessor::run | ( | int | numberEventsToProcess, |
bool | repeatable = true |
||
) |
Definition at line 680 of file EventProcessor.cc.
References runEventCount().
{ return runEventCount(numberEventsToProcess); }
EventProcessor::StatusCode edm::EventProcessor::run | ( | void | ) | [inline] |
Definition at line 392 of file EventProcessor.h.
{ return run(-1, false); }
void edm::EventProcessor::runAsync | ( | ) |
Definition at line 1297 of file EventProcessor.cc.
References asyncRun(), beginJob(), changeState(), edm::IEventProcessor::epSuccess, event_loop_, Exception, id_set_, last_rc_, edm::event_processor::mRunAsync, starter_, stop_count_, and stop_lock_.
{ beginJob(); { boost::mutex::scoped_lock sl(stop_lock_); if(id_set_ == true) { std::string err("runAsync called while async event loop already running\n"); LogError("FwkJob") << err; throw cms::Exception("BadState") << err; } changeState(mRunAsync); stop_count_ = 0; last_rc_ = epSuccess; // forget the last value! event_loop_.reset(new boost::thread(boost::bind(EventProcessor::asyncRun, this))); boost::xtime timeout; boost::xtime_get(&timeout, boost::TIME_UTC); timeout.sec += 60; // 60 seconds to start!!!! if(starter_.timed_wait(sl, timeout) == false) { // yikes - the thread did not start throw cms::Exception("BadState") << "Async run thread did not start in 60 seconds\n"; } } }
EventProcessor::StatusCode edm::EventProcessor::runCommon | ( | bool | onlineStateTransitions, |
int | numberOfEventsToProcess | ||
) | [private] |
Definition at line 1419 of file EventProcessor.cc.
References alreadyHandlingException_, beginJob(), changeState(), edm::errors::Configuration, statemachine::doNotHandleEmptyRunsAndLumis, ExpressReco_HICollisions_FallBack::e, emptyRunLumiMode_, edm::IEventProcessor::epCountComplete, edm::IEventProcessor::epSignal, edm::IEventProcessor::epSuccess, exception, Exception, exceptionMessageFiles_, exceptionMessageLumis_, exceptionMessageRuns_, FDEBUG, harvesting_AlCaReco_cfg::fileMode, fileMode_, forceLooperToEnd_, statemachine::FULLMERGE, statemachine::handleEmptyRuns, statemachine::handleEmptyRunsAndLumis, input_, edm::PrincipalCache::insert(), edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::errors::LogicError, machine_, edm::event_processor::mFinished, edm::event_processor::mInputExhausted, edm::event_processor::mRunCount, edm::event_processor::mShutdownSignal, statemachine::NOMERGE, preg_, principalCache_, processConfiguration_, runEdmFileComparison::returnCode, serviceToken_, edm::shutdown_flag, edm::event_processor::sShuttingDown, edm::event_processor::sStopping, state_, stateMachineWasInErrorState_, terminateMachine(), and edm::usr2_lock.
Referenced by runEventCount(), and runToCompletion().
{ // Reusable event principal boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_)); principalCache_.insert(ep); beginJob(); //make sure this was called if(!onlineStateTransitions) changeState(mRunCount); StatusCode returnCode = epSuccess; stateMachineWasInErrorState_ = false; // make the services available ServiceRegistry::Operate operate(serviceToken_); if(machine_.get() == 0) { statemachine::FileMode fileMode; if(fileMode_.empty()) fileMode = statemachine::FULLMERGE; else if(fileMode_ == std::string("NOMERGE")) fileMode = statemachine::NOMERGE; else if(fileMode_ == std::string("FULLMERGE")) fileMode = statemachine::FULLMERGE; else { throw Exception(errors::Configuration, "Illegal fileMode parameter value: ") << fileMode_ << ".\n" << "Legal values are 'NOMERGE' and 'FULLMERGE'.\n"; } statemachine::EmptyRunLumiMode emptyRunLumiMode; if(emptyRunLumiMode_.empty()) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis; else if(emptyRunLumiMode_ == std::string("handleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::handleEmptyRunsAndLumis; else if(emptyRunLumiMode_ == std::string("handleEmptyRuns")) emptyRunLumiMode = statemachine::handleEmptyRuns; else if(emptyRunLumiMode_ == std::string("doNotHandleEmptyRunsAndLumis")) emptyRunLumiMode = statemachine::doNotHandleEmptyRunsAndLumis; else { throw Exception(errors::Configuration, "Illegal emptyMode parameter value: ") << emptyRunLumiMode_ << ".\n" << "Legal values are 'handleEmptyRunsAndLumis', 'handleEmptyRuns', and 'doNotHandleEmptyRunsAndLumis'.\n"; } machine_.reset(new statemachine::Machine(this, fileMode, emptyRunLumiMode)); machine_->initiate(); } try { InputSource::ItemType itemType; int iEvents = 0; while(true) { itemType = input_->nextItemType(); FDEBUG(1) << "itemType = " << itemType << "\n"; // These are used for asynchronous running only and // and are checking to see if stopAsync or shutdownAsync // were called from another thread. In the future, we // may need to do something better than polling the state. // With the current code this is the simplest thing and // it should always work. If the interaction between // threads becomes more complex this may cause problems. if(state_ == sStopping) { FDEBUG(1) << "In main processing loop, encountered sStopping state\n"; forceLooperToEnd_ = true; machine_->process_event(statemachine::Stop()); forceLooperToEnd_ = false; break; } else if(state_ == sShuttingDown) { FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n"; forceLooperToEnd_ = true; machine_->process_event(statemachine::Stop()); forceLooperToEnd_ = false; break; } // Look for a shutdown signal { boost::mutex::scoped_lock sl(usr2_lock); if(shutdown_flag) { changeState(mShutdownSignal); returnCode = epSignal; forceLooperToEnd_ = true; machine_->process_event(statemachine::Stop()); forceLooperToEnd_ = false; break; } } if(itemType == InputSource::IsStop) { machine_->process_event(statemachine::Stop()); } else if(itemType == InputSource::IsFile) { machine_->process_event(statemachine::File()); } else if(itemType == InputSource::IsRun) { machine_->process_event(statemachine::Run(input_->processHistoryID(), input_->run())); } else if(itemType == InputSource::IsLumi) { machine_->process_event(statemachine::Lumi(input_->luminosityBlock())); } else if(itemType == InputSource::IsEvent) { machine_->process_event(statemachine::Event()); ++iEvents; if(numberOfEventsToProcess > 0 && iEvents >= numberOfEventsToProcess) { returnCode = epCountComplete; changeState(mInputExhausted); FDEBUG(1) << "Event count complete, pausing event loop\n"; break; } } // This should be impossible else { throw Exception(errors::LogicError) << "Unknown next item type passed to EventProcessor\n" << "Please report this error to the Framework group\n"; } if(machine_->terminated()) { changeState(mInputExhausted); break; } } // End of loop over state machine events } // Try block // Some comments on exception handling related to the boost state machine: // // Some states used in the machine are special because they // perform actions while the machine is being terminated, actions // such as close files, call endRun, call endLumi etc ... Each of these // states has two functions that perform these actions. The functions // are almost identical. The major difference is that one version // catches all exceptions and the other lets exceptions pass through. // The destructor catches them and the other function named "exit" lets // them pass through. On a normal termination, boost will always call // "exit" and then the state destructor. In our state classes, the // the destructors do nothing if the exit function already took // care of things. Here's the interesting part. When boost is // handling an exception the "exit" function is not called (a boost // feature). // // If an exception occurs while the boost machine is in control // (which usually means inside a process_event call), then // the boost state machine destroys its states and "terminates" itself. // This already done before we hit the catch blocks below. In this case // the call to terminateMachine below only destroys an already // terminated state machine. Because exit is not called, the state destructors // handle cleaning up lumis, runs, and files. The destructors swallow // all exceptions and only pass through the exceptions messages which // are tacked onto the original exception below. // // If an exception occurs when the boost state machine is not // in control (outside the process_event functions), then boost // cannot destroy its own states. The terminateMachine function // below takes care of that. The flag "alreadyHandlingException" // is set true so that the state exit functions do nothing (and // cannot throw more exceptions while handling the first). Then the // state destructors take care of this because exit did nothing. // // In both cases above, the EventProcessor::endOfLoop function is // not called because it can throw exceptions. // // One tricky aspect of the state machine is that things which can // throw should not be invoked by the state machine while another // exception is being handled. // Another tricky aspect is that it appears to be important to // terminate the state machine before invoking its destructor. // We've seen crashes which are not understood when that is not // done. Maintainers of this code should be careful about this. catch (cms::Exception& e) { alreadyHandlingException_ = true; terminateMachine(); alreadyHandlingException_ = false; e << "cms::Exception caught in EventProcessor and rethrown\n"; e << exceptionMessageLumis_; e << exceptionMessageRuns_; e << exceptionMessageFiles_; throw; } catch (std::bad_alloc& e) { alreadyHandlingException_ = true; terminateMachine(); alreadyHandlingException_ = false; throw cms::Exception("std::bad_alloc") << "The EventProcessor caught a std::bad_alloc exception and converted it to a cms::Exception\n" << "The job has probably exhausted the virtual memory available to the process.\n" << exceptionMessageLumis_ << exceptionMessageRuns_ << exceptionMessageFiles_; } catch (std::exception& e) { alreadyHandlingException_ = true; terminateMachine(); alreadyHandlingException_ = false; throw cms::Exception("StdException") << "The EventProcessor caught a std::exception and converted it to a cms::Exception\n" << "Previous information:\n" << e.what() << "\n" << exceptionMessageLumis_ << exceptionMessageRuns_ << exceptionMessageFiles_; } catch (...) { alreadyHandlingException_ = true; terminateMachine(); alreadyHandlingException_ = false; throw cms::Exception("Unknown") << "The EventProcessor caught an unknown exception type and converted it to a cms::Exception\n" << exceptionMessageLumis_ << exceptionMessageRuns_ << exceptionMessageFiles_; } if(machine_->terminated()) { FDEBUG(1) << "The state machine reports it has been terminated\n"; machine_.reset(); } if(!onlineStateTransitions) changeState(mFinished); if(stateMachineWasInErrorState_) { throw cms::Exception("BadState") << "The boost state machine in the EventProcessor exited after\n" << "entering the Error state.\n"; } return returnCode; }
EventProcessor::StatusCode edm::EventProcessor::runEventCount | ( | int | numberOfEventsToProcess | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1406 of file EventProcessor.cc.
References runEdmFileComparison::returnCode, runCommon(), and edm::event_processor::StateSentry::succeeded().
Referenced by run().
{ StateSentry toerror(this); bool onlineStateTransitions = false; StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess); toerror.succeeded(); return returnCode; }
EventProcessor::StatusCode edm::EventProcessor::runToCompletion | ( | bool | onlineStateTransitions | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1387 of file EventProcessor.cc.
References Exception, edm::errors::LogicError, machine_, runEdmFileComparison::returnCode, runCommon(), and edm::event_processor::StateSentry::succeeded().
Referenced by asyncRun().
{ StateSentry toerror(this); int numberOfEventsToProcess = -1; StatusCode returnCode = runCommon(onlineStateTransitions, numberOfEventsToProcess); if(machine_.get() != 0) { throw Exception(errors::LogicError) << "State machine not destroyed on exit from EventProcessor::runToCompletion\n" << "Please report this error to the Framework group\n"; } toerror.succeeded(); return returnCode; }
void edm::EventProcessor::setExceptionMessageFiles | ( | std::string & | message | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1927 of file EventProcessor.cc.
References exceptionMessageFiles_, and argparse::message.
{ exceptionMessageFiles_ = message; }
void edm::EventProcessor::setExceptionMessageLumis | ( | std::string & | message | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1935 of file EventProcessor.cc.
References exceptionMessageLumis_, and argparse::message.
{ exceptionMessageLumis_ = message; }
void edm::EventProcessor::setExceptionMessageRuns | ( | std::string & | message | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1931 of file EventProcessor.cc.
References exceptionMessageRuns_, and argparse::message.
{ exceptionMessageRuns_ = message; }
void edm::EventProcessor::setRunNumber | ( | RunNumber_t | runNumber | ) |
Definition at line 1156 of file EventProcessor.cc.
References beginJob(), changeState(), input_, and edm::event_processor::mSetRun.
{ if(runNumber == 0) { runNumber = 1; LogWarning("Invalid Run") << "EventProcessor::setRunNumber was called with an invalid run number (0)\n" << "Run number was set to 1 instead\n"; } // inside of beginJob there is a check to see if it has been called before beginJob(); changeState(mSetRun); // interface not correct yet input_->setRunNumber(runNumber); }
void edm::EventProcessor::setupSignal | ( | ) | [private] |
bool edm::EventProcessor::shouldWeCloseOutput | ( | ) | const [virtual] |
Implements edm::IEventProcessor.
Definition at line 1729 of file EventProcessor.cc.
bool edm::EventProcessor::shouldWeStop | ( | ) | const [virtual] |
Implements edm::IEventProcessor.
Definition at line 1921 of file EventProcessor.cc.
References FDEBUG, schedule_, and shouldWeStop_.
{ FDEBUG(1) << "\tshouldWeStop\n"; if(shouldWeStop_) return true; return schedule_->terminate(); }
EventProcessor::StatusCode edm::EventProcessor::shutdownAsync | ( | unsigned int | timeout_secs = 60 * 2 | ) |
Definition at line 1246 of file EventProcessor.cc.
References changeState(), edm::IEventProcessor::epTimedOut, errorState(), edm::event_processor::mFinished, edm::event_processor::mShutdownAsync, and waitForAsyncCompletion().
{ changeState(mShutdownAsync); StatusCode rc = waitForAsyncCompletion(secs); if(rc != epTimedOut) changeState(mFinished); else errorState(); return rc; }
EventProcessor::StatusCode edm::EventProcessor::skip | ( | int | numberToSkip | ) |
Definition at line 685 of file EventProcessor.cc.
References beginJob(), changeState(), edm::IEventProcessor::epSuccess, input_, edm::event_processor::mCountComplete, edm::event_processor::mFinished, edm::event_processor::mSkip, serviceToken_, and edm::event_processor::StateSentry::succeeded().
{ beginJob(); //make sure this was called changeState(mSkip); { StateSentry toerror(this); //make the services available ServiceRegistry::Operate operate(serviceToken_); { input_->skipEvents(numberToSkip); } changeState(mCountComplete); toerror.succeeded(); } changeState(mFinished); return epSuccess; }
void edm::EventProcessor::startingNewLoop | ( | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1695 of file EventProcessor.cc.
References FDEBUG, looper_, looperBeginJobRun_, and shouldWeStop_.
{ shouldWeStop_ = false; //NOTE: for first loop, need to delay calling 'doStartingNewLoop' // until after we've called beginOfJob if(looper_ && looperBeginJobRun_) { looper_->doStartingNewLoop(); } FDEBUG(1) << "\tstartingNewLoop\n"; }
char const * edm::EventProcessor::stateName | ( | event_processor::State | s | ) | const |
Definition at line 1137 of file EventProcessor.cc.
References asciidump::s.
Referenced by changeState(), currentStateName(), evf::FWEPWrapper::publishConfigAndMonitorItems(), and evf::FWEPWrapper::stop().
{ return stateNames[s]; }
EventProcessor::StatusCode edm::EventProcessor::statusAsync | ( | ) | const |
Definition at line 1149 of file EventProcessor.cc.
References last_rc_.
{ // the thread will record exception/error status in the event processor // for us to look at and report here return last_rc_; }
EventProcessor::StatusCode edm::EventProcessor::stopAsync | ( | unsigned int | timeout_secs = 60 * 2 | ) |
Definition at line 1238 of file EventProcessor.cc.
References changeState(), edm::IEventProcessor::epTimedOut, errorState(), edm::event_processor::mFinished, edm::event_processor::mStopAsync, and waitForAsyncCompletion().
{ changeState(mStopAsync); StatusCode rc = waitForAsyncCompletion(secs); if(rc != epTimedOut) changeState(mFinished); else errorState(); return rc; }
void edm::EventProcessor::terminateMachine | ( | ) | [private] |
Definition at line 1943 of file EventProcessor.cc.
References FDEBUG, forceLooperToEnd_, and machine_.
Referenced by endJob(), runCommon(), and ~EventProcessor().
{ if(machine_.get() != 0) { if(!machine_->terminated()) { forceLooperToEnd_ = true; machine_->process_event(statemachine::Stop()); forceLooperToEnd_ = false; } else { FDEBUG(1) << "EventProcess::terminateMachine The state machine was already terminated \n"; } if(machine_->terminated()) { FDEBUG(1) << "The state machine reports it has been terminated (3)\n"; } machine_.reset(); } }
int edm::EventProcessor::totalEvents | ( | ) | const |
Return the number of events this EventProcessor has tried to process (inclues both successes and failures, including failures due to exceptions during processing).
Definition at line 1099 of file EventProcessor.cc.
References schedule_.
Referenced by evf::FWEPWrapper::getTriggerReport(), and evf::FWEPWrapper::monitoring().
{ return schedule_->totalEvents(); }
int edm::EventProcessor::totalEventsFailed | ( | ) | const |
Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
Definition at line 1109 of file EventProcessor.cc.
References schedule_.
{ return schedule_->totalEventsFailed(); }
int edm::EventProcessor::totalEventsPassed | ( | ) | const |
Return the number of events processed by this EventProcessor which have been passed by one or more trigger paths.
Definition at line 1104 of file EventProcessor.cc.
References schedule_.
Referenced by evf::FWEPWrapper::getTriggerReport(), and evf::FWEPWrapper::monitoring().
{ return schedule_->totalEventsPassed(); }
EventProcessor::StatusCode edm::EventProcessor::waitForAsyncCompletion | ( | unsigned int | timeout_seconds | ) | [private] |
Definition at line 1183 of file EventProcessor.cc.
References edm::IEventProcessor::epTimedOut, event_loop_, event_loop_id_, id_set_, last_rc_, stop_count_, stop_lock_, and stopper_.
Referenced by doneAsync(), shutdownAsync(), stopAsync(), and waitTillDoneAsync().
{ bool rc = true; boost::xtime timeout; boost::xtime_get(&timeout, boost::TIME_UTC); timeout.sec += timeout_seconds; // make sure to include a timeout here so we don't wait forever // I suspect there are still timing issues with thread startup // and the setting of the various control variables (stop_count, id_set) { boost::mutex::scoped_lock sl(stop_lock_); // look here - if runAsync not active, just return the last return code if(stop_count_ < 0) return last_rc_; if(timeout_seconds == 0) { while(stop_count_ == 0) stopper_.wait(sl); } else { while(stop_count_ == 0 && (rc = stopper_.timed_wait(sl, timeout)) == true); } if(rc == false) { // timeout occurred // if(id_set_) pthread_kill(event_loop_id_, my_sig_num_); // this is a temporary hack until we get the input source // upgraded to allow blocking input sources to be unblocked // the next line is dangerous and causes all sorts of trouble if(id_set_) pthread_cancel(event_loop_id_); // we will not do anything yet LogWarning("timeout") << "An asynchronous request was made to shut down " << "the event loop " << "and the event loop did not shutdown after " << timeout_seconds << " seconds\n"; } else { event_loop_->join(); event_loop_.reset(); id_set_ = false; stop_count_ = -1; } } return rc == false ? epTimedOut : last_rc_; }
EventProcessor::StatusCode edm::EventProcessor::waitTillDoneAsync | ( | unsigned int | timeout_seconds = 0 | ) |
Definition at line 1230 of file EventProcessor.cc.
References changeState(), edm::IEventProcessor::epTimedOut, errorState(), edm::event_processor::mCountComplete, and waitForAsyncCompletion().
Referenced by evf::FWEPWrapper::stop().
{ StatusCode rc = waitForAsyncCompletion(timeout_value_secs); if(rc != epTimedOut) changeState(mCountComplete); else errorState(); return rc; }
void edm::EventProcessor::writeLumi | ( | ProcessHistoryID const & | phid, |
int | run, | ||
int | lumi | ||
) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1855 of file EventProcessor.cc.
References FDEBUG, edm::PrincipalCache::lumiPrincipal(), principalCache_, and schedule_.
void edm::EventProcessor::writeRun | ( | statemachine::Run const & | run | ) | [virtual] |
Implements edm::IEventProcessor.
Definition at line 1845 of file EventProcessor.cc.
References FDEBUG, principalCache_, statemachine::Run::processHistoryID(), statemachine::Run::runNumber(), edm::PrincipalCache::runPrincipal(), and schedule_.
friend class event_processor::StateSentry [friend] |
Definition at line 385 of file EventProcessor.h.
boost::shared_ptr<ActionTable const> edm::EventProcessor::act_table_ [private] |
Definition at line 345 of file EventProcessor.h.
Referenced by init(), and readAndProcessEvent().
boost::shared_ptr<ActivityRegistry> edm::EventProcessor::actReg_ [private] |
Definition at line 340 of file EventProcessor.h.
Referenced by beginJob(), beginLumi(), beginRun(), connectSigs(), endJob(), endLumi(), endRun(), forkProcess(), init(), readAndProcessEvent(), and ~EventProcessor().
bool edm::EventProcessor::alreadyHandlingException_ [private] |
Definition at line 374 of file EventProcessor.h.
Referenced by alreadyHandlingException(), and runCommon().
std::string edm::EventProcessor::emptyRunLumiMode_ [private] |
Definition at line 370 of file EventProcessor.h.
Referenced by init(), and runCommon().
boost::shared_ptr<eventsetup::EventSetupProvider> edm::EventProcessor::esp_ [private] |
Definition at line 344 of file EventProcessor.h.
Referenced by beginLumi(), beginRun(), endLumi(), endOfLoop(), endRun(), forkProcess(), init(), prepareForNextLoop(), readAndProcessEvent(), and ~EventProcessor().
boost::shared_ptr<boost::thread> edm::EventProcessor::event_loop_ [private] |
Definition at line 350 of file EventProcessor.h.
Referenced by runAsync(), and waitForAsyncCompletion().
volatile pthread_t edm::EventProcessor::event_loop_id_ [private] |
Definition at line 360 of file EventProcessor.h.
Referenced by asyncRun(), and waitForAsyncCompletion().
Definition at line 384 of file EventProcessor.h.
Referenced by forkProcess(), and init().
std::string edm::EventProcessor::exceptionMessageFiles_ [private] |
Definition at line 371 of file EventProcessor.h.
Referenced by runCommon(), and setExceptionMessageFiles().
std::string edm::EventProcessor::exceptionMessageLumis_ [private] |
Definition at line 373 of file EventProcessor.h.
Referenced by runCommon(), and setExceptionMessageLumis().
std::string edm::EventProcessor::exceptionMessageRuns_ [private] |
Definition at line 372 of file EventProcessor.h.
Referenced by runCommon(), and setExceptionMessageRuns().
boost::shared_ptr<FileBlock> edm::EventProcessor::fb_ [private] |
Definition at line 362 of file EventProcessor.h.
Referenced by closeInputFile(), openOutputFiles(), readFile(), respondToCloseInputFile(), respondToCloseOutputFiles(), respondToOpenInputFile(), and respondToOpenOutputFiles().
std::string edm::EventProcessor::fileMode_ [private] |
Definition at line 369 of file EventProcessor.h.
Referenced by init(), and runCommon().
bool edm::EventProcessor::forceESCacheClearOnNewRun_ [private] |
Definition at line 377 of file EventProcessor.h.
Referenced by beginRun(), and init().
bool edm::EventProcessor::forceLooperToEnd_ [private] |
Definition at line 375 of file EventProcessor.h.
Referenced by endOfLoop(), runCommon(), and terminateMachine().
volatile bool edm::EventProcessor::id_set_ [private] |
Definition at line 359 of file EventProcessor.h.
Referenced by asyncRun(), runAsync(), and waitForAsyncCompletion().
boost::shared_ptr<InputSource> edm::EventProcessor::input_ [private] |
Definition at line 343 of file EventProcessor.h.
Referenced by beginJob(), beginLumi(), beginRun(), closeInputFile(), endJob(), endLumi(), endRun(), forkProcess(), init(), readAndCacheLumi(), readAndCacheRun(), readAndProcessEvent(), readFile(), rewind(), rewindInput(), runCommon(), setRunNumber(), skip(), and ~EventProcessor().
std::string edm::EventProcessor::last_error_text_ [private] |
Definition at line 358 of file EventProcessor.h.
Referenced by asyncRun().
volatile Status edm::EventProcessor::last_rc_ [private] |
Definition at line 357 of file EventProcessor.h.
Referenced by asyncRun(), runAsync(), statusAsync(), and waitForAsyncCompletion().
boost::shared_ptr<EDLooperBase> edm::EventProcessor::looper_ [private] |
Definition at line 363 of file EventProcessor.h.
Referenced by beginLumi(), beginRun(), endJob(), endLumi(), endOfLoop(), endRun(), init(), prepareForNextLoop(), readAndProcessEvent(), startingNewLoop(), and ~EventProcessor().
bool edm::EventProcessor::looperBeginJobRun_ [private] |
Definition at line 376 of file EventProcessor.h.
Referenced by beginRun(), and startingNewLoop().
std::auto_ptr<statemachine::Machine> edm::EventProcessor::machine_ [private] |
Definition at line 365 of file EventProcessor.h.
Referenced by runCommon(), runToCompletion(), and terminateMachine().
int edm::EventProcessor::my_sig_num_ [private] |
Definition at line 361 of file EventProcessor.h.
int edm::EventProcessor::numberOfForkedChildren_ [private] |
Definition at line 379 of file EventProcessor.h.
Referenced by forkProcess(), init(), and readFile().
unsigned int edm::EventProcessor::numberOfSequentialEventsPerChild_ [private] |
Definition at line 380 of file EventProcessor.h.
Referenced by forkProcess(), and init().
Definition at line 339 of file EventProcessor.h.
Referenced by connectSigs(), and postProcessEventSignal().
boost::shared_ptr<SignallingProductRegistry> edm::EventProcessor::preg_ [private] |
Definition at line 341 of file EventProcessor.h.
Referenced by init(), and runCommon().
Definition at line 338 of file EventProcessor.h.
Referenced by connectSigs(), and preProcessEventSignal().
Definition at line 366 of file EventProcessor.h.
Referenced by beginLumi(), beginRun(), deleteLumiFromCache(), deleteRunFromCache(), endLumi(), endRun(), init(), readAndProcessEvent(), runCommon(), writeLumi(), and writeRun().
boost::shared_ptr<ProcessConfiguration> edm::EventProcessor::processConfiguration_ [private] |
Definition at line 346 of file EventProcessor.h.
Referenced by init(), and runCommon().
std::auto_ptr<Schedule> edm::EventProcessor::schedule_ [private] |
Definition at line 347 of file EventProcessor.h.
Referenced by beginJob(), beginLumi(), beginRun(), clearCounters(), closeOutputFiles(), enableEndPaths(), endJob(), endLumi(), endOfLoop(), endPathsEnabled(), endRun(), forkProcess(), getAllModuleDescriptions(), getTriggerReport(), init(), openOutputFiles(), readAndProcessEvent(), respondToCloseInputFile(), respondToCloseOutputFiles(), respondToOpenInputFile(), respondToOpenOutputFiles(), shouldWeCloseOutput(), shouldWeStop(), totalEvents(), totalEventsFailed(), totalEventsPassed(), writeLumi(), writeRun(), and ~EventProcessor().
Definition at line 342 of file EventProcessor.h.
Referenced by beginJob(), endJob(), forkProcess(), getToken(), init(), rewind(), runCommon(), and skip().
bool edm::EventProcessor::setCpuAffinity_ [private] |
Definition at line 381 of file EventProcessor.h.
Referenced by forkProcess(), and init().
bool edm::EventProcessor::shouldWeStop_ [private] |
Definition at line 367 of file EventProcessor.h.
Referenced by readAndProcessEvent(), shouldWeStop(), and startingNewLoop().
boost::condition edm::EventProcessor::starter_ [private] |
Definition at line 355 of file EventProcessor.h.
Referenced by asyncRun(), and runAsync().
volatile event_processor::State edm::EventProcessor::state_ [private] |
Definition at line 349 of file EventProcessor.h.
Referenced by beginJob(), changeState(), errorState(), getState(), and runCommon().
boost::mutex edm::EventProcessor::state_lock_ [private] |
Definition at line 352 of file EventProcessor.h.
Referenced by changeState().
bool edm::EventProcessor::stateMachineWasInErrorState_ [private] |
Definition at line 368 of file EventProcessor.h.
Referenced by doErrorStuff(), and runCommon().
volatile int edm::EventProcessor::stop_count_ [private] |
Definition at line 356 of file EventProcessor.h.
Referenced by asyncRun(), runAsync(), and waitForAsyncCompletion().
boost::mutex edm::EventProcessor::stop_lock_ [private] |
Definition at line 353 of file EventProcessor.h.
Referenced by asyncRun(), runAsync(), and waitForAsyncCompletion().
boost::condition edm::EventProcessor::stopper_ [private] |
Definition at line 354 of file EventProcessor.h.
Referenced by asyncRun(), and waitForAsyncCompletion().