#include <FUEventProcessor.h>
Public Member Functions | |
void | actionPerformed (xdata::Event &e) |
bool | configuring (toolbox::task::WorkLoop *wl) |
void | css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
bool | enabling (toolbox::task::WorkLoop *wl) |
xoap::MessageReference | fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception) |
FUEventProcessor (xdaq::ApplicationStub *s) | |
bool | halting (toolbox::task::WorkLoop *wl) |
void | microState (xgi::Input *in, xgi::Output *out) |
void | moduleWeb (xgi::Input *in, xgi::Output *out) |
void | pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
void | procStat (xgi::Input *in, xgi::Output *out) |
void | scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
void | sendMessageOverMonitorQueue (MsgBuf &) |
void | serviceWeb (xgi::Input *in, xgi::Output *out) |
void | spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
bool | stopping (toolbox::task::WorkLoop *wl) |
void | subWeb (xgi::Input *in, xgi::Output *out) |
void | updater (xgi::Input *in, xgi::Output *out) |
XDAQ_INSTANTIATOR () | |
virtual | ~FUEventProcessor () |
Static Public Member Functions | |
static void | forkProcessFromEDM_helper (void *addr) |
Private Member Functions | |
void | attachDqmToShm () throw (evf::Exception) |
void | detachDqmFromShm () throw (evf::Exception) |
bool | doEndRunInEDM () |
bool | enableClassic () |
bool | enableCommon () |
bool | enableForkInEDM () |
bool | enableMPEPSlave () |
void | forkProcessesFromEDM () |
void | localLog (std::string) |
std::string | logsAsString () |
void | makeStaticInfo () |
bool | receiving (toolbox::task::WorkLoop *wl) |
bool | receivingAndMonitor (toolbox::task::WorkLoop *wl) |
bool | restartForkInEDM (unsigned int slotId) |
bool | scalers (toolbox::task::WorkLoop *wl) |
void | setAttachDqmToShm () throw (evf::Exception) |
void | startReceivingLoop () |
void | startReceivingMonitorLoop () |
void | startScalersWorkLoop () throw (evf::Exception) |
void | startSummarizeWorkLoop () throw (evf::Exception) |
void | startSupervisorLoop () |
bool | stopClassic () |
void | stopSlavesAndAcknowledge () |
bool | summarize (toolbox::task::WorkLoop *wl) |
bool | supervisor (toolbox::task::WorkLoop *wl) |
Private Attributes | |
int | anonymousPipe_ [2] |
xdata::InfoSpace * | applicationInfoSpace_ |
toolbox::task::ActionSignature * | asReceiveMsgAndExecute_ |
toolbox::task::ActionSignature * | asReceiveMsgAndRead_ |
toolbox::task::ActionSignature * | asScalers_ |
toolbox::task::ActionSignature * | asSummarize_ |
toolbox::task::ActionSignature * | asSupervisor_ |
xdata::Boolean | autoRestartSlaves_ |
xdata::String | class_ |
xdata::String | configString_ |
std::string | configuration_ |
CPUStat * | cpustat_ |
Css | css_ |
bool | edm_init_done_ |
xdata::Boolean | epInitialized_ |
FWEPWrapper | evtProcessor_ |
xdata::Boolean | exitOnError_ |
xdata::UnsignedInteger32 | forkInEDM_ |
moduleweb::ForkInfoObj * | forkInfoObj_ |
pthread_mutex_t | forkObjLock_ |
evf::StateMachine | fsm_ |
xdata::Boolean | hasModuleWebRegistry_ |
xdata::Boolean | hasPrescaleService_ |
xdata::Boolean | hasServiceWebRegistry_ |
xdata::Boolean | hasShMem_ |
xdata::Boolean | iDieStatisticsGathering_ |
xdata::String | iDieUrl_ |
xdata::UnsignedInteger32 | instance_ |
xdata::Boolean | isRunNumberSetter_ |
Logger | log_ |
std::vector< std::string > | logRing_ |
unsigned int | logRingIndex_ |
bool | logWrap_ |
MsgBuf | master_message_prg_ |
MsgBuf | master_message_prr_ |
MsgBuf | master_message_trr_ |
xdata::InfoSpace * | monitorInfoSpace_ |
xdata::InfoSpace * | monitorLegendaInfoSpace_ |
ModuleWebRegistry * | mwrRef_ |
SubProcess * | myProcess_ |
std::list< std::string > | names_ |
xdata::Serializable * | nbAccepted |
unsigned int | nbdead_ |
unsigned int | nblive_ |
xdata::Serializable * | nbProcessed |
xdata::UnsignedInteger32 | nbSubProcesses_ |
xdata::UnsignedInteger32 | nbSubProcessesReporting_ |
unsigned int | nbTotalDQM_ |
bool | outprev_ |
xdata::Boolean | outPut_ |
pthread_mutex_t | pickup_lock_ |
RateStat * | ratestat_ |
std::string | reasonForFailedState_ |
bool | receiving_ |
bool | receivingM_ |
xdata::UnsignedInteger32 | runNumber_ |
xdata::InfoSpace * | scalersInfoSpace_ |
xdata::InfoSpace * | scalersLegendaInfoSpace_ |
unsigned int | scalersUpdates_ |
MsgBuf | slave_message_monitoring_ |
MsgBuf | slave_message_prr_ |
xdata::UnsignedInteger32 | slaveRestartDelaySecs_ |
ShmOutputModuleRegistry * | sorRef_ |
std::string | sourceId_ |
xdata::Vector< xdata::Integer > | spMStates_ |
xdata::Vector< xdata::Integer > | spmStates_ |
SquidNet | squidnet_ |
xdata::Boolean | squidPresent_ |
pthread_mutex_t | start_lock_ |
pthread_mutex_t | stop_lock_ |
std::vector< SubProcess > | subs_ |
xdata::UnsignedInteger32 | superSleepSec_ |
bool | supervising_ |
std::string | updaterStatic_ |
xdata::String | url_ |
pid_t | vp_ |
Vulture * | vulture_ |
toolbox::task::WorkLoop * | wlReceiving_ |
toolbox::task::WorkLoop * | wlReceivingMonitor_ |
toolbox::task::WorkLoop * | wlScalers_ |
bool | wlScalersActive_ |
toolbox::task::WorkLoop * | wlSummarize_ |
bool | wlSummarizeActive_ |
toolbox::task::WorkLoop * | wlSupervising_ |
Static Private Attributes | |
static const unsigned int | logRingSize_ = 50 |
Definition at line 61 of file FUEventProcessor.h.
FUEventProcessor::FUEventProcessor | ( | xdaq::ApplicationStub * | s | ) |
Definition at line 63 of file FUEventProcessor.cc.
References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, css(), defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), forkInEDM_, forkInfoObj_, forkObjLock_, evf::StateMachine::foundRcmsStateListener(), fsm_, reco::get(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, outPut_, pathNames(), pickup_lock_, testRegParams::pipe, procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, subWeb(), superSleepSec_, updater(), url_, and vulture_.
: xdaq::Application(s) , fsm_(this) , log_(getApplicationLogger()) , evtProcessor_(log_, getApplicationDescriptor()->getInstance()) , runNumber_(0) , epInitialized_(false) , outPut_(true) , autoRestartSlaves_(false) , slaveRestartDelaySecs_(10) , hasShMem_(true) , hasPrescaleService_(true) , hasModuleWebRegistry_(true) , hasServiceWebRegistry_(true) , isRunNumberSetter_(true) , iDieStatisticsGathering_(false) , outprev_(true) , exitOnError_(true) , reasonForFailedState_() , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt") , logRing_(logRingSize_) , logRingIndex_(logRingSize_) , logWrap_(false) , nbSubProcesses_(0) , nbSubProcessesReporting_(0) , forkInEDM_(true) , nblive_(0) , nbdead_(0) , nbTotalDQM_(0) , wlReceiving_(0) , asReceiveMsgAndExecute_(0) , receiving_(false) , wlReceivingMonitor_(0) , asReceiveMsgAndRead_(0) , receivingM_(false) , myProcess_(0) , wlSupervising_(0) , asSupervisor_(0) , supervising_(false) , monitorInfoSpace_(0) , monitorLegendaInfoSpace_(0) , applicationInfoSpace_(0) , nbProcessed(0) , nbAccepted(0) , scalersInfoSpace_(0) , scalersLegendaInfoSpace_(0) , wlScalers_(0) , asScalers_(0) , wlScalersActive_(false) , scalersUpdates_(0) , wlSummarize_(0) , asSummarize_(0) , wlSummarizeActive_(false) , superSleepSec_(1) , iDieUrl_("none") , vulture_(0) , vp_(0) , cpustat_(0) , ratestat_(0) , mwrRef_(nullptr) , sorRef_(nullptr) , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG) , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR) , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR) , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR) , edm_init_done_(true) { using namespace utils; names_.push_back("nbProcessed" ); names_.push_back("nbAccepted" ); names_.push_back("epMacroStateInt"); names_.push_back("epMicroStateInt"); // create pipe for web communication int retpipe = pipe(anonymousPipe_); if(retpipe != 0) LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe"); // check squid presence squidPresent_ = squidnet_.check(); //pass application parameters to FWEPWrapper evtProcessor_.setAppDesc(getApplicationDescriptor()); evtProcessor_.setAppCtxt(getApplicationContext()); // bind relevant callbacks to finite state machine fsm_.initialize<evf::FUEventProcessor>(this); //set sourceId_ url_ = getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+ getApplicationDescriptor()->getURN(); class_ =getApplicationDescriptor()->getClassName(); instance_=getApplicationDescriptor()->getInstance(); sourceId_=class_.toString()+"_"+instance_.toString(); LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" ); LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE")); getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg"); xdata::InfoSpace *ispace = getApplicationInfoSpace(); applicationInfoSpace_ = ispace; // default configuration ispace->fireItemAvailable("parameterSet", &configString_ ); ispace->fireItemAvailable("epInitialized", &epInitialized_ ); ispace->fireItemAvailable("stateName", fsm_.stateName() ); ispace->fireItemAvailable("runNumber", &runNumber_ ); ispace->fireItemAvailable("outputEnabled", &outPut_ ); ispace->fireItemAvailable("hasSharedMemory", &hasShMem_); ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ ); ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ ); ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ ); ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ ); ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_); ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() ); ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener()); ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ ); ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ ); ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ ); ispace->fireItemAvailable("superSleepSec", &superSleepSec_ ); ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ ); ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ ); ispace->fireItemAvailable("iDieUrl", &iDieUrl_ ); // Add infospace listeners for exporting data values getApplicationInfoSpace()->addItemChangedListener("parameterSet", this); getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this); // findRcmsStateListener fsm_.findRcmsStateListener(); // initialize monitoring infospace std::string monInfoSpaceName="evf-eventprocessor-status-monitor"; toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName); monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda"; urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName); monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); monitorInfoSpace_->fireItemAvailable("url", &url_ ); monitorInfoSpace_->fireItemAvailable("class", &class_ ); monitorInfoSpace_->fireItemAvailable("instance", &instance_ ); monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ ); monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName()); monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ ); std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor"; urn = this->createQualifiedInfoSpace(scalersInfoSpaceName); scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda"; urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName); scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_); scalersInfoSpace_->fireItemAvailable("instance", &instance_); evtProcessor_.setApplicationInfoSpace(ispace); evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_); evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0); //subprocess state vectors for MP monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_); monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_); // Bind web interface xgi::bind(this, &FUEventProcessor::css, "styles.css"); xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" ); xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" ); xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb"); xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" ); xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" ); xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" ); xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb"); xgi::bind(this, &FUEventProcessor::microState, "microState"); xgi::bind(this, &FUEventProcessor::updater, "updater" ); xgi::bind(this, &FUEventProcessor::procStat, "procStat" ); // instantiate the plugin manager, not referenced here after! edm::AssertHandler ah; try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } ML::MLlog4cplus::setAppl(this); typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t; typedef AppDescSet_t::iterator AppDescIter_t; AppDescSet_t rcms= getApplicationContext()->getDefaultZone()-> getApplicationDescriptors("RCMSStateListener"); if(rcms.size()==0) { LOG4CPLUS_WARN(getApplicationLogger(), "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!"); // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!"); } else { AppDescIter_t it = rcms.begin(); evtProcessor_.setRcms(*it); } pthread_mutex_init(&start_lock_,0); pthread_mutex_init(&stop_lock_,0); pthread_mutex_init(&pickup_lock_,0); forkInfoObj_=nullptr; pthread_mutex_init(&forkObjLock_,0); makeStaticInfo(); startSupervisorLoop(); if(vulture_==0) vulture_ = new Vulture(true); AppDescSet_t setOfiDie= getApplicationContext()->getDefaultZone()-> getApplicationDescriptors("evf::iDie"); for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it) if ((*it)->getInstance()==0) // there has to be only one instance of iDie iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN(); }
FUEventProcessor::~FUEventProcessor | ( | ) | [virtual] |
Definition at line 309 of file FUEventProcessor.cc.
References vulture_.
void FUEventProcessor::actionPerformed | ( | xdata::Event & | e | ) |
Definition at line 679 of file FUEventProcessor.cc.
References alignCSCRings::e, epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, and evf::StateMachine::stateName().
{ if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") { std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName(); if ( item == "parameterSet") { LOG4CPLUS_WARN(getApplicationLogger(), "HLT Menu changed, force reinitialization of EventProcessor"); epInitialized_ = false; } if ( item == "outputEnabled") { if(outprev_ != outPut_) { LOG4CPLUS_WARN(getApplicationLogger(), (outprev_ ? "Disabling " : "Enabling ")<<"global output"); evtProcessor_->enableEndPaths(outPut_); outprev_ = outPut_; } } if (item == "globalInputPrescale") { LOG4CPLUS_WARN(getApplicationLogger(), "Setting global input prescale has no effect " <<"in this version of the code"); } if ( item == "globalOutputPrescale") { LOG4CPLUS_WARN(getApplicationLogger(), "Setting global output prescale has no effect " <<"in this version of the code"); } } }
void FUEventProcessor::attachDqmToShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 924 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().
Referenced by enableCommon(), and forkProcessesFromEDM().
{ std::string errmsg; bool success = false; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) success = edm::Service<FUShmDQMOutputService>()->attachToShm(); if (!success) errmsg = "Failed to attach DQM service to shared memory"; } catch (cms::Exception& e) { errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
bool FUEventProcessor::configuring | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 324 of file FUEventProcessor.cc.
References configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, alignCSCRings::e, epInitialized_, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), mwrRef_, nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), edm::event_processor::sInit, sorRef_, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), vp_, and vulture_.
{ // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " " // << ((instance_.value_==0) ? 0x8 : 0) << " " // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " " // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " " // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl; unsigned short smap = ((nbSubProcesses_.value_!=0) ? 0x10 : 0) + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice + (hasServiceWebRegistry_.value_ ? 0x4 : 0) + (hasModuleWebRegistry_.value_ ? 0x2 : 0) + (hasPrescaleService_.value_ ? 0x1 : 0); if(nbSubProcesses_.value_==0) { spMStates_.setSize(1); spmStates_.setSize(1); } else { spMStates_.setSize(nbSubProcesses_.value_); spmStates_.setSize(nbSubProcesses_.value_); for(unsigned int i = 0; i < spMStates_.size(); i++) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } try { LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ..."); std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg); epInitialized_=true; if(evtProcessor_) { //get ref of mwr mwrRef_ = evtProcessor_.getModuleWebRegistry(); sorRef_ = evtProcessor_.getShmOutputModuleRegistry(); // moved to wrapper class configuration_ = evtProcessor_.configuration(); if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); evtProcessor_->beginJob(); if(cpustat_) {delete cpustat_; cpustat_=0;} cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(), nbSubProcesses_.value_, instance_.value_, iDieUrl_.value_); if(ratestat_) {delete ratestat_; ratestat_=0;} ratestat_ = new RateStat(iDieUrl_.value_); if(iDieStatisticsGathering_.value_){ try{ cpustat_->sendLegenda(evtProcessor_.getmicromap()); xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0){ std::string slegenda = ((xdata::String*)legenda)->value_; ratestat_->sendLegenda(slegenda); } } catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda" << e.what()); } } fsm_.fireEvent("ConfigureDone",this); LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!"); localLog("-I- Configuration completed"); } } catch (xcept::Exception &e) { reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(...) { fsm_.fireFailed("Unknown Exception",this); } if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess(); return false; }
void evf::FUEventProcessor::css | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [inline] |
Definition at line 104 of file FUEventProcessor.h.
References evf::Css::css(), and css_.
Referenced by FUEventProcessor().
void FUEventProcessor::defaultWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 781 of file FUEventProcessor.cc.
References nbSubProcesses_, and dbtoconf::out.
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") << getApplicationDescriptor()->getInstance() << "</title>" << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">" << "</head></html>"; }
void FUEventProcessor::detachDqmFromShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 942 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().
Referenced by stopClassic().
{ std::string errmsg; bool success = false; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) success = edm::Service<FUShmDQMOutputService>()->detachFromShm(); if (!success) errmsg = "Failed to detach DQM service from shared memory"; } catch (cms::Exception& e) { errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
bool FUEventProcessor::doEndRunInEDM | ( | ) | [private] |
Definition at line 574 of file FUEventProcessor.cc.
References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, edm_init_done_, evtProcessor_, forkInfoObj_, evf::moduleweb::ForkInfoObj::lock(), log_, edm::event_processor::sJobReady, stor::utils::sleep(), edm::event_processor::sRunning, edm::event_processor::sStopping, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().
Referenced by halting(), and stopping().
{ //taking care that EP in master is in stoppable state if (forkInfoObj_) { int count = 30; while (!edm_init_done_ && count) { ::sleep(1); if (count%5==0) LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec"); count--; } //if (count==0) fsm_.fireFailed("failed to stop Master EP",this); if (evtProcessor_->getState()==edm::event_processor::sJobReady) return true;//we are already done forkInfoObj_->lock(); forkInfoObj_->stopCondition=true; sem_post(forkInfoObj_->control_sem_); forkInfoObj_->unlock(); count = 30; edm::event_processor::State st; while(count--) { st = evtProcessor_->getState(); if (st==edm::event_processor::sRunning) ::usleep(100000); else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) { break; } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping"); return false; } } if (count<0) { LOG4CPLUS_ERROR(getApplicationLogger(), "Timeout waiting for Master edm::EventProcessor to go stopping state:"<<evtProcessor_->stateName(st)); return false; } } return true; }
bool FUEventProcessor::enableClassic | ( | ) | [private] |
Definition at line 1964 of file FUEventProcessor.cc.
References enableCommon(), evtProcessor_, localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.
Referenced by enabling().
{ bool retval = enableCommon(); while(evtProcessor_->getState()!= edm::event_processor::sRunning){ LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog"); ::sleep(1); } // implementation moved to EPWrapper // startScalersWorkLoop(); // this is now not done any longer localLog("-I- Start completed"); return retval; }
bool FUEventProcessor::enableCommon | ( | ) | [private] |
Definition at line 1674 of file FUEventProcessor.cc.
References attachDqmToShm(), gather_cfg::cout, alignCSCRings::e, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, runNumber_, and stor::utils::sleep().
Referenced by enableClassic(), and enableMPEPSlave().
{ try { if(hasShMem_) attachDqmToShm(); int sc = 0; evtProcessor_->clearCounters(); if(isRunNumberSetter_) evtProcessor_->setRunNumber(runNumber_.value_); else evtProcessor_->declareRunNumber(runNumber_.value_); try{ ::sleep(1); evtProcessor_->runAsync(); sc = evtProcessor_->statusAsync(); } catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } catch(...) { reasonForFailedState_ = "Unknown Exception"; fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } if(sc != 0) { std::ostringstream oss; oss<<"EventProcessor::runAsync returned status code " << sc; reasonForFailedState_ = oss.str(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } } catch (xcept::Exception &e) { reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } try{ fsm_.fireEvent("EnableDone",this); } catch (xcept::Exception &e) { std::cout << "exception " << (std::string)e.what() << std::endl; throw; } return false; }
bool FUEventProcessor::enableForkInEDM | ( | ) | [private] |
Definition at line 1885 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, exception, cms::Exception::explainSelf(), evf::StateMachine::fireFailed(), evf::moduleweb::ForkInfoObj::forkHandler, forkInfoObj_, forkObjLock_, evf::moduleweb::ForkInfoObj::forkParams, forkProcessFromEDM_helper(), fsm_, evf::moduleweb::ForkInfoObj::fuAddr, isRunNumberSetter_, log_, evf::moduleweb::ForkInfoObj::mst_lock_, mwrRef_, evf::ModuleWebRegistry::publishForkInfo(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), runNumber_, and evf::moduleweb::ForkInfoObj::stopCondition.
Referenced by enabling().
{ evtProcessor_.resetWaiting(); try { //set to connect to Shm later //if(hasShMem_) setAttachDqmToShm(); int sc = 0; //maybe not needed in MP mode evtProcessor_->clearCounters(); if(isRunNumberSetter_) evtProcessor_->setRunNumber(runNumber_.value_); else evtProcessor_->declareRunNumber(runNumber_.value_); //prepare object used to communicate with DaqSource pthread_mutex_destroy(&forkObjLock_); pthread_mutex_init(&forkObjLock_,0); if (forkInfoObj_) delete forkInfoObj_; forkInfoObj_ = new moduleweb::ForkInfoObj(); forkInfoObj_->mst_lock_=&forkObjLock_; forkInfoObj_->fuAddr=(void*)this; forkInfoObj_->forkHandler = forkProcessFromEDM_helper; forkInfoObj_->forkParams.slotId=-1; forkInfoObj_->forkParams.restart=0; forkInfoObj_->forkParams.isMaster=-1; forkInfoObj_->stopCondition=0; if (mwrRef_) mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_); evtProcessor_->runAsync(); sc = evtProcessor_->statusAsync(); if(sc != 0) { std::ostringstream oss; oss<<"EventProcessor::runAsync returned status code " << sc; reasonForFailedState_ = oss.str(); fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } } //catch exceptions on master side catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } catch(...) { reasonForFailedState_ = "Unknown Exception"; fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } return true; }
bool FUEventProcessor::enableMPEPSlave | ( | ) | [private] |
Definition at line 1977 of file FUEventProcessor.cc.
References alignCSCRings::e, enableCommon(), evtProcessor_, Exception, evf::StateMachine::fireFailed(), fsm_, reco::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), and startScalersWorkLoop().
Referenced by enabling(), and supervisor().
{ //all this happens only in the child process startReceivingLoop(); startReceivingMonitorLoop(); evtProcessor_.resetWaiting(); startScalersWorkLoop(); while(!evtProcessor_.isWaitingForLs()) ::sleep(1); // @EM test do not run monitor loop in slave, only receiving&Monitor // evtProcessor_.startMonitoringWorkLoop(); try{ // evtProcessor_.makeServicesOnly(); try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } ML::MLlog4cplus::setAppl(this); } catch (xcept::Exception &e) { reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } bool retval = enableCommon(); // while(evtProcessor_->getState()!= edm::event_processor::sRunning){ // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog"); // ::sleep(1); // } return retval; }
bool FUEventProcessor::enabling | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 422 of file FUEventProcessor.cc.
References toolbox::mem::_s_mutex_ptr_, configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, enableClassic(), enableForkInEDM(), enableMPEPSlave(), epInitialized_, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), forkInEDM_, fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), mwrRef_, myProcess_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, ratestat_, reasonForFailedState_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), runNumber_, scalersLegendaInfoSpace_, scalersUpdates_, evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), edm::event_processor::sError, edm::event_processor::sInvalid, sorRef_, edm::event_processor::sRunning, evf::Vulture::start(), start_lock_, startSummarizeWorkLoop(), stop_lock_, subs_, vp_, and vulture_.
{ nbTotalDQM_ = 0; scalersUpdates_ = 0; // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " " // << ((instance_.value_==0) ? 0x8 : 0) << " " // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " " // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " " // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl; unsigned short smap = ((nbSubProcesses_.value_!=0) ? 0x10 : 0) + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice + (hasServiceWebRegistry_.value_ ? 0x4 : 0) + (hasModuleWebRegistry_.value_ ? 0x2 : 0) + (hasPrescaleService_.value_ ? 0x1 : 0); LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling..."); if(!epInitialized_){ evtProcessor_.forceInitEventProcessorMaybe(); } std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg); mwrRef_ = evtProcessor_.getModuleWebRegistry(); sorRef_ = evtProcessor_.getShmOutputModuleRegistry(); if(!epInitialized_){ evtProcessor_->beginJob(); if(cpustat_) {delete cpustat_; cpustat_=0;} cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(), nbSubProcesses_.value_, instance_.value_, iDieUrl_.value_); if(ratestat_) {delete ratestat_; ratestat_=0;} ratestat_ = new RateStat(iDieUrl_.value_); if(iDieStatisticsGathering_.value_){ try { cpustat_->sendLegenda(evtProcessor_.getmicromap()); xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0) { std::string slegenda = ((xdata::String*)legenda)->value_; ratestat_->sendLegenda(slegenda); } } catch(evf::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda" << e.what()); } catch (xcept::Exception& e) { LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda." << e.what()); } } epInitialized_ = true; } configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out... evtProcessor_.resetLumiSectionReferenceIndex(); //classic appl will return here if(nbSubProcesses_.value_==0) return enableClassic(); //protect manipulation of subprocess array pthread_mutex_lock(&start_lock_); pthread_mutex_lock(&pickup_lock_); subs_.clear(); subs_.resize(nbSubProcesses_.value_); // this should not be necessary pid_t retval = -1; for(unsigned int i=0; i<nbSubProcesses_.value_; i++) { subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables } pthread_mutex_unlock(&pickup_lock_); pthread_mutex_unlock(&start_lock_); //set expected number of child EP's for the Init message(s) sent to the SM try { if (sorRef_) { unsigned int nbExpectedEPWriters = nbSubProcesses_.value_; if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules(); for (unsigned int i=0;i<shmOutputs.size();i++) { shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters); } } } catch (...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs"); } //use new method if configured edm_init_done_=true; if (forkInEDM_.value_) { edm_init_done_=false; enableForkInEDM(); } else for(unsigned int i=0; i<nbSubProcesses_.value_; i++) { retval = subs_[i].forkNew(); if(retval==0) { myProcess_ = &subs_[i]; // dirty hack: delete/recreate global binary semaphore for later use in child delete toolbox::mem::_s_mutex_ptr_; toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true); int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); return enableMPEPSlave(); // the loop is broken in the child } } if (!edm_init_done_) { //enable while we wait for beginRun/conditions to load LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!"); fsm_.fireEvent("EnableDone",this); localLog("-I- Start completed"); edm::event_processor::State st; while (!edm_init_done_) { usleep(10000); st = evtProcessor_->getState(); if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break; } st = evtProcessor_->getState(); //error handling: EP must fork during sRunning if (st!=edm::event_processor::sRunning) { reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); return false; } startSummarizeWorkLoop(); vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_); return false; } startSummarizeWorkLoop(); vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_); LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!"); fsm_.fireEvent("EnableDone",this); localLog("-I- Start completed"); return false; }
void FUEventProcessor::forkProcessesFromEDM | ( | ) | [private] |
Definition at line 1738 of file FUEventProcessor.cc.
References toolbox::mem::_s_mutex_ptr_, evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, attachDqmToShm(), gather_cfg::cout, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, evtProcessor_, exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, fsm_, reco::get(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasShMem_, i, evf::moduleweb::ForkParams::isMaster, evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), myProcess_, nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::resetPackedTriggerReport(), evf::moduleweb::ForkParams::restart, scalersUpdates_, ML::MLlog4cplus::setAppl(), evf::moduleweb::ForkParams::slotId, sorRef_, startReceivingLoop(), startReceivingMonitorLoop(), startScalersWorkLoop(), stop_lock_, and subs_.
{ moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams); unsigned int forkFrom=0; unsigned int forkTo=nbSubProcesses_.value_; if (forkParams->slotId>=0) { forkFrom=forkParams->slotId; forkTo=forkParams->slotId+1; } //before fork, make sure to disconnect output modules from Shm try { if (sorRef_) { std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules(); for (unsigned int i=0;i<shmOutputs.size();i++) { //unregister PID from ShmBuffer/RB shmOutputs[i]->unregisterFromShm(); //disconnect from Shm shmOutputs[i]->stop(); } } } catch (std::exception &e) { reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what(); LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch (...) { reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: "; LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } //fork loop for(unsigned int i=forkFrom; i<forkTo; i++) { int retval = subs_[i].forkNew(); if(retval==0) { forkParams->isMaster=0; myProcess_ = &subs_[i]; // dirty hack: delete/recreate global binary semaphore for later use in child delete toolbox::mem::_s_mutex_ptr_; toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true); int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); //release the presense factory in master edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence"); } ML::MLlog4cplus::setAppl(this); //reconnect to Shm from output modules try { if (sorRef_) { std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules(); for (unsigned int i=0;i<shmOutputs.size();i++) shmOutputs[i]->start(); } } catch (...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")"); } if (forkParams->restart) { //do restart things scalersUpdates_ = 0; fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition try{ xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex"); if(lsid) { ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died } } catch(...){ std::cout << "trouble with lsindex during restart" << std::endl; } try{ xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered"); if(lstb) { ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting } } catch(...){ std::cout << "trouble with resetting flag for eol recovery " << std::endl; } evtProcessor_.adjustLsIndexForRestart(); evtProcessor_.resetPackedTriggerReport(); } //start other threads startReceivingLoop(); startReceivingMonitorLoop(); startScalersWorkLoop(); while(!evtProcessor_.isWaitingForLs()) ::usleep(100000);//wait for scalers loop to start //connect DQMShmOutputModule if(hasShMem_) attachDqmToShm(); //catch transition error if we are already Enabled try { fsm_.fireEvent("EnableDone",this); } catch (...) {} //child return to DaqSource return ; } else { forkParams->isMaster=1; forkInfoObj_->forkParams.slotId=-1; forkInfoObj_->forkParams.restart=0; if (forkParams->restart) { std::ostringstream ost1; ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId; localLog(ost1.str()); } } } edm_init_done_=true; }
void FUEventProcessor::forkProcessFromEDM_helper | ( | void * | addr | ) | [static] |
Definition at line 1734 of file FUEventProcessor.cc.
Referenced by enableForkInEDM().
{ ((FUEventProcessor*)addr)->forkProcessesFromEDM(); }
xoap::MessageReference FUEventProcessor::fsmCallback | ( | xoap::MessageReference | msg | ) | throw (xoap::exception::Exception) |
Definition at line 671 of file FUEventProcessor.cc.
References evf::StateMachine::commandCallback(), fsm_, and lumiQueryAPI::msg.
{ return fsm_.commandCallback(msg); }
bool FUEventProcessor::halting | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 640 of file FUEventProcessor.cc.
References doEndRunInEDM(), alignCSCRings::e, evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInEDM_, forkInfoObj_, fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::stopAndHalt(), and stopSlavesAndAcknowledge().
{ LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ..."); if(nbSubProcesses_.value_!=0) { stopSlavesAndAcknowledge(); if (forkInEDM_.value_) doEndRunInEDM(); } try{ evtProcessor_.stopAndHalt(); //cleanup forking variables if (forkInfoObj_) { delete forkInfoObj_; forkInfoObj_=0; } } catch (evf::Exception &e) { reasonForFailedState_ = "halting FAILED: " + (std::string)e.what(); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); } // if(hasShMem_) detachDqmFromShm(); LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!"); fsm_.fireEvent("HaltDone",this); localLog("-I- Halt completed"); return false; }
void FUEventProcessor::localLog | ( | std::string | m | ) | [private] |
Definition at line 976 of file FUEventProcessor.cc.
References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.
Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().
{ timeval tv; gettimeofday(&tv,0); tm *uptm = localtime(&tv.tv_sec); char datestring[256]; strftime(datestring, sizeof(datestring),"%c", uptm); if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;} logRingIndex_--; std::ostringstream timestamp; timestamp << " at " << datestring; m += timestamp.str(); logRing_[logRingIndex_] = m; }
std::string FUEventProcessor::logsAsString | ( | ) | [private] |
Definition at line 959 of file FUEventProcessor.cc.
References i, logRing_, logRingIndex_, and logWrap_.
{ std::ostringstream oss; if(logWrap_) { for(unsigned int i = logRingIndex_; i < logRing_.size(); i++) oss << logRing_[i] << std::endl; for(unsigned int i = 0; i < logRingIndex_; i++) oss << logRing_[i] << std::endl; } else for(unsigned int i = logRingIndex_; i < logRing_.size(); i++) oss << logRing_[i] << std::endl; return oss.str(); }
void FUEventProcessor::makeStaticInfo | ( | ) | [private] |
Definition at line 2291 of file FUEventProcessor.cc.
References applicationInfoSpace_, evf::utils::cDiv(), alignCSCRings::e, Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.
Referenced by FUEventProcessor().
{ using namespace utils; std::ostringstream ost; mDiv(&ost,"ve"); ost<< "$Revision: 1.135 $ (" << edm::getReleaseVersion() <<")"; cDiv(&ost); mDiv(&ost,"ou",outPut_.toString()); mDiv(&ost,"sh",hasShMem_.toString()); mDiv(&ost,"mw",hasModuleWebRegistry_.toString()); mDiv(&ost,"sw",hasServiceWebRegistry_.toString()); xdata::Serializable *monsleep = 0; xdata::Serializable *lstimeout = 0; try{ monsleep = applicationInfoSpace_->find("monSleepSec"); lstimeout = applicationInfoSpace_->find("lsTimeOut"); } catch(xdata::exception::Exception e){ } if(monsleep!=0) mDiv(&ost,"ms",monsleep->toString()); if(lstimeout!=0) mDiv(&ost,"lst",lstimeout->toString()); char cbuf[sizeof(struct utsname)]; struct utsname* buf = (struct utsname*)cbuf; uname(buf); mDiv(&ost,"sysinfo"); ost << buf->sysname << " " << buf->nodename << " " << buf->release << " " << buf->version << " " << buf->machine; cDiv(&ost); updaterStatic_ = ost.str(); }
void FUEventProcessor::microState | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 2097 of file FUEventProcessor.cc.
References autoRestartSlaves_, gather_cfg::cout, alignCSCRings::e, evtProcessor_, exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), subs_, and cms::Exception::what().
Referenced by FUEventProcessor().
{ std::string urn = getApplicationDescriptor()->getURN(); try{ evtProcessor_.stateNameFromIndex(0); evtProcessor_.moduleNameFromIndex(0); if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;} *out << "<tr><td>" << fsm_.stateName()->toString() << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>" << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>"; evtProcessor_.microState(in,out); *out << "<td></td><td>" << nbTotalDQM_ << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>"; if(nbSubProcesses_.value_!=0 && !myProcess_) { pthread_mutex_lock(&start_lock_); for(unsigned int i = 0; i < subs_.size(); i++) { try{ if(subs_[i].alive()>0) { *out << "<tr><td bgcolor=\"#00FF00\" id=\"a" << i << "\">""Alive</td><td>S</td><td>" << subs_[i].queueId() << "<td>" << subs_[i].queueStatus()<< "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" << subs_[i].pid() << "&method=procStat\">" << subs_[i].pid()<<"</a></td>" //<< msg->mtext; << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" << subs_[i].params().nba << "/" << subs_[i].params().nbp << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls << "</td><td>" << subs_[i].params().ps << "</td><td" << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">" << subs_[i].params().eols << "</td><td>" << subs_[i].params().dqm << "</td><td>" << subs_[i].params().trp << "</td>"; } else { pthread_mutex_lock(&pickup_lock_); *out << "<tr><td id=\"a"<< i << "\" "; if(subs_[i].alive()==-1000) *out << " bgcolor=\"#bbaabb\">NotInitialized"; else *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead"); *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" << subs_[i].queueStatus() << "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td id=\"p"<< i << "\">" <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed(); if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) { if(autoRestartSlaves_ && subs_[i].restartCount()<=2) *out << " will restart in " << subs_[i].countdown() << " s"; else if(autoRestartSlaves_) *out << " reached maximum restart count"; else *out << " autoRestart is disabled "; } *out << "</td><td" << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">" << subs_[i].params().eols << "</td><td>" << subs_[i].params().dqm << "</td><td>" << subs_[i].params().trp << "</td>"; pthread_mutex_unlock(&pickup_lock_); } *out << "</tr>"; } catch(evf::Exception &e){ *out << "<tr><td id=\"a"<< i << "\" " <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" << subs_[i].queueStatus() << "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td id=\"p"<< i << "\">" <<subs_[i].pid()<<"</td>"; } } pthread_mutex_unlock(&start_lock_); } } catch(evf::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what()); } catch(cms::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what()); } catch(std::exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what()); } catch(...) { LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - "); } }
void evf::FUEventProcessor::moduleWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | [inline] |
Definition at line 110 of file FUEventProcessor.h.
References evtProcessor_, and evf::FWEPWrapper::moduleWeb().
Referenced by FUEventProcessor(), and receivingAndMonitor().
{evtProcessor_.moduleWeb(in,out);}
void FUEventProcessor::pathNames | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 895 of file FUEventProcessor.cc.
References evtProcessor_, dbtoconf::out, and scalersLegendaInfoSpace_.
Referenced by FUEventProcessor().
{ if(evtProcessor_ != 0){ xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0){ std::string slegenda = ((xdata::String*)legenda)->value_; *out << slegenda << std::endl; } } }
void FUEventProcessor::procStat | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 2281 of file FUEventProcessor.cc.
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ evf::utils::procStat(out); }
bool FUEventProcessor::receiving | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1048 of file FUEventProcessor.cc.
References alignCSCRings::e, evf::StateMachine::fireEvent(), fsm_, reco::get(), edm::PresenceFactory::makePresence(), lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), stop_lock_, and stopClassic().
Referenced by startReceivingLoop().
{ MsgBuf msg; try{ myProcess_->rcvSlave(msg,false); //will receive only messages from Master if(msg->mtype==MSQM_MESSAGE_TYPE_STOP) { pthread_mutex_lock(&stop_lock_); fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP); myProcess_->postSlave(msg1,false); pthread_mutex_unlock(&stop_lock_); fclose(stdout); fclose(stderr); _exit(EXIT_SUCCESS); } if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP) _exit(EXIT_SUCCESS); } catch(evf::Exception &e){} return true; }
bool FUEventProcessor::receivingAndMonitor | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1529 of file FUEventProcessor.cc.
References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, AlCaHLTBitMon_QueryRunRegistry::data, defaultWebPage(), evf::prg::dqm, alignCSCRings::e, evf::prg::eols, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, Exception, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::FWEPWrapper::lastLumiUsingEol_, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, PFRecoTauDiscriminationAgainstElectronMVA_cfi::method, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, slave_message_monitoring_, slave_message_prr_, stor::utils::sleep(), spotlightWebPage(), edm::event_processor::sStopping, evf::StateMachine::stateName(), stop_lock_, evf::prg::trp, and TablePrint::write.
Referenced by startReceivingMonitorLoop().
{ try{ myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master switch(slave_message_monitoring_->mtype) { case MSQM_MESSAGE_TYPE_MCS: { xgi::Input *in = 0; xgi::Output out; evtProcessor_.microState(in,&out); MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR); strncpy(msg1->mtext,out.str().c_str(),out.str().size()); myProcess_->postSlave(msg1,true); break; } case MSQM_MESSAGE_TYPE_PRG: { xdata::Serializable *dqmp = 0; xdata::UnsignedInteger32 *dqm = 0; evtProcessor_.monitoring(0); try{ dqmp = applicationInfoSpace_-> find("nbDqmUpdates"); } catch(xdata::exception::Exception e){} if(dqmp!=0) dqm = (xdata::UnsignedInteger32*)dqmp; // monitorInfoSpace_->lock(); prg * data = (prg*)slave_message_prr_->mtext; data->ls = evtProcessor_.lsid_; data->eols = evtProcessor_.lastLumiUsingEol_; data->ps = evtProcessor_.psid_; data->nbp = evtProcessor_->totalEvents(); data->nba = evtProcessor_->totalEventsPassed(); data->Ms = evtProcessor_.epMAltState_.value_; data->ms = evtProcessor_.epmAltState_.value_; if(dqm) data->dqm = dqm->value_; else data->dqm = 0; data->trp = scalersUpdates_; // monitorInfoSpace_->unlock(); myProcess_->postSlave(slave_message_prr_,true); if(exitOnError_.value_) { // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl; if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError) { bool running = true; int count = 0; while(running){ int retval = pthread_mutex_lock(&stop_lock_); if(retval != 0) perror("error"); running = fsm_.stateName()->toString()=="Enabled"; if(count>5) _exit(-1); pthread_mutex_unlock(&stop_lock_); if(running) {::sleep(1); count++;} } } } // scalersUpdates_++; break; } case MSQM_MESSAGE_TYPE_WEB: { xgi::Input *in = 0; xgi::Output out; unsigned int bytesToSend = 0; MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB); std::string query = slave_message_monitoring_->mtext; size_t pos = query.find_first_of("&"); std::string method; std::string args; if(pos!=std::string::npos) { method = query.substr(0,pos); args = query.substr(pos+1,query.length()-pos-1); } else method=query; if(method=="Spotlight") { spotlightWebPage(in,&out); } else if(method=="procStat") { procStat(in,&out); } else if(method=="moduleWeb") { internal::MyCgi mycgi; boost::char_separator<char> sep(";"); boost::tokenizer<boost::char_separator<char> > tokens(args, sep); for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin(); tok_iter != tokens.end(); ++tok_iter){ size_t pos = (*tok_iter).find_first_of("%"); if(pos != std::string::npos){ std::string first = (*tok_iter).substr(0 , pos); std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1); mycgi.getEnvironment()[first]=second; } } moduleWeb(&mycgi,&out); } else if(method=="Default") { defaultWebPage(in,&out); } else { out << "Error 404!!!!!!!!" << std::endl; } bytesToSend = out.str().size(); unsigned int cycle = 0; if(bytesToSend==0) { snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend); myProcess_->postSlave(msg1,true); } while(bytesToSend !=0){ unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend; write(anonymousPipe_[PIPE_WRITE], out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle, msgSize); snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize); myProcess_->postSlave(msg1,true); bytesToSend -= msgSize; cycle++; } break; } case MSQM_MESSAGE_TYPE_TRP: { break; } } } catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;} return true; }
bool FUEventProcessor::restartForkInEDM | ( | unsigned int | slotId | ) | [private] |
Definition at line 1949 of file FUEventProcessor.cc.
References evf::moduleweb::ForkInfoObj::control_sem_, forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, evf::moduleweb::ForkInfoObj::lock(), log_, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().
Referenced by supervisor().
{ //daqsource will keep this lock until master returns after fork //so that we don't do another EP restart in between forkInfoObj_->lock(); forkInfoObj_->forkParams.slotId=slotId; forkInfoObj_->forkParams.restart=true; forkInfoObj_->forkParams.isMaster=1; forkInfoObj_->stopCondition=0; LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore"); sem_post(forkInfoObj_->control_sem_); forkInfoObj_->unlock(); usleep(1000); return true; }
bool FUEventProcessor::scalers | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1409 of file FUEventProcessor.cc.
References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), runTheMatrix::ret, scalersUpdates_, and wlScalersActive_.
Referenced by startScalersWorkLoop().
{ if(evtProcessor_) { if(!evtProcessor_.getTriggerReport(true)) { wlScalersActive_ = false; return false; } } else { std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl; wlScalersActive_ = false; return false; } if(myProcess_) { // std::cout << getpid() << "going to post on control queue from scalers" << std::endl; int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false); if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl; scalersUpdates_++; } else evtProcessor_.fireScalersUpdate(); return true; }
void FUEventProcessor::scalersWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 882 of file FUEventProcessor.cc.
References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.
Referenced by FUEventProcessor().
{ out->getHTTPResponseHeader().addHeader( "Content-Type", "application/octet-stream" ); out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding", "binary" ); if(evtProcessor_ != 0){ out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) ); } }
void FUEventProcessor::sendMessageOverMonitorQueue | ( | MsgBuf & | buf | ) |
Definition at line 2286 of file FUEventProcessor.cc.
References myProcess_, and evf::SubProcess::postSlave().
{ if(myProcess_) myProcess_->postSlave(buf,true); }
void evf::FUEventProcessor::serviceWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | [inline] |
Definition at line 111 of file FUEventProcessor.h.
References evtProcessor_, and evf::FWEPWrapper::serviceWeb().
Referenced by FUEventProcessor().
{evtProcessor_.serviceWeb(in,out);}
void FUEventProcessor::setAttachDqmToShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 909 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), and cms::Exception::what().
{ std::string errmsg; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) edm::Service<FUShmDQMOutputService>()->setAttachToShm(); } catch (cms::Exception& e) { errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
void FUEventProcessor::spotlightWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 797 of file FUEventProcessor.cc.
References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ std::string urn = getApplicationDescriptor()->getURN(); *out << "<!-- base href=\"/" << urn << "\"> -->" << std::endl; *out << "<html>" << std::endl; *out << "<head>" << std::endl; *out << "<link type=\"text/css\" rel=\"stylesheet\""; *out << " href=\"/evf/html/styles.css\"/>" << std::endl; *out << "<title>" << getApplicationDescriptor()->getClassName() << getApplicationDescriptor()->getInstance() << " MAIN</title>" << std::endl; *out << "</head>" << std::endl; *out << "<body>" << std::endl; *out << "<table border=\"0\" width=\"100%\">" << std::endl; *out << "<tr>" << std::endl; *out << " <td align=\"left\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl; *out << " alt=\"main\"" << std::endl; *out << " width=\"64\"" << std::endl; *out << " height=\"64\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " <b>" << std::endl; *out << getApplicationDescriptor()->getClassName() << getApplicationDescriptor()->getInstance() << std::endl; *out << " " << fsm_.stateName()->toString() << std::endl; *out << " </b>" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl; *out << " alt=\"HyperDAQ\"" << std::endl; *out << " width=\"32\"" << std::endl; *out << " height=\"32\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " </a>" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " <a href=\"/" << urn << "/\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/evf/images/epicon.jpg\"" << std::endl; *out << " alt=\"main\"" << std::endl; *out << " width=\"32\"" << std::endl; *out << " height=\"32\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " </a>" << std::endl; *out << " </td>" << std::endl; *out << "</tr>" << std::endl; *out << "</table>" << std::endl; *out << "<hr/>" << std::endl; std::ostringstream ost; if(myProcess_) ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&"; else ost << "/moduleWeb?"; urn += ost.str(); if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0)) evtProcessor_.taskWebPage(in,out,urn); else if(evtProcessor_) evtProcessor_.summaryWebPage(in,out,urn); else *out << "<td>HLT Unconfigured</td>" << std::endl; *out << "</table>" << std::endl; *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl; *out << configuration_ << std::endl; *out << "</textarea><P>" << std::endl; *out << "</body>" << std::endl; *out << "</html>" << std::endl; }
void FUEventProcessor::startReceivingLoop | ( | ) | [private] |
Definition at line 1011 of file FUEventProcessor.cc.
References asReceiveMsgAndExecute_, alignCSCRings::e, Exception, lumiQueryAPI::msg, receiving(), receiving_, and wlReceiving_.
Referenced by enableMPEPSlave(), and forkProcessesFromEDM().
{ try { wlReceiving_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving", "waiting"); if (!wlReceiving_->isActive()) wlReceiving_->activate(); asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving, "Receiving"); wlReceiving_->submit(asReceiveMsgAndExecute_); receiving_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Receiving'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startReceivingMonitorLoop | ( | ) | [private] |
Definition at line 1028 of file FUEventProcessor.cc.
References asReceiveMsgAndRead_, alignCSCRings::e, Exception, lumiQueryAPI::msg, receivingAndMonitor(), receivingM_, and wlReceivingMonitor_.
Referenced by enableMPEPSlave(), and forkProcessesFromEDM().
{ try { wlReceivingMonitor_= toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM", "waiting"); if (!wlReceivingMonitor_->isActive()) wlReceivingMonitor_->activate(); asReceiveMsgAndRead_ = toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor, "ReceivingM"); wlReceivingMonitor_->submit(asReceiveMsgAndRead_); receivingM_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'ReceivingM'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startScalersWorkLoop | ( | ) | throw (evf::Exception) [private] |
Definition at line 1366 of file FUEventProcessor.cc.
References asScalers_, alignCSCRings::e, Exception, lumiQueryAPI::msg, scalers(), wlScalers_, and wlScalersActive_.
Referenced by enableMPEPSlave(), and forkProcessesFromEDM().
{ try { wlScalers_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers", "waiting"); if (!wlScalers_->isActive()) wlScalers_->activate(); asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers, "Scalers"); wlScalers_->submit(asScalers_); wlScalersActive_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Scalers'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startSummarizeWorkLoop | ( | ) | throw (evf::Exception) [private] |
Definition at line 1387 of file FUEventProcessor.cc.
References asSummarize_, alignCSCRings::e, Exception, lumiQueryAPI::msg, summarize(), wlSummarize_, and wlSummarizeActive_.
Referenced by enabling().
{ try { wlSummarize_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary", "waiting"); if (!wlSummarize_->isActive()) wlSummarize_->activate(); asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize, "Summary"); wlSummarize_->submit(asSummarize_); wlSummarizeActive_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Summarize'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startSupervisorLoop | ( | ) | [private] |
Definition at line 993 of file FUEventProcessor.cc.
References asSupervisor_, alignCSCRings::e, Exception, lumiQueryAPI::msg, supervising_, supervisor(), and wlSupervising_.
Referenced by FUEventProcessor().
{ try { wlSupervising_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor", "waiting"); if (!wlSupervising_->isActive()) wlSupervising_->activate(); asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor, "Supervisor"); wlSupervising_->submit(asSupervisor_); supervising_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Supervisor'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
bool FUEventProcessor::stopClassic | ( | ) | [private] |
Definition at line 2022 of file FUEventProcessor.cc.
References detachDqmFromShm(), alignCSCRings::e, edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, and evf::FWEPWrapper::stop().
Referenced by receiving(), and stopping().
{ try { LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ..."); edm::EventProcessor::StatusCode rc = evtProcessor_.stop(); if(rc == edm::EventProcessor::epSuccess) fsm_.fireEvent("StopDone",this); else { // epMState_ = evtProcessor_->currentStateName(); if(rc == edm::EventProcessor::epTimedOut) reasonForFailedState_ = "EventProcessor stop timed out"; else reasonForFailedState_ = "EventProcessor did not receive STOP event"; fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } if(hasShMem_) detachDqmFromShm(); } catch (xcept::Exception &e) { reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what(); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); } LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!"); localLog("-I- Stop completed"); return false; }
bool FUEventProcessor::stopping | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 620 of file FUEventProcessor.cc.
References doEndRunInEDM(), forkInEDM_, hasShMem_, nbSubProcesses_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.
Referenced by supervisor().
{ if(nbSubProcesses_.value_!=0) { stopSlavesAndAcknowledge(); if (forkInEDM_.value_) doEndRunInEDM(); } vulture_->stop(); if (forkInEDM_.value_) { bool tmpHasShMem_=hasShMem_; hasShMem_=false; bool stop_status = stopClassic(); hasShMem_=tmpHasShMem_; return stop_status; } return stopClassic(); }
void FUEventProcessor::stopSlavesAndAcknowledge | ( | ) | [private] |
Definition at line 2051 of file FUEventProcessor.cc.
References alignCSCRings::e, i, localLog(), MAX_MSG_SIZE, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, nbSubProcesses_, reasonForFailedState_, stop_lock_, and subs_.
Referenced by halting(), and stopping().
{ MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP); MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP); std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false); for(unsigned int i = 0; i < subs_.size(); i++) { pthread_mutex_lock(&stop_lock_); if(subs_[i].alive()>0){ processes_to_stop[i] = true; subs_[i].post(msg,false); } pthread_mutex_unlock(&stop_lock_); } for(unsigned int i = 0; i < subs_.size(); i++) { pthread_mutex_lock(&stop_lock_); if(processes_to_stop[i]){ try{ subs_[i].rcv(msg1,false); } catch(evf::Exception &e){ std::ostringstream ost; ost << "failed to get STOP - errno ->" << errno << " " << e.what(); reasonForFailedState_ = ost.str(); LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_); // fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); pthread_mutex_unlock(&stop_lock_); continue; } } else { pthread_mutex_unlock(&stop_lock_); continue; } pthread_mutex_unlock(&stop_lock_); if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP) while(subs_[i].alive()>0) ::usleep(10000); subs_[i].disconnect(); } // subs_.clear(); }
void FUEventProcessor::subWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 713 of file FUEventProcessor.cc.
References anonymousPipe_, gather_cfg::cout, generateEDF::done, asciidump::els, i, j, Association::map, MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, mod(), lumiQueryAPI::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), subs_, and superSleepSec_.
Referenced by FUEventProcessor().
{ using namespace cgicc; pid_t pid = 0; std::ostringstream ost; ost << "&"; Cgicc cgi(in); internal::MyCgi *mycgi = (internal::MyCgi*)in; for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = mycgi->getEnvironment().begin(); mit != mycgi->getEnvironment().end(); mit++) ost << mit->first << "%" << mit->second << ";"; std::vector<FormEntry> els = cgi.getElements() ; std::vector<FormEntry> el1; cgi.getElement("method",el1); std::vector<FormEntry> el2; cgi.getElement("process",el2); if(el1.size()!=0) { std::string meth = el1[0].getValue(); if(el2.size()!=0) { unsigned int i = 0; std::string mod = el2[0].getValue(); pid = atoi(mod.c_str()); // get the process id to be polled for(; i < subs_.size(); i++) if(subs_[i].pid()==pid) break; if(i>=subs_.size()){ //process was not found, let the browser know *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl; return; } if(subs_[i].alive() != 1){ *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl; return; } MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB); strncpy(msg1->mtext,meth.c_str(),meth.length()); strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length()); subs_[i].post(msg1,true); unsigned int keep_supersleep_original_value = superSleepSec_.value_; superSleepSec_.value_=10*keep_supersleep_original_value; MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB); bool done = false; std::vector<char *>pieces; while(!done){ unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true); if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){ ::sleep(1); continue; } unsigned int nbytes = atoi(msg->mtext); if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop char *buf= new char[nbytes]; ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes); if(retval!=nbytes) std::cout << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl; pieces.push_back(buf); } superSleepSec_.value_=keep_supersleep_original_value; for(unsigned int j = 0; j < pieces.size(); j++){ *out<<pieces[j]; // chain the buffers into the output strstream delete[] pieces[j]; //make sure to release all buffers used for reading the pipe } } } }
bool FUEventProcessor::summarize | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1437 of file FUEventProcessor.cc.
References gather_cfg::cout, cpustat_, alignCSCRings::e, evf::TriggerReportStatic::eventSummary, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, evf::TriggerReportStatic::lumiSection, master_message_trr_, MSQS_MESSAGE_TYPE_TRR, evf::TriggerReportStatic::nbExpected, evf::TriggerReportStatic::nbReporting, nbSubProcesses_, nbSubProcessesReporting_, ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), runTheMatrix::ret, evf::CPUStat::sendStat(), evf::RateStat::sendStat(), evf::CPUStat::setNproc(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), edm::EventSummary::totalEvents, evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.
Referenced by startSummarizeWorkLoop().
{ evtProcessor_.resetPackedTriggerReport(); bool atLeastOneProcessUpdatedSuccessfully = false; int msgCount = 0; for (unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()>0) { int ret = 0; if(subs_[i].check_postponed_trigger_update(master_message_trr_, evtProcessor_.getLumiSectionReferenceIndex())) { ret = MSQS_MESSAGE_TYPE_TRR; std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl; } else{ bool insync = false; bool exception_caught = false; while(!insync){ try{ ret = subs_[i].rcv(master_message_trr_,false); } catch(evf::Exception &e) { std::cout << "exception in msgrcv on " << i << " " << subs_[i].alive() << " " << strerror(errno) << std::endl; exception_caught = true; break; //do nothing special } if(ret==MSQS_MESSAGE_TYPE_TRR) { TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext; if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){ insync = true; } } } if(exception_caught) continue; } msgCount++; if(ret==MSQS_MESSAGE_TYPE_TRR) { TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext; if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){ std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl; subs_[i].add_postponed_trigger_update(master_message_trr_); }else{ atLeastOneProcessUpdatedSuccessfully = true; evtProcessor_.sumAndPackTriggerReport(master_message_trr_); } } else std::cout << "msgrcv returned error " << errno << std::endl; } } if(atLeastOneProcessUpdatedSuccessfully){ nbSubProcessesReporting_.value_ = msgCount; evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_; evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_; evtProcessor_.updateRollingReport(); evtProcessor_.fireScalersUpdate(); } else{ LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again"); if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement(); nbSubProcessesReporting_.value_ = 0; ::sleep(10); } if(fsm_.stateName()->toString()!="Enabled"){ wlScalersActive_ = false; return false; } // cpustat_->printStat(); if(iDieStatisticsGathering_.value_){ try{ TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct(); cpustat_ ->setNproc(trsp->eventSummary.totalEvents); cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex()); ratestat_->sendStat((unsigned char*)trsp, sizeof(TriggerReportStatic), evtProcessor_.getLumiSectionReferenceIndex()); }catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics" << e.what()); } } cpustat_->reset(); return true; }
bool FUEventProcessor::supervisor | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1087 of file FUEventProcessor.cc.
References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, alignCSCRings::e, edm_init_done_, enableMPEPSlave(), evtProcessor_, exception, Exception, spr::find(), evf::StateMachine::fireEvent(), forkInEDM_, fsm_, i, localLog(), log_, evf::prg::ls, python::rootplot::utilities::ls(), master_message_prg_, master_message_prr_, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_FSTOP, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), NUMERIC_MESSAGE_SIZE, AlCaHLTBitMon_ParallelJobs::p, pickup_lock_, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), restartForkInEDM(), findQualityFiles::rr, scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.
Referenced by startSupervisorLoop().
{ pthread_mutex_lock(&stop_lock_); if(subs_.size()!=nbSubProcesses_.value_) { pthread_mutex_lock(&pickup_lock_); if(subs_.size()!=nbSubProcesses_.value_) { subs_.resize(nbSubProcesses_.value_); spMStates_.resize(nbSubProcesses_.value_); spmStates_.resize(nbSubProcesses_.value_); for(unsigned int i = 0; i < spMStates_.size(); i++) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } pthread_mutex_unlock(&pickup_lock_); } bool running = fsm_.stateName()->toString()=="Enabled"; bool stopping = fsm_.stateName()->toString()=="stopping"; for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()==-1000) continue; int sl; pid_t sub_pid = subs_[i].pid(); pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG); if(killedOrNot && killedOrNot==sub_pid) { pthread_mutex_lock(&pickup_lock_); //check if out of range or recreated (enable can clear vector) if (i<subs_.size() && subs_[i].alive()!=-1000) { subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1)); std::ostringstream ost; if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl); else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl); else ost << " process stopped "; subs_[i].countdown()=slaveRestartDelaySecs_.value_; subs_[i].setReasonForFailed(ost.str()); spMStates_[i] = evtProcessor_.notstarted_state_code(); spmStates_[i] = 0; std::ostringstream ost1; ost1 << "-E- Slave " << subs_[i].pid() << ost.str(); localLog(ost1.str()); if(!autoRestartSlaves_.value_) subs_[i].disconnect(); } pthread_mutex_unlock(&pickup_lock_); } } pthread_mutex_unlock(&stop_lock_); if(stopping) return true; // if in stopping we are done if(running && edm_init_done_) { // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them // this is only active while running, hence, the stop lock is acquired and only released at end of loop if(autoRestartSlaves_.value_){ pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive() != 1){ if(subs_[i].countdown() == 0) { if(subs_[i].restartCount()>2){ LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i << " - maximum restart count reached"); std::ostringstream ost1; ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; localLog(ost1.str()); subs_[i].countdown()--; XCEPT_DECLARE(evf::Exception, sentinelException, ost1.str()); notifyQualified("error",sentinelException); subs_[i].disconnect(); continue; } subs_[i].restartCount()++; if (forkInEDM_.value_) { restartForkInEDM(i); } else { pid_t rr = subs_[i].forkNew(); if(rr==0) { myProcess_=&subs_[i]; scalersUpdates_ = 0; int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition try{ xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex"); if(lsid) { ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died } } catch(...){ std::cout << "trouble with lsindex during restart" << std::endl; } try{ xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered"); if(lstb) { ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting } } catch(...){ std::cout << "trouble with resetting flag for eol recovery " << std::endl; } evtProcessor_.adjustLsIndexForRestart(); evtProcessor_.resetPackedTriggerReport(); enableMPEPSlave(); return false; // exit the supervisor loop immediately in the child !!! } else { std::ostringstream ost1; ost1 << "-I- New Process " << rr << " forked for slot " << i; localLog(ost1.str()); } } } if(subs_[i].countdown()>=0) subs_[i].countdown()--; } } pthread_mutex_unlock(&stop_lock_); } // finished handling replacement of dead slaves once they've been reaped } xdata::Serializable *lsid = 0; xdata::Serializable *psid = 0; xdata::Serializable *dqmp = 0; xdata::UnsignedInteger32 *dqm = 0; if(running && edm_init_done_){ try{ lsid = applicationInfoSpace_->find("lumiSectionIndex"); psid = applicationInfoSpace_->find("prescaleSetIndex"); nbProcessed = monitorInfoSpace_->find("nbProcessed"); nbAccepted = monitorInfoSpace_->find("nbAccepted"); dqmp = applicationInfoSpace_-> find("nbDqmUpdates"); } catch(xdata::exception::Exception e){ LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what()); } try{ if(nbProcessed !=0 && nbAccepted !=0) { xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed); xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted); xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid); xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid); if(dqmp!=0) dqm = (xdata::UnsignedInteger32*)dqmp; if(dqm) dqm->value_ = 0; nbTotalDQM_ = 0; nbp->value_ = 0; nba->value_ = 0; nblive_ = 0; nbdead_ = 0; scalersUpdates_ = 0; for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()>0) { nblive_++; try{ subs_[i].post(master_message_prg_,true); unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true); if(retval == (unsigned long) master_message_prr_->mtype){ prg* p = (struct prg*)(master_message_prr_->mtext); subs_[i].setParams(p); spMStates_[i] = p->Ms; spmStates_[i] = p->ms; cpustat_->addEntry(p->ms); if(!subs_[i].inInconsistentState() && (p->Ms == edm::event_processor::sError || p->Ms == edm::event_processor::sInvalid || p->Ms == edm::event_processor::sStopping)) { std::ostringstream ost; ost << "edm::eventprocessor slot " << i << " process id " << subs_[i].pid() << " not in Running state : Mstate=" << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate=" << evtProcessor_.moduleNameFromIndex(p->ms) << " - Look into possible error messages from HLT process"; LOG4CPLUS_WARN(getApplicationLogger(),ost.str()); } nbp->value_ += subs_[i].params().nbp; nba->value_ += subs_[i].params().nba; if(dqm)dqm->value_ += p->dqm; nbTotalDQM_ += p->dqm; scalersUpdates_ += p->trp; if(p->ls > ls->value_) ls->value_ = p->ls; if(p->ps != ps->value_) ps->value_ = p->ps; } else{ nbp->value_ += subs_[i].get_save_nbp(); nba->value_ += subs_[i].get_save_nba(); } } catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(), "could not send/receive msg on slot " << i << " - " << e.what()); } } else { nbp->value_ += subs_[i].get_save_nbp(); nba->value_ += subs_[i].get_save_nba(); nbdead_++; } } if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells) for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].params().nbp == 0){ // a slave has processed 0 events // check that the process is not stuck if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0) { subs_[i].found_invalid();//increase the "found_invalid" counter if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal subs_[i].post(msg3,false); std::ostringstream ost1; ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; localLog(ost1.str()); LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str()); XCEPT_DECLARE(evf::Exception, sentinelException, ost1.str()); notifyQualified("error",sentinelException); } } } } } } } catch(std::exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what()); } catch(...){ LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception "); } } else{ for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()==-1000) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } } try{ monitorInfoSpace_->lock(); monitorInfoSpace_->fireItemGroupChanged(names_,0); monitorInfoSpace_->unlock(); } catch(xdata::exception::Exception &e) { LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what()); // localLog(e.what()); } ::sleep(superSleepSec_.value_); return true; }
void FUEventProcessor::updater | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) |
Definition at line 2208 of file FUEventProcessor.cc.
References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, cpustat_, evtProcessor_, fsm_, evf::CPUStat::getChart(), evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, nbSubProcessesReporting_, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.
Referenced by FUEventProcessor().
{ using namespace utils; *out << updaterStatic_; mDiv(out,"loads"); uptime(out); cDiv(out); mDiv(out,"st",fsm_.stateName()->toString()); mDiv(out,"ru",runNumber_.toString()); mDiv(out,"nsl",nbSubProcesses_.value_); mDiv(out,"nsr",nbSubProcessesReporting_.value_); mDiv(out,"cl"); *out << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " "); cDiv(out); mDiv(out,"in",getApplicationDescriptor()->getInstance()); if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){ mDiv(out,"hlt"); *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>"; cDiv(out); *out << std::endl; } else mDiv(out,"hlt","Not yet..."); mDiv(out,"sq",squidPresent_.toString()); mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized")); mDiv(out,"mwl",evtProcessor_.wlMonitoring()); if(nbProcessed != 0 && nbAccepted != 0) { mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_); mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_); } else { mDiv(out,"tt",0); mDiv(out,"ac",0); } if(!myProcess_) mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive")); else mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive")); mDiv(out,"idi",iDieUrl_.value_); if(vp_!=0){ mDiv(out,"vpi",(unsigned int) vp_); if(vulture_->hasStarted()>=0) mDiv(out,"vul","Prowling"); else mDiv(out,"vul","Dead"); } else{ mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching")); } if(evtProcessor_){ mDiv(out,"ll"); *out << evtProcessor_.lastLumi().ls << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc; cDiv(out); } mDiv(out,"lg"); for(unsigned int i = logRingIndex_; i<logRingSize_; i++) *out << logRing_[i] << std::endl; if(logWrap_) for(unsigned int i = 0; i<logRingIndex_; i++) *out << logRing_[i] << std::endl; cDiv(out); mDiv(out,"cha"); if(cpustat_) *out << cpustat_->getChart(); cDiv(out); }
evf::FUEventProcessor::XDAQ_INSTANTIATOR | ( | ) |
int evf::FUEventProcessor::anonymousPipe_[2] [private] |
Definition at line 258 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().
xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_ [private] |
Definition at line 236 of file FUEventProcessor.h.
Referenced by forkProcessesFromEDM(), FUEventProcessor(), makeStaticInfo(), receivingAndMonitor(), and supervisor().
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_ [private] |
Definition at line 224 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_ [private] |
Definition at line 227 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_ [private] |
Definition at line 250 of file FUEventProcessor.h.
Referenced by startScalersWorkLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_ [private] |
Definition at line 256 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_ [private] |
Definition at line 231 of file FUEventProcessor.h.
Referenced by startSupervisorLoop().
xdata::Boolean evf::FUEventProcessor::autoRestartSlaves_ [private] |
Definition at line 180 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), microState(), and supervisor().
xdata::String evf::FUEventProcessor::class_ [private] |
Definition at line 171 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::String evf::FUEventProcessor::configString_ [private] |
Definition at line 175 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and updater().
std::string evf::FUEventProcessor::configuration_ [private] |
Definition at line 176 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and spotlightWebPage().
CPUStat* evf::FUEventProcessor::cpustat_ [private] |
Definition at line 266 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), summarize(), supervisor(), and updater().
Css evf::FUEventProcessor::css_ [private] |
Definition at line 202 of file FUEventProcessor.h.
Referenced by css().
bool evf::FUEventProcessor::edm_init_done_ [private] |
Definition at line 279 of file FUEventProcessor.h.
Referenced by doEndRunInEDM(), enabling(), forkProcessesFromEDM(), and supervisor().
xdata::Boolean evf::FUEventProcessor::epInitialized_ [private] |
Definition at line 174 of file FUEventProcessor.h.
Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().
Definition at line 167 of file FUEventProcessor.h.
Referenced by actionPerformed(), attachDqmToShm(), configuring(), detachDqmFromShm(), doEndRunInEDM(), enableClassic(), enableCommon(), enableForkInEDM(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), FUEventProcessor(), halting(), microState(), moduleWeb(), pathNames(), receivingAndMonitor(), scalers(), scalersWeb(), serviceWeb(), setAttachDqmToShm(), spotlightWebPage(), stopClassic(), summarize(), supervisor(), and updater().
xdata::Boolean evf::FUEventProcessor::exitOnError_ [private] |
Definition at line 199 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
xdata::UnsignedInteger32 evf::FUEventProcessor::forkInEDM_ [private] |
Definition at line 215 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), halting(), stopping(), and supervisor().
Definition at line 277 of file FUEventProcessor.h.
Referenced by doEndRunInEDM(), enableForkInEDM(), forkProcessesFromEDM(), FUEventProcessor(), halting(), and restartForkInEDM().
pthread_mutex_t evf::FUEventProcessor::forkObjLock_ [private] |
Definition at line 278 of file FUEventProcessor.h.
Referenced by enableForkInEDM(), and FUEventProcessor().
evf::StateMachine evf::FUEventProcessor::fsm_ [private] |
Definition at line 161 of file FUEventProcessor.h.
Referenced by actionPerformed(), configuring(), enableCommon(), enableForkInEDM(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), fsmCallback(), FUEventProcessor(), halting(), microState(), receiving(), receivingAndMonitor(), spotlightWebPage(), stopClassic(), summarize(), supervisor(), and updater().
xdata::Boolean evf::FUEventProcessor::hasModuleWebRegistry_ [private] |
Definition at line 185 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().
xdata::Boolean evf::FUEventProcessor::hasPrescaleService_ [private] |
Definition at line 184 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::hasServiceWebRegistry_ [private] |
Definition at line 186 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().
xdata::Boolean evf::FUEventProcessor::hasShMem_ [private] |
Definition at line 183 of file FUEventProcessor.h.
Referenced by enableCommon(), forkProcessesFromEDM(), FUEventProcessor(), makeStaticInfo(), stopClassic(), and stopping().
xdata::Boolean evf::FUEventProcessor::iDieStatisticsGathering_ [private] |
Definition at line 188 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().
xdata::String evf::FUEventProcessor::iDieUrl_ [private] |
Definition at line 263 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::instance_ [private] |
Definition at line 172 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::isRunNumberSetter_ [private] |
Definition at line 187 of file FUEventProcessor.h.
Referenced by enableCommon(), enableForkInEDM(), and FUEventProcessor().
Logger evf::FUEventProcessor::log_ [private] |
Definition at line 164 of file FUEventProcessor.h.
Referenced by doEndRunInEDM(), enableForkInEDM(), restartForkInEDM(), and supervisor().
std::vector<std::string> evf::FUEventProcessor::logRing_ [private] |
Definition at line 208 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
unsigned int evf::FUEventProcessor::logRingIndex_ [private] |
Definition at line 209 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
const unsigned int evf::FUEventProcessor::logRingSize_ = 50 [static, private] |
Definition at line 210 of file FUEventProcessor.h.
Referenced by localLog(), and updater().
bool evf::FUEventProcessor::logWrap_ [private] |
Definition at line 211 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
Definition at line 271 of file FUEventProcessor.h.
Referenced by supervisor().
Definition at line 272 of file FUEventProcessor.h.
Referenced by supervisor().
Definition at line 275 of file FUEventProcessor.h.
Referenced by summarize().
xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_ [private] |
Definition at line 234 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
xdata::InfoSpace* evf::FUEventProcessor::monitorLegendaInfoSpace_ [private] |
Definition at line 235 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
ModuleWebRegistry* evf::FUEventProcessor::mwrRef_ [private] |
Definition at line 269 of file FUEventProcessor.h.
Referenced by configuring(), enableForkInEDM(), and enabling().
SubProcess* evf::FUEventProcessor::myProcess_ [private] |
Definition at line 229 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), microState(), receiving(), receivingAndMonitor(), scalers(), sendMessageOverMonitorQueue(), spotlightWebPage(), supervisor(), and updater().
std::list<std::string> evf::FUEventProcessor::names_ [private] |
Definition at line 262 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
xdata::Serializable* evf::FUEventProcessor::nbAccepted [private] |
Definition at line 242 of file FUEventProcessor.h.
Referenced by supervisor(), and updater().
unsigned int evf::FUEventProcessor::nbdead_ [private] |
Definition at line 218 of file FUEventProcessor.h.
Referenced by microState(), and supervisor().
unsigned int evf::FUEventProcessor::nblive_ [private] |
Definition at line 217 of file FUEventProcessor.h.
Referenced by microState(), and supervisor().
xdata::Serializable* evf::FUEventProcessor::nbProcessed [private] |
Definition at line 241 of file FUEventProcessor.h.
Referenced by supervisor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_ [private] |
Definition at line 213 of file FUEventProcessor.h.
Referenced by configuring(), defaultWebPage(), enabling(), forkProcessesFromEDM(), FUEventProcessor(), halting(), microState(), spotlightWebPage(), stopping(), stopSlavesAndAcknowledge(), summarize(), supervisor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_ [private] |
Definition at line 214 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), summarize(), and updater().
unsigned int evf::FUEventProcessor::nbTotalDQM_ [private] |
Definition at line 220 of file FUEventProcessor.h.
Referenced by enabling(), microState(), and supervisor().
bool evf::FUEventProcessor::outprev_ [private] |
Definition at line 189 of file FUEventProcessor.h.
Referenced by actionPerformed().
xdata::Boolean evf::FUEventProcessor::outPut_ [private] |
Definition at line 178 of file FUEventProcessor.h.
Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().
pthread_mutex_t evf::FUEventProcessor::pickup_lock_ [private] |
Definition at line 239 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), microState(), and supervisor().
RateStat* evf::FUEventProcessor::ratestat_ [private] |
Definition at line 267 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and summarize().
std::string evf::FUEventProcessor::reasonForFailedState_ [private] |
Definition at line 205 of file FUEventProcessor.h.
Referenced by configuring(), enableCommon(), enableForkInEDM(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), halting(), stopClassic(), and stopSlavesAndAcknowledge().
bool evf::FUEventProcessor::receiving_ [private] |
Definition at line 225 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
bool evf::FUEventProcessor::receivingM_ [private] |
Definition at line 228 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_ [private] |
Definition at line 173 of file FUEventProcessor.h.
Referenced by enableCommon(), enableForkInEDM(), enabling(), FUEventProcessor(), and updater().
xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_ [private] |
Definition at line 245 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::InfoSpace* evf::FUEventProcessor::scalersLegendaInfoSpace_ [private] |
Definition at line 246 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and pathNames().
unsigned int evf::FUEventProcessor::scalersUpdates_ [private] |
Definition at line 252 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), receivingAndMonitor(), scalers(), and supervisor().
Definition at line 274 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
Definition at line 273 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_ [private] |
Definition at line 181 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
Definition at line 270 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and forkProcessesFromEDM().
std::string evf::FUEventProcessor::sourceId_ [private] |
Definition at line 192 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::Vector<xdata::Integer> evf::FUEventProcessor::spmStates_ [private] |
Definition at line 260 of file FUEventProcessor.h.
Referenced by configuring(), FUEventProcessor(), and supervisor().
xdata::Vector<xdata::Integer> evf::FUEventProcessor::spMStates_ [private] |
Definition at line 259 of file FUEventProcessor.h.
Referenced by configuring(), FUEventProcessor(), and supervisor().
SquidNet evf::FUEventProcessor::squidnet_ [private] |
Definition at line 207 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::squidPresent_ [private] |
Definition at line 195 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and updater().
pthread_mutex_t evf::FUEventProcessor::start_lock_ [private] |
Definition at line 238 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), and microState().
pthread_mutex_t evf::FUEventProcessor::stop_lock_ [private] |
Definition at line 237 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), FUEventProcessor(), receiving(), receivingAndMonitor(), stopSlavesAndAcknowledge(), and supervisor().
std::vector<SubProcess> evf::FUEventProcessor::subs_ [private] |
Definition at line 216 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), microState(), stopSlavesAndAcknowledge(), subWeb(), summarize(), and supervisor().
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_ [private] |
Definition at line 261 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), subWeb(), and supervisor().
bool evf::FUEventProcessor::supervising_ [private] |
Definition at line 232 of file FUEventProcessor.h.
Referenced by startSupervisorLoop(), and updater().
std::string evf::FUEventProcessor::updaterStatic_ [private] |
Definition at line 240 of file FUEventProcessor.h.
Referenced by makeStaticInfo(), and updater().
xdata::String evf::FUEventProcessor::url_ [private] |
Definition at line 170 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
pid_t evf::FUEventProcessor::vp_ [private] |
Definition at line 265 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and updater().
Vulture* evf::FUEventProcessor::vulture_ [private] |
Definition at line 264 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), stopping(), updater(), and ~FUEventProcessor().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_ [private] |
Definition at line 223 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_ [private] |
Definition at line 226 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_ [private] |
Definition at line 249 of file FUEventProcessor.h.
Referenced by startScalersWorkLoop().
bool evf::FUEventProcessor::wlScalersActive_ [private] |
Definition at line 251 of file FUEventProcessor.h.
Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_ [private] |
Definition at line 255 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop().
bool evf::FUEventProcessor::wlSummarizeActive_ [private] |
Definition at line 257 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop(), and updater().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_ [private] |
Definition at line 230 of file FUEventProcessor.h.
Referenced by startSupervisorLoop().