#include <FUEventProcessor.h>
Public Member Functions | |
void | actionPerformed (xdata::Event &e) |
bool | configuring (toolbox::task::WorkLoop *wl) |
void | css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
bool | enabling (toolbox::task::WorkLoop *wl) |
xoap::MessageReference | fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception) |
FUEventProcessor (xdaq::ApplicationStub *s) | |
bool | halting (toolbox::task::WorkLoop *wl) |
void | microState (xgi::Input *in, xgi::Output *out) |
void | moduleWeb (xgi::Input *in, xgi::Output *out) |
void | pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
void | procStat (xgi::Input *in, xgi::Output *out) |
void | scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
void | sendMessageOverMonitorQueue (MsgBuf &) |
void | serviceWeb (xgi::Input *in, xgi::Output *out) |
void | spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
bool | stopping (toolbox::task::WorkLoop *wl) |
void | subWeb (xgi::Input *in, xgi::Output *out) |
void | updater (xgi::Input *in, xgi::Output *out) |
XDAQ_INSTANTIATOR () | |
virtual | ~FUEventProcessor () |
Private Member Functions | |
void | attachDqmToShm () throw (evf::Exception) |
void | detachDqmFromShm () throw (evf::Exception) |
bool | enableClassic () |
bool | enableCommon () |
bool | enableMPEPSlave () |
void | localLog (std::string) |
std::string | logsAsString () |
void | makeStaticInfo () |
bool | receiving (toolbox::task::WorkLoop *wl) |
bool | receivingAndMonitor (toolbox::task::WorkLoop *wl) |
bool | scalers (toolbox::task::WorkLoop *wl) |
void | startReceivingLoop () |
void | startReceivingMonitorLoop () |
void | startScalersWorkLoop () throw (evf::Exception) |
void | startSummarizeWorkLoop () throw (evf::Exception) |
void | startSupervisorLoop () |
bool | stopClassic () |
void | stopSlavesAndAcknowledge () |
bool | summarize (toolbox::task::WorkLoop *wl) |
bool | supervisor (toolbox::task::WorkLoop *wl) |
Private Attributes | |
int | anonymousPipe_ [2] |
xdata::InfoSpace * | applicationInfoSpace_ |
toolbox::task::ActionSignature * | asReceiveMsgAndExecute_ |
toolbox::task::ActionSignature * | asReceiveMsgAndRead_ |
toolbox::task::ActionSignature * | asScalers_ |
toolbox::task::ActionSignature * | asSummarize_ |
toolbox::task::ActionSignature * | asSupervisor_ |
xdata::Boolean | autoRestartSlaves_ |
xdata::String | class_ |
xdata::String | configString_ |
std::string | configuration_ |
CPUStat * | cpustat_ |
Css | css_ |
xdata::Boolean | epInitialized_ |
FWEPWrapper | evtProcessor_ |
xdata::Boolean | exitOnError_ |
evf::StateMachine | fsm_ |
xdata::Boolean | hasModuleWebRegistry_ |
xdata::Boolean | hasPrescaleService_ |
xdata::Boolean | hasServiceWebRegistry_ |
xdata::Boolean | hasShMem_ |
xdata::Boolean | iDieStatisticsGathering_ |
xdata::String | iDieUrl_ |
xdata::UnsignedInteger32 | instance_ |
xdata::Boolean | isRunNumberSetter_ |
Logger | log_ |
std::vector< std::string > | logRing_ |
unsigned int | logRingIndex_ |
bool | logWrap_ |
MsgBuf | master_message_prg_ |
MsgBuf | master_message_prr_ |
MsgBuf | master_message_trr_ |
xdata::InfoSpace * | monitorInfoSpace_ |
xdata::InfoSpace * | monitorLegendaInfoSpace_ |
SubProcess * | myProcess_ |
std::list< std::string > | names_ |
xdata::Serializable * | nbAccepted |
unsigned int | nbdead_ |
unsigned int | nblive_ |
xdata::Serializable * | nbProcessed |
xdata::UnsignedInteger32 | nbSubProcesses_ |
xdata::UnsignedInteger32 | nbSubProcessesReporting_ |
unsigned int | nbTotalDQM_ |
bool | outprev_ |
xdata::Boolean | outPut_ |
pthread_mutex_t | pickup_lock_ |
RateStat * | ratestat_ |
std::string | reasonForFailedState_ |
bool | receiving_ |
bool | receivingM_ |
xdata::UnsignedInteger32 | runNumber_ |
xdata::InfoSpace * | scalersInfoSpace_ |
xdata::InfoSpace * | scalersLegendaInfoSpace_ |
unsigned int | scalersUpdates_ |
MsgBuf | slave_message_monitoring_ |
MsgBuf | slave_message_prr_ |
xdata::UnsignedInteger32 | slaveRestartDelaySecs_ |
std::string | sourceId_ |
xdata::Vector< xdata::Integer > | spMStates_ |
xdata::Vector< xdata::Integer > | spmStates_ |
SquidNet | squidnet_ |
xdata::Boolean | squidPresent_ |
pthread_mutex_t | start_lock_ |
pthread_mutex_t | stop_lock_ |
std::vector< SubProcess > | subs_ |
xdata::UnsignedInteger32 | superSleepSec_ |
bool | supervising_ |
std::string | updaterStatic_ |
xdata::String | url_ |
pid_t | vp_ |
Vulture * | vulture_ |
toolbox::task::WorkLoop * | wlReceiving_ |
toolbox::task::WorkLoop * | wlReceivingMonitor_ |
toolbox::task::WorkLoop * | wlScalers_ |
bool | wlScalersActive_ |
toolbox::task::WorkLoop * | wlSummarize_ |
bool | wlSummarizeActive_ |
toolbox::task::WorkLoop * | wlSupervising_ |
Static Private Attributes | |
static const unsigned int | logRingSize_ = 50 |
Definition at line 58 of file FUEventProcessor.h.
FUEventProcessor::FUEventProcessor | ( | xdaq::ApplicationStub * | s | ) |
Definition at line 61 of file FUEventProcessor.cc.
References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, css(), defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), evf::StateMachine::foundRcmsStateListener(), fsm_, reco::get(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, outPut_, pathNames(), pickup_lock_, pipe::pipe(), procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, subWeb(), superSleepSec_, updater(), url_, and vulture_.
: xdaq::Application(s) , fsm_(this) , log_(getApplicationLogger()) , evtProcessor_(log_, getApplicationDescriptor()->getInstance()) , runNumber_(0) , epInitialized_(false) , outPut_(true) , autoRestartSlaves_(false) , slaveRestartDelaySecs_(10) , hasShMem_(true) , hasPrescaleService_(true) , hasModuleWebRegistry_(true) , hasServiceWebRegistry_(true) , isRunNumberSetter_(true) , iDieStatisticsGathering_(false) , outprev_(true) , exitOnError_(true) , reasonForFailedState_() , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt") , logRing_(logRingSize_) , logRingIndex_(logRingSize_) , logWrap_(false) , nbSubProcesses_(0) , nbSubProcessesReporting_(0) , nblive_(0) , nbdead_(0) , nbTotalDQM_(0) , wlReceiving_(0) , asReceiveMsgAndExecute_(0) , receiving_(false) , wlReceivingMonitor_(0) , asReceiveMsgAndRead_(0) , receivingM_(false) , myProcess_(0) , wlSupervising_(0) , asSupervisor_(0) , supervising_(false) , monitorInfoSpace_(0) , monitorLegendaInfoSpace_(0) , applicationInfoSpace_(0) , nbProcessed(0) , nbAccepted(0) , scalersInfoSpace_(0) , scalersLegendaInfoSpace_(0) , wlScalers_(0) , asScalers_(0) , wlScalersActive_(false) , scalersUpdates_(0) , wlSummarize_(0) , asSummarize_(0) , wlSummarizeActive_(false) , superSleepSec_(1) , iDieUrl_("none") , vulture_(0) , vp_(0) , cpustat_(0) , ratestat_(0) , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG) , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR) , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR) , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR) { using namespace utils; names_.push_back("nbProcessed" ); names_.push_back("nbAccepted" ); names_.push_back("epMacroStateInt"); names_.push_back("epMicroStateInt"); // create pipe for web communication int retpipe = pipe(anonymousPipe_); if(retpipe != 0) LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe"); // check squid presence squidPresent_ = squidnet_.check(); //pass application parameters to FWEPWrapper evtProcessor_.setAppDesc(getApplicationDescriptor()); evtProcessor_.setAppCtxt(getApplicationContext()); // bind relevant callbacks to finite state machine fsm_.initialize<evf::FUEventProcessor>(this); //set sourceId_ url_ = getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+ getApplicationDescriptor()->getURN(); class_ =getApplicationDescriptor()->getClassName(); instance_=getApplicationDescriptor()->getInstance(); sourceId_=class_.toString()+"_"+instance_.toString(); LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" ); LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE")); getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg"); xdata::InfoSpace *ispace = getApplicationInfoSpace(); applicationInfoSpace_ = ispace; // default configuration ispace->fireItemAvailable("parameterSet", &configString_ ); ispace->fireItemAvailable("epInitialized", &epInitialized_ ); ispace->fireItemAvailable("stateName", fsm_.stateName() ); ispace->fireItemAvailable("runNumber", &runNumber_ ); ispace->fireItemAvailable("outputEnabled", &outPut_ ); ispace->fireItemAvailable("hasSharedMemory", &hasShMem_); ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ ); ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ ); ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ ); ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ ); ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_); ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() ); ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener()); ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ ); ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ ); ispace->fireItemAvailable("superSleepSec", &superSleepSec_ ); ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ ); ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ ); ispace->fireItemAvailable("iDieUrl", &iDieUrl_ ); // Add infospace listeners for exporting data values getApplicationInfoSpace()->addItemChangedListener("parameterSet", this); getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this); // findRcmsStateListener fsm_.findRcmsStateListener(); // initialize monitoring infospace std::string monInfoSpaceName="evf-eventprocessor-status-monitor"; toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName); monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda"; urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName); monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); monitorInfoSpace_->fireItemAvailable("url", &url_ ); monitorInfoSpace_->fireItemAvailable("class", &class_ ); monitorInfoSpace_->fireItemAvailable("instance", &instance_ ); monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ ); monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName()); monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ ); std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor"; urn = this->createQualifiedInfoSpace(scalersInfoSpaceName); scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda"; urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName); scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_); scalersInfoSpace_->fireItemAvailable("instance", &instance_); evtProcessor_.setApplicationInfoSpace(ispace); evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_); evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0); //subprocess state vectors for MP monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_); monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_); // Bind web interface xgi::bind(this, &FUEventProcessor::css, "styles.css"); xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" ); xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" ); xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb"); xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" ); xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" ); xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" ); xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb"); xgi::bind(this, &FUEventProcessor::microState, "microState"); xgi::bind(this, &FUEventProcessor::updater, "updater" ); xgi::bind(this, &FUEventProcessor::procStat, "procStat" ); // instantiate the plugin manager, not referenced here after! edm::AssertHandler ah; try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } ML::MLlog4cplus::setAppl(this); typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t; typedef AppDescSet_t::iterator AppDescIter_t; AppDescSet_t rcms= getApplicationContext()->getDefaultZone()-> getApplicationDescriptors("RCMSStateListener"); if(rcms.size()==0) { LOG4CPLUS_WARN(getApplicationLogger(), "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!"); // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!"); } else { AppDescIter_t it = rcms.begin(); evtProcessor_.setRcms(*it); } pthread_mutex_init(&start_lock_,0); pthread_mutex_init(&stop_lock_,0); pthread_mutex_init(&pickup_lock_,0); makeStaticInfo(); startSupervisorLoop(); if(vulture_==0) vulture_ = new Vulture(true); AppDescSet_t setOfiDie= getApplicationContext()->getDefaultZone()-> getApplicationDescriptors("evf::iDie"); for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it) if ((*it)->getInstance()==0) // there has to be only one instance of iDie iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN(); }
FUEventProcessor::~FUEventProcessor | ( | ) | [virtual] |
Definition at line 300 of file FUEventProcessor.cc.
References vulture_.
void FUEventProcessor::actionPerformed | ( | xdata::Event & | e | ) |
Definition at line 551 of file FUEventProcessor.cc.
References epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, and evf::StateMachine::stateName().
{ if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") { std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName(); if ( item == "parameterSet") { LOG4CPLUS_WARN(getApplicationLogger(), "HLT Menu changed, force reinitialization of EventProcessor"); epInitialized_ = false; } if ( item == "outputEnabled") { if(outprev_ != outPut_) { LOG4CPLUS_WARN(getApplicationLogger(), (outprev_ ? "Disabling " : "Enabling ")<<"global output"); evtProcessor_->enableEndPaths(outPut_); outprev_ = outPut_; } } if (item == "globalInputPrescale") { LOG4CPLUS_WARN(getApplicationLogger(), "Setting global input prescale has no effect " <<"in this version of the code"); } if ( item == "globalOutputPrescale") { LOG4CPLUS_WARN(getApplicationLogger(), "Setting global output prescale has no effect " <<"in this version of the code"); } } }
void FUEventProcessor::attachDqmToShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 780 of file FUEventProcessor.cc.
References evtProcessor_, edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().
Referenced by enableCommon().
{ std::string errmsg; bool success = false; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) success = edm::Service<FUShmDQMOutputService>()->attachToShm(); if (!success) errmsg = "Failed to attach DQM service to shared memory"; } catch (cms::Exception& e) { errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
bool FUEventProcessor::configuring | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 315 of file FUEventProcessor.cc.
References configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, epInitialized_, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getNumberOfMicrostates(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), edm::event_processor::sInit, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), vp_, and vulture_.
{ // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " " // << ((instance_.value_==0) ? 0x8 : 0) << " " // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " " // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " " // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl; unsigned short smap = ((nbSubProcesses_.value_!=0) ? 0x10 : 0) + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice + (hasServiceWebRegistry_.value_ ? 0x4 : 0) + (hasModuleWebRegistry_.value_ ? 0x2 : 0) + (hasPrescaleService_.value_ ? 0x1 : 0); if(nbSubProcesses_.value_==0) { spMStates_.setSize(1); spmStates_.setSize(1); } else { spMStates_.setSize(nbSubProcesses_.value_); spmStates_.setSize(nbSubProcesses_.value_); for(unsigned int i = 0; i < spMStates_.size(); i++) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } try { LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ..."); std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg); epInitialized_=true; if(evtProcessor_) { // moved to wrapper class configuration_ = evtProcessor_.configuration(); if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); evtProcessor_->beginJob(); if(cpustat_) {delete cpustat_; cpustat_=0;} cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(), nbSubProcesses_.value_, instance_.value_, iDieUrl_.value_); if(ratestat_) {delete ratestat_; ratestat_=0;} ratestat_ = new RateStat(iDieUrl_.value_); if(iDieStatisticsGathering_.value_){ try{ cpustat_->sendLegenda(evtProcessor_.getmicromap()); xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0){ std::string slegenda = ((xdata::String*)legenda)->value_; ratestat_->sendLegenda(slegenda); } } catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda" << e.what()); } } fsm_.fireEvent("ConfigureDone",this); LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!"); localLog("-I- Configuration completed"); } } catch (xcept::Exception &e) { reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(...) { fsm_.fireFailed("Unknown Exception",this); } if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess(); return false; }
void evf::FUEventProcessor::css | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [inline] |
Definition at line 101 of file FUEventProcessor.h.
References evf::Css::css(), and css_.
Referenced by FUEventProcessor().
void FUEventProcessor::defaultWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 653 of file FUEventProcessor.cc.
References nbSubProcesses_, and dbtoconf::out.
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") << getApplicationDescriptor()->getInstance() << "</title>" << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">" << "</head></html>"; }
void FUEventProcessor::detachDqmFromShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 798 of file FUEventProcessor.cc.
References evtProcessor_, edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().
Referenced by stopClassic().
{ std::string errmsg; bool success = false; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) success = edm::Service<FUShmDQMOutputService>()->detachFromShm(); if (!success) errmsg = "Failed to detach DQM service from shared memory"; } catch (cms::Exception& e) { errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
bool FUEventProcessor::enableClassic | ( | ) | [private] |
Definition at line 1575 of file FUEventProcessor.cc.
References enableCommon(), evtProcessor_, localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.
Referenced by enabling().
{ bool retval = enableCommon(); while(evtProcessor_->getState()!= edm::event_processor::sRunning){ LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog"); ::sleep(1); } // implementation moved to EPWrapper // startScalersWorkLoop(); // this is now not done any longer localLog("-I- Start completed"); return retval; }
bool FUEventProcessor::enableCommon | ( | ) | [private] |
Definition at line 1516 of file FUEventProcessor.cc.
References attachDqmToShm(), gather_cfg::cout, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, runNumber_, and stor::utils::sleep().
Referenced by enableClassic(), and enableMPEPSlave().
{ try { if(hasShMem_) attachDqmToShm(); int sc = 0; evtProcessor_->clearCounters(); if(isRunNumberSetter_) evtProcessor_->setRunNumber(runNumber_.value_); else evtProcessor_->declareRunNumber(runNumber_.value_); try{ ::sleep(1); evtProcessor_->runAsync(); sc = evtProcessor_->statusAsync(); } catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } catch(...) { reasonForFailedState_ = "Unknown Exception"; fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } if(sc != 0) { std::ostringstream oss; oss<<"EventProcessor::runAsync returned status code " << sc; reasonForFailedState_ = oss.str(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } } catch (xcept::Exception &e) { reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } try{ fsm_.fireEvent("EnableDone",this); } catch (xcept::Exception &e) { std::cout << "exception " << (std::string)e.what() << std::endl; throw; } return false; }
bool FUEventProcessor::enableMPEPSlave | ( | ) | [private] |
Definition at line 1588 of file FUEventProcessor.cc.
References enableCommon(), evtProcessor_, Exception, evf::StateMachine::fireFailed(), fsm_, reco::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), and startScalersWorkLoop().
Referenced by enabling(), and supervisor().
{ //all this happens only in the child process startReceivingLoop(); startReceivingMonitorLoop(); evtProcessor_.resetWaiting(); startScalersWorkLoop(); while(!evtProcessor_.isWaitingForLs()) ::sleep(1); // @EM test do not run monitor loop in slave, only receiving&Monitor // evtProcessor_.startMonitoringWorkLoop(); try{ // evtProcessor_.makeServicesOnly(); try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } ML::MLlog4cplus::setAppl(this); } catch (xcept::Exception &e) { reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } bool retval = enableCommon(); // while(evtProcessor_->getState()!= edm::event_processor::sRunning){ // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog"); // ::sleep(1); // } return retval; }
bool FUEventProcessor::enabling | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 410 of file FUEventProcessor.cc.
References toolbox::mem::_s_mutex_ptr_, configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, evf::StateMachine::disableRcmsStateNotification(), enableClassic(), enableMPEPSlave(), epInitialized_, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getNumberOfMicrostates(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), myProcess_, nbSubProcesses_, nbTotalDQM_, ratestat_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), runNumber_, scalersLegendaInfoSpace_, scalersUpdates_, evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), evf::Vulture::start(), start_lock_, startSummarizeWorkLoop(), stop_lock_, subs_, vp_, and vulture_.
{ nbTotalDQM_ = 0; scalersUpdates_ = 0; // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " " // << ((instance_.value_==0) ? 0x8 : 0) << " " // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " " // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " " // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl; unsigned short smap = ((nbSubProcesses_.value_!=0) ? 0x10 : 0) + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice + (hasServiceWebRegistry_.value_ ? 0x4 : 0) + (hasModuleWebRegistry_.value_ ? 0x2 : 0) + (hasPrescaleService_.value_ ? 0x1 : 0); LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling..."); if(!epInitialized_){ evtProcessor_.forceInitEventProcessorMaybe(); } std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg); if(!epInitialized_){ evtProcessor_->beginJob(); if(cpustat_) {delete cpustat_; cpustat_=0;} cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(), nbSubProcesses_.value_, instance_.value_, iDieUrl_.value_); if(ratestat_) {delete ratestat_; ratestat_=0;} ratestat_ = new RateStat(iDieUrl_.value_); if(iDieStatisticsGathering_.value_){ try { cpustat_->sendLegenda(evtProcessor_.getmicromap()); xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0) { std::string slegenda = ((xdata::String*)legenda)->value_; ratestat_->sendLegenda(slegenda); } } catch(evf::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda" << e.what()); } catch (xcept::Exception& e) { LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda." << e.what()); } } epInitialized_ = true; } configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out... evtProcessor_.resetLumiSectionReferenceIndex(); //classic appl will return here if(nbSubProcesses_.value_==0) return enableClassic(); //protect manipulation of subprocess array pthread_mutex_lock(&start_lock_); subs_.clear(); subs_.resize(nbSubProcesses_.value_); // this should not be necessary pid_t retval = -1; for(unsigned int i=0; i<nbSubProcesses_.value_; i++) { subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables } pthread_mutex_unlock(&start_lock_); for(unsigned int i=0; i<nbSubProcesses_.value_; i++) { retval = subs_[i].forkNew(); if(retval==0) { myProcess_ = &subs_[i]; // dirty hack: delete/recreate global binary semaphore for later use in child delete toolbox::mem::_s_mutex_ptr_; toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true); int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); return enableMPEPSlave(); // the loop is broken in the child } } startSummarizeWorkLoop(); vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_); LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!"); fsm_.fireEvent("EnableDone",this); localLog("-I- Start completed"); return false; }
xoap::MessageReference FUEventProcessor::fsmCallback | ( | xoap::MessageReference | msg | ) | throw (xoap::exception::Exception) |
Definition at line 543 of file FUEventProcessor.cc.
References evf::StateMachine::commandCallback(), fsm_, and MatrixRunner::msg.
{ return fsm_.commandCallback(msg); }
bool FUEventProcessor::halting | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 519 of file FUEventProcessor.cc.
References evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::stopAndHalt(), and stopSlavesAndAcknowledge().
{ LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ..."); if(nbSubProcesses_.value_!=0) stopSlavesAndAcknowledge(); try{ evtProcessor_.stopAndHalt(); } catch (evf::Exception &e) { reasonForFailedState_ = "halting FAILED: " + (std::string)e.what(); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); } // if(hasShMem_) detachDqmFromShm(); LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!"); fsm_.fireEvent("HaltDone",this); localLog("-I- Halt completed"); return false; }
void FUEventProcessor::localLog | ( | std::string | m | ) | [private] |
Definition at line 832 of file FUEventProcessor.cc.
References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.
Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().
{ timeval tv; gettimeofday(&tv,0); tm *uptm = localtime(&tv.tv_sec); char datestring[256]; strftime(datestring, sizeof(datestring),"%c", uptm); if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;} logRingIndex_--; std::ostringstream timestamp; timestamp << " at " << datestring; m += timestamp.str(); logRing_[logRingIndex_] = m; }
std::string FUEventProcessor::logsAsString | ( | ) | [private] |
Definition at line 815 of file FUEventProcessor.cc.
References i, logRing_, logRingIndex_, and logWrap_.
{ std::ostringstream oss; if(logWrap_) { for(unsigned int i = logRingIndex_; i < logRing_.size(); i++) oss << logRing_[i] << std::endl; for(unsigned int i = 0; i < logRingIndex_; i++) oss << logRing_[i] << std::endl; } else for(unsigned int i = logRingIndex_; i < logRing_.size(); i++) oss << logRing_[i] << std::endl; return oss.str(); }
void FUEventProcessor::makeStaticInfo | ( | ) | [private] |
Definition at line 1900 of file FUEventProcessor.cc.
References applicationInfoSpace_, evf::utils::cDiv(), Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.
Referenced by FUEventProcessor().
{ using namespace utils; std::ostringstream ost; mDiv(&ost,"ve"); ost<< "$Revision: 1.134 $ (" << edm::getReleaseVersion() <<")"; cDiv(&ost); mDiv(&ost,"ou",outPut_.toString()); mDiv(&ost,"sh",hasShMem_.toString()); mDiv(&ost,"mw",hasModuleWebRegistry_.toString()); mDiv(&ost,"sw",hasServiceWebRegistry_.toString()); xdata::Serializable *monsleep = 0; xdata::Serializable *lstimeout = 0; try{ monsleep = applicationInfoSpace_->find("monSleepSec"); lstimeout = applicationInfoSpace_->find("lsTimeOut"); } catch(xdata::exception::Exception e){ } if(monsleep!=0) mDiv(&ost,"ms",monsleep->toString()); if(lstimeout!=0) mDiv(&ost,"lst",lstimeout->toString()); char cbuf[sizeof(struct utsname)]; struct utsname* buf = (struct utsname*)cbuf; uname(buf); mDiv(&ost,"sysinfo"); ost << buf->sysname << " " << buf->nodename << " " << buf->release << " " << buf->version << " " << buf->machine; cDiv(&ost); updaterStatic_ = ost.str(); }
void FUEventProcessor::microState | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 1706 of file FUEventProcessor.cc.
References autoRestartSlaves_, gather_cfg::cout, evtProcessor_, exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), subs_, and cms::Exception::what().
Referenced by FUEventProcessor().
{ std::string urn = getApplicationDescriptor()->getURN(); try{ evtProcessor_.stateNameFromIndex(0); evtProcessor_.moduleNameFromIndex(0); if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;} *out << "<tr><td>" << fsm_.stateName()->toString() << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>" << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>"; evtProcessor_.microState(in,out); *out << "<td></td><td>" << nbTotalDQM_ << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>"; if(nbSubProcesses_.value_!=0 && !myProcess_) { pthread_mutex_lock(&start_lock_); for(unsigned int i = 0; i < subs_.size(); i++) { try{ if(subs_[i].alive()>0) { *out << "<tr><td bgcolor=\"#00FF00\" id=\"a" << i << "\">""Alive</td><td>S</td><td>" << subs_[i].queueId() << "<td>" << subs_[i].queueStatus()<< "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" << subs_[i].pid() << "&method=procStat\">" << subs_[i].pid()<<"</a></td>" //<< msg->mtext; << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" << subs_[i].params().nba << "/" << subs_[i].params().nbp << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls << "</td><td>" << subs_[i].params().ps << "</td><td" << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">" << subs_[i].params().eols << "</td><td>" << subs_[i].params().dqm << "</td><td>" << subs_[i].params().trp << "</td>"; } else { pthread_mutex_lock(&pickup_lock_); *out << "<tr><td id=\"a"<< i << "\" "; if(subs_[i].alive()==-1000) *out << " bgcolor=\"#bbaabb\">NotInitialized"; else *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead"); *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" << subs_[i].queueStatus() << "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td id=\"p"<< i << "\">" <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed(); if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) { if(autoRestartSlaves_ && subs_[i].restartCount()<=2) *out << " will restart in " << subs_[i].countdown() << " s"; else if(autoRestartSlaves_) *out << " reached maximum restart count"; else *out << " autoRestart is disabled "; } *out << "</td><td" << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">" << subs_[i].params().eols << "</td><td>" << subs_[i].params().dqm << "</td><td>" << subs_[i].params().trp << "</td>"; pthread_mutex_unlock(&pickup_lock_); } *out << "</tr>"; } catch(evf::Exception &e){ *out << "<tr><td id=\"a"<< i << "\" " <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" << subs_[i].queueStatus() << "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td id=\"p"<< i << "\">" <<subs_[i].pid()<<"</td>"; } } pthread_mutex_unlock(&start_lock_); } } catch(evf::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what()); } catch(cms::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what()); } catch(std::exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what()); } catch(...) { LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - "); } }
void evf::FUEventProcessor::moduleWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | [inline] |
Definition at line 107 of file FUEventProcessor.h.
References evtProcessor_, and evf::FWEPWrapper::moduleWeb().
Referenced by FUEventProcessor(), and receivingAndMonitor().
{evtProcessor_.moduleWeb(in,out);}
void FUEventProcessor::pathNames | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 767 of file FUEventProcessor.cc.
References evtProcessor_, dbtoconf::out, and scalersLegendaInfoSpace_.
Referenced by FUEventProcessor().
{ if(evtProcessor_ != 0){ xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0){ std::string slegenda = ((xdata::String*)legenda)->value_; *out << slegenda << std::endl; } } }
void FUEventProcessor::procStat | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 1890 of file FUEventProcessor.cc.
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ evf::utils::procStat(out); }
bool FUEventProcessor::receiving | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 904 of file FUEventProcessor.cc.
References evf::StateMachine::fireEvent(), fsm_, reco::get(), edm::PresenceFactory::makePresence(), MatrixRunner::msg, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), stop_lock_, and stopClassic().
Referenced by startReceivingLoop().
{ MsgBuf msg; try{ myProcess_->rcvSlave(msg,false); //will receive only messages from Master if(msg->mtype==MSQM_MESSAGE_TYPE_STOP) { pthread_mutex_lock(&stop_lock_); fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP); myProcess_->postSlave(msg1,false); pthread_mutex_unlock(&stop_lock_); fclose(stdout); fclose(stderr); _exit(EXIT_SUCCESS); } if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP) _exit(EXIT_SUCCESS); } catch(evf::Exception &e){} return true; }
bool FUEventProcessor::receivingAndMonitor | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1372 of file FUEventProcessor.cc.
References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, AlCaHLTBitMon_QueryRunRegistry::data, defaultWebPage(), evf::prg::dqm, evf::prg::eols, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, Exception, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::FWEPWrapper::lastLumiUsingEol_, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, slave_message_monitoring_, slave_message_prr_, stor::utils::sleep(), spotlightWebPage(), edm::event_processor::sStopping, evf::StateMachine::stateName(), stop_lock_, EcalElecEmulTccFlatFileProducerFromTPG_cfg::tokens, evf::prg::trp, and TablePrint::write.
Referenced by startReceivingMonitorLoop().
{ try{ myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master switch(slave_message_monitoring_->mtype) { case MSQM_MESSAGE_TYPE_MCS: { xgi::Input *in = 0; xgi::Output out; evtProcessor_.microState(in,&out); MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR); strncpy(msg1->mtext,out.str().c_str(),out.str().size()); myProcess_->postSlave(msg1,true); break; } case MSQM_MESSAGE_TYPE_PRG: { xdata::Serializable *dqmp = 0; xdata::UnsignedInteger32 *dqm = 0; evtProcessor_.monitoring(0); try{ dqmp = applicationInfoSpace_-> find("nbDqmUpdates"); } catch(xdata::exception::Exception e){} if(dqmp!=0) dqm = (xdata::UnsignedInteger32*)dqmp; // monitorInfoSpace_->lock(); prg * data = (prg*)slave_message_prr_->mtext; data->ls = evtProcessor_.lsid_; data->eols = evtProcessor_.lastLumiUsingEol_; data->ps = evtProcessor_.psid_; data->nbp = evtProcessor_->totalEvents(); data->nba = evtProcessor_->totalEventsPassed(); data->Ms = evtProcessor_.epMAltState_.value_; data->ms = evtProcessor_.epmAltState_.value_; if(dqm) data->dqm = dqm->value_; else data->dqm = 0; data->trp = scalersUpdates_; // monitorInfoSpace_->unlock(); myProcess_->postSlave(slave_message_prr_,true); if(exitOnError_.value_) { // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl; if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) { bool running = true; int count = 0; while(running){ int retval = pthread_mutex_lock(&stop_lock_); if(retval != 0) perror("error"); running = fsm_.stateName()->toString()=="Enabled"; if(count>5) _exit(-1); pthread_mutex_unlock(&stop_lock_); if(running) {::sleep(1); count++;} } } } // scalersUpdates_++; break; } case MSQM_MESSAGE_TYPE_WEB: { xgi::Input *in = 0; xgi::Output out; unsigned int bytesToSend = 0; MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB); std::string query = slave_message_monitoring_->mtext; size_t pos = query.find_first_of("&"); std::string method; std::string args; if(pos!=std::string::npos) { method = query.substr(0,pos); args = query.substr(pos+1,query.length()-pos-1); } else method=query; if(method=="Spotlight") { spotlightWebPage(in,&out); } else if(method=="procStat") { procStat(in,&out); } else if(method=="moduleWeb") { internal::MyCgi mycgi; boost::char_separator<char> sep(";"); boost::tokenizer<boost::char_separator<char> > tokens(args, sep); for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin(); tok_iter != tokens.end(); ++tok_iter){ size_t pos = (*tok_iter).find_first_of("%"); if(pos != std::string::npos){ std::string first = (*tok_iter).substr(0 , pos); std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1); mycgi.getEnvironment()[first]=second; } } moduleWeb(&mycgi,&out); } else if(method=="Default") { defaultWebPage(in,&out); } else { out << "Error 404!!!!!!!!" << std::endl; } bytesToSend = out.str().size(); unsigned int cycle = 0; if(bytesToSend==0) { snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend); myProcess_->postSlave(msg1,true); } while(bytesToSend !=0){ unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend; write(anonymousPipe_[PIPE_WRITE], out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle, msgSize); snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize); myProcess_->postSlave(msg1,true); bytesToSend -= msgSize; cycle++; } break; } case MSQM_MESSAGE_TYPE_TRP: { break; } } } catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;} return true; }
bool FUEventProcessor::scalers | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1252 of file FUEventProcessor.cc.
References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), runTheMatrix::ret, scalersUpdates_, and wlScalersActive_.
Referenced by startScalersWorkLoop().
{ if(evtProcessor_) { if(!evtProcessor_.getTriggerReport(true)) { wlScalersActive_ = false; return false; } } else { std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl; wlScalersActive_ = false; return false; } if(myProcess_) { // std::cout << getpid() << "going to post on control queue from scalers" << std::endl; int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false); if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl; scalersUpdates_++; } else evtProcessor_.fireScalersUpdate(); return true; }
void FUEventProcessor::scalersWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 754 of file FUEventProcessor.cc.
References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.
Referenced by FUEventProcessor().
{ out->getHTTPResponseHeader().addHeader( "Content-Type", "application/octet-stream" ); out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding", "binary" ); if(evtProcessor_ != 0){ out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) ); } }
void FUEventProcessor::sendMessageOverMonitorQueue | ( | MsgBuf & | buf | ) |
Definition at line 1895 of file FUEventProcessor.cc.
References myProcess_, and evf::SubProcess::postSlave().
{ if(myProcess_) myProcess_->postSlave(buf,true); }
void evf::FUEventProcessor::serviceWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | [inline] |
Definition at line 108 of file FUEventProcessor.h.
References evtProcessor_, and evf::FWEPWrapper::serviceWeb().
Referenced by FUEventProcessor().
{evtProcessor_.serviceWeb(in,out);}
void FUEventProcessor::spotlightWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 669 of file FUEventProcessor.cc.
References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ std::string urn = getApplicationDescriptor()->getURN(); *out << "<!-- base href=\"/" << urn << "\"> -->" << std::endl; *out << "<html>" << std::endl; *out << "<head>" << std::endl; *out << "<link type=\"text/css\" rel=\"stylesheet\""; *out << " href=\"/evf/html/styles.css\"/>" << std::endl; *out << "<title>" << getApplicationDescriptor()->getClassName() << getApplicationDescriptor()->getInstance() << " MAIN</title>" << std::endl; *out << "</head>" << std::endl; *out << "<body>" << std::endl; *out << "<table border=\"0\" width=\"100%\">" << std::endl; *out << "<tr>" << std::endl; *out << " <td align=\"left\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl; *out << " alt=\"main\"" << std::endl; *out << " width=\"64\"" << std::endl; *out << " height=\"64\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " <b>" << std::endl; *out << getApplicationDescriptor()->getClassName() << getApplicationDescriptor()->getInstance() << std::endl; *out << " " << fsm_.stateName()->toString() << std::endl; *out << " </b>" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl; *out << " alt=\"HyperDAQ\"" << std::endl; *out << " width=\"32\"" << std::endl; *out << " height=\"32\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " </a>" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " <a href=\"/" << urn << "/\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/evf/images/epicon.jpg\"" << std::endl; *out << " alt=\"main\"" << std::endl; *out << " width=\"32\"" << std::endl; *out << " height=\"32\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " </a>" << std::endl; *out << " </td>" << std::endl; *out << "</tr>" << std::endl; *out << "</table>" << std::endl; *out << "<hr/>" << std::endl; std::ostringstream ost; if(myProcess_) ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&"; else ost << "/moduleWeb?"; urn += ost.str(); if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0)) evtProcessor_.taskWebPage(in,out,urn); else if(evtProcessor_) evtProcessor_.summaryWebPage(in,out,urn); else *out << "<td>HLT Unconfigured</td>" << std::endl; *out << "</table>" << std::endl; *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl; *out << configuration_ << std::endl; *out << "</textarea><P>" << std::endl; *out << "</body>" << std::endl; *out << "</html>" << std::endl; }
void FUEventProcessor::startReceivingLoop | ( | ) | [private] |
Definition at line 867 of file FUEventProcessor.cc.
References asReceiveMsgAndExecute_, Exception, MatrixRunner::msg, receiving(), receiving_, and wlReceiving_.
Referenced by enableMPEPSlave().
{ try { wlReceiving_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving", "waiting"); if (!wlReceiving_->isActive()) wlReceiving_->activate(); asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving, "Receiving"); wlReceiving_->submit(asReceiveMsgAndExecute_); receiving_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Receiving'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startReceivingMonitorLoop | ( | ) | [private] |
Definition at line 884 of file FUEventProcessor.cc.
References asReceiveMsgAndRead_, Exception, MatrixRunner::msg, receivingAndMonitor(), receivingM_, and wlReceivingMonitor_.
Referenced by enableMPEPSlave().
{ try { wlReceivingMonitor_= toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM", "waiting"); if (!wlReceivingMonitor_->isActive()) wlReceivingMonitor_->activate(); asReceiveMsgAndRead_ = toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor, "ReceivingM"); wlReceivingMonitor_->submit(asReceiveMsgAndRead_); receivingM_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'ReceivingM'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startScalersWorkLoop | ( | ) | throw (evf::Exception) [private] |
Definition at line 1209 of file FUEventProcessor.cc.
References asScalers_, Exception, MatrixRunner::msg, scalers(), wlScalers_, and wlScalersActive_.
Referenced by enableMPEPSlave().
{ try { wlScalers_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers", "waiting"); if (!wlScalers_->isActive()) wlScalers_->activate(); asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers, "Scalers"); wlScalers_->submit(asScalers_); wlScalersActive_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Scalers'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startSummarizeWorkLoop | ( | ) | throw (evf::Exception) [private] |
Definition at line 1230 of file FUEventProcessor.cc.
References asSummarize_, Exception, MatrixRunner::msg, summarize(), wlSummarize_, and wlSummarizeActive_.
Referenced by enabling().
{ try { wlSummarize_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary", "waiting"); if (!wlSummarize_->isActive()) wlSummarize_->activate(); asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize, "Summary"); wlSummarize_->submit(asSummarize_); wlSummarizeActive_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Summarize'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startSupervisorLoop | ( | ) | [private] |
Definition at line 849 of file FUEventProcessor.cc.
References asSupervisor_, Exception, MatrixRunner::msg, supervising_, supervisor(), and wlSupervising_.
Referenced by FUEventProcessor().
{ try { wlSupervising_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor", "waiting"); if (!wlSupervising_->isActive()) wlSupervising_->activate(); asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor, "Supervisor"); wlSupervising_->submit(asSupervisor_); supervising_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Supervisor'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
bool FUEventProcessor::stopClassic | ( | ) | [private] |
Definition at line 1631 of file FUEventProcessor.cc.
References detachDqmFromShm(), edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, and evf::FWEPWrapper::stop().
Referenced by receiving(), and stopping().
{ try { LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ..."); edm::EventProcessor::StatusCode rc = evtProcessor_.stop(); if(rc == edm::EventProcessor::epSuccess) fsm_.fireEvent("StopDone",this); else { // epMState_ = evtProcessor_->currentStateName(); if(rc == edm::EventProcessor::epTimedOut) reasonForFailedState_ = "EventProcessor stop timed out"; else reasonForFailedState_ = "EventProcessor did not receive STOP event"; fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } if(hasShMem_) detachDqmFromShm(); } catch (xcept::Exception &e) { reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what(); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); } LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!"); localLog("-I- Stop completed"); return false; }
bool FUEventProcessor::stopping | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 509 of file FUEventProcessor.cc.
References nbSubProcesses_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.
Referenced by supervisor().
{ if(nbSubProcesses_.value_!=0) stopSlavesAndAcknowledge(); vulture_->stop(); return stopClassic(); }
void FUEventProcessor::stopSlavesAndAcknowledge | ( | ) | [private] |
Definition at line 1660 of file FUEventProcessor.cc.
References i, localLog(), MAX_MSG_SIZE, MatrixRunner::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, nbSubProcesses_, reasonForFailedState_, stop_lock_, and subs_.
Referenced by halting(), and stopping().
{ MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP); MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP); std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false); for(unsigned int i = 0; i < subs_.size(); i++) { pthread_mutex_lock(&stop_lock_); if(subs_[i].alive()>0){ processes_to_stop[i] = true; subs_[i].post(msg,false); } pthread_mutex_unlock(&stop_lock_); } for(unsigned int i = 0; i < subs_.size(); i++) { pthread_mutex_lock(&stop_lock_); if(processes_to_stop[i]){ try{ subs_[i].rcv(msg1,false); } catch(evf::Exception &e){ std::ostringstream ost; ost << "failed to get STOP - errno ->" << errno << " " << e.what(); reasonForFailedState_ = ost.str(); LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_); // fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); pthread_mutex_unlock(&stop_lock_); continue; } } else { pthread_mutex_unlock(&stop_lock_); continue; } pthread_mutex_unlock(&stop_lock_); if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP) while(subs_[i].alive()>0) ::usleep(10000); subs_[i].disconnect(); } // subs_.clear(); }
void FUEventProcessor::subWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 585 of file FUEventProcessor.cc.
References anonymousPipe_, gather_cfg::cout, generateEDF::done, asciidump::els, i, j, Association::map, MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, mod(), MatrixRunner::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), subs_, and superSleepSec_.
Referenced by FUEventProcessor().
{ using namespace cgicc; pid_t pid = 0; std::ostringstream ost; ost << "&"; Cgicc cgi(in); internal::MyCgi *mycgi = (internal::MyCgi*)in; for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = mycgi->getEnvironment().begin(); mit != mycgi->getEnvironment().end(); mit++) ost << mit->first << "%" << mit->second << ";"; std::vector<FormEntry> els = cgi.getElements() ; std::vector<FormEntry> el1; cgi.getElement("method",el1); std::vector<FormEntry> el2; cgi.getElement("process",el2); if(el1.size()!=0) { std::string meth = el1[0].getValue(); if(el2.size()!=0) { unsigned int i = 0; std::string mod = el2[0].getValue(); pid = atoi(mod.c_str()); // get the process id to be polled for(; i < subs_.size(); i++) if(subs_[i].pid()==pid) break; if(i>=subs_.size()){ //process was not found, let the browser know *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl; return; } if(subs_[i].alive() != 1){ *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl; return; } MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB); strncpy(msg1->mtext,meth.c_str(),meth.length()); strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length()); subs_[i].post(msg1,true); unsigned int keep_supersleep_original_value = superSleepSec_.value_; superSleepSec_.value_=10*keep_supersleep_original_value; MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB); bool done = false; std::vector<char *>pieces; while(!done){ unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true); if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){ ::sleep(1); continue; } unsigned int nbytes = atoi(msg->mtext); if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop char *buf= new char[nbytes]; ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes); if(retval!=nbytes) std::cout << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl; pieces.push_back(buf); } superSleepSec_.value_=keep_supersleep_original_value; for(unsigned int j = 0; j < pieces.size(); j++){ *out<<pieces[j]; // chain the buffers into the output strstream delete[] pieces[j]; //make sure to release all buffers used for reading the pipe } } } }
bool FUEventProcessor::summarize | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1280 of file FUEventProcessor.cc.
References gather_cfg::cout, cpustat_, evf::TriggerReportStatic::eventSummary, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, evf::TriggerReportStatic::lumiSection, master_message_trr_, MSQS_MESSAGE_TYPE_TRR, evf::TriggerReportStatic::nbExpected, evf::TriggerReportStatic::nbReporting, nbSubProcesses_, nbSubProcessesReporting_, ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), runTheMatrix::ret, evf::CPUStat::sendStat(), evf::RateStat::sendStat(), evf::CPUStat::setNproc(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), edm::EventSummary::totalEvents, evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.
Referenced by startSummarizeWorkLoop().
{ evtProcessor_.resetPackedTriggerReport(); bool atLeastOneProcessUpdatedSuccessfully = false; int msgCount = 0; for (unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()>0) { int ret = 0; if(subs_[i].check_postponed_trigger_update(master_message_trr_, evtProcessor_.getLumiSectionReferenceIndex())) { ret = MSQS_MESSAGE_TYPE_TRR; std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl; } else{ bool insync = false; bool exception_caught = false; while(!insync){ try{ ret = subs_[i].rcv(master_message_trr_,false); } catch(evf::Exception &e) { std::cout << "exception in msgrcv on " << i << " " << subs_[i].alive() << " " << strerror(errno) << std::endl; exception_caught = true; break; //do nothing special } if(ret==MSQS_MESSAGE_TYPE_TRR) { TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext; if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){ insync = true; } } } if(exception_caught) continue; } msgCount++; if(ret==MSQS_MESSAGE_TYPE_TRR) { TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext; if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){ std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl; subs_[i].add_postponed_trigger_update(master_message_trr_); }else{ atLeastOneProcessUpdatedSuccessfully = true; evtProcessor_.sumAndPackTriggerReport(master_message_trr_); } } else std::cout << "msgrcv returned error " << errno << std::endl; } } if(atLeastOneProcessUpdatedSuccessfully){ nbSubProcessesReporting_.value_ = msgCount; evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_; evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_; evtProcessor_.updateRollingReport(); evtProcessor_.fireScalersUpdate(); } else{ LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again"); if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement(); nbSubProcessesReporting_.value_ = 0; ::sleep(10); } if(fsm_.stateName()->toString()!="Enabled"){ wlScalersActive_ = false; return false; } // cpustat_->printStat(); if(iDieStatisticsGathering_.value_){ try{ TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct(); cpustat_ ->setNproc(trsp->eventSummary.totalEvents); cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex()); ratestat_->sendStat((unsigned char*)trsp, sizeof(TriggerReportStatic), evtProcessor_.getLumiSectionReferenceIndex()); }catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics" << e.what()); } } cpustat_->reset(); return true; }
bool FUEventProcessor::supervisor | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 943 of file FUEventProcessor.cc.
References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, enableMPEPSlave(), evtProcessor_, exception, Exception, spr::find(), evf::StateMachine::fireEvent(), fsm_, i, localLog(), log_, evf::prg::ls, python::rootplot::utilities::ls(), master_message_prg_, master_message_prr_, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_FSTOP, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), NUMERIC_MESSAGE_SIZE, AlCaHLTBitMon_ParallelJobs::p, pickup_lock_, evf::utils::pid, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.
Referenced by startSupervisorLoop().
{ pthread_mutex_lock(&stop_lock_); if(subs_.size()!=nbSubProcesses_.value_) { subs_.resize(nbSubProcesses_.value_); spMStates_.resize(nbSubProcesses_.value_); spmStates_.resize(nbSubProcesses_.value_); for(unsigned int i = 0; i < spMStates_.size(); i++) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } bool running = fsm_.stateName()->toString()=="Enabled"; bool stopping = fsm_.stateName()->toString()=="stopping"; for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()==-1000) continue; int sl; pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG); if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1)); else continue; pthread_mutex_lock(&pickup_lock_); std::ostringstream ost; if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl); else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl); else ost << " process stopped "; subs_[i].countdown()=slaveRestartDelaySecs_.value_; subs_[i].setReasonForFailed(ost.str()); spMStates_[i] = evtProcessor_.notstarted_state_code(); spmStates_[i] = 0; std::ostringstream ost1; ost1 << "-E- Slave " << subs_[i].pid() << ost.str(); localLog(ost1.str()); if(!autoRestartSlaves_.value_) subs_[i].disconnect(); pthread_mutex_unlock(&pickup_lock_); } pthread_mutex_unlock(&stop_lock_); if(stopping) return true; // if in stopping we are done if(running) { // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them // this is only active while running, hence, the stop lock is acquired and only released at end of loop if(autoRestartSlaves_.value_){ pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive() != 1){ if(subs_[i].countdown() == 0) { if(subs_[i].restartCount()>2){ LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i << " - maximum restart count reached"); std::ostringstream ost1; ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; localLog(ost1.str()); subs_[i].countdown()--; XCEPT_DECLARE(evf::Exception, sentinelException, ost1.str()); notifyQualified("error",sentinelException); subs_[i].disconnect(); continue; } subs_[i].restartCount()++; pid_t rr = subs_[i].forkNew(); if(rr==0) { myProcess_=&subs_[i]; scalersUpdates_ = 0; int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition try{ xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex"); if(lsid) { ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died } } catch(...){ std::cout << "trouble with lsindex during restart" << std::endl; } try{ xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered"); if(lstb) { ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting } } catch(...){ std::cout << "trouble with resetting flag for eol recovery " << std::endl; } evtProcessor_.adjustLsIndexForRestart(); evtProcessor_.resetPackedTriggerReport(); enableMPEPSlave(); return false; // exit the supervisor loop immediately in the child !!! } else { std::ostringstream ost1; ost1 << "-I- New Process " << rr << " forked for slot " << i; localLog(ost1.str()); } } if(subs_[i].countdown()>=0) subs_[i].countdown()--; } } pthread_mutex_unlock(&stop_lock_); } // finished handling replacement of dead slaves once they've been reaped } xdata::Serializable *lsid = 0; xdata::Serializable *psid = 0; xdata::Serializable *dqmp = 0; xdata::UnsignedInteger32 *dqm = 0; if(running){ try{ lsid = applicationInfoSpace_->find("lumiSectionIndex"); psid = applicationInfoSpace_->find("prescaleSetIndex"); nbProcessed = monitorInfoSpace_->find("nbProcessed"); nbAccepted = monitorInfoSpace_->find("nbAccepted"); dqmp = applicationInfoSpace_-> find("nbDqmUpdates"); } catch(xdata::exception::Exception e){ LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what()); } try{ if(nbProcessed !=0 && nbAccepted !=0) { xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed); xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted); xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid); xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid); if(dqmp!=0) dqm = (xdata::UnsignedInteger32*)dqmp; if(dqm) dqm->value_ = 0; nbTotalDQM_ = 0; nbp->value_ = 0; nba->value_ = 0; nblive_ = 0; nbdead_ = 0; scalersUpdates_ = 0; for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()>0) { nblive_++; try{ subs_[i].post(master_message_prg_,true); unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true); if(retval == (unsigned long) master_message_prr_->mtype){ prg* p = (struct prg*)(master_message_prr_->mtext); subs_[i].setParams(p); spMStates_[i] = p->Ms; spmStates_[i] = p->ms; cpustat_->addEntry(p->ms); if(!subs_[i].inInconsistentState() && (p->Ms == edm::event_processor::sError || p->Ms == edm::event_processor::sInvalid || p->Ms == edm::event_processor::sStopping)) { std::ostringstream ost; ost << "edm::eventprocessor slot " << i << " process id " << subs_[i].pid() << " not in Running state : Mstate=" << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate=" << evtProcessor_.moduleNameFromIndex(p->ms) << " - Look into possible error messages from HLT process"; LOG4CPLUS_WARN(getApplicationLogger(),ost.str()); } nbp->value_ += subs_[i].params().nbp; nba->value_ += subs_[i].params().nba; if(dqm)dqm->value_ += p->dqm; nbTotalDQM_ += p->dqm; scalersUpdates_ += p->trp; if(p->ls > ls->value_) ls->value_ = p->ls; if(p->ps != ps->value_) ps->value_ = p->ps; } else{ nbp->value_ += subs_[i].get_save_nbp(); nba->value_ += subs_[i].get_save_nba(); } } catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(), "could not send/receive msg on slot " << i << " - " << e.what()); } } else { nbp->value_ += subs_[i].get_save_nbp(); nba->value_ += subs_[i].get_save_nba(); nbdead_++; } } if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells) for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].params().nbp == 0){ // a slave has processed 0 events // check that the process is not stuck if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0) { subs_[i].found_invalid();//increase the "found_invalid" counter if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal subs_[i].post(msg3,false); std::ostringstream ost1; ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; localLog(ost1.str()); LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str()); XCEPT_DECLARE(evf::Exception, sentinelException, ost1.str()); notifyQualified("error",sentinelException); } } } } } } } catch(std::exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what()); } catch(...){ LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception "); } } else{ for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()==-1000) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } } try{ monitorInfoSpace_->lock(); monitorInfoSpace_->fireItemGroupChanged(names_,0); monitorInfoSpace_->unlock(); } catch(xdata::exception::Exception &e) { LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what()); // localLog(e.what()); } ::sleep(superSleepSec_.value_); return true; }
void FUEventProcessor::updater | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 1817 of file FUEventProcessor.cc.
References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, cpustat_, evtProcessor_, fsm_, evf::CPUStat::getChart(), evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, nbSubProcessesReporting_, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.
Referenced by FUEventProcessor().
{ using namespace utils; *out << updaterStatic_; mDiv(out,"loads"); uptime(out); cDiv(out); mDiv(out,"st",fsm_.stateName()->toString()); mDiv(out,"ru",runNumber_.toString()); mDiv(out,"nsl",nbSubProcesses_.value_); mDiv(out,"nsr",nbSubProcessesReporting_.value_); mDiv(out,"cl"); *out << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " "); cDiv(out); mDiv(out,"in",getApplicationDescriptor()->getInstance()); if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){ mDiv(out,"hlt"); *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>"; cDiv(out); *out << std::endl; } else mDiv(out,"hlt","Not yet..."); mDiv(out,"sq",squidPresent_.toString()); mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized")); mDiv(out,"mwl",evtProcessor_.wlMonitoring()); if(nbProcessed != 0 && nbAccepted != 0) { mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_); mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_); } else { mDiv(out,"tt",0); mDiv(out,"ac",0); } if(!myProcess_) mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive")); else mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive")); mDiv(out,"idi",iDieUrl_.value_); if(vp_!=0){ mDiv(out,"vpi",(unsigned int) vp_); if(vulture_->hasStarted()>=0) mDiv(out,"vul","Prowling"); else mDiv(out,"vul","Dead"); } else{ mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching")); } if(evtProcessor_){ mDiv(out,"ll"); *out << evtProcessor_.lastLumi().ls << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc; cDiv(out); } mDiv(out,"lg"); for(unsigned int i = logRingIndex_; i<logRingSize_; i++) *out << logRing_[i] << std::endl; if(logWrap_) for(unsigned int i = 0; i<logRingIndex_; i++) *out << logRing_[i] << std::endl; cDiv(out); mDiv(out,"cha"); if(cpustat_) *out << cpustat_->getChart(); cDiv(out); }
evf::FUEventProcessor::XDAQ_INSTANTIATOR | ( | ) |
int evf::FUEventProcessor::anonymousPipe_[2] [private] |
Definition at line 244 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().
xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_ [private] |
Definition at line 222 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), makeStaticInfo(), receivingAndMonitor(), and supervisor().
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_ [private] |
Definition at line 210 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_ [private] |
Definition at line 213 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_ [private] |
Definition at line 236 of file FUEventProcessor.h.
Referenced by startScalersWorkLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_ [private] |
Definition at line 242 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_ [private] |
Definition at line 217 of file FUEventProcessor.h.
Referenced by startSupervisorLoop().
xdata::Boolean evf::FUEventProcessor::autoRestartSlaves_ [private] |
Definition at line 167 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), microState(), and supervisor().
xdata::String evf::FUEventProcessor::class_ [private] |
Definition at line 158 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::String evf::FUEventProcessor::configString_ [private] |
Definition at line 162 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and updater().
std::string evf::FUEventProcessor::configuration_ [private] |
Definition at line 163 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and spotlightWebPage().
CPUStat* evf::FUEventProcessor::cpustat_ [private] |
Definition at line 252 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), summarize(), supervisor(), and updater().
Css evf::FUEventProcessor::css_ [private] |
Definition at line 189 of file FUEventProcessor.h.
Referenced by css().
xdata::Boolean evf::FUEventProcessor::epInitialized_ [private] |
Definition at line 161 of file FUEventProcessor.h.
Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().
Definition at line 154 of file FUEventProcessor.h.
Referenced by actionPerformed(), attachDqmToShm(), configuring(), detachDqmFromShm(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), FUEventProcessor(), halting(), microState(), moduleWeb(), pathNames(), receivingAndMonitor(), scalers(), scalersWeb(), serviceWeb(), spotlightWebPage(), stopClassic(), summarize(), supervisor(), and updater().
xdata::Boolean evf::FUEventProcessor::exitOnError_ [private] |
Definition at line 186 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
evf::StateMachine evf::FUEventProcessor::fsm_ [private] |
Definition at line 148 of file FUEventProcessor.h.
Referenced by actionPerformed(), configuring(), enableCommon(), enableMPEPSlave(), enabling(), fsmCallback(), FUEventProcessor(), halting(), microState(), receiving(), receivingAndMonitor(), spotlightWebPage(), stopClassic(), summarize(), supervisor(), and updater().
xdata::Boolean evf::FUEventProcessor::hasModuleWebRegistry_ [private] |
Definition at line 172 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().
xdata::Boolean evf::FUEventProcessor::hasPrescaleService_ [private] |
Definition at line 171 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::hasServiceWebRegistry_ [private] |
Definition at line 173 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().
xdata::Boolean evf::FUEventProcessor::hasShMem_ [private] |
Definition at line 170 of file FUEventProcessor.h.
Referenced by enableCommon(), FUEventProcessor(), makeStaticInfo(), and stopClassic().
xdata::Boolean evf::FUEventProcessor::iDieStatisticsGathering_ [private] |
Definition at line 175 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().
xdata::String evf::FUEventProcessor::iDieUrl_ [private] |
Definition at line 249 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::instance_ [private] |
Definition at line 159 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::isRunNumberSetter_ [private] |
Definition at line 174 of file FUEventProcessor.h.
Referenced by enableCommon(), and FUEventProcessor().
Logger evf::FUEventProcessor::log_ [private] |
Definition at line 151 of file FUEventProcessor.h.
Referenced by supervisor().
std::vector<std::string> evf::FUEventProcessor::logRing_ [private] |
Definition at line 195 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
unsigned int evf::FUEventProcessor::logRingIndex_ [private] |
Definition at line 196 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
const unsigned int evf::FUEventProcessor::logRingSize_ = 50 [static, private] |
Definition at line 197 of file FUEventProcessor.h.
Referenced by localLog(), and updater().
bool evf::FUEventProcessor::logWrap_ [private] |
Definition at line 198 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
Definition at line 255 of file FUEventProcessor.h.
Referenced by supervisor().
Definition at line 256 of file FUEventProcessor.h.
Referenced by supervisor().
Definition at line 259 of file FUEventProcessor.h.
Referenced by summarize().
xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_ [private] |
Definition at line 220 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
xdata::InfoSpace* evf::FUEventProcessor::monitorLegendaInfoSpace_ [private] |
Definition at line 221 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
SubProcess* evf::FUEventProcessor::myProcess_ [private] |
Definition at line 215 of file FUEventProcessor.h.
Referenced by enabling(), microState(), receiving(), receivingAndMonitor(), scalers(), sendMessageOverMonitorQueue(), spotlightWebPage(), supervisor(), and updater().
std::list<std::string> evf::FUEventProcessor::names_ [private] |
Definition at line 248 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
xdata::Serializable* evf::FUEventProcessor::nbAccepted [private] |
Definition at line 228 of file FUEventProcessor.h.
Referenced by supervisor(), and updater().
unsigned int evf::FUEventProcessor::nbdead_ [private] |
Definition at line 204 of file FUEventProcessor.h.
Referenced by microState(), and supervisor().
unsigned int evf::FUEventProcessor::nblive_ [private] |
Definition at line 203 of file FUEventProcessor.h.
Referenced by microState(), and supervisor().
xdata::Serializable* evf::FUEventProcessor::nbProcessed [private] |
Definition at line 227 of file FUEventProcessor.h.
Referenced by supervisor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_ [private] |
Definition at line 200 of file FUEventProcessor.h.
Referenced by configuring(), defaultWebPage(), enabling(), FUEventProcessor(), halting(), microState(), spotlightWebPage(), stopping(), stopSlavesAndAcknowledge(), summarize(), supervisor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_ [private] |
Definition at line 201 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), summarize(), and updater().
unsigned int evf::FUEventProcessor::nbTotalDQM_ [private] |
Definition at line 206 of file FUEventProcessor.h.
Referenced by enabling(), microState(), and supervisor().
bool evf::FUEventProcessor::outprev_ [private] |
Definition at line 176 of file FUEventProcessor.h.
Referenced by actionPerformed().
xdata::Boolean evf::FUEventProcessor::outPut_ [private] |
Definition at line 165 of file FUEventProcessor.h.
Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().
pthread_mutex_t evf::FUEventProcessor::pickup_lock_ [private] |
Definition at line 225 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), microState(), and supervisor().
RateStat* evf::FUEventProcessor::ratestat_ [private] |
Definition at line 253 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and summarize().
std::string evf::FUEventProcessor::reasonForFailedState_ [private] |
Definition at line 192 of file FUEventProcessor.h.
Referenced by configuring(), enableCommon(), enableMPEPSlave(), halting(), stopClassic(), and stopSlavesAndAcknowledge().
bool evf::FUEventProcessor::receiving_ [private] |
Definition at line 211 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
bool evf::FUEventProcessor::receivingM_ [private] |
Definition at line 214 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_ [private] |
Definition at line 160 of file FUEventProcessor.h.
Referenced by enableCommon(), enabling(), FUEventProcessor(), and updater().
xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_ [private] |
Definition at line 231 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::InfoSpace* evf::FUEventProcessor::scalersLegendaInfoSpace_ [private] |
Definition at line 232 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and pathNames().
unsigned int evf::FUEventProcessor::scalersUpdates_ [private] |
Definition at line 238 of file FUEventProcessor.h.
Referenced by enabling(), receivingAndMonitor(), scalers(), and supervisor().
Definition at line 258 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
Definition at line 257 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_ [private] |
Definition at line 168 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
std::string evf::FUEventProcessor::sourceId_ [private] |
Definition at line 179 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::Vector<xdata::Integer> evf::FUEventProcessor::spMStates_ [private] |
Definition at line 245 of file FUEventProcessor.h.
Referenced by configuring(), FUEventProcessor(), and supervisor().
xdata::Vector<xdata::Integer> evf::FUEventProcessor::spmStates_ [private] |
Definition at line 246 of file FUEventProcessor.h.
Referenced by configuring(), FUEventProcessor(), and supervisor().
SquidNet evf::FUEventProcessor::squidnet_ [private] |
Definition at line 194 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::squidPresent_ [private] |
Definition at line 182 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and updater().
pthread_mutex_t evf::FUEventProcessor::start_lock_ [private] |
Definition at line 224 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), and microState().
pthread_mutex_t evf::FUEventProcessor::stop_lock_ [private] |
Definition at line 223 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), receiving(), receivingAndMonitor(), stopSlavesAndAcknowledge(), and supervisor().
std::vector<SubProcess> evf::FUEventProcessor::subs_ [private] |
Definition at line 202 of file FUEventProcessor.h.
Referenced by enabling(), microState(), stopSlavesAndAcknowledge(), subWeb(), summarize(), and supervisor().
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_ [private] |
Definition at line 247 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), subWeb(), and supervisor().
bool evf::FUEventProcessor::supervising_ [private] |
Definition at line 218 of file FUEventProcessor.h.
Referenced by startSupervisorLoop(), and updater().
std::string evf::FUEventProcessor::updaterStatic_ [private] |
Definition at line 226 of file FUEventProcessor.h.
Referenced by makeStaticInfo(), and updater().
xdata::String evf::FUEventProcessor::url_ [private] |
Definition at line 157 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
pid_t evf::FUEventProcessor::vp_ [private] |
Definition at line 251 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and updater().
Vulture* evf::FUEventProcessor::vulture_ [private] |
Definition at line 250 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), stopping(), updater(), and ~FUEventProcessor().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_ [private] |
Definition at line 209 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_ [private] |
Definition at line 212 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_ [private] |
Definition at line 235 of file FUEventProcessor.h.
Referenced by startScalersWorkLoop().
bool evf::FUEventProcessor::wlScalersActive_ [private] |
Definition at line 237 of file FUEventProcessor.h.
Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_ [private] |
Definition at line 241 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop().
bool evf::FUEventProcessor::wlSummarizeActive_ [private] |
Definition at line 243 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop(), and updater().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_ [private] |
Definition at line 216 of file FUEventProcessor.h.
Referenced by startSupervisorLoop().