#include <FUEventProcessor.h>
Public Member Functions | |
void | actionPerformed (xdata::Event &e) |
bool | configuring (toolbox::task::WorkLoop *wl) |
void | css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
bool | enabling (toolbox::task::WorkLoop *wl) |
xoap::MessageReference | fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception) |
FUEventProcessor (xdaq::ApplicationStub *s) | |
void | getSlavePids (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
bool | halting (toolbox::task::WorkLoop *wl) |
void | handleSignalSlave (int sig, siginfo_t *info, void *c) |
void | microState (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | moduleWeb (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
void | procStat (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
void | sendMessageOverMonitorQueue (MsgBuf &) |
void | serviceWeb (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception) |
bool | stopping (toolbox::task::WorkLoop *wl) |
void | subWeb (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
void | updater (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception) |
XDAQ_INSTANTIATOR () | |
virtual | ~FUEventProcessor () |
Static Public Member Functions | |
static void | forkProcessFromEDM_helper (void *addr) |
Private Member Functions | |
void | attachDqmToShm () throw (evf::Exception) |
void | detachDqmFromShm () throw (evf::Exception) |
bool | doEndRunInEDM () |
bool | enableClassic () |
bool | enableCommon () |
bool | enableForkInEDM () |
bool | enableMPEPSlave () |
void | forkProcessesFromEDM () |
void | localLog (std::string) |
std::string | logsAsString () |
void | makeStaticInfo () |
bool | receiving (toolbox::task::WorkLoop *wl) |
bool | receivingAndMonitor (toolbox::task::WorkLoop *wl) |
bool | restartForkInEDM (unsigned int slotId) |
bool | scalers (toolbox::task::WorkLoop *wl) |
void | setAttachDqmToShm () throw (evf::Exception) |
bool | sigmon (toolbox::task::WorkLoop *wl) |
void | startReceivingLoop () |
void | startReceivingMonitorLoop () |
void | startScalersWorkLoop () throw (evf::Exception) |
void | startSignalMonitorWorkLoop () throw (evf::Exception) |
void | startSummarizeWorkLoop () throw (evf::Exception) |
void | startSupervisorLoop () |
bool | stopClassic () |
void | stopSlavesAndAcknowledge () |
bool | summarize (toolbox::task::WorkLoop *wl) |
bool | supervisor (toolbox::task::WorkLoop *wl) |
Private Attributes | |
unsigned long long | allProcStats_ |
int | anonymousPipe_ [2] |
xdata::InfoSpace * | applicationInfoSpace_ |
toolbox::task::ActionSignature * | asReceiveMsgAndExecute_ |
toolbox::task::ActionSignature * | asReceiveMsgAndRead_ |
toolbox::task::ActionSignature * | asScalers_ |
toolbox::task::ActionSignature * | asSignalMonitor_ |
toolbox::task::ActionSignature * | asSummarize_ |
toolbox::task::ActionSignature * | asSupervisor_ |
xdata::Boolean | autoRestartSlaves_ |
xdata::String | class_ |
xdata::String | configString_ |
std::string | configuration_ |
CPUStat * | cpustat_ |
unsigned int | crashesThisRun_ |
xdata::UnsignedInteger32 | crashesToDump_ |
Css | css_ |
xdata::Boolean | datasetCounting_ |
bool | edm_init_done_ |
xdata::Boolean | epInitialized_ |
FWEPWrapper | evtProcessor_ |
xdata::Boolean | exitOnError_ |
xdata::UnsignedInteger32 | forkInEDM_ |
moduleweb::ForkInfoObj * | forkInfoObj_ |
pthread_mutex_t | forkObjLock_ |
evf::StateMachine | fsm_ |
xdata::Boolean | hasModuleWebRegistry_ |
xdata::Boolean | hasPrescaleService_ |
xdata::Boolean | hasServiceWebRegistry_ |
xdata::Boolean | hasShMem_ |
xdata::Boolean | iDieStatisticsGathering_ |
xdata::String | iDieUrl_ |
unsigned long long | idleProcStats_ |
xdata::UnsignedInteger32 | instance_ |
xdata::Boolean | isRunNumberSetter_ |
timeval | lastCrashTime_ |
timeval | lastProcReport_ |
Logger | log_ |
std::vector< std::string > | logRing_ |
unsigned int | logRingIndex_ |
bool | logWrap_ |
MsgBuf | master_message_prg_ |
MsgBuf | master_message_prr_ |
MsgBuf | master_message_trr_ |
std::auto_ptr< edm::Presence > | messageServicePresence_ |
xdata::InfoSpace * | monitorInfoSpace_ |
xdata::InfoSpace * | monitorLegendaInfoSpace_ |
ModuleWebRegistry * | mwrRef_ |
SubProcess * | myProcess_ |
std::list< std::string > | names_ |
xdata::Serializable * | nbAccepted |
unsigned int | nbdead_ |
unsigned int | nblive_ |
xdata::Serializable * | nbProcessed |
xdata::UnsignedInteger32 | nbSubProcesses_ |
xdata::UnsignedInteger32 | nbSubProcessesReporting_ |
unsigned int | nbTotalDQM_ |
bool | outprev_ |
xdata::Boolean | outPut_ |
pthread_mutex_t | pickup_lock_ |
RateStat * | ratestat_ |
std::string | reasonForFailedState_ |
bool | receiving_ |
bool | receivingM_ |
bool | restart_in_progress_ |
bool | rlimit_coresize_changed_ |
rlimit | rlimit_coresize_default_ |
xdata::UnsignedInteger32 | runNumber_ |
xdata::InfoSpace * | scalersInfoSpace_ |
xdata::InfoSpace * | scalersLegendaInfoSpace_ |
unsigned int | scalersUpdates_ |
sem_t * | sigmon_sem_ |
bool | signalMonitorActive_ |
MsgBuf | slave_message_monitoring_ |
MsgBuf | slave_message_prr_ |
xdata::UnsignedInteger32 | slaveRestartDelaySecs_ |
ShmOutputModuleRegistry * | sorRef_ |
std::string | sourceId_ |
xdata::Vector< xdata::Integer > | spmStates_ |
xdata::Vector< xdata::Integer > | spMStates_ |
SquidNet | squidnet_ |
xdata::Boolean | squidPresent_ |
pthread_mutex_t | start_lock_ |
pthread_mutex_t | stop_lock_ |
std::vector< SubProcess > | subs_ |
xdata::UnsignedInteger32 | superSleepSec_ |
bool | supervising_ |
std::string | updaterStatic_ |
xdata::String | url_ |
pid_t | vp_ |
Vulture * | vulture_ |
toolbox::task::WorkLoop * | wlReceiving_ |
toolbox::task::WorkLoop * | wlReceivingMonitor_ |
toolbox::task::WorkLoop * | wlScalers_ |
bool | wlScalersActive_ |
toolbox::task::WorkLoop * | wlSignalMonitor_ |
toolbox::task::WorkLoop * | wlSummarize_ |
bool | wlSummarizeActive_ |
toolbox::task::WorkLoop * | wlSupervising_ |
Static Private Attributes | |
static const unsigned int | logRingSize_ = 50 |
Definition at line 64 of file FUEventProcessor.h.
FUEventProcessor::FUEventProcessor | ( | xdaq::ApplicationStub * | s | ) |
Definition at line 87 of file FUEventProcessor.cc.
References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, gather_cfg::cout, crashesToDump_, css(), datasetCounting_, defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), forkInEDM_, forkInfoObj_, forkObjLock_, evf::StateMachine::foundRcmsStateListener(), fsm_, evf::FUInstancePtr_, reco::get(), getSlavePids(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), messageServicePresence_, microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, NULL, outPut_, pathNames(), pickup_lock_, pipe::pipe(), procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), rlimit_coresize_default_, runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), sigmon_sem_, slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, subWeb(), superSleepSec_, updater(), url_, and vulture_.
: xdaq::Application(s) , fsm_(this) , log_(getApplicationLogger()) , evtProcessor_(log_, getApplicationDescriptor()->getInstance()) , runNumber_(0) , epInitialized_(false) , outPut_(true) , autoRestartSlaves_(false) , slaveRestartDelaySecs_(10) , hasShMem_(true) , hasPrescaleService_(true) , hasModuleWebRegistry_(true) , hasServiceWebRegistry_(true) , isRunNumberSetter_(true) , iDieStatisticsGathering_(false) , outprev_(true) , exitOnError_(true) , reasonForFailedState_() , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt") , logRing_(logRingSize_) , logRingIndex_(logRingSize_) , logWrap_(false) , nbSubProcesses_(0) , nbSubProcessesReporting_(0) , forkInEDM_(true) , nblive_(0) , nbdead_(0) , nbTotalDQM_(0) , wlReceiving_(0) , asReceiveMsgAndExecute_(0) , receiving_(false) , wlReceivingMonitor_(0) , asReceiveMsgAndRead_(0) , receivingM_(false) , myProcess_(0) , wlSupervising_(0) , asSupervisor_(0) , supervising_(false) , wlSignalMonitor_(0) , asSignalMonitor_(0) , signalMonitorActive_(false) , monitorInfoSpace_(0) , monitorLegendaInfoSpace_(0) , applicationInfoSpace_(0) , nbProcessed(0) , nbAccepted(0) , scalersInfoSpace_(0) , scalersLegendaInfoSpace_(0) , wlScalers_(0) , asScalers_(0) , wlScalersActive_(false) , scalersUpdates_(0) , wlSummarize_(0) , asSummarize_(0) , wlSummarizeActive_(false) , superSleepSec_(1) , iDieUrl_("none") , vulture_(0) , vp_(0) , cpustat_(0) , ratestat_(0) , mwrRef_(nullptr) , sorRef_(nullptr) , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG) , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR) , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR) , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR) , edm_init_done_(true) , crashesThisRun_(false) , rlimit_coresize_changed_(false) , crashesToDump_(2) , sigmon_sem_(0) , datasetCounting_(true) { using namespace utils; FUInstancePtr_=this; names_.push_back("nbProcessed" ); names_.push_back("nbAccepted" ); names_.push_back("epMacroStateInt"); names_.push_back("epMicroStateInt"); // create pipe for web communication int retpipe = pipe(anonymousPipe_); if(retpipe != 0) LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe"); // check squid presence squidPresent_ = squidnet_.check(); //pass application parameters to FWEPWrapper evtProcessor_.setAppDesc(getApplicationDescriptor()); evtProcessor_.setAppCtxt(getApplicationContext()); // bind relevant callbacks to finite state machine fsm_.initialize<evf::FUEventProcessor>(this); //set sourceId_ url_ = getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+ getApplicationDescriptor()->getURN(); class_ =getApplicationDescriptor()->getClassName(); instance_=getApplicationDescriptor()->getInstance(); sourceId_=class_.toString()+"_"+instance_.toString(); LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" ); LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE")); getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg"); xdata::InfoSpace *ispace = getApplicationInfoSpace(); applicationInfoSpace_ = ispace; // default configuration ispace->fireItemAvailable("parameterSet", &configString_ ); ispace->fireItemAvailable("epInitialized", &epInitialized_ ); ispace->fireItemAvailable("stateName", fsm_.stateName() ); ispace->fireItemAvailable("runNumber", &runNumber_ ); ispace->fireItemAvailable("outputEnabled", &outPut_ ); ispace->fireItemAvailable("hasSharedMemory", &hasShMem_); ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ ); ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ ); ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ ); ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ ); ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_); ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() ); ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener()); ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ ); ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ ); ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ ); ispace->fireItemAvailable("superSleepSec", &superSleepSec_ ); ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ ); ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ ); ispace->fireItemAvailable("iDieUrl", &iDieUrl_ ); ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ ); ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ ); // Add infospace listeners for exporting data values getApplicationInfoSpace()->addItemChangedListener("parameterSet", this); getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this); // findRcmsStateListener fsm_.findRcmsStateListener(); // initialize monitoring infospace std::string monInfoSpaceName="evf-eventprocessor-status-monitor"; toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName); monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda"; urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName); monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); monitorInfoSpace_->fireItemAvailable("url", &url_ ); monitorInfoSpace_->fireItemAvailable("class", &class_ ); monitorInfoSpace_->fireItemAvailable("instance", &instance_ ); monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ ); monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName()); monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ ); std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor"; urn = this->createQualifiedInfoSpace(scalersInfoSpaceName); scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda"; urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName); scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString()); evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_); scalersInfoSpace_->fireItemAvailable("instance", &instance_); evtProcessor_.setApplicationInfoSpace(ispace); evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_); evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0); //subprocess state vectors for MP monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_); monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_); // Bind web interface xgi::bind(this, &FUEventProcessor::css, "styles.css"); xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" ); xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" ); xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb"); xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" ); xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" ); xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids"); xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" ); xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb"); xgi::bind(this, &FUEventProcessor::microState, "microState"); xgi::bind(this, &FUEventProcessor::updater, "updater" ); xgi::bind(this, &FUEventProcessor::procStat, "procStat" ); // instantiate the plugin manager, not referenced here after! edm::AssertHandler ah; //create Message Service thread and pass ownership to auto_ptr that we destroy before fork try{ LOG4CPLUS_DEBUG(getApplicationLogger(), "Trying to create message service presence "); edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { messageServicePresence_= pf->makePresence("MessageServicePresence"); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } ML::MLlog4cplus::setAppl(this); typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t; typedef AppDescSet_t::iterator AppDescIter_t; AppDescSet_t rcms= getApplicationContext()->getDefaultZone()-> getApplicationDescriptors("RCMSStateListener"); if(rcms.size()==0) { LOG4CPLUS_WARN(getApplicationLogger(), "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!"); // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!"); } else { AppDescIter_t it = rcms.begin(); evtProcessor_.setRcms(*it); } pthread_mutex_init(&start_lock_,0); pthread_mutex_init(&stop_lock_,0); pthread_mutex_init(&pickup_lock_,0); forkInfoObj_=nullptr; pthread_mutex_init(&forkObjLock_,0); makeStaticInfo(); startSupervisorLoop(); if(vulture_==0) vulture_ = new Vulture(true); AppDescSet_t setOfiDie= getApplicationContext()->getDefaultZone()-> getApplicationDescriptors("evf::iDie"); for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it) if ((*it)->getInstance()==0) // there has to be only one instance of iDie iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN(); //save default core file size getrlimit(RLIMIT_CORE,&rlimit_coresize_default_); //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves #ifdef linux if (sigmon_sem_==0) { sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, 0, 0); if (!sigmon_sem_) { perror("mmap error\n"); std::cout << "mmap error"<<std::endl; } else sem_init(sigmon_sem_,true,0); } #endif }
FUEventProcessor::~FUEventProcessor | ( | ) | [virtual] |
Definition at line 365 of file FUEventProcessor.cc.
References vulture_.
void FUEventProcessor::actionPerformed | ( | xdata::Event & | e | ) |
Definition at line 810 of file FUEventProcessor.cc.
References alignCSCRings::e, epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, evf::StateMachine::stateName(), and AlCaHLTBitMon_QueryRunRegistry::string.
{ if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") { std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName(); if ( item == "parameterSet") { LOG4CPLUS_WARN(getApplicationLogger(), "HLT Menu changed, force reinitialization of EventProcessor"); epInitialized_ = false; } if ( item == "outputEnabled") { if(outprev_ != outPut_) { LOG4CPLUS_WARN(getApplicationLogger(), (outprev_ ? "Disabling " : "Enabling ")<<"global output"); evtProcessor_->enableEndPaths(outPut_); outprev_ = outPut_; } } if (item == "globalInputPrescale") { LOG4CPLUS_WARN(getApplicationLogger(), "Setting global input prescale has no effect " <<"in this version of the code"); } if ( item == "globalOutputPrescale") { LOG4CPLUS_WARN(getApplicationLogger(), "Setting global output prescale has no effect " <<"in this version of the code"); } } }
void FUEventProcessor::attachDqmToShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 1068 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and cms::Exception::what().
Referenced by enableCommon(), and forkProcessesFromEDM().
{ std::string errmsg; bool success = false; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) success = edm::Service<FUShmDQMOutputService>()->attachToShm(); if (!success) errmsg = "Failed to attach DQM service to shared memory"; } catch (cms::Exception& e) { errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
bool FUEventProcessor::configuring | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 380 of file FUEventProcessor.cc.
References configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, datasetCounting_, alignCSCRings::e, epInitialized_, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::ShmOutputModuleRegistry::getDatasetCSV(), evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), mwrRef_, nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::RateStat::sendAuxLegenda(), evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), evf::FWEPWrapper::setupFastTimerService(), edm::event_processor::sInit, sorRef_, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), AlCaHLTBitMon_QueryRunRegistry::string, vp_, and vulture_.
{ // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " " // << ((instance_.value_==0) ? 0x8 : 0) << " " // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " " // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " " // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl; unsigned short smap = (datasetCounting_.value_ ? 0x20 : 0 ) + ((nbSubProcesses_.value_!=0) ? 0x10 : 0) + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice + (hasServiceWebRegistry_.value_ ? 0x4 : 0) + (hasModuleWebRegistry_.value_ ? 0x2 : 0) + (hasPrescaleService_.value_ ? 0x1 : 0); if(nbSubProcesses_.value_==0) { spMStates_.setSize(1); spmStates_.setSize(1); } else { spMStates_.setSize(nbSubProcesses_.value_); spmStates_.setSize(nbSubProcesses_.value_); for(unsigned int i = 0; i < spMStates_.size(); i++) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } try { LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ..."); std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg); epInitialized_=true; if(evtProcessor_) { //get ref of mwr mwrRef_ = evtProcessor_.getModuleWebRegistry(); sorRef_ = evtProcessor_.getShmOutputModuleRegistry(); // moved to wrapper class configuration_ = evtProcessor_.configuration(); if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop(); evtProcessor_->beginJob(); evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1); if(cpustat_) {delete cpustat_; cpustat_=0;} cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(), nbSubProcesses_.value_, instance_.value_, iDieUrl_.value_); if(ratestat_) {delete ratestat_; ratestat_=0;} ratestat_ = new RateStat(iDieUrl_.value_); if(iDieStatisticsGathering_.value_) { try { cpustat_->sendLegenda(evtProcessor_.getmicromap()); xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0) { std::string slegenda = ((xdata::String*)legenda)->value_; ratestat_->sendLegenda(slegenda); } if (sorRef_ && datasetCounting_.value_) { xdata::String dsLegenda = sorRef_->getDatasetCSV(); if (dsLegenda.value_.size()) ratestat_->sendAuxLegenda(dsLegenda); } } catch(evf::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda" << e.what()); } catch (xcept::Exception& e) { LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda." << e.what()); } } fsm_.fireEvent("ConfigureDone",this); LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!"); localLog("-I- Configuration completed"); } } catch (xcept::Exception &e) { reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch(...) { fsm_.fireFailed("Unknown Exception",this); } if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess(); return false; }
void evf::FUEventProcessor::css | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [inline] |
Definition at line 107 of file FUEventProcessor.h.
References evf::Css::css(), and css_.
Referenced by FUEventProcessor().
void FUEventProcessor::defaultWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 925 of file FUEventProcessor.cc.
References nbSubProcesses_, and dbtoconf::out.
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">" << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ") << getApplicationDescriptor()->getInstance() << "</title>" << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">" << "</head></html>"; }
void FUEventProcessor::detachDqmFromShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 1086 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and cms::Exception::what().
Referenced by stopClassic().
{ std::string errmsg; bool success = false; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) success = edm::Service<FUShmDQMOutputService>()->detachFromShm(); if (!success) errmsg = "Failed to detach DQM service from shared memory"; } catch (cms::Exception& e) { errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
bool FUEventProcessor::doEndRunInEDM | ( | ) | [private] |
Definition at line 670 of file FUEventProcessor.cc.
References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, edm_init_done_, evtProcessor_, evf::StateMachine::fireFailed(), forkInfoObj_, fsm_, evf::moduleweb::ForkInfoObj::lock(), log_, evf::moduleweb::ForkInfoObj::receivedStop_, edm::event_processor::sJobReady, stor::utils::sleep(), edm::event_processor::sRunning, edm::event_processor::sStopping, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().
Referenced by halting(), and stopping().
{ //taking care that EP in master is in stoppable state if (forkInfoObj_) { int count = 30; bool waitedForEDM=false; while (!edm_init_done_ && count) { ::sleep(1); if (count%5==0) LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec"); count--; waitedForEDM=true; } //sleep a few more seconds it was early stop if (waitedForEDM) sleep(5); //if (count==0) fsm_.fireFailed("failed to stop Master EP",this); if (evtProcessor_->getState()==edm::event_processor::sJobReady) return true;//we are already done forkInfoObj_->lock(); forkInfoObj_->stopCondition=true; sem_post(forkInfoObj_->control_sem_); forkInfoObj_->unlock(); count = 30; edm::event_processor::State st; while(count--) { st = evtProcessor_->getState(); if (st==edm::event_processor::sRunning) ::usleep(100000); else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) { break; } else { std::ostringstream ost; ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping"; LOG4CPLUS_ERROR(getApplicationLogger(),ost.str()); fsm_.fireFailed(ost.str(),this); return false; } if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) { forkInfoObj_->lock(); forkInfoObj_->stopCondition=true; sem_post(forkInfoObj_->control_sem_); forkInfoObj_->unlock(); LOG4CPLUS_WARN(getApplicationLogger(), "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" ); } } if (count<0) { std::ostringstream ost; if (!forkInfoObj_->receivedStop_) ost << "Timeout waiting for Master edm::EventProcessor to go stopping state " << evtProcessor_->stateName(st) << ": input source did not receive stop signal!"; else ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st); LOG4CPLUS_ERROR(getApplicationLogger(),ost.str()); fsm_.fireFailed(ost.str(),this); return false; } } return true; }
bool FUEventProcessor::enableClassic | ( | ) | [private] |
Definition at line 2323 of file FUEventProcessor.cc.
References enableCommon(), evtProcessor_, localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.
Referenced by enabling().
{ bool retval = enableCommon(); while(evtProcessor_->getState()!= edm::event_processor::sRunning){ LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog"); ::sleep(1); } // implementation moved to EPWrapper // startScalersWorkLoop(); // this is now not done any longer localLog("-I- Start completed"); return retval; }
bool FUEventProcessor::enableCommon | ( | ) | [private] |
Definition at line 1958 of file FUEventProcessor.cc.
References attachDqmToShm(), gather_cfg::cout, alignCSCRings::e, evtProcessor_, exception, Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, runNumber_, stor::utils::sleep(), and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by enableClassic(), and enableMPEPSlave().
{ try { if(hasShMem_) attachDqmToShm(); int sc = 0; evtProcessor_->clearCounters(); if(isRunNumberSetter_) evtProcessor_->setRunNumber(runNumber_.value_); else evtProcessor_->declareRunNumber(runNumber_.value_); try{ ::sleep(1); evtProcessor_->runAsync(); sc = evtProcessor_->statusAsync(); } catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } catch(...) { reasonForFailedState_ = "Unknown Exception"; fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } if(sc != 0) { std::ostringstream oss; oss<<"EventProcessor::runAsync returned status code " << sc; reasonForFailedState_ = oss.str(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } } catch (xcept::Exception &e) { reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); return false; } try{ fsm_.fireEvent("EnableDone",this); } catch (xcept::Exception &e) { std::cout << "exception " << (std::string)e.what() << std::endl; throw; } return false; }
bool FUEventProcessor::enableForkInEDM | ( | ) | [private] |
Definition at line 2240 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, exception, cms::Exception::explainSelf(), evf::StateMachine::fireFailed(), evf::moduleweb::ForkInfoObj::forkHandler, forkInfoObj_, forkObjLock_, evf::moduleweb::ForkInfoObj::forkParams, forkProcessFromEDM_helper(), fsm_, evf::moduleweb::ForkInfoObj::fuAddr, isRunNumberSetter_, log_, evf::moduleweb::ForkInfoObj::mst_lock_, mwrRef_, evf::ModuleWebRegistry::publishForkInfo(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), runNumber_, evf::moduleweb::ForkInfoObj::stopCondition, and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by enabling().
{ evtProcessor_.resetWaiting(); try { //set to connect to Shm later //if(hasShMem_) setAttachDqmToShm(); int sc = 0; //maybe not needed in MP mode evtProcessor_->clearCounters(); if(isRunNumberSetter_) evtProcessor_->setRunNumber(runNumber_.value_); else evtProcessor_->declareRunNumber(runNumber_.value_); //prepare object used to communicate with DaqSource pthread_mutex_destroy(&forkObjLock_); pthread_mutex_init(&forkObjLock_,0); if (forkInfoObj_) delete forkInfoObj_; forkInfoObj_ = new moduleweb::ForkInfoObj(); forkInfoObj_->mst_lock_=&forkObjLock_; forkInfoObj_->fuAddr=(void*)this; forkInfoObj_->forkHandler = forkProcessFromEDM_helper; forkInfoObj_->forkParams.slotId=-1; forkInfoObj_->forkParams.restart=0; forkInfoObj_->forkParams.isMaster=-1; forkInfoObj_->stopCondition=0; if (mwrRef_) mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_); evtProcessor_->runAsync(); sc = evtProcessor_->statusAsync(); if(sc != 0) { std::ostringstream oss; oss<<"EventProcessor::runAsync returned status code " << sc; reasonForFailedState_ = oss.str(); fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } } //catch exceptions on master side catch(cms::Exception &e) { reasonForFailedState_ = e.explainSelf(); fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } catch(std::exception &e) { reasonForFailedState_ = e.what(); fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } catch(...) { reasonForFailedState_ = "Unknown Exception"; fsm_.fireFailed(reasonForFailedState_,this); LOG4CPLUS_FATAL(log_,reasonForFailedState_); return false; } return true; }
bool FUEventProcessor::enableMPEPSlave | ( | ) | [private] |
Definition at line 2336 of file FUEventProcessor.cc.
References alignCSCRings::e, enableCommon(), evtProcessor_, Exception, evf::StateMachine::fireFailed(), fsm_, reco::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), startScalersWorkLoop(), and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by enabling(), and supervisor().
{ //all this happens only in the child process startReceivingLoop(); startReceivingMonitorLoop(); evtProcessor_.resetWaiting(); startScalersWorkLoop(); while(!evtProcessor_.isWaitingForLs()) ::sleep(1); // @EM test do not run monitor loop in slave, only receiving&Monitor // evtProcessor_.startMonitoringWorkLoop(); try{ // evtProcessor_.makeServicesOnly(); try{ edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { pf->makePresence("MessageServicePresence").release(); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to create message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception"); } ML::MLlog4cplus::setAppl(this); } catch (xcept::Exception &e) { reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what(); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } bool retval = enableCommon(); // while(evtProcessor_->getState()!= edm::event_processor::sRunning){ // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog"); // ::sleep(1); // } return retval; }
bool FUEventProcessor::enabling | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 493 of file FUEventProcessor.cc.
References toolbox::mem::_s_mutex_ptr_, allProcStats_, configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, crashesThisRun_, datasetCounting_, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, enableClassic(), enableForkInEDM(), enableMPEPSlave(), epInitialized_, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), forkInEDM_, fsm_, evf::ShmOutputModuleRegistry::getDatasetCSV(), evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, idleProcStats_, evf::FWEPWrapper::init(), instance_, localLog(), mwrRef_, myProcess_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, ratestat_, reasonForFailedState_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), rlimit_coresize_changed_, rlimit_coresize_default_, runNumber_, scalersLegendaInfoSpace_, scalersUpdates_, evf::RateStat::sendAuxLegenda(), evf::CPUStat::sendLegenda(), evf::RateStat::sendLegenda(), edm::event_processor::sError, evf::FWEPWrapper::setupFastTimerService(), sigmon_sem_, edm::event_processor::sInvalid, stor::utils::sleep(), sorRef_, edm::event_processor::sRunning, evf::Vulture::start(), start_lock_, startSignalMonitorWorkLoop(), startSummarizeWorkLoop(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, subs_, vp_, and vulture_.
Referenced by sigmon().
{ nbTotalDQM_ = 0; scalersUpdates_ = 0; idleProcStats_ = 0; allProcStats_ = 0; // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " " // << ((instance_.value_==0) ? 0x8 : 0) << " " // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " " // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " " // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl; unsigned short smap = (datasetCounting_.value_ ? 0x20 : 0 ) + ((nbSubProcesses_.value_!=0) ? 0x10 : 0) + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice + (hasServiceWebRegistry_.value_ ? 0x4 : 0) + (hasModuleWebRegistry_.value_ ? 0x2 : 0) + (hasPrescaleService_.value_ ? 0x1 : 0); LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling..."); //reset core limit size if (rlimit_coresize_changed_) setrlimit(RLIMIT_CORE,&rlimit_coresize_default_); rlimit_coresize_changed_=false; crashesThisRun_=0; //recreate signal monitor sem sem_destroy(sigmon_sem_); sem_init(sigmon_sem_,true,0); if(!epInitialized_){ evtProcessor_.forceInitEventProcessorMaybe(); } std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg); mwrRef_ = evtProcessor_.getModuleWebRegistry(); sorRef_ = evtProcessor_.getShmOutputModuleRegistry(); if(!epInitialized_){ evtProcessor_->beginJob(); evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1); if(cpustat_) {delete cpustat_; cpustat_=0;} cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(), nbSubProcesses_.value_, instance_.value_, iDieUrl_.value_); if(ratestat_) {delete ratestat_; ratestat_=0;} ratestat_ = new RateStat(iDieUrl_.value_); if(iDieStatisticsGathering_.value_) { try { cpustat_->sendLegenda(evtProcessor_.getmicromap()); xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0) { std::string slegenda = ((xdata::String*)legenda)->value_; ratestat_->sendLegenda(slegenda); } if (sorRef_ && datasetCounting_.value_) { xdata::String dsLegenda = sorRef_->getDatasetCSV(); if (dsLegenda.value_.size()) ratestat_->sendAuxLegenda(dsLegenda); } } catch(evf::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda" << e.what()); } catch (xcept::Exception& e) { LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda." << e.what()); } } epInitialized_ = true; } configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out... evtProcessor_.resetLumiSectionReferenceIndex(); //classic appl will return here if(nbSubProcesses_.value_==0) return enableClassic(); //protect manipulation of subprocess array pthread_mutex_lock(&start_lock_); pthread_mutex_lock(&pickup_lock_); subs_.clear(); subs_.resize(nbSubProcesses_.value_); // this should not be necessary pid_t retval = -1; for(unsigned int i=0; i<nbSubProcesses_.value_; i++) { subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables } pthread_mutex_unlock(&pickup_lock_); pthread_mutex_unlock(&start_lock_); //set expected number of child EP's for the Init message(s) sent to the SM try { if (sorRef_) { unsigned int nbExpectedEPWriters = nbSubProcesses_.value_; if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules(); for (unsigned int i=0;i<shmOutputs.size();i++) { shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters); } } } catch (...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs"); } //use new method if configured edm_init_done_=true; if (forkInEDM_.value_) { edm_init_done_=false; enableForkInEDM(); } else for(unsigned int i=0; i<nbSubProcesses_.value_; i++) { retval = subs_[i].forkNew(); if(retval==0) { myProcess_ = &subs_[i]; // dirty hack: delete/recreate global binary semaphore for later use in child delete toolbox::mem::_s_mutex_ptr_; toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true); int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); return enableMPEPSlave(); // the loop is broken in the child } } if (forkInEDM_.value_) { edm::event_processor::State st; while (!edm_init_done_) { usleep(10000); st = evtProcessor_->getState(); if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break; } st = evtProcessor_->getState(); //error handling: EP must fork during sRunning if (st!=edm::event_processor::sRunning) { reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); return false; } sleep(1); startSummarizeWorkLoop(); startSignalMonitorWorkLoop();//only with new forking vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_); //enable after we are done with conditions loading and forking LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!"); fsm_.fireEvent("EnableDone",this); localLog("-I- Start completed"); return false; } startSummarizeWorkLoop(); vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_); LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!"); fsm_.fireEvent("EnableDone",this); localLog("-I- Start completed"); return false; }
void FUEventProcessor::forkProcessesFromEDM | ( | ) | [private] |
Definition at line 2022 of file FUEventProcessor.cc.
References toolbox::mem::_s_mutex_ptr_, evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, attachDqmToShm(), gather_cfg::cout, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, evf::evfep_alarmhandler(), evf::evfep_sighandler(), evtProcessor_, exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, fsm_, reco::get(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasShMem_, i, evf::moduleweb::ForkParams::isMaster, evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), messageServicePresence_, myProcess_, nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::resetPackedTriggerReport(), evf::moduleweb::ForkParams::restart, restart_in_progress_, scalersUpdates_, ML::MLlog4cplus::setAppl(), evf::moduleweb::ForkParams::slotId, sorRef_, dqm_diff::start, startReceivingLoop(), startReceivingMonitorLoop(), startScalersWorkLoop(), evf::StateMachine::stateName(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, subs_, wlReceiving_, and wlReceivingMonitor_.
{ moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams); unsigned int forkFrom=0; unsigned int forkTo=nbSubProcesses_.value_; if (forkParams->slotId>=0) { forkFrom=forkParams->slotId; forkTo=forkParams->slotId+1; } //before fork, make sure to disconnect output modules from Shm try { if (sorRef_) { std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules(); for (unsigned int i=0;i<shmOutputs.size();i++) { //unregister PID from ShmBuffer/RB shmOutputs[i]->unregisterFromShm(); //disconnect from Shm shmOutputs[i]->stop(); } } } catch (std::exception &e) { reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what(); LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } catch (...) { reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: "; LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); } std::string currentState = fsm_.stateName()->toString(); //destroy MessageServicePresence thread before fork if (currentState!="stopping") { try { messageServicePresence_.reset(); } catch (...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!"); } } if (currentState=="stopping") { LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString()); forkParams->isMaster=1; forkInfoObj_->forkParams.slotId=-1; forkInfoObj_->forkParams.restart=0; } //fork loop else for(unsigned int i=forkFrom; i<forkTo; i++) { int retval = subs_[i].forkNew(); if(retval==0) { forkParams->isMaster=0; myProcess_ = &subs_[i]; // dirty hack: delete/recreate global binary semaphore for later use in child delete toolbox::mem::_s_mutex_ptr_; toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true); int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); //recreate MessageLogger thread in slave after fork try{ edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { messageServicePresence_ = pf->makePresence("MessageServicePresence"); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "SLAVE: Unable to create message service presence. pid:"<<getpid()); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid()); } ML::MLlog4cplus::setAppl(this); //reconnect to Shm from output modules try { if (sorRef_) { std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules(); for (unsigned int i=0;i<shmOutputs.size();i++) shmOutputs[i]->start(); } } catch (...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")"); } if (forkParams->restart) { //do restart things scalersUpdates_ = 0; try { fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition } catch (...) { LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP"); } try{ xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex"); if(lsid) { ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died } } catch(...){ std::cout << "trouble with lsindex during restart" << std::endl; } try{ xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered"); if(lstb) { ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting } } catch(...){ std::cout << "trouble with resetting flag for eol recovery " << std::endl; } evtProcessor_.adjustLsIndexForRestart(); evtProcessor_.resetPackedTriggerReport(); } //start other threads startReceivingLoop(); startReceivingMonitorLoop(); startScalersWorkLoop(); while(!evtProcessor_.isWaitingForLs()) ::usleep(100000);//wait for scalers loop to start //connect DQMShmOutputModule if(hasShMem_) attachDqmToShm(); //catch transition error if we are already Enabled try { fsm_.fireEvent("EnableDone",this); } catch (...) {} //make sure workloops are started while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000); //unmask signals sigset_t tmpset_thread; sigemptyset(&tmpset_thread); sigaddset(&tmpset_thread, SIGQUIT); sigaddset(&tmpset_thread, SIGILL); sigaddset(&tmpset_thread, SIGABRT); sigaddset(&tmpset_thread, SIGFPE); sigaddset(&tmpset_thread, SIGSEGV); sigaddset(&tmpset_thread, SIGALRM); //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0); pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0); //set signal handlers struct sigaction sa; sigset_t tmpset; memset(&tmpset,0,sizeof(tmpset)); sigemptyset(&tmpset); sa.sa_mask=tmpset; sa.sa_flags=SA_RESETHAND | SA_SIGINFO; sa.sa_handler=0; sa.sa_sigaction=evfep_sighandler; sigaction(SIGQUIT,&sa,0); sigaction(SIGILL,&sa,0); sigaction(SIGABRT,&sa,0); sigaction(SIGFPE,&sa,0); sigaction(SIGSEGV,&sa,0); sa.sa_sigaction=evfep_alarmhandler; sigaction(SIGALRM,&sa,0); //child return to DaqSource return ; } else { forkParams->isMaster=1; forkInfoObj_->forkParams.slotId=-1; if (forkParams->restart) { std::ostringstream ost1; ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId; localLog(ost1.str()); } forkInfoObj_->forkParams.restart=0; //start "crash" receiver workloop } } //recreate MessageLogger thread after fork try{ //release the presense factory in master edm::PresenceFactory *pf = edm::PresenceFactory::get(); if(pf != 0) { messageServicePresence_ = pf->makePresence("MessageServicePresence"); } else { LOG4CPLUS_ERROR(getApplicationLogger(), "Unable to recreate message service presence "); } } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence"); } restart_in_progress_=false; edm_init_done_=true; }
void FUEventProcessor::forkProcessFromEDM_helper | ( | void * | addr | ) | [static] |
Definition at line 2018 of file FUEventProcessor.cc.
Referenced by enableForkInEDM().
{ ((FUEventProcessor*)addr)->forkProcessesFromEDM(); }
xoap::MessageReference FUEventProcessor::fsmCallback | ( | xoap::MessageReference | msg | ) | throw (xoap::exception::Exception) |
Definition at line 802 of file FUEventProcessor.cc.
References evf::StateMachine::commandCallback(), fsm_, and lumiQueryAPI::msg.
{ return fsm_.commandCallback(msg); }
void FUEventProcessor::getSlavePids | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 844 of file FUEventProcessor.cc.
References i, dbtoconf::out, and subs_.
Referenced by FUEventProcessor().
bool FUEventProcessor::halting | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 765 of file FUEventProcessor.cc.
References doEndRunInEDM(), alignCSCRings::e, evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInEDM_, forkInfoObj_, fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, rlimit_coresize_default_, sigmon_sem_, evf::FWEPWrapper::stopAndHalt(), stopSlavesAndAcknowledge(), and AlCaHLTBitMon_QueryRunRegistry::string.
{ LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ..."); setrlimit(RLIMIT_CORE,&rlimit_coresize_default_); if(nbSubProcesses_.value_!=0) { stopSlavesAndAcknowledge(); if (forkInEDM_.value_) { sem_post(sigmon_sem_); if (!doEndRunInEDM()) return false; } } try{ evtProcessor_.stopAndHalt(); //cleanup forking variables if (forkInfoObj_) { delete forkInfoObj_; forkInfoObj_=0; } } catch (evf::Exception &e) { reasonForFailedState_ = "halting FAILED: " + (std::string)e.what(); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); } // if(hasShMem_) detachDqmFromShm(); LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!"); fsm_.fireEvent("HaltDone",this); localLog("-I- Halt completed"); return false; }
void FUEventProcessor::handleSignalSlave | ( | int | sig, |
siginfo_t * | info, | ||
void * | c | ||
) |
Definition at line 2713 of file FUEventProcessor.cc.
References gather_cfg::cout, rlimit_coresize_changed_, sigmon_sem_, stor::utils::sleep(), and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by evf::evfep_sighandler().
{ //notify master sem_post(sigmon_sem_); //sleep while master takes action sleep(2); //set up alarm if handler deadlocks on unsafe actions alarm(5); std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl; std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl; std::cout << "--- Stacktrace follows --" << std::endl; std::ostringstream stacktr; toolbox::stacktrace(20,stacktr); std::cout << stacktr.str(); if (!rlimit_coresize_changed_) std::cout << "--- Dumping core." << " --- " << std::endl; else std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl; std::string hasdump = ""; if (rlimit_coresize_changed_) hasdump = " (core dump disabled) "; LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid() << " on node " << toolbox::net::getHostName() << " ---" << std::endl << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str() ); //re-raise signal with default handler (will cause core dump if enabled) raise(sig); }
void FUEventProcessor::localLog | ( | std::string | m | ) | [private] |
Definition at line 1120 of file FUEventProcessor.cc.
References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.
Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().
{ timeval tv; gettimeofday(&tv,0); tm *uptm = localtime(&tv.tv_sec); char datestring[256]; strftime(datestring, sizeof(datestring),"%c", uptm); if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;} logRingIndex_--; std::ostringstream timestamp; timestamp << " at " << datestring; m += timestamp.str(); logRing_[logRingIndex_] = m; }
std::string FUEventProcessor::logsAsString | ( | ) | [private] |
Definition at line 1103 of file FUEventProcessor.cc.
References i, logRing_, logRingIndex_, and logWrap_.
{ std::ostringstream oss; if(logWrap_) { for(unsigned int i = logRingIndex_; i < logRing_.size(); i++) oss << logRing_[i] << std::endl; for(unsigned int i = 0; i < logRingIndex_; i++) oss << logRing_[i] << std::endl; } else for(unsigned int i = logRingIndex_; i < logRing_.size(); i++) oss << logRing_[i] << std::endl; return oss.str(); }
void FUEventProcessor::makeStaticInfo | ( | ) | [private] |
Definition at line 2678 of file FUEventProcessor.cc.
References applicationInfoSpace_, evf::utils::cDiv(), alignCSCRings::e, Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.
Referenced by FUEventProcessor().
{ using namespace utils; std::ostringstream ost; mDiv(&ost,"ve"); ost<< "$Revision: 1.164 $ (" << edm::getReleaseVersion() <<")"; cDiv(&ost); mDiv(&ost,"ou",outPut_.toString()); mDiv(&ost,"sh",hasShMem_.toString()); mDiv(&ost,"mw",hasModuleWebRegistry_.toString()); mDiv(&ost,"sw",hasServiceWebRegistry_.toString()); xdata::Serializable *monsleep = 0; xdata::Serializable *lstimeout = 0; try{ monsleep = applicationInfoSpace_->find("monSleepSec"); lstimeout = applicationInfoSpace_->find("lsTimeOut"); } catch(xdata::exception::Exception e){ } if(monsleep!=0) mDiv(&ost,"ms",monsleep->toString()); if(lstimeout!=0) mDiv(&ost,"lst",lstimeout->toString()); char cbuf[sizeof(struct utsname)]; struct utsname* buf = (struct utsname*)cbuf; uname(buf); mDiv(&ost,"sysinfo"); ost << buf->sysname << " " << buf->nodename << " " << buf->release << " " << buf->version << " " << buf->machine; cDiv(&ost); updaterStatic_ = ost.str(); }
void FUEventProcessor::microState | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 2484 of file FUEventProcessor.cc.
References autoRestartSlaves_, gather_cfg::cout, alignCSCRings::e, evtProcessor_, exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, recoMuon::in, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, dbtoconf::out, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), AlCaHLTBitMon_QueryRunRegistry::string, subs_, and cms::Exception::what().
Referenced by FUEventProcessor().
{ std::string urn = getApplicationDescriptor()->getURN(); try{ evtProcessor_.stateNameFromIndex(0); evtProcessor_.moduleNameFromIndex(0); if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;} *out << "<tr><td>" << fsm_.stateName()->toString() << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>" << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>"; evtProcessor_.microState(in,out); *out << "<td></td><td>" << nbTotalDQM_ << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>"; if(nbSubProcesses_.value_!=0 && !myProcess_) { pthread_mutex_lock(&start_lock_); for(unsigned int i = 0; i < subs_.size(); i++) { try{ if(subs_[i].alive()>0) { *out << "<tr><td bgcolor=\"#00FF00\" id=\"a" << i << "\">""Alive</td><td>S</td><td>" << subs_[i].queueId() << "<td>" << subs_[i].queueStatus()<< "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process=" << subs_[i].pid() << "&method=procStat\">" << subs_[i].pid()<<"</a></td>" //<< msg->mtext; << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>" << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>" << subs_[i].params().nba << "/" << subs_[i].params().nbp << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)" << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls << "</td><td>" << subs_[i].params().ps << "</td><td" << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">" << subs_[i].params().eols << "</td><td>" << subs_[i].params().dqm << "</td><td>" << subs_[i].params().trp << "</td>"; } else { pthread_mutex_lock(&pickup_lock_); *out << "<tr><td id=\"a"<< i << "\" "; if(subs_[i].alive()==-1000) *out << " bgcolor=\"#bbaabb\">NotInitialized"; else *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead"); *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" << subs_[i].queueStatus() << "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td id=\"p"<< i << "\">" <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed(); if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000) { if(autoRestartSlaves_ && subs_[i].restartCount()<=2) *out << " will restart in " << subs_[i].countdown() << " s"; else if(autoRestartSlaves_) *out << " reached maximum restart count"; else *out << " autoRestart is disabled "; } *out << "</td><td" << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">" << subs_[i].params().eols << "</td><td>" << subs_[i].params().dqm << "</td><td>" << subs_[i].params().trp << "</td>"; pthread_mutex_unlock(&pickup_lock_); } *out << "</tr>"; } catch(evf::Exception &e){ *out << "<tr><td id=\"a"<< i << "\" " <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>" << subs_[i].queueStatus() << "/" << subs_[i].queueOccupancy() << "/" << subs_[i].queuePidOfLastSend() << "/" << subs_[i].queuePidOfLastReceive() << "</td><td id=\"p"<< i << "\">" <<subs_[i].pid()<<"</td>"; } } pthread_mutex_unlock(&start_lock_); } } catch(evf::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what()); } catch(cms::Exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what()); } catch(std::exception &e) { LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what()); } catch(...) { LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - "); } }
void evf::FUEventProcessor::moduleWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [inline] |
Definition at line 114 of file FUEventProcessor.h.
References evtProcessor_, and evf::FWEPWrapper::moduleWeb().
Referenced by FUEventProcessor(), and receivingAndMonitor().
{evtProcessor_.moduleWeb(in,out);}
void FUEventProcessor::pathNames | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 1039 of file FUEventProcessor.cc.
References evtProcessor_, dbtoconf::out, scalersLegendaInfoSpace_, and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by FUEventProcessor().
{ if(evtProcessor_ != 0){ xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda"); if(legenda !=0){ std::string slegenda = ((xdata::String*)legenda)->value_; *out << slegenda << std::endl; } } }
void FUEventProcessor::procStat | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 2668 of file FUEventProcessor.cc.
References dbtoconf::out.
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ evf::utils::procStat(out); }
bool FUEventProcessor::receiving | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1192 of file FUEventProcessor.cc.
References alignCSCRings::e, evf::StateMachine::fireEvent(), fsm_, messageServicePresence_, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_RLI, MSQM_MESSAGE_TYPE_RLR, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), rlimit_coresize_changed_, rlimit_coresize_default_, stop_lock_, and stopClassic().
Referenced by startReceivingLoop().
{ MsgBuf msg; try{ myProcess_->rcvSlave(msg,false); //will receive only messages from Master if(msg->mtype==MSQM_MESSAGE_TYPE_RLI) { rlimit rl; getrlimit(RLIMIT_CORE,&rl); rl.rlim_cur=0; setrlimit(RLIMIT_CORE,&rl); rlimit_coresize_changed_=true; } if (msg->mtype==MSQM_MESSAGE_TYPE_RLR) { //reset coresize limit setrlimit(RLIMIT_CORE,&rlimit_coresize_default_); rlimit_coresize_changed_=false; } if(msg->mtype==MSQM_MESSAGE_TYPE_STOP) { pthread_mutex_lock(&stop_lock_); try { fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition } catch (...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid " << getpid() << " The state on Stop event was not consistent"); } try { stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM } catch (...) { LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid()); } //destroy MessageService thread before exit try{ messageServicePresence_.reset(); } catch(...) { LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() ); } MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP); myProcess_->postSlave(msg1,false); pthread_mutex_unlock(&stop_lock_); fclose(stdout); fclose(stderr); _exit(EXIT_SUCCESS); } if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP) _exit(EXIT_SUCCESS); } catch(evf::Exception &e){ LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what()); } return true; }
bool FUEventProcessor::receivingAndMonitor | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1744 of file FUEventProcessor.cc.
References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, data, defaultWebPage(), evf::prg::dqm, alignCSCRings::e, evf::prg::eols, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, Exception, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::FWEPWrapper::lastLumiUsingEol_, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, PFRecoTauDiscriminationAgainstElectronMVA2_cfi::method, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, slave_message_monitoring_, slave_message_prr_, stor::utils::sleep(), spotlightWebPage(), evf::StateMachine::stateName(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, evf::prg::trp, and TablePrint::write.
Referenced by startReceivingMonitorLoop().
{ try{ myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master switch(slave_message_monitoring_->mtype) { case MSQM_MESSAGE_TYPE_MCS: { xgi::Input *in = 0; xgi::Output out; evtProcessor_.microState(in,&out); MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR); strncpy(msg1->mtext,out.str().c_str(),out.str().size()); myProcess_->postSlave(msg1,true); break; } case MSQM_MESSAGE_TYPE_PRG: { xdata::Serializable *dqmp = 0; xdata::UnsignedInteger32 *dqm = 0; evtProcessor_.monitoring(0); try{ dqmp = applicationInfoSpace_-> find("nbDqmUpdates"); } catch(xdata::exception::Exception e){} if(dqmp!=0) dqm = (xdata::UnsignedInteger32*)dqmp; // monitorInfoSpace_->lock(); prg * data = (prg*)slave_message_prr_->mtext; data->ls = evtProcessor_.lsid_; data->eols = evtProcessor_.lastLumiUsingEol_; data->ps = evtProcessor_.psid_; data->nbp = evtProcessor_->totalEvents(); data->nba = evtProcessor_->totalEventsPassed(); data->Ms = evtProcessor_.epMAltState_.value_; data->ms = evtProcessor_.epmAltState_.value_; if(dqm) data->dqm = dqm->value_; else data->dqm = 0; data->trp = scalersUpdates_; // monitorInfoSpace_->unlock(); myProcess_->postSlave(slave_message_prr_,true); if(exitOnError_.value_) { // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl; if(data->Ms == edm::event_processor::sError) { bool running = true; int count = 0; while(running){ int retval = pthread_mutex_lock(&stop_lock_); if(retval != 0) perror("error"); running = fsm_.stateName()->toString()=="Enabled"; if(count>5) _exit(-1); pthread_mutex_unlock(&stop_lock_); if(running) {::sleep(1); count++;} } } } break; } case MSQM_MESSAGE_TYPE_WEB: { xgi::Input *in = 0; xgi::Output out; unsigned int bytesToSend = 0; MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB); std::string query = slave_message_monitoring_->mtext; size_t pos = query.find_first_of("&"); std::string method; std::string args; if(pos!=std::string::npos) { method = query.substr(0,pos); args = query.substr(pos+1,query.length()-pos-1); } else method=query; if(method=="Spotlight") { spotlightWebPage(in,&out); } else if(method=="procStat") { procStat(in,&out); } else if(method=="moduleWeb") { internal::MyCgi mycgi; boost::char_separator<char> sep(";"); boost::tokenizer<boost::char_separator<char> > tokens(args, sep); for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin(); tok_iter != tokens.end(); ++tok_iter){ size_t pos = (*tok_iter).find_first_of("%"); if(pos != std::string::npos){ std::string first = (*tok_iter).substr(0 , pos); std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1); mycgi.getEnvironment()[first]=second; } } moduleWeb(&mycgi,&out); } else if(method=="Default") { defaultWebPage(in,&out); } else { out << "Error 404!!!!!!!!" << std::endl; } bytesToSend = out.str().size(); unsigned int cycle = 0; if(bytesToSend==0) { snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend); myProcess_->postSlave(msg1,true); } while(bytesToSend !=0){ unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend; write(anonymousPipe_[PIPE_WRITE], out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle, msgSize); snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize); myProcess_->postSlave(msg1,true); bytesToSend -= msgSize; cycle++; } break; } case MSQM_MESSAGE_TYPE_TRP: { break; } } } catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;} return true; }
bool FUEventProcessor::restartForkInEDM | ( | unsigned int | slotId | ) | [private] |
Definition at line 2304 of file FUEventProcessor.cc.
References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, evf::moduleweb::ForkInfoObj::lock(), log_, restart_in_progress_, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().
Referenced by supervisor().
{ //daqsource will keep this lock until master returns after fork //so that we don't do another EP restart in between forkInfoObj_->lock(); forkInfoObj_->forkParams.slotId=slotId; forkInfoObj_->forkParams.restart=true; forkInfoObj_->forkParams.isMaster=1; forkInfoObj_->stopCondition=0; LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore"); sem_post(forkInfoObj_->control_sem_); forkInfoObj_->unlock(); usleep(1000); //sleep until fork is performed int count=50; restart_in_progress_=true; while (restart_in_progress_ && count--) usleep(20000); return true; }
bool FUEventProcessor::scalers | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1605 of file FUEventProcessor.cc.
References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), run_regression::ret, scalersUpdates_, and wlScalersActive_.
Referenced by startScalersWorkLoop().
{ if(evtProcessor_) { if(!evtProcessor_.getTriggerReport(true)) { wlScalersActive_ = false; return false; } } else { std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl; wlScalersActive_ = false; return false; } if(myProcess_) { // std::cout << getpid() << "going to post on control queue from scalers" << std::endl; int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false); if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl; scalersUpdates_++; } else evtProcessor_.fireScalersUpdate(); return true; }
void FUEventProcessor::scalersWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 1026 of file FUEventProcessor.cc.
References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.
Referenced by FUEventProcessor().
{ out->getHTTPResponseHeader().addHeader( "Content-Type", "application/octet-stream" ); out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding", "binary" ); if(evtProcessor_ != 0){ out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic)); } }
void FUEventProcessor::sendMessageOverMonitorQueue | ( | MsgBuf & | buf | ) |
Definition at line 2673 of file FUEventProcessor.cc.
References myProcess_, and evf::SubProcess::postSlave().
{ if(myProcess_) myProcess_->postSlave(buf,true); }
void evf::FUEventProcessor::serviceWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) [inline] |
Definition at line 115 of file FUEventProcessor.h.
References evtProcessor_, and evf::FWEPWrapper::serviceWeb().
Referenced by FUEventProcessor().
{evtProcessor_.serviceWeb(in,out);}
void FUEventProcessor::setAttachDqmToShm | ( | ) | throw (evf::Exception) [private] |
Definition at line 1053 of file FUEventProcessor.cc.
References alignCSCRings::e, evtProcessor_, edm::Service< T >::isAvailable(), AlCaHLTBitMon_QueryRunRegistry::string, and cms::Exception::what().
{ std::string errmsg; try { edm::ServiceRegistry::Operate operate(evtProcessor_->getToken()); if(edm::Service<FUShmDQMOutputService>().isAvailable()) edm::Service<FUShmDQMOutputService>()->setAttachToShm(); } catch (cms::Exception& e) { errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what(); } if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg); }
bool FUEventProcessor::sigmon | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1909 of file FUEventProcessor.cc.
References gather_cfg::cout, crashesThisRun_, crashesToDump_, enabling(), fsm_, i, lastCrashTime_, min, MSQM_MESSAGE_TYPE_RLI, NUMERIC_MESSAGE_SIZE, rlimit_coresize_changed_, sigmon_sem_, signalMonitorActive_, evf::StateMachine::stateName(), stopping(), and subs_.
Referenced by startSignalMonitorWorkLoop().
{ while (1) { sem_wait(sigmon_sem_); std::cout << " received signal notification from slave!"<< std::endl; //check if shutdown time bool running = fsm_.stateName()->toString()=="Enabled"; bool stopping = fsm_.stateName()->toString()=="stopping"; bool enabling = fsm_.stateName()->toString()=="enabling"; if (!running && !enabling) { signalMonitorActive_ = false; return false; } crashesThisRun_++; gettimeofday(&lastCrashTime_,0); //set core size limit to 0 in master and slaves if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) { rlimit rlold; getrlimit(RLIMIT_CORE,&rlold); rlimit rlnew = rlold; rlnew.rlim_cur=0; setrlimit(RLIMIT_CORE,&rlnew); rlimit_coresize_changed_=true; MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI); //in case of frequent crashes, allow first slot to dump (until restart) unsigned int min=1; for (unsigned int i = min; i < subs_.size(); i++) { try { if (subs_[i].alive()) { subs_[i].post(master_message_rli_,false); } } catch (...) {} } std::ostringstream ostr; ostr << "Number of recent slave crashes reaches " << crashesThisRun_ << ". Disabling core dumps for next 15 minutes in this FilterUnit"; LOG4CPLUS_WARN(getApplicationLogger(),ostr.str()); } }//end while loop signalMonitorActive_ = false; return false; }
void FUEventProcessor::spotlightWebPage | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 941 of file FUEventProcessor.cc.
References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), AlCaHLTBitMon_QueryRunRegistry::string, evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().
Referenced by FUEventProcessor(), and receivingAndMonitor().
{ std::string urn = getApplicationDescriptor()->getURN(); *out << "<!-- base href=\"/" << urn << "\"> -->" << std::endl; *out << "<html>" << std::endl; *out << "<head>" << std::endl; *out << "<link type=\"text/css\" rel=\"stylesheet\""; *out << " href=\"/evf/html/styles.css\"/>" << std::endl; *out << "<title>" << getApplicationDescriptor()->getClassName() << getApplicationDescriptor()->getInstance() << " MAIN</title>" << std::endl; *out << "</head>" << std::endl; *out << "<body>" << std::endl; *out << "<table border=\"0\" width=\"100%\">" << std::endl; *out << "<tr>" << std::endl; *out << " <td align=\"left\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl; *out << " alt=\"main\"" << std::endl; *out << " width=\"64\"" << std::endl; *out << " height=\"64\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " <b>" << std::endl; *out << getApplicationDescriptor()->getClassName() << getApplicationDescriptor()->getInstance() << std::endl; *out << " " << fsm_.stateName()->toString() << std::endl; *out << " </b>" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl; *out << " alt=\"HyperDAQ\"" << std::endl; *out << " width=\"32\"" << std::endl; *out << " height=\"32\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " </a>" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " </td>" << std::endl; *out << " <td width=\"32\">" << std::endl; *out << " <a href=\"/" << urn << "/\">" << std::endl; *out << " <img" << std::endl; *out << " align=\"middle\"" << std::endl; *out << " src=\"/evf/images/epicon.jpg\"" << std::endl; *out << " alt=\"main\"" << std::endl; *out << " width=\"32\"" << std::endl; *out << " height=\"32\"" << std::endl; *out << " border=\"\"/>" << std::endl; *out << " </a>" << std::endl; *out << " </td>" << std::endl; *out << "</tr>" << std::endl; *out << "</table>" << std::endl; *out << "<hr/>" << std::endl; std::ostringstream ost; if(myProcess_) ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&"; else ost << "/moduleWeb?"; urn += ost.str(); if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0)) evtProcessor_.taskWebPage(in,out,urn); else if(evtProcessor_) evtProcessor_.summaryWebPage(in,out,urn); else *out << "<td>HLT Unconfigured</td>" << std::endl; *out << "</table>" << std::endl; *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl; *out << configuration_ << std::endl; *out << "</textarea><P>" << std::endl; *out << "</body>" << std::endl; *out << "</html>" << std::endl; }
void FUEventProcessor::startReceivingLoop | ( | ) | [private] |
Definition at line 1155 of file FUEventProcessor.cc.
References asReceiveMsgAndExecute_, alignCSCRings::e, Exception, lumiQueryAPI::msg, receiving(), receiving_, AlCaHLTBitMon_QueryRunRegistry::string, and wlReceiving_.
Referenced by enableMPEPSlave(), and forkProcessesFromEDM().
{ try { wlReceiving_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving", "waiting"); if (!wlReceiving_->isActive()) wlReceiving_->activate(); asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving, "Receiving"); wlReceiving_->submit(asReceiveMsgAndExecute_); receiving_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Receiving'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startReceivingMonitorLoop | ( | ) | [private] |
Definition at line 1172 of file FUEventProcessor.cc.
References asReceiveMsgAndRead_, alignCSCRings::e, Exception, lumiQueryAPI::msg, receivingAndMonitor(), receivingM_, AlCaHLTBitMon_QueryRunRegistry::string, and wlReceivingMonitor_.
Referenced by enableMPEPSlave(), and forkProcessesFromEDM().
{ try { wlReceivingMonitor_= toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM", "waiting"); if (!wlReceivingMonitor_->isActive()) wlReceivingMonitor_->activate(); asReceiveMsgAndRead_ = toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor, "ReceivingM"); wlReceivingMonitor_->submit(asReceiveMsgAndRead_); receivingM_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'ReceivingM'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startScalersWorkLoop | ( | ) | throw (evf::Exception) [private] |
Definition at line 1562 of file FUEventProcessor.cc.
References asScalers_, alignCSCRings::e, Exception, lumiQueryAPI::msg, scalers(), AlCaHLTBitMon_QueryRunRegistry::string, wlScalers_, and wlScalersActive_.
Referenced by enableMPEPSlave(), and forkProcessesFromEDM().
{ try { wlScalers_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers", "waiting"); if (!wlScalers_->isActive()) wlScalers_->activate(); asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers, "Scalers"); wlScalers_->submit(asScalers_); wlScalersActive_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Scalers'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startSignalMonitorWorkLoop | ( | ) | throw (evf::Exception) [private] |
Definition at line 1886 of file FUEventProcessor.cc.
References asSignalMonitor_, gather_cfg::cout, alignCSCRings::e, Exception, lumiQueryAPI::msg, sigmon(), signalMonitorActive_, AlCaHLTBitMon_QueryRunRegistry::string, and wlSignalMonitor_.
Referenced by enabling().
{ //todo rewind/check semaphore //start workloop try { wlSignalMonitor_= toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor", "waiting"); if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate(); asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon, "SignalMonitor"); wlSignalMonitor_->submit(asSignalMonitor_); signalMonitorActive_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'SignalMonitor'. (3)"; std::cout << e.what() << std::endl; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startSummarizeWorkLoop | ( | ) | throw (evf::Exception) [private] |
Definition at line 1583 of file FUEventProcessor.cc.
References asSummarize_, alignCSCRings::e, Exception, lumiQueryAPI::msg, AlCaHLTBitMon_QueryRunRegistry::string, summarize(), wlSummarize_, and wlSummarizeActive_.
Referenced by enabling().
{ try { wlSummarize_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary", "waiting"); if (!wlSummarize_->isActive()) wlSummarize_->activate(); asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize, "Summary"); wlSummarize_->submit(asSummarize_); wlSummarizeActive_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Summarize'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
void FUEventProcessor::startSupervisorLoop | ( | ) | [private] |
Definition at line 1137 of file FUEventProcessor.cc.
References asSupervisor_, alignCSCRings::e, Exception, lumiQueryAPI::msg, AlCaHLTBitMon_QueryRunRegistry::string, supervising_, supervisor(), and wlSupervising_.
Referenced by FUEventProcessor().
{ try { wlSupervising_= toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor", "waiting"); if (!wlSupervising_->isActive()) wlSupervising_->activate(); asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor, "Supervisor"); wlSupervising_->submit(asSupervisor_); supervising_ = true; } catch (xcept::Exception& e) { std::string msg = "Failed to start workloop 'Supervisor'."; XCEPT_RETHROW(evf::Exception,msg,e); } }
bool FUEventProcessor::stopClassic | ( | ) | [private] |
Definition at line 2379 of file FUEventProcessor.cc.
References detachDqmFromShm(), alignCSCRings::e, edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, evf::FWEPWrapper::stop(), AlCaHLTBitMon_QueryRunRegistry::string, and cms::Exception::what().
Referenced by receiving(), and stopping().
{ bool failed=false; try { LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ..."); edm::EventProcessor::StatusCode rc = evtProcessor_.stop(); if(rc == edm::EventProcessor::epSuccess) fsm_.fireEvent("StopDone",this); else { failed=true; // epMState_ = evtProcessor_->currentStateName(); if(rc == edm::EventProcessor::epTimedOut) reasonForFailedState_ = "EventProcessor stop timed out"; else reasonForFailedState_ = "EventProcessor did not receive STOP event"; } } catch (xcept::Exception &e) { failed=true; reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what(); } catch (edm::Exception &e) { failed=true; reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what(); } catch (...) { failed=true; reasonForFailedState_= "Stopping FAILED: unknown exception"; } try { if (hasShMem_) { detachDqmFromShm(); if (failed) LOG4CPLUS_WARN(getApplicationLogger(), "In failed STOP - success detaching DQM from Shm. pid:" << getpid()); } } catch (cms::Exception & e) { failed=true; reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what(); } catch (...) { failed=true; reasonForFailedState_= "DQM detach failed: Unknown exception"; } if (failed) { LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: " << reasonForFailedState_ << " (pid:" << getpid()<<")"); localLog(reasonForFailedState_); fsm_.fireFailed(reasonForFailedState_,this); } LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!"); localLog("-I- Stop completed"); return false; }
bool FUEventProcessor::stopping | ( | toolbox::task::WorkLoop * | wl | ) |
Definition at line 737 of file FUEventProcessor.cc.
References doEndRunInEDM(), forkInEDM_, hasShMem_, nbSubProcesses_, rlimit_coresize_default_, sigmon_sem_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.
Referenced by sigmon(), and supervisor().
{ setrlimit(RLIMIT_CORE,&rlimit_coresize_default_); if(nbSubProcesses_.value_!=0) { stopSlavesAndAcknowledge(); if (forkInEDM_.value_) { //only in new forking for now sem_post(sigmon_sem_); if (!doEndRunInEDM()) return false; } } vulture_->stop(); if (forkInEDM_.value_) { //shared memory was already disconnected in master bool tmpHasShMem_=hasShMem_; hasShMem_=false; stopClassic(); hasShMem_=tmpHasShMem_; return false; } stopClassic(); return false; }
void FUEventProcessor::stopSlavesAndAcknowledge | ( | ) | [private] |
Definition at line 2438 of file FUEventProcessor.cc.
References alignCSCRings::e, i, localLog(), MAX_MSG_SIZE, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, nbSubProcesses_, reasonForFailedState_, stop_lock_, and subs_.
Referenced by halting(), and stopping().
{ MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP); MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP); std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false); for(unsigned int i = 0; i < subs_.size(); i++) { pthread_mutex_lock(&stop_lock_); if(subs_[i].alive()>0){ processes_to_stop[i] = true; subs_[i].post(msg,false); } pthread_mutex_unlock(&stop_lock_); } for(unsigned int i = 0; i < subs_.size(); i++) { pthread_mutex_lock(&stop_lock_); if(processes_to_stop[i]){ try{ subs_[i].rcv(msg1,false); } catch(evf::Exception &e){ std::ostringstream ost; ost << "failed to get STOP - errno ->" << errno << " " << e.what(); reasonForFailedState_ = ost.str(); LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_); // fsm_.fireFailed(reasonForFailedState_,this); localLog(reasonForFailedState_); pthread_mutex_unlock(&stop_lock_); continue; } } else { pthread_mutex_unlock(&stop_lock_); continue; } pthread_mutex_unlock(&stop_lock_); if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP) while(subs_[i].alive()>0) ::usleep(10000); subs_[i].disconnect(); } // subs_.clear(); }
void FUEventProcessor::subWeb | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 853 of file FUEventProcessor.cc.
References anonymousPipe_, gather_cfg::cout, run_regression::done, asciidump::els, i, recoMuon::in, j, Association::map, MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, mod(), lumiQueryAPI::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, dbtoconf::out, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), AlCaHLTBitMon_QueryRunRegistry::string, subs_, and superSleepSec_.
Referenced by FUEventProcessor().
{ using namespace cgicc; pid_t pid = 0; std::ostringstream ost; ost << "&"; Cgicc cgi(in); internal::MyCgi *mycgi = (internal::MyCgi*)in; for(std::map<std::string, std::string, std::less<std::string> >::iterator mit = mycgi->getEnvironment().begin(); mit != mycgi->getEnvironment().end(); mit++) ost << mit->first << "%" << mit->second << ";"; std::vector<FormEntry> els = cgi.getElements() ; std::vector<FormEntry> el1; cgi.getElement("method",el1); std::vector<FormEntry> el2; cgi.getElement("process",el2); if(el1.size()!=0) { std::string meth = el1[0].getValue(); if(el2.size()!=0) { unsigned int i = 0; std::string mod = el2[0].getValue(); pid = atoi(mod.c_str()); // get the process id to be polled for(; i < subs_.size(); i++) if(subs_[i].pid()==pid) break; if(i>=subs_.size()){ //process was not found, let the browser know *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl; return; } if(subs_[i].alive() != 1){ *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl; return; } MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB); strncpy(msg1->mtext,meth.c_str(),meth.length()); strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length()); subs_[i].post(msg1,true); unsigned int keep_supersleep_original_value = superSleepSec_.value_; superSleepSec_.value_=10*keep_supersleep_original_value; MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB); bool done = false; std::vector<char *>pieces; while(!done){ unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true); if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){ ::sleep(1); continue; } unsigned int nbytes = atoi(msg->mtext); if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop char *buf= new char[nbytes]; ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes); if(retval<0){ std::cout << "Failed to read from pipe." << std::endl; continue; } if(static_cast<unsigned int>(retval) != nbytes) std::cout << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl; pieces.push_back(buf); } superSleepSec_.value_=keep_supersleep_original_value; for(unsigned int j = 0; j < pieces.size(); j++){ *out<<pieces[j]; // chain the buffers into the output strstream delete[] pieces[j]; //make sure to release all buffers used for reading the pipe } } } }
bool FUEventProcessor::summarize | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1633 of file FUEventProcessor.cc.
References allProcStats_, gather_cfg::cout, cpustat_, alignCSCRings::e, evf::TriggerReportStatic::eventSummary, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, idleProcStats_, lastProcReport_, evf::TriggerReportStatic::lumiSection, master_message_trr_, MSQS_MESSAGE_TYPE_TRR, evf::TriggerReportStatic::nbExpected, evf::TriggerReportStatic::nbReporting, nbSubProcesses_, nbSubProcessesReporting_, evf::utils::procCpuStat(), ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), run_regression::ret, evf::CPUStat::sendStat(), evf::RateStat::sendStat(), evf::CPUStat::setCPUStat(), evf::CPUStat::setElapsed(), evf::CPUStat::setNproc(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), edm::EventSummary::totalEvents, evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.
Referenced by startSummarizeWorkLoop().
{ evtProcessor_.resetPackedTriggerReport(); bool atLeastOneProcessUpdatedSuccessfully = false; int msgCount = 0; for (unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()>0) { int ret = 0; if(subs_[i].check_postponed_trigger_update(master_message_trr_, evtProcessor_.getLumiSectionReferenceIndex())) { ret = MSQS_MESSAGE_TYPE_TRR; std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl; } else{ bool insync = false; bool exception_caught = false; while(!insync){ try{ ret = subs_[i].rcv(master_message_trr_,false); } catch(evf::Exception &e) { std::cout << "exception in msgrcv on " << i << " " << subs_[i].alive() << " " << strerror(errno) << std::endl; exception_caught = true; break; //do nothing special } if(ret==MSQS_MESSAGE_TYPE_TRR) { TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext; if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){ insync = true; } } } if(exception_caught) continue; } msgCount++; if(ret==MSQS_MESSAGE_TYPE_TRR) { TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext; if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){ std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl; subs_[i].add_postponed_trigger_update(master_message_trr_); }else{ atLeastOneProcessUpdatedSuccessfully = true; evtProcessor_.sumAndPackTriggerReport(master_message_trr_); } } else std::cout << "msgrcv returned error " << errno << std::endl; } } if(atLeastOneProcessUpdatedSuccessfully){ nbSubProcessesReporting_.value_ = msgCount; evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_; evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_; evtProcessor_.updateRollingReport(); evtProcessor_.fireScalersUpdate(); } else{ LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again"); if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement(); nbSubProcessesReporting_.value_ = 0; ::sleep(10); } if(fsm_.stateName()->toString()!="Enabled"){ wlScalersActive_ = false; return false; } // cpustat_->printStat(); if(iDieStatisticsGathering_.value_){ try{ unsigned long long idleTmp=idleProcStats_; unsigned long long allPSTmp=allProcStats_; idleProcStats_=allProcStats_=0; utils::procCpuStat(idleProcStats_,allProcStats_); timeval oldtime=lastProcReport_; gettimeofday(&lastProcReport_,0); if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) { cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp)); int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec) + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000; cpustat_->setElapsed(deltaTms); } else { cpustat_->setCPUStat(0); cpustat_->setElapsed(0); } TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct(); cpustat_ ->setNproc(trsp->eventSummary.totalEvents); cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex()); ratestat_->sendStat((unsigned char*)trsp, sizeof(TriggerReportStatic), evtProcessor_.getLumiSectionReferenceIndex()); }catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics" << e.what()); } } cpustat_->reset(); return true; }
bool FUEventProcessor::supervisor | ( | toolbox::task::WorkLoop * | wl | ) | [private] |
Definition at line 1253 of file FUEventProcessor.cc.
References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, crashesThisRun_, delta, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, alignCSCRings::e, edm_init_done_, enableMPEPSlave(), evtProcessor_, exception, Exception, spr::find(), evf::StateMachine::fireEvent(), forkInEDM_, fsm_, i, lastCrashTime_, localLog(), log_, evf::prg::ls, python::rootplot::utilities::ls(), master_message_prg_, master_message_prr_, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_RLR, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), NUMERIC_MESSAGE_SIZE, AlCaHLTBitMon_ParallelJobs::p, pickup_lock_, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), restartForkInEDM(), rlimit_coresize_changed_, rlimit_coresize_default_, findQualityFiles::rr, scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.
Referenced by startSupervisorLoop().
{ pthread_mutex_lock(&stop_lock_); if(subs_.size()!=nbSubProcesses_.value_) { pthread_mutex_lock(&pickup_lock_); if(subs_.size()!=nbSubProcesses_.value_) { subs_.resize(nbSubProcesses_.value_); spMStates_.resize(nbSubProcesses_.value_); spmStates_.resize(nbSubProcesses_.value_); for(unsigned int i = 0; i < spMStates_.size(); i++) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } pthread_mutex_unlock(&pickup_lock_); } bool running = fsm_.stateName()->toString()=="Enabled"; bool stopping = fsm_.stateName()->toString()=="stopping"; for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()==-1000) continue; int sl; pid_t sub_pid = subs_[i].pid(); pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG); if(killedOrNot && killedOrNot==sub_pid) { pthread_mutex_lock(&pickup_lock_); //check if out of range or recreated (enable can clear vector) if (i<subs_.size() && subs_[i].alive()!=-1000) { subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1)); std::ostringstream ost; if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl); else if(WIFSIGNALED(sl)!=0) { ost << " process terminated with signal " << WTERMSIG(sl); } else ost << " process stopped "; //report unexpected slave exit in stop //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) { // LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid()); //} subs_[i].countdown()=slaveRestartDelaySecs_.value_; subs_[i].setReasonForFailed(ost.str()); spMStates_[i] = evtProcessor_.notstarted_state_code(); spmStates_[i] = 0; std::ostringstream ost1; ost1 << "-E- Slave " << subs_[i].pid() << ost.str(); localLog(ost1.str()); if(!autoRestartSlaves_.value_) subs_[i].disconnect(); } pthread_mutex_unlock(&pickup_lock_); } } pthread_mutex_unlock(&stop_lock_); if(stopping) return true; // if in stopping we are done // check if we need to reset core dumps (15 min after last one) if (running && rlimit_coresize_changed_) { timeval newtv; gettimeofday(&newtv,0); int delta = newtv.tv_sec-lastCrashTime_.tv_sec; if (delta>60*15) { std::ostringstream ostr; ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits"; std::cout << ostr.str() << std::endl; LOG4CPLUS_INFO(getApplicationLogger(),ostr.str()); setrlimit(RLIMIT_CORE,&rlimit_coresize_default_); MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR); for (unsigned int i = 0; i < subs_.size(); i++) { try { if (subs_[i].alive()) subs_[i].post(master_message_rlr_,false); } catch (...) {} } rlimit_coresize_changed_=false; crashesThisRun_=0; } } if(running && edm_init_done_) { // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them // this is only active while running, hence, the stop lock is acquired and only released at end of loop if(autoRestartSlaves_.value_){ pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive() != 1){ if(subs_[i].countdown() == 0) { if(subs_[i].restartCount()>2){ LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i << " - maximum restart count reached"); std::ostringstream ost1; ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count"; localLog(ost1.str()); subs_[i].countdown()--; XCEPT_DECLARE(evf::Exception, sentinelException, ost1.str()); notifyQualified("error",sentinelException); subs_[i].disconnect(); continue; } subs_[i].restartCount()++; if (forkInEDM_.value_) { restartForkInEDM(i); } else { pid_t rr = subs_[i].forkNew(); if(rr==0) { myProcess_=&subs_[i]; scalersUpdates_ = 0; int retval = pthread_mutex_destroy(&stop_lock_); if(retval != 0) perror("error"); retval = pthread_mutex_init(&stop_lock_,0); if(retval != 0) perror("error"); fsm_.disableRcmsStateNotification(); fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition try{ xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex"); if(lsid) { ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died } } catch(...){ std::cout << "trouble with lsindex during restart" << std::endl; } try{ xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered"); if(lstb) { ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting } } catch(...){ std::cout << "trouble with resetting flag for eol recovery " << std::endl; } evtProcessor_.adjustLsIndexForRestart(); evtProcessor_.resetPackedTriggerReport(); enableMPEPSlave(); return false; // exit the supervisor loop immediately in the child !!! } else { std::ostringstream ost1; ost1 << "-I- New Process " << rr << " forked for slot " << i; localLog(ost1.str()); } } } if(subs_[i].countdown()>=0) subs_[i].countdown()--; } } pthread_mutex_unlock(&stop_lock_); } // finished handling replacement of dead slaves once they've been reaped } xdata::Serializable *lsid = 0; xdata::Serializable *psid = 0; xdata::Serializable *dqmp = 0; xdata::UnsignedInteger32 *dqm = 0; if(running && edm_init_done_){ try{ lsid = applicationInfoSpace_->find("lumiSectionIndex"); psid = applicationInfoSpace_->find("prescaleSetIndex"); nbProcessed = monitorInfoSpace_->find("nbProcessed"); nbAccepted = monitorInfoSpace_->find("nbAccepted"); dqmp = applicationInfoSpace_-> find("nbDqmUpdates"); } catch(xdata::exception::Exception e){ LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what()); } try{ if(nbProcessed !=0 && nbAccepted !=0) { xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed); xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted); xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid); xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid); if(dqmp!=0) dqm = (xdata::UnsignedInteger32*)dqmp; if(dqm) dqm->value_ = 0; nbTotalDQM_ = 0; nbp->value_ = 0; nba->value_ = 0; nblive_ = 0; nbdead_ = 0; scalersUpdates_ = 0; for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()>0) { nblive_++; try{ subs_[i].post(master_message_prg_,true); unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true); if(retval == (unsigned long) master_message_prr_->mtype){ prg* p = (struct prg*)(master_message_prr_->mtext); subs_[i].setParams(p); spMStates_[i] = p->Ms; spmStates_[i] = p->ms; cpustat_->addEntry(p->ms); if(!subs_[i].inInconsistentState() && (p->Ms == edm::event_processor::sError || p->Ms == edm::event_processor::sInvalid || p->Ms == edm::event_processor::sStopping)) { std::ostringstream ost; ost << "edm::eventprocessor slot " << i << " process id " << subs_[i].pid() << " not in Running state : Mstate=" << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate=" << evtProcessor_.moduleNameFromIndex(p->ms) << " - Look into possible error messages from HLT process"; LOG4CPLUS_WARN(getApplicationLogger(),ost.str()); } nbp->value_ += subs_[i].params().nbp; nba->value_ += subs_[i].params().nba; if(dqm)dqm->value_ += p->dqm; nbTotalDQM_ += p->dqm; scalersUpdates_ += p->trp; if(p->ls > ls->value_) ls->value_ = p->ls; if(p->ps != ps->value_) ps->value_ = p->ps; } else{ nbp->value_ += subs_[i].get_save_nbp(); nba->value_ += subs_[i].get_save_nba(); } } catch(evf::Exception &e){ LOG4CPLUS_INFO(getApplicationLogger(), "could not send/receive msg on slot " << i << " - " << e.what()); } } else { nbp->value_ += subs_[i].get_save_nbp(); nba->value_ += subs_[i].get_save_nba(); nbdead_++; } } if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells) for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].params().nbp == 0){ // a slave has processed 0 events // check that the process is not stuck if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0) { subs_[i].found_invalid();//increase the "found_invalid" counter if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal subs_[i].post(msg3,false); std::ostringstream ost1; ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it"; localLog(ost1.str()); LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str()); XCEPT_DECLARE(evf::Exception, sentinelException, ost1.str()); notifyQualified("error",sentinelException); } } } } } } } catch(std::exception &e){ LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what()); } catch(...){ LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception "); } } else{ for(unsigned int i = 0; i < subs_.size(); i++) { if(subs_[i].alive()==-1000) { spMStates_[i] = edm::event_processor::sInit; spmStates_[i] = 0; } } } try{ monitorInfoSpace_->lock(); monitorInfoSpace_->fireItemGroupChanged(names_,0); monitorInfoSpace_->unlock(); } catch(xdata::exception::Exception &e) { LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what()); // localLog(e.what()); } ::sleep(superSleepSec_.value_); return true; }
void FUEventProcessor::updater | ( | xgi::Input * | in, |
xgi::Output * | out | ||
) | throw (xgi::exception::Exception) |
Definition at line 2595 of file FUEventProcessor.cc.
References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, cpustat_, evtProcessor_, fsm_, evf::CPUStat::getChart(), evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, nbSubProcessesReporting_, dbtoconf::out, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.
Referenced by FUEventProcessor().
{ using namespace utils; *out << updaterStatic_; mDiv(out,"loads"); uptime(out); cDiv(out); mDiv(out,"st",fsm_.stateName()->toString()); mDiv(out,"ru",runNumber_.toString()); mDiv(out,"nsl",nbSubProcesses_.value_); mDiv(out,"nsr",nbSubProcessesReporting_.value_); mDiv(out,"cl"); *out << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " "); cDiv(out); mDiv(out,"in",getApplicationDescriptor()->getInstance()); if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){ mDiv(out,"hlt"); *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>"; cDiv(out); *out << std::endl; } else mDiv(out,"hlt","Not yet..."); mDiv(out,"sq",squidPresent_.toString()); mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized")); mDiv(out,"mwl",evtProcessor_.wlMonitoring()); if(nbProcessed != 0 && nbAccepted != 0) { mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_); mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_); } else { mDiv(out,"tt",0); mDiv(out,"ac",0); } if(!myProcess_) mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive")); else mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive")); mDiv(out,"idi",iDieUrl_.value_); if(vp_!=0){ mDiv(out,"vpi",(unsigned int) vp_); if(vulture_->hasStarted()>=0) mDiv(out,"vul","Prowling"); else mDiv(out,"vul","Dead"); } else{ mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching")); } if(evtProcessor_){ mDiv(out,"ll"); *out << evtProcessor_.lastLumi().ls << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc; cDiv(out); } mDiv(out,"lg"); for(unsigned int i = logRingIndex_; i<logRingSize_; i++) *out << logRing_[i] << std::endl; if(logWrap_) for(unsigned int i = 0; i<logRingIndex_; i++) *out << logRing_[i] << std::endl; cDiv(out); mDiv(out,"cha"); if(cpustat_) *out << cpustat_->getChart(); cDiv(out); }
evf::FUEventProcessor::XDAQ_INSTANTIATOR | ( | ) |
unsigned long long evf::FUEventProcessor::allProcStats_ [private] |
Definition at line 302 of file FUEventProcessor.h.
Referenced by enabling(), and summarize().
int evf::FUEventProcessor::anonymousPipe_[2] [private] |
Definition at line 270 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().
xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_ [private] |
Definition at line 248 of file FUEventProcessor.h.
Referenced by forkProcessesFromEDM(), FUEventProcessor(), makeStaticInfo(), receivingAndMonitor(), and supervisor().
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_ [private] |
Definition at line 232 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_ [private] |
Definition at line 235 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_ [private] |
Definition at line 262 of file FUEventProcessor.h.
Referenced by startScalersWorkLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asSignalMonitor_ [private] |
Definition at line 242 of file FUEventProcessor.h.
Referenced by startSignalMonitorWorkLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_ [private] |
Definition at line 268 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop().
toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_ [private] |
Definition at line 239 of file FUEventProcessor.h.
Referenced by startSupervisorLoop().
xdata::Boolean evf::FUEventProcessor::autoRestartSlaves_ [private] |
Definition at line 188 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), microState(), and supervisor().
xdata::String evf::FUEventProcessor::class_ [private] |
Definition at line 179 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::String evf::FUEventProcessor::configString_ [private] |
Definition at line 183 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and updater().
std::string evf::FUEventProcessor::configuration_ [private] |
Definition at line 184 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and spotlightWebPage().
CPUStat* evf::FUEventProcessor::cpustat_ [private] |
Definition at line 278 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), summarize(), supervisor(), and updater().
unsigned int evf::FUEventProcessor::crashesThisRun_ [private] |
Definition at line 294 of file FUEventProcessor.h.
Referenced by enabling(), sigmon(), and supervisor().
xdata::UnsignedInteger32 evf::FUEventProcessor::crashesToDump_ [private] |
Definition at line 297 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and sigmon().
Css evf::FUEventProcessor::css_ [private] |
Definition at line 210 of file FUEventProcessor.h.
Referenced by css().
xdata::Boolean evf::FUEventProcessor::datasetCounting_ [private] |
Definition at line 306 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and FUEventProcessor().
bool evf::FUEventProcessor::edm_init_done_ [private] |
Definition at line 292 of file FUEventProcessor.h.
Referenced by doEndRunInEDM(), enabling(), forkProcessesFromEDM(), and supervisor().
xdata::Boolean evf::FUEventProcessor::epInitialized_ [private] |
Definition at line 182 of file FUEventProcessor.h.
Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().
Definition at line 175 of file FUEventProcessor.h.
Referenced by actionPerformed(), attachDqmToShm(), configuring(), detachDqmFromShm(), doEndRunInEDM(), enableClassic(), enableCommon(), enableForkInEDM(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), FUEventProcessor(), halting(), microState(), moduleWeb(), pathNames(), receivingAndMonitor(), scalers(), scalersWeb(), serviceWeb(), setAttachDqmToShm(), spotlightWebPage(), stopClassic(), summarize(), supervisor(), and updater().
xdata::Boolean evf::FUEventProcessor::exitOnError_ [private] |
Definition at line 207 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
xdata::UnsignedInteger32 evf::FUEventProcessor::forkInEDM_ [private] |
Definition at line 223 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), halting(), stopping(), and supervisor().
Definition at line 289 of file FUEventProcessor.h.
Referenced by doEndRunInEDM(), enableForkInEDM(), forkProcessesFromEDM(), FUEventProcessor(), halting(), and restartForkInEDM().
pthread_mutex_t evf::FUEventProcessor::forkObjLock_ [private] |
Definition at line 290 of file FUEventProcessor.h.
Referenced by enableForkInEDM(), and FUEventProcessor().
evf::StateMachine evf::FUEventProcessor::fsm_ [private] |
Definition at line 169 of file FUEventProcessor.h.
Referenced by actionPerformed(), configuring(), doEndRunInEDM(), enableCommon(), enableForkInEDM(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), fsmCallback(), FUEventProcessor(), halting(), microState(), receiving(), receivingAndMonitor(), sigmon(), spotlightWebPage(), stopClassic(), summarize(), supervisor(), and updater().
xdata::Boolean evf::FUEventProcessor::hasModuleWebRegistry_ [private] |
Definition at line 193 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().
xdata::Boolean evf::FUEventProcessor::hasPrescaleService_ [private] |
Definition at line 192 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::hasServiceWebRegistry_ [private] |
Definition at line 194 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().
xdata::Boolean evf::FUEventProcessor::hasShMem_ [private] |
Definition at line 191 of file FUEventProcessor.h.
Referenced by enableCommon(), forkProcessesFromEDM(), FUEventProcessor(), makeStaticInfo(), stopClassic(), and stopping().
xdata::Boolean evf::FUEventProcessor::iDieStatisticsGathering_ [private] |
Definition at line 196 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().
xdata::String evf::FUEventProcessor::iDieUrl_ [private] |
Definition at line 275 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and updater().
unsigned long long evf::FUEventProcessor::idleProcStats_ [private] |
Definition at line 301 of file FUEventProcessor.h.
Referenced by enabling(), and summarize().
xdata::UnsignedInteger32 evf::FUEventProcessor::instance_ [private] |
Definition at line 180 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::isRunNumberSetter_ [private] |
Definition at line 195 of file FUEventProcessor.h.
Referenced by enableCommon(), enableForkInEDM(), and FUEventProcessor().
timeval evf::FUEventProcessor::lastCrashTime_ [private] |
Definition at line 299 of file FUEventProcessor.h.
Referenced by sigmon(), and supervisor().
timeval evf::FUEventProcessor::lastProcReport_ [private] |
Definition at line 303 of file FUEventProcessor.h.
Referenced by summarize().
Logger evf::FUEventProcessor::log_ [private] |
Definition at line 172 of file FUEventProcessor.h.
Referenced by doEndRunInEDM(), enableForkInEDM(), restartForkInEDM(), and supervisor().
std::vector<std::string> evf::FUEventProcessor::logRing_ [private] |
Definition at line 216 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
unsigned int evf::FUEventProcessor::logRingIndex_ [private] |
Definition at line 217 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
const unsigned int evf::FUEventProcessor::logRingSize_ = 50 [static, private] |
Definition at line 218 of file FUEventProcessor.h.
Referenced by localLog(), and updater().
bool evf::FUEventProcessor::logWrap_ [private] |
Definition at line 219 of file FUEventProcessor.h.
Referenced by localLog(), logsAsString(), and updater().
Definition at line 283 of file FUEventProcessor.h.
Referenced by supervisor().
Definition at line 284 of file FUEventProcessor.h.
Referenced by supervisor().
Definition at line 287 of file FUEventProcessor.h.
Referenced by summarize().
std::auto_ptr<edm::Presence> evf::FUEventProcessor::messageServicePresence_ [private] |
Definition at line 305 of file FUEventProcessor.h.
Referenced by forkProcessesFromEDM(), FUEventProcessor(), and receiving().
xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_ [private] |
Definition at line 246 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
xdata::InfoSpace* evf::FUEventProcessor::monitorLegendaInfoSpace_ [private] |
Definition at line 247 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
ModuleWebRegistry* evf::FUEventProcessor::mwrRef_ [private] |
Definition at line 281 of file FUEventProcessor.h.
Referenced by configuring(), enableForkInEDM(), and enabling().
SubProcess* evf::FUEventProcessor::myProcess_ [private] |
Definition at line 237 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), microState(), receiving(), receivingAndMonitor(), scalers(), sendMessageOverMonitorQueue(), spotlightWebPage(), supervisor(), and updater().
std::list<std::string> evf::FUEventProcessor::names_ [private] |
Definition at line 274 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
xdata::Serializable* evf::FUEventProcessor::nbAccepted [private] |
Definition at line 254 of file FUEventProcessor.h.
Referenced by supervisor(), and updater().
unsigned int evf::FUEventProcessor::nbdead_ [private] |
Definition at line 226 of file FUEventProcessor.h.
Referenced by microState(), and supervisor().
unsigned int evf::FUEventProcessor::nblive_ [private] |
Definition at line 225 of file FUEventProcessor.h.
Referenced by microState(), and supervisor().
xdata::Serializable* evf::FUEventProcessor::nbProcessed [private] |
Definition at line 253 of file FUEventProcessor.h.
Referenced by supervisor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_ [private] |
Definition at line 221 of file FUEventProcessor.h.
Referenced by configuring(), defaultWebPage(), enabling(), forkProcessesFromEDM(), FUEventProcessor(), halting(), microState(), spotlightWebPage(), stopping(), stopSlavesAndAcknowledge(), summarize(), supervisor(), and updater().
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_ [private] |
Definition at line 222 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), summarize(), and updater().
unsigned int evf::FUEventProcessor::nbTotalDQM_ [private] |
Definition at line 228 of file FUEventProcessor.h.
Referenced by enabling(), microState(), and supervisor().
bool evf::FUEventProcessor::outprev_ [private] |
Definition at line 197 of file FUEventProcessor.h.
Referenced by actionPerformed().
xdata::Boolean evf::FUEventProcessor::outPut_ [private] |
Definition at line 186 of file FUEventProcessor.h.
Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().
pthread_mutex_t evf::FUEventProcessor::pickup_lock_ [private] |
Definition at line 251 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), microState(), and supervisor().
RateStat* evf::FUEventProcessor::ratestat_ [private] |
Definition at line 279 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and summarize().
std::string evf::FUEventProcessor::reasonForFailedState_ [private] |
Definition at line 213 of file FUEventProcessor.h.
Referenced by configuring(), enableCommon(), enableForkInEDM(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), halting(), stopClassic(), and stopSlavesAndAcknowledge().
bool evf::FUEventProcessor::receiving_ [private] |
Definition at line 233 of file FUEventProcessor.h.
Referenced by startReceivingLoop().
bool evf::FUEventProcessor::receivingM_ [private] |
Definition at line 236 of file FUEventProcessor.h.
Referenced by startReceivingMonitorLoop().
bool evf::FUEventProcessor::restart_in_progress_ [private] |
Definition at line 291 of file FUEventProcessor.h.
Referenced by forkProcessesFromEDM(), and restartForkInEDM().
bool evf::FUEventProcessor::rlimit_coresize_changed_ [private] |
Definition at line 295 of file FUEventProcessor.h.
Referenced by enabling(), handleSignalSlave(), receiving(), sigmon(), and supervisor().
rlimit evf::FUEventProcessor::rlimit_coresize_default_ [private] |
Definition at line 296 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), halting(), receiving(), stopping(), and supervisor().
xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_ [private] |
Definition at line 181 of file FUEventProcessor.h.
Referenced by enableCommon(), enableForkInEDM(), enabling(), FUEventProcessor(), and updater().
xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_ [private] |
Definition at line 257 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::InfoSpace* evf::FUEventProcessor::scalersLegendaInfoSpace_ [private] |
Definition at line 258 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), and pathNames().
unsigned int evf::FUEventProcessor::scalersUpdates_ [private] |
Definition at line 264 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), receivingAndMonitor(), scalers(), and supervisor().
sem_t* evf::FUEventProcessor::sigmon_sem_ [private] |
Definition at line 298 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), halting(), handleSignalSlave(), sigmon(), and stopping().
bool evf::FUEventProcessor::signalMonitorActive_ [private] |
Definition at line 243 of file FUEventProcessor.h.
Referenced by sigmon(), and startSignalMonitorWorkLoop().
Definition at line 286 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
Definition at line 285 of file FUEventProcessor.h.
Referenced by receivingAndMonitor().
xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_ [private] |
Definition at line 189 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and supervisor().
Definition at line 282 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and forkProcessesFromEDM().
std::string evf::FUEventProcessor::sourceId_ [private] |
Definition at line 200 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::Vector<xdata::Integer> evf::FUEventProcessor::spMStates_ [private] |
Definition at line 271 of file FUEventProcessor.h.
Referenced by configuring(), FUEventProcessor(), and supervisor().
xdata::Vector<xdata::Integer> evf::FUEventProcessor::spmStates_ [private] |
Definition at line 272 of file FUEventProcessor.h.
Referenced by configuring(), FUEventProcessor(), and supervisor().
SquidNet evf::FUEventProcessor::squidnet_ [private] |
Definition at line 215 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
xdata::Boolean evf::FUEventProcessor::squidPresent_ [private] |
Definition at line 203 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), and updater().
pthread_mutex_t evf::FUEventProcessor::start_lock_ [private] |
Definition at line 250 of file FUEventProcessor.h.
Referenced by enabling(), FUEventProcessor(), and microState().
pthread_mutex_t evf::FUEventProcessor::stop_lock_ [private] |
Definition at line 249 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), FUEventProcessor(), receiving(), receivingAndMonitor(), stopSlavesAndAcknowledge(), and supervisor().
std::vector<SubProcess> evf::FUEventProcessor::subs_ [private] |
Definition at line 224 of file FUEventProcessor.h.
Referenced by enabling(), forkProcessesFromEDM(), getSlavePids(), microState(), sigmon(), stopSlavesAndAcknowledge(), subWeb(), summarize(), and supervisor().
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_ [private] |
Definition at line 273 of file FUEventProcessor.h.
Referenced by FUEventProcessor(), subWeb(), and supervisor().
bool evf::FUEventProcessor::supervising_ [private] |
Definition at line 240 of file FUEventProcessor.h.
Referenced by startSupervisorLoop(), and updater().
std::string evf::FUEventProcessor::updaterStatic_ [private] |
Definition at line 252 of file FUEventProcessor.h.
Referenced by makeStaticInfo(), and updater().
xdata::String evf::FUEventProcessor::url_ [private] |
Definition at line 178 of file FUEventProcessor.h.
Referenced by FUEventProcessor().
pid_t evf::FUEventProcessor::vp_ [private] |
Definition at line 277 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), and updater().
Vulture* evf::FUEventProcessor::vulture_ [private] |
Definition at line 276 of file FUEventProcessor.h.
Referenced by configuring(), enabling(), FUEventProcessor(), stopping(), updater(), and ~FUEventProcessor().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_ [private] |
Definition at line 231 of file FUEventProcessor.h.
Referenced by forkProcessesFromEDM(), and startReceivingLoop().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_ [private] |
Definition at line 234 of file FUEventProcessor.h.
Referenced by forkProcessesFromEDM(), and startReceivingMonitorLoop().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_ [private] |
Definition at line 261 of file FUEventProcessor.h.
Referenced by startScalersWorkLoop().
bool evf::FUEventProcessor::wlScalersActive_ [private] |
Definition at line 263 of file FUEventProcessor.h.
Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlSignalMonitor_ [private] |
Definition at line 241 of file FUEventProcessor.h.
Referenced by startSignalMonitorWorkLoop().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_ [private] |
Definition at line 267 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop().
bool evf::FUEventProcessor::wlSummarizeActive_ [private] |
Definition at line 269 of file FUEventProcessor.h.
Referenced by startSummarizeWorkLoop(), and updater().
toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_ [private] |
Definition at line 238 of file FUEventProcessor.h.
Referenced by startSupervisorLoop().