00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029
00030 #include "toolbox/BSem.h"
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034
00035 #include <boost/tokenizer.hpp>
00036
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043
00044
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054
00055
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059 namespace mem {
00060 extern toolbox::BSem * _s_mutex_ptr_;
00061 }
00062 }
00063
00064
00065 namespace evf {
00066 FUEventProcessor * FUInstancePtr_;
00067 int evfep_raised_signal;
00068 void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069 {
00070 evfep_raised_signal=sig;
00071 FUInstancePtr_->handleSignalSlave(sig, info, c);
00072 }
00073 void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074 {
00075 if (evfep_raised_signal) {
00076 signal(evfep_raised_signal,SIG_DFL);
00077 raise(evfep_raised_signal);
00078 }
00079 }
00080 }
00081
00083
00085
00086
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00088 : xdaq::Application(s)
00089 , fsm_(this)
00090 , log_(getApplicationLogger())
00091 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092 , runNumber_(0)
00093 , epInitialized_(false)
00094 , outPut_(true)
00095 , autoRestartSlaves_(false)
00096 , slaveRestartDelaySecs_(10)
00097 , hasShMem_(true)
00098 , hasPrescaleService_(true)
00099 , hasModuleWebRegistry_(true)
00100 , hasServiceWebRegistry_(true)
00101 , isRunNumberSetter_(true)
00102 , iDieStatisticsGathering_(false)
00103 , outprev_(true)
00104 , exitOnError_(true)
00105 , reasonForFailedState_()
00106 , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00107 , logRing_(logRingSize_)
00108 , logRingIndex_(logRingSize_)
00109 , logWrap_(false)
00110 , nbSubProcesses_(0)
00111 , nbSubProcessesReporting_(0)
00112 , forkInEDM_(true)
00113 , nblive_(0)
00114 , nbdead_(0)
00115 , nbTotalDQM_(0)
00116 , wlReceiving_(0)
00117 , asReceiveMsgAndExecute_(0)
00118 , receiving_(false)
00119 , wlReceivingMonitor_(0)
00120 , asReceiveMsgAndRead_(0)
00121 , receivingM_(false)
00122 , myProcess_(0)
00123 , wlSupervising_(0)
00124 , asSupervisor_(0)
00125 , supervising_(false)
00126 , wlSignalMonitor_(0)
00127 , asSignalMonitor_(0)
00128 , signalMonitorActive_(false)
00129 , monitorInfoSpace_(0)
00130 , monitorLegendaInfoSpace_(0)
00131 , applicationInfoSpace_(0)
00132 , nbProcessed(0)
00133 , nbAccepted(0)
00134 , scalersInfoSpace_(0)
00135 , scalersLegendaInfoSpace_(0)
00136 , wlScalers_(0)
00137 , asScalers_(0)
00138 , wlScalersActive_(false)
00139 , scalersUpdates_(0)
00140 , wlSummarize_(0)
00141 , asSummarize_(0)
00142 , wlSummarizeActive_(false)
00143 , superSleepSec_(1)
00144 , iDieUrl_("none")
00145 , vulture_(0)
00146 , vp_(0)
00147 , cpustat_(0)
00148 , ratestat_(0)
00149 , mwrRef_(nullptr)
00150 , sorRef_(nullptr)
00151 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155 , edm_init_done_(true)
00156 , crashesThisRun_(false)
00157 , rlimit_coresize_changed_(false)
00158 , crashesToDump_(2)
00159 , sigmon_sem_(0)
00160 {
00161 using namespace utils;
00162
00163 FUInstancePtr_=this;
00164
00165 names_.push_back("nbProcessed" );
00166 names_.push_back("nbAccepted" );
00167 names_.push_back("epMacroStateInt");
00168 names_.push_back("epMicroStateInt");
00169
00170 int retpipe = pipe(anonymousPipe_);
00171 if(retpipe != 0)
00172 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00173
00174 squidPresent_ = squidnet_.check();
00175
00176 evtProcessor_.setAppDesc(getApplicationDescriptor());
00177 evtProcessor_.setAppCtxt(getApplicationContext());
00178
00179 fsm_.initialize<evf::FUEventProcessor>(this);
00180
00181
00182 url_ =
00183 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00184 getApplicationDescriptor()->getURN();
00185 class_ =getApplicationDescriptor()->getClassName();
00186 instance_=getApplicationDescriptor()->getInstance();
00187 sourceId_=class_.toString()+"_"+instance_.toString();
00188 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00189 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00190
00191 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00192
00193 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00194 applicationInfoSpace_ = ispace;
00195
00196
00197 ispace->fireItemAvailable("parameterSet", &configString_ );
00198 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00199 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00200 ispace->fireItemAvailable("runNumber", &runNumber_ );
00201 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00202
00203 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00204 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00205 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00206 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00207 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00208 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00209 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00210 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00211 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00212 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00213 ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
00214 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00215 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00216 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00217 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00218 ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
00219
00220
00221 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00222 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00223
00224
00225 fsm_.findRcmsStateListener();
00226
00227
00228
00229 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00230 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00231 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00232
00233 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00234 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00235 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00236
00237
00238 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00239 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00240 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00241 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00242 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00243
00244 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00245
00246 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00247 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00248 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00249
00250 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00251 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00252 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00253
00254
00255
00256 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00257 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00258
00259 evtProcessor_.setApplicationInfoSpace(ispace);
00260 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00261 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00262
00263
00264 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00265 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00266
00267
00268 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00269 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00270 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00271 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00272 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00273 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00274 xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
00275 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00276 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00277 xgi::bind(this, &FUEventProcessor::microState, "microState");
00278 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00279 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00280
00281
00282
00283 edm::AssertHandler ah;
00284
00285 try{
00286 LOG4CPLUS_DEBUG(getApplicationLogger(),
00287 "Trying to create message service presence ");
00288 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00289 if(pf != 0) {
00290 pf->makePresence("MessageServicePresence").release();
00291 }
00292 else {
00293 LOG4CPLUS_ERROR(getApplicationLogger(),
00294 "Unable to create message service presence ");
00295 }
00296 }
00297 catch(...) {
00298 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00299 }
00300 ML::MLlog4cplus::setAppl(this);
00301
00302 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00303 typedef AppDescSet_t::iterator AppDescIter_t;
00304
00305 AppDescSet_t rcms=
00306 getApplicationContext()->getDefaultZone()->
00307 getApplicationDescriptors("RCMSStateListener");
00308 if(rcms.size()==0)
00309 {
00310 LOG4CPLUS_WARN(getApplicationLogger(),
00311 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00312
00313 }
00314 else
00315 {
00316 AppDescIter_t it = rcms.begin();
00317 evtProcessor_.setRcms(*it);
00318 }
00319 pthread_mutex_init(&start_lock_,0);
00320 pthread_mutex_init(&stop_lock_,0);
00321 pthread_mutex_init(&pickup_lock_,0);
00322
00323 forkInfoObj_=nullptr;
00324 pthread_mutex_init(&forkObjLock_,0);
00325 makeStaticInfo();
00326 startSupervisorLoop();
00327
00328 if(vulture_==0) vulture_ = new Vulture(true);
00329
00331
00332 AppDescSet_t setOfiDie=
00333 getApplicationContext()->getDefaultZone()->
00334 getApplicationDescriptors("evf::iDie");
00335
00336 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00337 if ((*it)->getInstance()==0)
00338 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00339
00340
00341 getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00342
00343
00344 #ifdef linux
00345 if (sigmon_sem_==0) {
00346 sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00347 PROT_READ | PROT_WRITE,
00348 MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00349 if (!sigmon_sem_) {
00350 perror("mmap error\n");
00351 std::cout << "mmap error"<<std::endl;
00352 }
00353 else
00354 sem_init(sigmon_sem_,true,0);
00355 }
00356 #endif
00357 }
00358
00359
00360
00361
00362 FUEventProcessor::~FUEventProcessor()
00363 {
00364
00365
00366
00367 if(vulture_ != 0) delete vulture_;
00368 }
00369
00370
00372
00374
00375
00376
00377 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00378 {
00379
00380
00381
00382
00383
00384 unsigned short smap
00385 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00386 + (((instance_.value_%80)==0) ? 0x8 : 0)
00387 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00388 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00389 + (hasPrescaleService_.value_ ? 0x1 : 0);
00390 if(nbSubProcesses_.value_==0)
00391 {
00392 spMStates_.setSize(1);
00393 spmStates_.setSize(1);
00394 }
00395 else
00396 {
00397 spMStates_.setSize(nbSubProcesses_.value_);
00398 spmStates_.setSize(nbSubProcesses_.value_);
00399 for(unsigned int i = 0; i < spMStates_.size(); i++)
00400 {
00401 spMStates_[i] = edm::event_processor::sInit;
00402 spmStates_[i] = 0;
00403 }
00404 }
00405 try {
00406 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00407 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00408 epInitialized_=true;
00409 if(evtProcessor_)
00410 {
00411
00412 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00413 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00414
00415 configuration_ = evtProcessor_.configuration();
00416 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00417 evtProcessor_->beginJob();
00418 if(cpustat_) {delete cpustat_; cpustat_=0;}
00419 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00420 nbSubProcesses_.value_,
00421 instance_.value_,
00422 iDieUrl_.value_);
00423 if(ratestat_) {delete ratestat_; ratestat_=0;}
00424 ratestat_ = new RateStat(iDieUrl_.value_);
00425 if(iDieStatisticsGathering_.value_){
00426 try{
00427 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00428 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00429 if(legenda !=0){
00430 std::string slegenda = ((xdata::String*)legenda)->value_;
00431 ratestat_->sendLegenda(slegenda);
00432 }
00433
00434 }
00435 catch(evf::Exception &e){
00436 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00437 << e.what());
00438 }
00439 }
00440
00441 fsm_.fireEvent("ConfigureDone",this);
00442 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00443 localLog("-I- Configuration completed");
00444
00445 }
00446 }
00447 catch (xcept::Exception &e) {
00448 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00449 fsm_.fireFailed(reasonForFailedState_,this);
00450 localLog(reasonForFailedState_);
00451 }
00452 catch(cms::Exception &e) {
00453 reasonForFailedState_ = e.explainSelf();
00454 fsm_.fireFailed(reasonForFailedState_,this);
00455 localLog(reasonForFailedState_);
00456 }
00457 catch(std::exception &e) {
00458 reasonForFailedState_ = e.what();
00459 fsm_.fireFailed(reasonForFailedState_,this);
00460 localLog(reasonForFailedState_);
00461 }
00462 catch(...) {
00463 fsm_.fireFailed("Unknown Exception",this);
00464 }
00465
00466 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00467
00468 return false;
00469 }
00470
00471
00472
00473
00474
00475 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00476 {
00477 nbTotalDQM_ = 0;
00478 scalersUpdates_ = 0;
00479 idleProcStats_ = 0;
00480 allProcStats_ = 0;
00481
00482
00483
00484
00485
00486 unsigned short smap
00487 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00488 + (((instance_.value_%80)==0) ? 0x8 : 0)
00489 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00490 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00491 + (hasPrescaleService_.value_ ? 0x1 : 0);
00492
00493 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00494
00495
00496 if (rlimit_coresize_changed_)
00497 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00498 rlimit_coresize_changed_=false;
00499 crashesThisRun_=0;
00500
00501 sem_destroy(sigmon_sem_);
00502 sem_init(sigmon_sem_,true,0);
00503
00504 if(!epInitialized_){
00505 evtProcessor_.forceInitEventProcessorMaybe();
00506 }
00507 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00508
00509 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00510 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00511
00512 if(!epInitialized_){
00513 evtProcessor_->beginJob();
00514 if(cpustat_) {delete cpustat_; cpustat_=0;}
00515 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00516 nbSubProcesses_.value_,
00517 instance_.value_,
00518 iDieUrl_.value_);
00519 if(ratestat_) {delete ratestat_; ratestat_=0;}
00520 ratestat_ = new RateStat(iDieUrl_.value_);
00521 if(iDieStatisticsGathering_.value_){
00522 try
00523 {
00524 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00525 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00526 if(legenda !=0)
00527 {
00528 std::string slegenda = ((xdata::String*)legenda)->value_;
00529 ratestat_->sendLegenda(slegenda);
00530 }
00531 }
00532 catch(evf::Exception &e)
00533 {
00534 LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00535 << e.what());
00536 }
00537 catch (xcept::Exception& e) {
00538 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00539 << e.what());
00540 }
00541 }
00542 epInitialized_ = true;
00543 }
00544 configuration_ = evtProcessor_.configuration();
00545 evtProcessor_.resetLumiSectionReferenceIndex();
00546
00547 if(nbSubProcesses_.value_==0) return enableClassic();
00548
00549 pthread_mutex_lock(&start_lock_);
00550 pthread_mutex_lock(&pickup_lock_);
00551 subs_.clear();
00552 subs_.resize(nbSubProcesses_.value_);
00553 pid_t retval = -1;
00554
00555 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00556 {
00557 subs_[i]=SubProcess(i,retval);
00558 }
00559 pthread_mutex_unlock(&pickup_lock_);
00560
00561 pthread_mutex_unlock(&start_lock_);
00562
00563
00564 try {
00565 if (sorRef_) {
00566 unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00567 if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;
00568 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00569 for (unsigned int i=0;i<shmOutputs.size();i++) {
00570 shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00571 }
00572 }
00573 }
00574 catch (...)
00575 {
00576 LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00577 }
00578
00579
00580 edm_init_done_=true;
00581 if (forkInEDM_.value_) {
00582 edm_init_done_=false;
00583 enableForkInEDM();
00584 }
00585 else
00586 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00587 {
00588 retval = subs_[i].forkNew();
00589 if(retval==0)
00590 {
00591 myProcess_ = &subs_[i];
00592
00593 delete toolbox::mem::_s_mutex_ptr_;
00594 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00595 int retval = pthread_mutex_destroy(&stop_lock_);
00596 if(retval != 0) perror("error");
00597 retval = pthread_mutex_init(&stop_lock_,0);
00598 if(retval != 0) perror("error");
00599 fsm_.disableRcmsStateNotification();
00600
00601 return enableMPEPSlave();
00602
00603 }
00604 }
00605
00606 if (forkInEDM_.value_) {
00607
00608 edm::event_processor::State st;
00609 while (!edm_init_done_) {
00610 usleep(10000);
00611 st = evtProcessor_->getState();
00612 if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00613 }
00614 st = evtProcessor_->getState();
00615
00616 if (st!=edm::event_processor::sRunning) {
00617 reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00618 localLog(reasonForFailedState_);
00619 fsm_.fireFailed(reasonForFailedState_,this);
00620 return false;
00621 }
00622
00623 sleep(1);
00624 startSummarizeWorkLoop();
00625 startSignalMonitorWorkLoop();
00626 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00627
00628
00629 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00630 fsm_.fireEvent("EnableDone",this);
00631 localLog("-I- Start completed");
00632 return false;
00633 }
00634
00635 startSummarizeWorkLoop();
00636 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00637 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00638 fsm_.fireEvent("EnableDone",this);
00639 localLog("-I- Start completed");
00640 return false;
00641 }
00642
00643 bool FUEventProcessor::doEndRunInEDM() {
00644
00645 if (forkInfoObj_) {
00646
00647 int count = 30;
00648 bool waitedForEDM=false;
00649 while (!edm_init_done_ && count) {
00650 ::sleep(1);
00651 if (count%5==0)
00652 LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00653 count--;
00654 waitedForEDM=true;
00655 }
00656
00657 if (waitedForEDM) sleep(5);
00658
00659
00660
00661 if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00662 return true;
00663
00664 forkInfoObj_->lock();
00665 forkInfoObj_->stopCondition=true;
00666 sem_post(forkInfoObj_->control_sem_);
00667 forkInfoObj_->unlock();
00668
00669 count = 30;
00670
00671 edm::event_processor::State st;
00672 while(count--) {
00673 st = evtProcessor_->getState();
00674 if (st==edm::event_processor::sRunning) ::usleep(100000);
00675 else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00676 break;
00677 }
00678 else {
00679 std::ostringstream ost;
00680 ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00681 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00682 fsm_.fireFailed(ost.str(),this);
00683 return false;
00684 }
00685 if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00686 forkInfoObj_->lock();
00687 forkInfoObj_->stopCondition=true;
00688 sem_post(forkInfoObj_->control_sem_);
00689 forkInfoObj_->unlock();
00690 LOG4CPLUS_WARN(getApplicationLogger(),
00691 "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00692 }
00693 }
00694 if (count<0) {
00695 std::ostringstream ost;
00696 if (!forkInfoObj_->receivedStop_)
00697 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00698 else
00699 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00700 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00701 fsm_.fireFailed(ost.str(),this);
00702 return false;
00703 }
00704 }
00705 return true;
00706 }
00707
00708
00709 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00710 {
00711 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00712 if(nbSubProcesses_.value_!=0) {
00713 stopSlavesAndAcknowledge();
00714 if (forkInEDM_.value_) {
00715
00716 sem_post(sigmon_sem_);
00717 doEndRunInEDM();
00718 }
00719 }
00720 vulture_->stop();
00721
00722 if (forkInEDM_.value_) {
00723
00724 bool tmpHasShMem_=hasShMem_;
00725 hasShMem_=false;
00726 stopClassic();
00727 hasShMem_=tmpHasShMem_;
00728 return false;
00729 }
00730 stopClassic();
00731 return false;
00732 }
00733
00734
00735
00736 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00737 {
00738 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00739 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00740 if(nbSubProcesses_.value_!=0) {
00741 stopSlavesAndAcknowledge();
00742 if (forkInEDM_.value_) {
00743 sem_post(sigmon_sem_);
00744 doEndRunInEDM();
00745 }
00746 }
00747 try{
00748 evtProcessor_.stopAndHalt();
00749
00750 if (forkInfoObj_) {
00751 delete forkInfoObj_;
00752 forkInfoObj_=0;
00753 }
00754 }
00755 catch (evf::Exception &e) {
00756 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00757 localLog(reasonForFailedState_);
00758 fsm_.fireFailed(reasonForFailedState_,this);
00759 }
00760
00761
00762 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00763 fsm_.fireEvent("HaltDone",this);
00764
00765 localLog("-I- Halt completed");
00766 return false;
00767 }
00768
00769
00770
00771 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00772 throw (xoap::exception::Exception)
00773 {
00774 return fsm_.commandCallback(msg);
00775 }
00776
00777
00778
00779 void FUEventProcessor::actionPerformed(xdata::Event& e)
00780 {
00781
00782 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00783 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00784
00785 if ( item == "parameterSet") {
00786 LOG4CPLUS_WARN(getApplicationLogger(),
00787 "HLT Menu changed, force reinitialization of EventProcessor");
00788 epInitialized_ = false;
00789 }
00790 if ( item == "outputEnabled") {
00791 if(outprev_ != outPut_) {
00792 LOG4CPLUS_WARN(getApplicationLogger(),
00793 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00794 evtProcessor_->enableEndPaths(outPut_);
00795 outprev_ = outPut_;
00796 }
00797 }
00798 if (item == "globalInputPrescale") {
00799 LOG4CPLUS_WARN(getApplicationLogger(),
00800 "Setting global input prescale has no effect "
00801 <<"in this version of the code");
00802 }
00803 if ( item == "globalOutputPrescale") {
00804 LOG4CPLUS_WARN(getApplicationLogger(),
00805 "Setting global output prescale has no effect "
00806 <<"in this version of the code");
00807 }
00808 }
00809
00810 }
00811
00812
00813 void FUEventProcessor::getSlavePids(xgi::Input *in, xgi::Output *out)
00814 {
00815 for (unsigned int i=0;i<subs_.size();i++)
00816 {
00817 if (i!=0) *out << ",";
00818 *out << subs_[i].pid();
00819 }
00820 }
00821
00822 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00823 {
00824 using namespace cgicc;
00825 pid_t pid = 0;
00826 std::ostringstream ost;
00827 ost << "&";
00828
00829 Cgicc cgi(in);
00830 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00831 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00832 mycgi->getEnvironment().begin();
00833 mit != mycgi->getEnvironment().end(); mit++)
00834 ost << mit->first << "%" << mit->second << ";";
00835 std::vector<FormEntry> els = cgi.getElements() ;
00836 std::vector<FormEntry> el1;
00837 cgi.getElement("method",el1);
00838 std::vector<FormEntry> el2;
00839 cgi.getElement("process",el2);
00840 if(el1.size()!=0) {
00841 std::string meth = el1[0].getValue();
00842 if(el2.size()!=0) {
00843 unsigned int i = 0;
00844 std::string mod = el2[0].getValue();
00845 pid = atoi(mod.c_str());
00846 for(; i < subs_.size(); i++)
00847 if(subs_[i].pid()==pid) break;
00848 if(i>=subs_.size()){
00849 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00850 return;
00851 }
00852 if(subs_[i].alive() != 1){
00853 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00854 return;
00855 }
00856 MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00857 strncpy(msg1->mtext,meth.c_str(),meth.length());
00858 strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00859 subs_[i].post(msg1,true);
00860 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00861 superSleepSec_.value_=10*keep_supersleep_original_value;
00862 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00863 bool done = false;
00864 std::vector<char *>pieces;
00865 while(!done){
00866 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00867 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00868 ::sleep(1);
00869 continue;
00870 }
00871 unsigned int nbytes = atoi(msg->mtext);
00872 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00873 char *buf= new char[nbytes];
00874 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00875 if(retval!=nbytes) std::cout
00876 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00877 pieces.push_back(buf);
00878 }
00879 superSleepSec_.value_=keep_supersleep_original_value;
00880 for(unsigned int j = 0; j < pieces.size(); j++){
00881 *out<<pieces[j];
00882 delete[] pieces[j];
00883 }
00884 }
00885 }
00886 }
00887
00888
00889
00890 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00891 throw (xgi::exception::Exception)
00892 {
00893
00894
00895 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00896 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00897 << getApplicationDescriptor()->getInstance() << "</title>"
00898 << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00899 << "</head></html>";
00900 }
00901
00902
00903
00904
00905
00906 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00907 throw (xgi::exception::Exception)
00908 {
00909
00910 std::string urn = getApplicationDescriptor()->getURN();
00911
00912 *out << "<!-- base href=\"/" << urn
00913 << "\"> -->" << std::endl;
00914 *out << "<html>" << std::endl;
00915 *out << "<head>" << std::endl;
00916 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00917 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00918 *out << "<title>" << getApplicationDescriptor()->getClassName()
00919 << getApplicationDescriptor()->getInstance()
00920 << " MAIN</title>" << std::endl;
00921 *out << "</head>" << std::endl;
00922 *out << "<body>" << std::endl;
00923 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00924 *out << "<tr>" << std::endl;
00925 *out << " <td align=\"left\">" << std::endl;
00926 *out << " <img" << std::endl;
00927 *out << " align=\"middle\"" << std::endl;
00928 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00929 *out << " alt=\"main\"" << std::endl;
00930 *out << " width=\"64\"" << std::endl;
00931 *out << " height=\"64\"" << std::endl;
00932 *out << " border=\"\"/>" << std::endl;
00933 *out << " <b>" << std::endl;
00934 *out << getApplicationDescriptor()->getClassName()
00935 << getApplicationDescriptor()->getInstance() << std::endl;
00936 *out << " " << fsm_.stateName()->toString() << std::endl;
00937 *out << " </b>" << std::endl;
00938 *out << " </td>" << std::endl;
00939 *out << " <td width=\"32\">" << std::endl;
00940 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00941 *out << " <img" << std::endl;
00942 *out << " align=\"middle\"" << std::endl;
00943 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00944 *out << " alt=\"HyperDAQ\"" << std::endl;
00945 *out << " width=\"32\"" << std::endl;
00946 *out << " height=\"32\"" << std::endl;
00947 *out << " border=\"\"/>" << std::endl;
00948 *out << " </a>" << std::endl;
00949 *out << " </td>" << std::endl;
00950 *out << " <td width=\"32\">" << std::endl;
00951 *out << " </td>" << std::endl;
00952 *out << " <td width=\"32\">" << std::endl;
00953 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00954 *out << " <img" << std::endl;
00955 *out << " align=\"middle\"" << std::endl;
00956 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00957 *out << " alt=\"main\"" << std::endl;
00958 *out << " width=\"32\"" << std::endl;
00959 *out << " height=\"32\"" << std::endl;
00960 *out << " border=\"\"/>" << std::endl;
00961 *out << " </a>" << std::endl;
00962 *out << " </td>" << std::endl;
00963 *out << "</tr>" << std::endl;
00964 *out << "</table>" << std::endl;
00965
00966 *out << "<hr/>" << std::endl;
00967
00968 std::ostringstream ost;
00969 if(myProcess_)
00970 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00971 else
00972 ost << "/moduleWeb?";
00973 urn += ost.str();
00974 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00975 evtProcessor_.taskWebPage(in,out,urn);
00976 else if(evtProcessor_)
00977 evtProcessor_.summaryWebPage(in,out,urn);
00978 else
00979 *out << "<td>HLT Unconfigured</td>" << std::endl;
00980 *out << "</table>" << std::endl;
00981
00982 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
00983 *out << configuration_ << std::endl;
00984 *out << "</textarea><P>" << std::endl;
00985
00986 *out << "</body>" << std::endl;
00987 *out << "</html>" << std::endl;
00988
00989
00990 }
00991 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
00992 throw (xgi::exception::Exception)
00993 {
00994
00995 out->getHTTPResponseHeader().addHeader( "Content-Type",
00996 "application/octet-stream" );
00997 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00998 "binary" );
00999 if(evtProcessor_ != 0){
01000 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
01001 }
01002 }
01003
01004 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
01005 throw (xgi::exception::Exception)
01006 {
01007
01008 if(evtProcessor_ != 0){
01009 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01010 if(legenda !=0){
01011 std::string slegenda = ((xdata::String*)legenda)->value_;
01012 *out << slegenda << std::endl;
01013 }
01014 }
01015 }
01016
01017
01018 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)
01019 {
01020 std::string errmsg;
01021 try {
01022 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01023 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01024 edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01025 }
01026 catch (cms::Exception& e) {
01027 errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01028 }
01029 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01030 }
01031
01032
01033 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
01034 {
01035 std::string errmsg;
01036 bool success = false;
01037 try {
01038 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01039 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01040 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01041 if (!success) errmsg = "Failed to attach DQM service to shared memory";
01042 }
01043 catch (cms::Exception& e) {
01044 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01045 }
01046 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01047 }
01048
01049
01050
01051 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01052 {
01053 std::string errmsg;
01054 bool success = false;
01055 try {
01056 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01057 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01058 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01059 if (!success) errmsg = "Failed to detach DQM service from shared memory";
01060 }
01061 catch (cms::Exception& e) {
01062 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01063 }
01064 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01065 }
01066
01067
01068 std::string FUEventProcessor::logsAsString()
01069 {
01070 std::ostringstream oss;
01071 if(logWrap_)
01072 {
01073 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01074 oss << logRing_[i] << std::endl;
01075 for(unsigned int i = 0; i < logRingIndex_; i++)
01076 oss << logRing_[i] << std::endl;
01077 }
01078 else
01079 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01080 oss << logRing_[i] << std::endl;
01081
01082 return oss.str();
01083 }
01084
01085 void FUEventProcessor::localLog(std::string m)
01086 {
01087 timeval tv;
01088
01089 gettimeofday(&tv,0);
01090 tm *uptm = localtime(&tv.tv_sec);
01091 char datestring[256];
01092 strftime(datestring, sizeof(datestring),"%c", uptm);
01093
01094 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01095 logRingIndex_--;
01096 std::ostringstream timestamp;
01097 timestamp << " at " << datestring;
01098 m += timestamp.str();
01099 logRing_[logRingIndex_] = m;
01100 }
01101
01102 void FUEventProcessor::startSupervisorLoop()
01103 {
01104 try {
01105 wlSupervising_=
01106 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01107 "waiting");
01108 if (!wlSupervising_->isActive()) wlSupervising_->activate();
01109 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01110 "Supervisor");
01111 wlSupervising_->submit(asSupervisor_);
01112 supervising_ = true;
01113 }
01114 catch (xcept::Exception& e) {
01115 std::string msg = "Failed to start workloop 'Supervisor'.";
01116 XCEPT_RETHROW(evf::Exception,msg,e);
01117 }
01118 }
01119
01120 void FUEventProcessor::startReceivingLoop()
01121 {
01122 try {
01123 wlReceiving_=
01124 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01125 "waiting");
01126 if (!wlReceiving_->isActive()) wlReceiving_->activate();
01127 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01128 "Receiving");
01129 wlReceiving_->submit(asReceiveMsgAndExecute_);
01130 receiving_ = true;
01131 }
01132 catch (xcept::Exception& e) {
01133 std::string msg = "Failed to start workloop 'Receiving'.";
01134 XCEPT_RETHROW(evf::Exception,msg,e);
01135 }
01136 }
01137 void FUEventProcessor::startReceivingMonitorLoop()
01138 {
01139 try {
01140 wlReceivingMonitor_=
01141 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01142 "waiting");
01143 if (!wlReceivingMonitor_->isActive())
01144 wlReceivingMonitor_->activate();
01145 asReceiveMsgAndRead_ =
01146 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01147 "ReceivingM");
01148 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01149 receivingM_ = true;
01150 }
01151 catch (xcept::Exception& e) {
01152 std::string msg = "Failed to start workloop 'ReceivingM'.";
01153 XCEPT_RETHROW(evf::Exception,msg,e);
01154 }
01155 }
01156
01157 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01158 {
01159 MsgBuf msg;
01160 try{
01161 myProcess_->rcvSlave(msg,false);
01162 if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01163 {
01164 rlimit rl;
01165 getrlimit(RLIMIT_CORE,&rl);
01166 rl.rlim_cur=0;
01167 setrlimit(RLIMIT_CORE,&rl);
01168 rlimit_coresize_changed_=true;
01169 }
01170 if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01171 {
01172
01173 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01174 rlimit_coresize_changed_=false;
01175 }
01176 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01177 {
01178 pthread_mutex_lock(&stop_lock_);
01179 try {
01180 fsm_.fireEvent("Stop",this);
01181 }
01182 catch (...) {
01183 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01184 << getpid() << " The state on Stop event was not consistent");
01185 }
01186 try{
01187 LOG4CPLUS_DEBUG(getApplicationLogger(),
01188 "Trying to create message service presence ");
01189 edm::PresenceFactory *pf = edm::PresenceFactory::get();
01190 if(pf != 0) {
01191 pf->makePresence("MessageServicePresence").release();
01192 }
01193 else {
01194 LOG4CPLUS_ERROR(getApplicationLogger(),
01195 "Unable to create message service presence ");
01196 }
01197 }
01198 catch(...) {
01199 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01200 }
01201 try {
01202 stopClassic();
01203 }
01204 catch (...) {
01205 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP " << getpid() << " stop failed. Process will now exit");
01206 }
01207
01208 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01209 myProcess_->postSlave(msg1,false);
01210 pthread_mutex_unlock(&stop_lock_);
01211 fclose(stdout);
01212 fclose(stderr);
01213 _exit(EXIT_SUCCESS);
01214 }
01215 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01216 _exit(EXIT_SUCCESS);
01217 }
01218 catch(evf::Exception &e){}
01219 return true;
01220 }
01221
01222 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01223 {
01224 pthread_mutex_lock(&stop_lock_);
01225 if(subs_.size()!=nbSubProcesses_.value_)
01226 {
01227 pthread_mutex_lock(&pickup_lock_);
01228 if(subs_.size()!=nbSubProcesses_.value_) {
01229 subs_.resize(nbSubProcesses_.value_);
01230 spMStates_.resize(nbSubProcesses_.value_);
01231 spmStates_.resize(nbSubProcesses_.value_);
01232 for(unsigned int i = 0; i < spMStates_.size(); i++)
01233 {
01234 spMStates_[i] = edm::event_processor::sInit;
01235 spmStates_[i] = 0;
01236 }
01237 }
01238 pthread_mutex_unlock(&pickup_lock_);
01239 }
01240 bool running = fsm_.stateName()->toString()=="Enabled";
01241 bool stopping = fsm_.stateName()->toString()=="stopping";
01242 for(unsigned int i = 0; i < subs_.size(); i++)
01243 {
01244 if(subs_[i].alive()==-1000) continue;
01245 int sl;
01246 pid_t sub_pid = subs_[i].pid();
01247 pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01248
01249 if(killedOrNot && killedOrNot==sub_pid) {
01250 pthread_mutex_lock(&pickup_lock_);
01251
01252 if (i<subs_.size() && subs_[i].alive()!=-1000) {
01253 subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01254 std::ostringstream ost;
01255 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01256 else if(WIFSIGNALED(sl)!=0) {
01257 ost << " process terminated with signal " << WTERMSIG(sl);
01258 }
01259 else ost << " process stopped ";
01260 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01261 subs_[i].setReasonForFailed(ost.str());
01262 spMStates_[i] = evtProcessor_.notstarted_state_code();
01263 spmStates_[i] = 0;
01264 std::ostringstream ost1;
01265 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01266 localLog(ost1.str());
01267 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01268 }
01269 pthread_mutex_unlock(&pickup_lock_);
01270 }
01271 }
01272 pthread_mutex_unlock(&stop_lock_);
01273 if(stopping) return true;
01274
01275
01276 if (running && rlimit_coresize_changed_) {
01277 timeval newtv;
01278 gettimeofday(&newtv,0);
01279 int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01280 if (delta>60*15) {
01281 std::ostringstream ostr;
01282 ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01283 std::cout << ostr.str() << std::endl;
01284 LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01285 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01286 MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01287 for (unsigned int i = 0; i < subs_.size(); i++) {
01288 try {
01289 if (subs_[i].alive())
01290 subs_[i].post(master_message_rlr_,false);
01291 }
01292 catch (...) {}
01293 }
01294 rlimit_coresize_changed_=false;
01295 crashesThisRun_=0;
01296 }
01297 }
01298
01299 if(running && edm_init_done_)
01300 {
01301
01302
01303 if(autoRestartSlaves_.value_){
01304 pthread_mutex_lock(&stop_lock_);
01305 for(unsigned int i = 0; i < subs_.size(); i++)
01306 {
01307 if(subs_[i].alive() != 1){
01308 if(subs_[i].countdown() == 0)
01309 {
01310 if(subs_[i].restartCount()>2){
01311 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
01312 << " - maximum restart count reached");
01313 std::ostringstream ost1;
01314 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
01315 localLog(ost1.str());
01316 subs_[i].countdown()--;
01317 XCEPT_DECLARE(evf::Exception,
01318 sentinelException, ost1.str());
01319 notifyQualified("error",sentinelException);
01320 subs_[i].disconnect();
01321 continue;
01322 }
01323 subs_[i].restartCount()++;
01324 if (forkInEDM_.value_) {
01325 restartForkInEDM(i);
01326 }
01327 else {
01328 pid_t rr = subs_[i].forkNew();
01329 if(rr==0)
01330 {
01331 myProcess_=&subs_[i];
01332 scalersUpdates_ = 0;
01333 int retval = pthread_mutex_destroy(&stop_lock_);
01334 if(retval != 0) perror("error");
01335 retval = pthread_mutex_init(&stop_lock_,0);
01336 if(retval != 0) perror("error");
01337 fsm_.disableRcmsStateNotification();
01338 fsm_.fireEvent("Stop",this);
01339 fsm_.fireEvent("StopDone",this);
01340 fsm_.fireEvent("Enable",this);
01341 try{
01342 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01343 if(lsid) {
01344 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01345 }
01346 }
01347 catch(...){
01348 std::cout << "trouble with lsindex during restart" << std::endl;
01349 }
01350 try{
01351 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01352 if(lstb) {
01353 ((xdata::Boolean*)(lstb))->value_ = false;
01354 }
01355 }
01356 catch(...){
01357 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01358 }
01359
01360 evtProcessor_.adjustLsIndexForRestart();
01361 evtProcessor_.resetPackedTriggerReport();
01362 enableMPEPSlave();
01363 return false;
01364 }
01365 else
01366 {
01367 std::ostringstream ost1;
01368 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01369 localLog(ost1.str());
01370 }
01371 }
01372 }
01373 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01374 }
01375 }
01376 pthread_mutex_unlock(&stop_lock_);
01377 }
01378 }
01379 xdata::Serializable *lsid = 0;
01380 xdata::Serializable *psid = 0;
01381 xdata::Serializable *dqmp = 0;
01382 xdata::UnsignedInteger32 *dqm = 0;
01383
01384
01385
01386 if(running && edm_init_done_){
01387 try{
01388 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01389 psid = applicationInfoSpace_->find("prescaleSetIndex");
01390 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01391 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01392 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01393 }
01394 catch(xdata::exception::Exception e){
01395 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01396 }
01397
01398 try{
01399 if(nbProcessed !=0 && nbAccepted !=0)
01400 {
01401 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01402 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01403 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01404 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01405 if(dqmp!=0)
01406 dqm = (xdata::UnsignedInteger32*)dqmp;
01407 if(dqm) dqm->value_ = 0;
01408 nbTotalDQM_ = 0;
01409 nbp->value_ = 0;
01410 nba->value_ = 0;
01411 nblive_ = 0;
01412 nbdead_ = 0;
01413 scalersUpdates_ = 0;
01414
01415 for(unsigned int i = 0; i < subs_.size(); i++)
01416 {
01417 if(subs_[i].alive()>0)
01418 {
01419 nblive_++;
01420 try{
01421 subs_[i].post(master_message_prg_,true);
01422
01423 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01424 if(retval == (unsigned long) master_message_prr_->mtype){
01425 prg* p = (struct prg*)(master_message_prr_->mtext);
01426 subs_[i].setParams(p);
01427 spMStates_[i] = p->Ms;
01428 spmStates_[i] = p->ms;
01429 cpustat_->addEntry(p->ms);
01430 if(!subs_[i].inInconsistentState() &&
01431 (p->Ms == edm::event_processor::sError
01432 || p->Ms == edm::event_processor::sInvalid
01433 || p->Ms == edm::event_processor::sStopping))
01434 {
01435 std::ostringstream ost;
01436 ost << "edm::eventprocessor slot " << i << " process id "
01437 << subs_[i].pid() << " not in Running state : Mstate="
01438 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01439 << evtProcessor_.moduleNameFromIndex(p->ms)
01440 << " - Look into possible error messages from HLT process";
01441 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01442 }
01443 nbp->value_ += subs_[i].params().nbp;
01444 nba->value_ += subs_[i].params().nba;
01445 if(dqm)dqm->value_ += p->dqm;
01446 nbTotalDQM_ += p->dqm;
01447 scalersUpdates_ += p->trp;
01448 if(p->ls > ls->value_) ls->value_ = p->ls;
01449 if(p->ps != ps->value_) ps->value_ = p->ps;
01450 }
01451 else{
01452 nbp->value_ += subs_[i].get_save_nbp();
01453 nba->value_ += subs_[i].get_save_nba();
01454 }
01455 }
01456 catch(evf::Exception &e){
01457 LOG4CPLUS_INFO(getApplicationLogger(),
01458 "could not send/receive msg on slot "
01459 << i << " - " << e.what());
01460 }
01461
01462 }
01463 else
01464 {
01465 nbp->value_ += subs_[i].get_save_nbp();
01466 nba->value_ += subs_[i].get_save_nba();
01467 nbdead_++;
01468 }
01469 }
01470 if(nbp->value_>64){
01471 for(unsigned int i = 0; i < subs_.size(); i++)
01472 {
01473 if(subs_[i].params().nbp == 0){
01474
01475 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01476 {
01477 subs_[i].found_invalid();
01478 if(subs_[i].nfound_invalid() > 60){
01479 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01480 subs_[i].post(msg3,false);
01481 std::ostringstream ost1;
01482 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01483 localLog(ost1.str());
01484 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01485 XCEPT_DECLARE(evf::Exception,
01486 sentinelException, ost1.str());
01487 notifyQualified("error",sentinelException);
01488
01489 }
01490 }
01491 }
01492 }
01493 }
01494 }
01495 }
01496 catch(std::exception &e){
01497 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01498 }
01499 catch(...){
01500 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01501 }
01502 }
01503 else{
01504 for(unsigned int i = 0; i < subs_.size(); i++)
01505 {
01506 if(subs_[i].alive()==-1000)
01507 {
01508 spMStates_[i] = edm::event_processor::sInit;
01509 spmStates_[i] = 0;
01510 }
01511 }
01512 }
01513 try{
01514 monitorInfoSpace_->lock();
01515 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01516 monitorInfoSpace_->unlock();
01517 }
01518 catch(xdata::exception::Exception &e)
01519 {
01520 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01521
01522 }
01523 ::sleep(superSleepSec_.value_);
01524 return true;
01525 }
01526
01527 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01528 {
01529 try {
01530 wlScalers_=
01531 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01532 "waiting");
01533 if (!wlScalers_->isActive()) wlScalers_->activate();
01534 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01535 "Scalers");
01536
01537 wlScalers_->submit(asScalers_);
01538 wlScalersActive_ = true;
01539 }
01540 catch (xcept::Exception& e) {
01541 std::string msg = "Failed to start workloop 'Scalers'.";
01542 XCEPT_RETHROW(evf::Exception,msg,e);
01543 }
01544 }
01545
01546
01547
01548 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01549 {
01550 try {
01551 wlSummarize_=
01552 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01553 "waiting");
01554 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01555
01556 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01557 "Summary");
01558
01559 wlSummarize_->submit(asSummarize_);
01560 wlSummarizeActive_ = true;
01561 }
01562 catch (xcept::Exception& e) {
01563 std::string msg = "Failed to start workloop 'Summarize'.";
01564 XCEPT_RETHROW(evf::Exception,msg,e);
01565 }
01566 }
01567
01568
01569
01570 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01571 {
01572 if(evtProcessor_)
01573 {
01574 if(!evtProcessor_.getTriggerReport(true)) {
01575 wlScalersActive_ = false;
01576 return false;
01577 }
01578 }
01579 else
01580 {
01581 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01582 wlScalersActive_ = false;
01583 return false;
01584 }
01585 if(myProcess_)
01586 {
01587
01588 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01589 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01590 scalersUpdates_++;
01591 }
01592 else
01593 evtProcessor_.fireScalersUpdate();
01594 return true;
01595 }
01596
01597
01598 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01599 {
01600 evtProcessor_.resetPackedTriggerReport();
01601 bool atLeastOneProcessUpdatedSuccessfully = false;
01602 int msgCount = 0;
01603 for (unsigned int i = 0; i < subs_.size(); i++)
01604 {
01605 if(subs_[i].alive()>0)
01606 {
01607 int ret = 0;
01608 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01609 evtProcessor_.getLumiSectionReferenceIndex()))
01610 {
01611 ret = MSQS_MESSAGE_TYPE_TRR;
01612 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01613 }
01614 else{
01615 bool insync = false;
01616 bool exception_caught = false;
01617 while(!insync){
01618 try{
01619 ret = subs_[i].rcv(master_message_trr_,false);
01620 }
01621 catch(evf::Exception &e)
01622 {
01623 std::cout << "exception in msgrcv on " << i
01624 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01625 exception_caught = true;
01626 break;
01627
01628 }
01629 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01630 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01631 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01632 insync = true;
01633 }
01634 }
01635 }
01636 if(exception_caught) continue;
01637 }
01638 msgCount++;
01639 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01640 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01641 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01642 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01643 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01644 subs_[i].add_postponed_trigger_update(master_message_trr_);
01645 }else{
01646 atLeastOneProcessUpdatedSuccessfully = true;
01647 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01648 }
01649 }
01650 else std::cout << "msgrcv returned error " << errno << std::endl;
01651 }
01652 }
01653 if(atLeastOneProcessUpdatedSuccessfully){
01654 nbSubProcessesReporting_.value_ = msgCount;
01655 evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01656 evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01657 evtProcessor_.updateRollingReport();
01658 evtProcessor_.fireScalersUpdate();
01659 }
01660 else{
01661 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01662 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01663 nbSubProcessesReporting_.value_ = 0;
01664 ::sleep(10);
01665 }
01666 if(fsm_.stateName()->toString()!="Enabled"){
01667 wlScalersActive_ = false;
01668 return false;
01669 }
01670
01671 if(iDieStatisticsGathering_.value_){
01672 try{
01673 unsigned long long idleTmp=idleProcStats_;
01674 unsigned long long allPSTmp=allProcStats_;
01675 idleProcStats_=allProcStats_=0;
01676
01677 utils::procCpuStat(idleProcStats_,allProcStats_);
01678 if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01679 cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01680
01681 }
01682 else cpustat_->setCPUStat(0);
01683
01684 TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01685 cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01686 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01687 ratestat_->sendStat((unsigned char*)trsp,
01688 sizeof(TriggerReportStatic),
01689 evtProcessor_.getLumiSectionReferenceIndex());
01690 }catch(evf::Exception &e){
01691 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01692 << e.what());
01693 }
01694 }
01695 cpustat_->reset();
01696 return true;
01697 }
01698
01699
01700
01701 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01702 {
01703 try{
01704 myProcess_->rcvSlave(slave_message_monitoring_,true);
01705 switch(slave_message_monitoring_->mtype)
01706 {
01707 case MSQM_MESSAGE_TYPE_MCS:
01708 {
01709 xgi::Input *in = 0;
01710 xgi::Output out;
01711 evtProcessor_.microState(in,&out);
01712 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01713 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01714 myProcess_->postSlave(msg1,true);
01715 break;
01716 }
01717
01718 case MSQM_MESSAGE_TYPE_PRG:
01719 {
01720 xdata::Serializable *dqmp = 0;
01721 xdata::UnsignedInteger32 *dqm = 0;
01722 evtProcessor_.monitoring(0);
01723 try{
01724 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01725 } catch(xdata::exception::Exception e){}
01726 if(dqmp!=0)
01727 dqm = (xdata::UnsignedInteger32*)dqmp;
01728
01729
01730 prg * data = (prg*)slave_message_prr_->mtext;
01731 data->ls = evtProcessor_.lsid_;
01732 data->eols = evtProcessor_.lastLumiUsingEol_;
01733 data->ps = evtProcessor_.psid_;
01734 data->nbp = evtProcessor_->totalEvents();
01735 data->nba = evtProcessor_->totalEventsPassed();
01736 data->Ms = evtProcessor_.epMAltState_.value_;
01737 data->ms = evtProcessor_.epmAltState_.value_;
01738 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01739 data->trp = scalersUpdates_;
01740
01741 myProcess_->postSlave(slave_message_prr_,true);
01742 if(exitOnError_.value_)
01743 {
01744
01745
01746 if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError)
01747 {
01748 bool running = true;
01749 int count = 0;
01750 while(running){
01751 int retval = pthread_mutex_lock(&stop_lock_);
01752 if(retval != 0) perror("error");
01753 running = fsm_.stateName()->toString()=="Enabled";
01754 if(count>5) _exit(-1);
01755 pthread_mutex_unlock(&stop_lock_);
01756 if(running) {::sleep(1); count++;}
01757 }
01758 }
01759
01760 }
01761
01762 break;
01763 }
01764 case MSQM_MESSAGE_TYPE_WEB:
01765 {
01766 xgi::Input *in = 0;
01767 xgi::Output out;
01768 unsigned int bytesToSend = 0;
01769 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01770 std::string query = slave_message_monitoring_->mtext;
01771 size_t pos = query.find_first_of("&");
01772 std::string method;
01773 std::string args;
01774 if(pos!=std::string::npos)
01775 {
01776 method = query.substr(0,pos);
01777 args = query.substr(pos+1,query.length()-pos-1);
01778 }
01779 else
01780 method=query;
01781
01782 if(method=="Spotlight")
01783 {
01784 spotlightWebPage(in,&out);
01785 }
01786 else if(method=="procStat")
01787 {
01788 procStat(in,&out);
01789 }
01790 else if(method=="moduleWeb")
01791 {
01792 internal::MyCgi mycgi;
01793 boost::char_separator<char> sep(";");
01794 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01795 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01796 tok_iter != tokens.end(); ++tok_iter){
01797 size_t pos = (*tok_iter).find_first_of("%");
01798 if(pos != std::string::npos){
01799 std::string first = (*tok_iter).substr(0 , pos);
01800 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01801 mycgi.getEnvironment()[first]=second;
01802 }
01803 }
01804 moduleWeb(&mycgi,&out);
01805 }
01806 else if(method=="Default")
01807 {
01808 defaultWebPage(in,&out);
01809 }
01810 else
01811 {
01812 out << "Error 404!!!!!!!!" << std::endl;
01813 }
01814
01815
01816 bytesToSend = out.str().size();
01817 unsigned int cycle = 0;
01818 if(bytesToSend==0)
01819 {
01820 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01821 myProcess_->postSlave(msg1,true);
01822 }
01823 while(bytesToSend !=0){
01824 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01825 write(anonymousPipe_[PIPE_WRITE],
01826 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01827 msgSize);
01828 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01829 myProcess_->postSlave(msg1,true);
01830 bytesToSend -= msgSize;
01831 cycle++;
01832 }
01833 break;
01834 }
01835 case MSQM_MESSAGE_TYPE_TRP:
01836 {
01837 break;
01838 }
01839 }
01840 }
01841 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01842 return true;
01843 }
01844
01845 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01846 {
01847
01848
01849 try {
01850 wlSignalMonitor_=
01851 toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01852 "waiting");
01853
01854 if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01855 asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01856 "SignalMonitor");
01857 wlSignalMonitor_->submit(asSignalMonitor_);
01858 signalMonitorActive_ = true;
01859 }
01860 catch (xcept::Exception& e) {
01861 std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01862 std::cout << e.what() << std::endl;
01863 XCEPT_RETHROW(evf::Exception,msg,e);
01864 }
01865 }
01866
01867
01868 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01869 {
01870 while (1) {
01871 sem_wait(sigmon_sem_);
01872 std::cout << " received signal notification from slave!"<< std::endl;
01873
01874
01875 bool running = fsm_.stateName()->toString()=="Enabled";
01876 bool stopping = fsm_.stateName()->toString()=="stopping";
01877 bool enabling = fsm_.stateName()->toString()=="enabling";
01878 if (!running && !enabling) {
01879 signalMonitorActive_ = false;
01880 return false;
01881 }
01882
01883 crashesThisRun_++;
01884 gettimeofday(&lastCrashTime_,0);
01885
01886
01887 if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01888
01889 rlimit rlold;
01890 getrlimit(RLIMIT_CORE,&rlold);
01891 rlimit rlnew = rlold;
01892 rlnew.rlim_cur=0;
01893 setrlimit(RLIMIT_CORE,&rlnew);
01894 rlimit_coresize_changed_=true;
01895 MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01896
01897 unsigned int min=1;
01898 for (unsigned int i = min; i < subs_.size(); i++) {
01899 try {
01900 if (subs_[i].alive()) {
01901 subs_[i].post(master_message_rli_,false);
01902 }
01903 }
01904 catch (...) {}
01905 }
01906 std::ostringstream ostr;
01907 ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01908 << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01909 LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01910 }
01911 }
01912 signalMonitorActive_ = false;
01913 return false;
01914 }
01915
01916
01917 bool FUEventProcessor::enableCommon()
01918 {
01919 try {
01920 if(hasShMem_) attachDqmToShm();
01921 int sc = 0;
01922 evtProcessor_->clearCounters();
01923 if(isRunNumberSetter_)
01924 evtProcessor_->setRunNumber(runNumber_.value_);
01925 else
01926 evtProcessor_->declareRunNumber(runNumber_.value_);
01927
01928 try{
01929 ::sleep(1);
01930 evtProcessor_->runAsync();
01931 sc = evtProcessor_->statusAsync();
01932 }
01933 catch(cms::Exception &e) {
01934 reasonForFailedState_ = e.explainSelf();
01935 fsm_.fireFailed(reasonForFailedState_,this);
01936 localLog(reasonForFailedState_);
01937 return false;
01938 }
01939 catch(std::exception &e) {
01940 reasonForFailedState_ = e.what();
01941 fsm_.fireFailed(reasonForFailedState_,this);
01942 localLog(reasonForFailedState_);
01943 return false;
01944 }
01945 catch(...) {
01946 reasonForFailedState_ = "Unknown Exception";
01947 fsm_.fireFailed(reasonForFailedState_,this);
01948 localLog(reasonForFailedState_);
01949 return false;
01950 }
01951 if(sc != 0) {
01952 std::ostringstream oss;
01953 oss<<"EventProcessor::runAsync returned status code " << sc;
01954 reasonForFailedState_ = oss.str();
01955 fsm_.fireFailed(reasonForFailedState_,this);
01956 localLog(reasonForFailedState_);
01957 return false;
01958 }
01959 }
01960 catch (xcept::Exception &e) {
01961 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01962 fsm_.fireFailed(reasonForFailedState_,this);
01963 localLog(reasonForFailedState_);
01964 return false;
01965 }
01966 try{
01967 fsm_.fireEvent("EnableDone",this);
01968 }
01969 catch (xcept::Exception &e) {
01970 std::cout << "exception " << (std::string)e.what() << std::endl;
01971 throw;
01972 }
01973
01974 return false;
01975 }
01976
01977 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
01978 ((FUEventProcessor*)addr)->forkProcessesFromEDM();
01979 }
01980
01981 void FUEventProcessor::forkProcessesFromEDM() {
01982
01983 moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
01984 unsigned int forkFrom=0;
01985 unsigned int forkTo=nbSubProcesses_.value_;
01986 if (forkParams->slotId>=0) {
01987 forkFrom=forkParams->slotId;
01988 forkTo=forkParams->slotId+1;
01989 }
01990
01991
01992 try {
01993 if (sorRef_) {
01994 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
01995 for (unsigned int i=0;i<shmOutputs.size();i++) {
01996
01997 shmOutputs[i]->unregisterFromShm();
01998
01999 shmOutputs[i]->stop();
02000 }
02001 }
02002 }
02003 catch (std::exception &e)
02004 {
02005 reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02006 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02007 fsm_.fireFailed(reasonForFailedState_,this);
02008 localLog(reasonForFailedState_);
02009 }
02010 catch (...) {
02011 reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02012 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02013 fsm_.fireFailed(reasonForFailedState_,this);
02014 localLog(reasonForFailedState_);
02015 }
02016
02017
02018 if (fsm_.stateName()->toString()=="stopping") {
02019 LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02020 forkParams->isMaster=1;
02021 forkInfoObj_->forkParams.slotId=-1;
02022 forkInfoObj_->forkParams.restart=0;
02023 }
02024 else for(unsigned int i=forkFrom; i<forkTo; i++)
02025 {
02026
02027 int retval = subs_[i].forkNew();
02028 if(retval==0)
02029 {
02030
02031 forkParams->isMaster=0;
02032 myProcess_ = &subs_[i];
02033
02034 delete toolbox::mem::_s_mutex_ptr_;
02035 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02036 int retval = pthread_mutex_destroy(&stop_lock_);
02037 if(retval != 0) perror("error");
02038 retval = pthread_mutex_init(&stop_lock_,0);
02039 if(retval != 0) perror("error");
02040 fsm_.disableRcmsStateNotification();
02041
02042 try{
02043 LOG4CPLUS_DEBUG(getApplicationLogger(),
02044 "Trying to create message service presence ");
02045
02046 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02047 if(pf != 0) {
02048 pf->makePresence("MessageServicePresence").release();
02049 }
02050 else {
02051 LOG4CPLUS_ERROR(getApplicationLogger(),
02052 "Unable to create message service presence ");
02053 }
02054 }
02055 catch(...) {
02056 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02057 }
02058
02059 ML::MLlog4cplus::setAppl(this);
02060
02061
02062 try {
02063 if (sorRef_) {
02064 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02065 for (unsigned int i=0;i<shmOutputs.size();i++)
02066 shmOutputs[i]->start();
02067 }
02068 }
02069 catch (...)
02070 {
02071 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02072 }
02073
02074 if (forkParams->restart) {
02075
02076 scalersUpdates_ = 0;
02077 try {
02078 fsm_.fireEvent("Stop",this);
02079 fsm_.fireEvent("StopDone",this);
02080 fsm_.fireEvent("Enable",this);
02081 } catch (...) {
02082 LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02083 }
02084 try{
02085 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02086 if(lsid) {
02087 ((xdata::UnsignedInteger32*)(lsid))->value_--;
02088 }
02089 }
02090 catch(...){
02091 std::cout << "trouble with lsindex during restart" << std::endl;
02092 }
02093 try{
02094 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02095 if(lstb) {
02096 ((xdata::Boolean*)(lstb))->value_ = false;
02097 }
02098 }
02099 catch(...){
02100 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02101 }
02102 evtProcessor_.adjustLsIndexForRestart();
02103 evtProcessor_.resetPackedTriggerReport();
02104 }
02105
02106
02107 startReceivingLoop();
02108 startReceivingMonitorLoop();
02109 startScalersWorkLoop();
02110 while(!evtProcessor_.isWaitingForLs())
02111 ::usleep(100000);
02112
02113
02114 if(hasShMem_) attachDqmToShm();
02115
02116
02117 try {
02118 fsm_.fireEvent("EnableDone",this);
02119 }
02120 catch (...) {}
02121
02122
02123 while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02124
02125
02126 sigset_t tmpset_thread;
02127 sigemptyset(&tmpset_thread);
02128 sigaddset(&tmpset_thread, SIGQUIT);
02129 sigaddset(&tmpset_thread, SIGILL);
02130 sigaddset(&tmpset_thread, SIGABRT);
02131 sigaddset(&tmpset_thread, SIGFPE);
02132 sigaddset(&tmpset_thread, SIGSEGV);
02133 sigaddset(&tmpset_thread, SIGALRM);
02134
02135 pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02136
02137
02138 struct sigaction sa;
02139 sigset_t tmpset;
02140 memset(&tmpset,0,sizeof(tmpset));
02141 sigemptyset(&tmpset);
02142 sa.sa_mask=tmpset;
02143 sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02144 sa.sa_handler=0;
02145 sa.sa_sigaction=evfep_sighandler;
02146
02147 sigaction(SIGQUIT,&sa,0);
02148 sigaction(SIGILL,&sa,0);
02149 sigaction(SIGABRT,&sa,0);
02150 sigaction(SIGFPE,&sa,0);
02151 sigaction(SIGSEGV,&sa,0);
02152 sa.sa_sigaction=evfep_alarmhandler;
02153 sigaction(SIGALRM,&sa,0);
02154
02155
02156 return ;
02157 }
02158 else {
02159 forkParams->isMaster=1;
02160 forkInfoObj_->forkParams.slotId=-1;
02161 forkInfoObj_->forkParams.restart=0;
02162 if (forkParams->restart) {
02163 std::ostringstream ost1;
02164 ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02165 localLog(ost1.str());
02166 }
02167
02168 }
02169 }
02170 edm_init_done_=true;
02171 }
02172
02173 bool FUEventProcessor::enableForkInEDM()
02174 {
02175 evtProcessor_.resetWaiting();
02176 try {
02177
02178
02179
02180 int sc = 0;
02181
02182 evtProcessor_->clearCounters();
02183 if(isRunNumberSetter_)
02184 evtProcessor_->setRunNumber(runNumber_.value_);
02185 else
02186 evtProcessor_->declareRunNumber(runNumber_.value_);
02187
02188
02189 pthread_mutex_destroy(&forkObjLock_);
02190 pthread_mutex_init(&forkObjLock_,0);
02191 if (forkInfoObj_) delete forkInfoObj_;
02192 forkInfoObj_ = new moduleweb::ForkInfoObj();
02193 forkInfoObj_->mst_lock_=&forkObjLock_;
02194 forkInfoObj_->fuAddr=(void*)this;
02195 forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02196 forkInfoObj_->forkParams.slotId=-1;
02197 forkInfoObj_->forkParams.restart=0;
02198 forkInfoObj_->forkParams.isMaster=-1;
02199 forkInfoObj_->stopCondition=0;
02200 if (mwrRef_)
02201 mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02202
02203 evtProcessor_->runAsync();
02204 sc = evtProcessor_->statusAsync();
02205
02206 if(sc != 0) {
02207 std::ostringstream oss;
02208 oss<<"EventProcessor::runAsync returned status code " << sc;
02209 reasonForFailedState_ = oss.str();
02210 fsm_.fireFailed(reasonForFailedState_,this);
02211 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02212 return false;
02213 }
02214 }
02215
02216 catch(cms::Exception &e) {
02217 reasonForFailedState_ = e.explainSelf();
02218 fsm_.fireFailed(reasonForFailedState_,this);
02219 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02220 return false;
02221 }
02222 catch(std::exception &e) {
02223 reasonForFailedState_ = e.what();
02224 fsm_.fireFailed(reasonForFailedState_,this);
02225 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02226 return false;
02227 }
02228 catch(...) {
02229 reasonForFailedState_ = "Unknown Exception";
02230 fsm_.fireFailed(reasonForFailedState_,this);
02231 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02232 return false;
02233 }
02234 return true;
02235 }
02236
02237 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02238
02239
02240 forkInfoObj_->lock();
02241 forkInfoObj_->forkParams.slotId=slotId;
02242 forkInfoObj_->forkParams.restart=true;
02243 forkInfoObj_->forkParams.isMaster=1;
02244 forkInfoObj_->stopCondition=0;
02245 LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02246 sem_post(forkInfoObj_->control_sem_);
02247 forkInfoObj_->unlock();
02248 usleep(1000);
02249 return true;
02250 }
02251
02252 bool FUEventProcessor::enableClassic()
02253 {
02254 bool retval = enableCommon();
02255 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02256 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02257 ::sleep(1);
02258 }
02259
02260
02261
02262 localLog("-I- Start completed");
02263 return retval;
02264 }
02265 bool FUEventProcessor::enableMPEPSlave()
02266 {
02267
02268
02269 startReceivingLoop();
02270 startReceivingMonitorLoop();
02271 evtProcessor_.resetWaiting();
02272 startScalersWorkLoop();
02273 while(!evtProcessor_.isWaitingForLs())
02274 ::sleep(1);
02275
02276
02277
02278 try{
02279
02280 try{
02281 LOG4CPLUS_DEBUG(getApplicationLogger(),
02282 "Trying to create message service presence ");
02283 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02284 if(pf != 0) {
02285 pf->makePresence("MessageServicePresence").release();
02286 }
02287 else {
02288 LOG4CPLUS_ERROR(getApplicationLogger(),
02289 "Unable to create message service presence ");
02290 }
02291 }
02292 catch(...) {
02293 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02294 }
02295 ML::MLlog4cplus::setAppl(this);
02296 }
02297 catch (xcept::Exception &e) {
02298 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02299 fsm_.fireFailed(reasonForFailedState_,this);
02300 localLog(reasonForFailedState_);
02301 }
02302 bool retval = enableCommon();
02303
02304
02305
02306
02307 return retval;
02308 }
02309
02310 bool FUEventProcessor::stopClassic()
02311 {
02312 bool failed=false;
02313 try {
02314 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02315 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02316 if(rc == edm::EventProcessor::epSuccess)
02317 fsm_.fireEvent("StopDone",this);
02318 else
02319 {
02320 failed=true;
02321
02322 if(rc == edm::EventProcessor::epTimedOut)
02323 reasonForFailedState_ = "EventProcessor stop timed out";
02324 else
02325 reasonForFailedState_ = "EventProcessor did not receive STOP event";
02326 fsm_.fireFailed(reasonForFailedState_,this);
02327 localLog(reasonForFailedState_);
02328 }
02329
02330 if (failed) LOG4CPLUS_WARN(getApplicationLogger(),"STOP failed: try detaching from Shm");
02331
02332
02333 if(hasShMem_) detachDqmFromShm();
02334
02335 if (failed) LOG4CPLUS_WARN(getApplicationLogger(),"STOP failed: detached from Shm");
02336 }
02337 catch (xcept::Exception &e) {
02338 failed=true;
02339 reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
02340 }
02341 catch (edm::Exception &e) {
02342 failed=true;
02343 reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
02344 }
02345 catch (...) {
02346 failed=true;
02347 reasonForFailedState_= "STOP failed: unknown exception";
02348 }
02349
02350 if (failed) {
02351 LOG4CPLUS_WARN(getApplicationLogger(),"STOP failed: return point of the stopping call reached. pid:" << getpid());
02352 localLog(reasonForFailedState_);
02353 fsm_.fireFailed(reasonForFailedState_,this);
02354 }
02355
02356 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02357 localLog("-I- Stop completed");
02358 return false;
02359 }
02360
02361 void FUEventProcessor::stopSlavesAndAcknowledge()
02362 {
02363 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02364 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02365
02366 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02367 for(unsigned int i = 0; i < subs_.size(); i++)
02368 {
02369 pthread_mutex_lock(&stop_lock_);
02370 if(subs_[i].alive()>0){
02371 processes_to_stop[i] = true;
02372 subs_[i].post(msg,false);
02373 }
02374 pthread_mutex_unlock(&stop_lock_);
02375 }
02376 for(unsigned int i = 0; i < subs_.size(); i++)
02377 {
02378 pthread_mutex_lock(&stop_lock_);
02379 if(processes_to_stop[i]){
02380 try{
02381 subs_[i].rcv(msg1,false);
02382 }
02383 catch(evf::Exception &e){
02384 std::ostringstream ost;
02385 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
02386 reasonForFailedState_ = ost.str();
02387 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02388
02389 localLog(reasonForFailedState_);
02390 pthread_mutex_unlock(&stop_lock_);
02391 continue;
02392 }
02393 }
02394 else {
02395 pthread_mutex_unlock(&stop_lock_);
02396 continue;
02397 }
02398 pthread_mutex_unlock(&stop_lock_);
02399 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02400 while(subs_[i].alive()>0) ::usleep(10000);
02401 subs_[i].disconnect();
02402 }
02403
02404
02405 }
02406
02407 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02408 {
02409 std::string urn = getApplicationDescriptor()->getURN();
02410 try{
02411 evtProcessor_.stateNameFromIndex(0);
02412 evtProcessor_.moduleNameFromIndex(0);
02413 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02414 *out << "<tr><td>" << fsm_.stateName()->toString()
02415 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02416 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02417 evtProcessor_.microState(in,out);
02418 *out << "<td></td><td>" << nbTotalDQM_
02419 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02420 if(nbSubProcesses_.value_!=0 && !myProcess_)
02421 {
02422 pthread_mutex_lock(&start_lock_);
02423 for(unsigned int i = 0; i < subs_.size(); i++)
02424 {
02425 try{
02426 if(subs_[i].alive()>0)
02427 {
02428 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
02429 << i << "\">""Alive</td><td>S</td><td>"
02430 << subs_[i].queueId() << "<td>"
02431 << subs_[i].queueStatus()<< "/"
02432 << subs_[i].queueOccupancy() << "/"
02433 << subs_[i].queuePidOfLastSend() << "/"
02434 << subs_[i].queuePidOfLastReceive()
02435 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
02436 << subs_[i].pid() << "&method=procStat\">"
02437 << subs_[i].pid()<<"</a></td>"
02438 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
02439 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
02440 << subs_[i].params().nba << "/" << subs_[i].params().nbp
02441 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
02442 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
02443 << "</td><td>" << subs_[i].params().ps
02444 << "</td><td"
02445 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
02446 << subs_[i].params().eols
02447 << "</td><td>" << subs_[i].params().dqm
02448 << "</td><td>" << subs_[i].params().trp << "</td>";
02449 }
02450 else
02451 {
02452 pthread_mutex_lock(&pickup_lock_);
02453 *out << "<tr><td id=\"a"<< i << "\" ";
02454 if(subs_[i].alive()==-1000)
02455 *out << " bgcolor=\"#bbaabb\">NotInitialized";
02456 else
02457 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02458 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02459 << subs_[i].queueStatus() << "/"
02460 << subs_[i].queueOccupancy() << "/"
02461 << subs_[i].queuePidOfLastSend() << "/"
02462 << subs_[i].queuePidOfLastReceive()
02463 << "</td><td id=\"p"<< i << "\">"
02464 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02465 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
02466 {
02467 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
02468 *out << " will restart in " << subs_[i].countdown() << " s";
02469 else if(autoRestartSlaves_)
02470 *out << " reached maximum restart count";
02471 else *out << " autoRestart is disabled ";
02472 }
02473 *out << "</td><td"
02474 << ((subs_[i].params().eols<subs_[i].params().ls) ?
02475 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
02476 << ">"
02477 << subs_[i].params().eols
02478 << "</td><td>" << subs_[i].params().dqm
02479 << "</td><td>" << subs_[i].params().trp << "</td>";
02480 pthread_mutex_unlock(&pickup_lock_);
02481 }
02482 *out << "</tr>";
02483 }
02484 catch(evf::Exception &e){
02485 *out << "<tr><td id=\"a"<< i << "\" "
02486 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02487 << subs_[i].queueStatus() << "/"
02488 << subs_[i].queueOccupancy() << "/"
02489 << subs_[i].queuePidOfLastSend() << "/"
02490 << subs_[i].queuePidOfLastReceive()
02491 << "</td><td id=\"p"<< i << "\">"
02492 <<subs_[i].pid()<<"</td>";
02493 }
02494 }
02495 pthread_mutex_unlock(&start_lock_);
02496 }
02497 }
02498 catch(evf::Exception &e)
02499 {
02500 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
02501 }
02502 catch(cms::Exception &e)
02503 {
02504 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
02505 }
02506 catch(std::exception &e)
02507 {
02508 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
02509 }
02510 catch(...)
02511 {
02512 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
02513 }
02514
02515 }
02516
02517
02518 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02519 {
02520 using namespace utils;
02521
02522 *out << updaterStatic_;
02523 mDiv(out,"loads");
02524 uptime(out);
02525 cDiv(out);
02526 mDiv(out,"st",fsm_.stateName()->toString());
02527 mDiv(out,"ru",runNumber_.toString());
02528 mDiv(out,"nsl",nbSubProcesses_.value_);
02529 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02530 mDiv(out,"cl");
02531 *out << getApplicationDescriptor()->getClassName()
02532 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02533 cDiv(out);
02534 mDiv(out,"in",getApplicationDescriptor()->getInstance());
02535 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02536 mDiv(out,"hlt");
02537 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02538 cDiv(out);
02539 *out << std::endl;
02540 }
02541 else
02542 mDiv(out,"hlt","Not yet...");
02543
02544 mDiv(out,"sq",squidPresent_.toString());
02545 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02546 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02547 if(nbProcessed != 0 && nbAccepted != 0)
02548 {
02549 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02550 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02551 }
02552 else
02553 {
02554 mDiv(out,"tt",0);
02555 mDiv(out,"ac",0);
02556 }
02557 if(!myProcess_)
02558 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02559 else
02560 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02561
02562 mDiv(out,"idi",iDieUrl_.value_);
02563 if(vp_!=0){
02564 mDiv(out,"vpi",(unsigned int) vp_);
02565 if(vulture_->hasStarted()>=0)
02566 mDiv(out,"vul","Prowling");
02567 else
02568 mDiv(out,"vul","Dead");
02569 }
02570 else{
02571 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02572 }
02573 if(evtProcessor_){
02574 mDiv(out,"ll");
02575 *out << evtProcessor_.lastLumi().ls
02576 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02577 cDiv(out);
02578 }
02579 mDiv(out,"lg");
02580 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02581 *out << logRing_[i] << std::endl;
02582 if(logWrap_)
02583 for(unsigned int i = 0; i<logRingIndex_; i++)
02584 *out << logRing_[i] << std::endl;
02585 cDiv(out);
02586 mDiv(out,"cha");
02587 if(cpustat_) *out << cpustat_->getChart();
02588 cDiv(out);
02589 }
02590
02591 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02592 {
02593 evf::utils::procStat(out);
02594 }
02595
02596 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02597 {
02598 if(myProcess_) myProcess_->postSlave(buf,true);
02599 }
02600
02601 void FUEventProcessor::makeStaticInfo()
02602 {
02603 using namespace utils;
02604 std::ostringstream ost;
02605 mDiv(&ost,"ve");
02606 ost<< "$Revision: 1.151 $ (" << edm::getReleaseVersion() <<")";
02607 cDiv(&ost);
02608 mDiv(&ost,"ou",outPut_.toString());
02609 mDiv(&ost,"sh",hasShMem_.toString());
02610 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02611 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02612
02613 xdata::Serializable *monsleep = 0;
02614 xdata::Serializable *lstimeout = 0;
02615 try{
02616 monsleep = applicationInfoSpace_->find("monSleepSec");
02617 lstimeout = applicationInfoSpace_->find("lsTimeOut");
02618 }
02619 catch(xdata::exception::Exception e){
02620 }
02621
02622 if(monsleep!=0)
02623 mDiv(&ost,"ms",monsleep->toString());
02624 if(lstimeout!=0)
02625 mDiv(&ost,"lst",lstimeout->toString());
02626 char cbuf[sizeof(struct utsname)];
02627 struct utsname* buf = (struct utsname*)cbuf;
02628 uname(buf);
02629 mDiv(&ost,"sysinfo");
02630 ost << buf->sysname << " " << buf->nodename
02631 << " " << buf->release << " " << buf->version << " " << buf->machine;
02632 cDiv(&ost);
02633 updaterStatic_ = ost.str();
02634 }
02635
02636 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02637 {
02638
02639 sem_post(sigmon_sem_);
02640
02641
02642 sleep(2);
02643
02644
02645 alarm(5);
02646
02647 std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02648 std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02649 std::cout << "--- Stacktrace follows --" << std::endl;
02650 std::ostringstream stacktr;
02651 toolbox::stacktrace(20,stacktr);
02652 std::cout << stacktr.str();
02653 if (!rlimit_coresize_changed_)
02654 std::cout << "--- Dumping core." << " --- " << std::endl;
02655 else
02656 std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02657
02658 std::string hasdump = "";
02659 if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02660
02661 LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
02662 << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02663 << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02664 << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02665 );
02666
02667
02668 raise(sig);
02669 }
02670
02671
02672 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)