00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029
00030 #include "toolbox/BSem.h"
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034
00035 #include <boost/tokenizer.hpp>
00036
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043
00044
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054
00055
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059 namespace mem {
00060 extern toolbox::BSem * _s_mutex_ptr_;
00061 }
00062 }
00063
00064
00065 namespace evf {
00066 FUEventProcessor * FUInstancePtr_;
00067 int evfep_raised_signal;
00068 void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069 {
00070 evfep_raised_signal=sig;
00071 FUInstancePtr_->handleSignalSlave(sig, info, c);
00072 }
00073 void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074 {
00075 if (evfep_raised_signal) {
00076 signal(evfep_raised_signal,SIG_DFL);
00077 raise(evfep_raised_signal);
00078 }
00079 }
00080 }
00081
00083
00085
00086
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00088 : xdaq::Application(s)
00089 , fsm_(this)
00090 , log_(getApplicationLogger())
00091 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092 , runNumber_(0)
00093 , epInitialized_(false)
00094 , outPut_(true)
00095 , autoRestartSlaves_(false)
00096 , slaveRestartDelaySecs_(10)
00097 , hasShMem_(true)
00098 , hasPrescaleService_(true)
00099 , hasModuleWebRegistry_(true)
00100 , hasServiceWebRegistry_(true)
00101 , isRunNumberSetter_(true)
00102 , iDieStatisticsGathering_(false)
00103 , outprev_(true)
00104 , exitOnError_(true)
00105 , reasonForFailedState_()
00106 , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00107 , logRing_(logRingSize_)
00108 , logRingIndex_(logRingSize_)
00109 , logWrap_(false)
00110 , nbSubProcesses_(0)
00111 , nbSubProcessesReporting_(0)
00112 , forkInEDM_(true)
00113 , nblive_(0)
00114 , nbdead_(0)
00115 , nbTotalDQM_(0)
00116 , wlReceiving_(0)
00117 , asReceiveMsgAndExecute_(0)
00118 , receiving_(false)
00119 , wlReceivingMonitor_(0)
00120 , asReceiveMsgAndRead_(0)
00121 , receivingM_(false)
00122 , myProcess_(0)
00123 , wlSupervising_(0)
00124 , asSupervisor_(0)
00125 , supervising_(false)
00126 , wlSignalMonitor_(0)
00127 , asSignalMonitor_(0)
00128 , signalMonitorActive_(false)
00129 , monitorInfoSpace_(0)
00130 , monitorLegendaInfoSpace_(0)
00131 , applicationInfoSpace_(0)
00132 , nbProcessed(0)
00133 , nbAccepted(0)
00134 , scalersInfoSpace_(0)
00135 , scalersLegendaInfoSpace_(0)
00136 , wlScalers_(0)
00137 , asScalers_(0)
00138 , wlScalersActive_(false)
00139 , scalersUpdates_(0)
00140 , wlSummarize_(0)
00141 , asSummarize_(0)
00142 , wlSummarizeActive_(false)
00143 , superSleepSec_(1)
00144 , iDieUrl_("none")
00145 , vulture_(0)
00146 , vp_(0)
00147 , cpustat_(0)
00148 , ratestat_(0)
00149 , mwrRef_(nullptr)
00150 , sorRef_(nullptr)
00151 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155 , edm_init_done_(true)
00156 , crashesThisRun_(false)
00157 , rlimit_coresize_changed_(false)
00158 , crashesToDump_(2)
00159 , sigmon_sem_(0)
00160 {
00161 using namespace utils;
00162
00163 FUInstancePtr_=this;
00164
00165 names_.push_back("nbProcessed" );
00166 names_.push_back("nbAccepted" );
00167 names_.push_back("epMacroStateInt");
00168 names_.push_back("epMicroStateInt");
00169
00170 int retpipe = pipe(anonymousPipe_);
00171 if(retpipe != 0)
00172 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00173
00174 squidPresent_ = squidnet_.check();
00175
00176 evtProcessor_.setAppDesc(getApplicationDescriptor());
00177 evtProcessor_.setAppCtxt(getApplicationContext());
00178
00179 fsm_.initialize<evf::FUEventProcessor>(this);
00180
00181
00182 url_ =
00183 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00184 getApplicationDescriptor()->getURN();
00185 class_ =getApplicationDescriptor()->getClassName();
00186 instance_=getApplicationDescriptor()->getInstance();
00187 sourceId_=class_.toString()+"_"+instance_.toString();
00188 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00189 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00190
00191 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00192
00193 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00194 applicationInfoSpace_ = ispace;
00195
00196
00197 ispace->fireItemAvailable("parameterSet", &configString_ );
00198 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00199 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00200 ispace->fireItemAvailable("runNumber", &runNumber_ );
00201 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00202
00203 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00204 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00205 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00206 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00207 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00208 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00209 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00210 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00211 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00212 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00213 ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
00214 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00215 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00216 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00217 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00218 ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
00219
00220
00221 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00222 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00223
00224
00225 fsm_.findRcmsStateListener();
00226
00227
00228
00229 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00230 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00231 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00232
00233 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00234 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00235 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00236
00237
00238 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00239 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00240 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00241 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00242 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00243
00244 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00245
00246 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00247 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00248 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00249
00250 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00251 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00252 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00253
00254
00255
00256 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00257 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00258
00259 evtProcessor_.setApplicationInfoSpace(ispace);
00260 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00261 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00262
00263
00264 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00265 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00266
00267
00268 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00269 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00270 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00271 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00272 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00273 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00274 xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
00275 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00276 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00277 xgi::bind(this, &FUEventProcessor::microState, "microState");
00278 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00279 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00280
00281
00282
00283 edm::AssertHandler ah;
00284
00285
00286 try{
00287 LOG4CPLUS_DEBUG(getApplicationLogger(),
00288 "Trying to create message service presence ");
00289 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00290 if(pf != 0) {
00291 messageServicePresence_= pf->makePresence("MessageServicePresence");
00292 }
00293 else {
00294 LOG4CPLUS_ERROR(getApplicationLogger(),
00295 "Unable to create message service presence ");
00296 }
00297 }
00298 catch(...) {
00299 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00300 }
00301 ML::MLlog4cplus::setAppl(this);
00302
00303 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00304 typedef AppDescSet_t::iterator AppDescIter_t;
00305
00306 AppDescSet_t rcms=
00307 getApplicationContext()->getDefaultZone()->
00308 getApplicationDescriptors("RCMSStateListener");
00309 if(rcms.size()==0)
00310 {
00311 LOG4CPLUS_WARN(getApplicationLogger(),
00312 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00313
00314 }
00315 else
00316 {
00317 AppDescIter_t it = rcms.begin();
00318 evtProcessor_.setRcms(*it);
00319 }
00320 pthread_mutex_init(&start_lock_,0);
00321 pthread_mutex_init(&stop_lock_,0);
00322 pthread_mutex_init(&pickup_lock_,0);
00323
00324 forkInfoObj_=nullptr;
00325 pthread_mutex_init(&forkObjLock_,0);
00326 makeStaticInfo();
00327 startSupervisorLoop();
00328
00329 if(vulture_==0) vulture_ = new Vulture(true);
00330
00332
00333 AppDescSet_t setOfiDie=
00334 getApplicationContext()->getDefaultZone()->
00335 getApplicationDescriptors("evf::iDie");
00336
00337 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00338 if ((*it)->getInstance()==0)
00339 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00340
00341
00342 getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00343
00344
00345 #ifdef linux
00346 if (sigmon_sem_==0) {
00347 sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00348 PROT_READ | PROT_WRITE,
00349 MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00350 if (!sigmon_sem_) {
00351 perror("mmap error\n");
00352 std::cout << "mmap error"<<std::endl;
00353 }
00354 else
00355 sem_init(sigmon_sem_,true,0);
00356 }
00357 #endif
00358 }
00359
00360
00361
00362
00363 FUEventProcessor::~FUEventProcessor()
00364 {
00365
00366
00367
00368 if(vulture_ != 0) delete vulture_;
00369 }
00370
00371
00373
00375
00376
00377
00378 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00379 {
00380
00381
00382
00383
00384
00385 unsigned short smap
00386 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00387 + (((instance_.value_%80)==0) ? 0x8 : 0)
00388 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00389 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00390 + (hasPrescaleService_.value_ ? 0x1 : 0);
00391 if(nbSubProcesses_.value_==0)
00392 {
00393 spMStates_.setSize(1);
00394 spmStates_.setSize(1);
00395 }
00396 else
00397 {
00398 spMStates_.setSize(nbSubProcesses_.value_);
00399 spmStates_.setSize(nbSubProcesses_.value_);
00400 for(unsigned int i = 0; i < spMStates_.size(); i++)
00401 {
00402 spMStates_[i] = edm::event_processor::sInit;
00403 spmStates_[i] = 0;
00404 }
00405 }
00406 try {
00407 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00408 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00409 epInitialized_=true;
00410 if(evtProcessor_)
00411 {
00412
00413 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00414 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00415
00416 configuration_ = evtProcessor_.configuration();
00417 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00418 evtProcessor_->beginJob();
00419 if(cpustat_) {delete cpustat_; cpustat_=0;}
00420 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00421 nbSubProcesses_.value_,
00422 instance_.value_,
00423 iDieUrl_.value_);
00424 if(ratestat_) {delete ratestat_; ratestat_=0;}
00425 ratestat_ = new RateStat(iDieUrl_.value_);
00426 if(iDieStatisticsGathering_.value_){
00427 try{
00428 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00429 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00430 if(legenda !=0){
00431 std::string slegenda = ((xdata::String*)legenda)->value_;
00432 ratestat_->sendLegenda(slegenda);
00433 }
00434
00435 }
00436 catch(evf::Exception &e){
00437 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00438 << e.what());
00439 }
00440 }
00441
00442 fsm_.fireEvent("ConfigureDone",this);
00443 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00444 localLog("-I- Configuration completed");
00445
00446 }
00447 }
00448 catch (xcept::Exception &e) {
00449 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00450 fsm_.fireFailed(reasonForFailedState_,this);
00451 localLog(reasonForFailedState_);
00452 }
00453 catch(cms::Exception &e) {
00454 reasonForFailedState_ = e.explainSelf();
00455 fsm_.fireFailed(reasonForFailedState_,this);
00456 localLog(reasonForFailedState_);
00457 }
00458 catch(std::exception &e) {
00459 reasonForFailedState_ = e.what();
00460 fsm_.fireFailed(reasonForFailedState_,this);
00461 localLog(reasonForFailedState_);
00462 }
00463 catch(...) {
00464 fsm_.fireFailed("Unknown Exception",this);
00465 }
00466
00467 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00468
00469 return false;
00470 }
00471
00472
00473
00474
00475
00476 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00477 {
00478 nbTotalDQM_ = 0;
00479 scalersUpdates_ = 0;
00480 idleProcStats_ = 0;
00481 allProcStats_ = 0;
00482
00483
00484
00485
00486
00487 unsigned short smap
00488 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00489 + (((instance_.value_%80)==0) ? 0x8 : 0)
00490 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00491 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00492 + (hasPrescaleService_.value_ ? 0x1 : 0);
00493
00494 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00495
00496
00497 if (rlimit_coresize_changed_)
00498 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00499 rlimit_coresize_changed_=false;
00500 crashesThisRun_=0;
00501
00502 sem_destroy(sigmon_sem_);
00503 sem_init(sigmon_sem_,true,0);
00504
00505 if(!epInitialized_){
00506 evtProcessor_.forceInitEventProcessorMaybe();
00507 }
00508 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00509
00510 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00511 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00512
00513 if(!epInitialized_){
00514 evtProcessor_->beginJob();
00515 if(cpustat_) {delete cpustat_; cpustat_=0;}
00516 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00517 nbSubProcesses_.value_,
00518 instance_.value_,
00519 iDieUrl_.value_);
00520 if(ratestat_) {delete ratestat_; ratestat_=0;}
00521 ratestat_ = new RateStat(iDieUrl_.value_);
00522 if(iDieStatisticsGathering_.value_){
00523 try
00524 {
00525 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00526 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00527 if(legenda !=0)
00528 {
00529 std::string slegenda = ((xdata::String*)legenda)->value_;
00530 ratestat_->sendLegenda(slegenda);
00531 }
00532 }
00533 catch(evf::Exception &e)
00534 {
00535 LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00536 << e.what());
00537 }
00538 catch (xcept::Exception& e) {
00539 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00540 << e.what());
00541 }
00542 }
00543 epInitialized_ = true;
00544 }
00545 configuration_ = evtProcessor_.configuration();
00546 evtProcessor_.resetLumiSectionReferenceIndex();
00547
00548 if(nbSubProcesses_.value_==0) return enableClassic();
00549
00550 pthread_mutex_lock(&start_lock_);
00551 pthread_mutex_lock(&pickup_lock_);
00552 subs_.clear();
00553 subs_.resize(nbSubProcesses_.value_);
00554 pid_t retval = -1;
00555
00556 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00557 {
00558 subs_[i]=SubProcess(i,retval);
00559 }
00560 pthread_mutex_unlock(&pickup_lock_);
00561
00562 pthread_mutex_unlock(&start_lock_);
00563
00564
00565 try {
00566 if (sorRef_) {
00567 unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00568 if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;
00569 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00570 for (unsigned int i=0;i<shmOutputs.size();i++) {
00571 shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00572 }
00573 }
00574 }
00575 catch (...)
00576 {
00577 LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00578 }
00579
00580
00581 edm_init_done_=true;
00582 if (forkInEDM_.value_) {
00583 edm_init_done_=false;
00584 enableForkInEDM();
00585 }
00586 else
00587 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00588 {
00589 retval = subs_[i].forkNew();
00590 if(retval==0)
00591 {
00592 myProcess_ = &subs_[i];
00593
00594 delete toolbox::mem::_s_mutex_ptr_;
00595 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00596 int retval = pthread_mutex_destroy(&stop_lock_);
00597 if(retval != 0) perror("error");
00598 retval = pthread_mutex_init(&stop_lock_,0);
00599 if(retval != 0) perror("error");
00600 fsm_.disableRcmsStateNotification();
00601
00602 return enableMPEPSlave();
00603
00604 }
00605 }
00606
00607 if (forkInEDM_.value_) {
00608
00609 edm::event_processor::State st;
00610 while (!edm_init_done_) {
00611 usleep(10000);
00612 st = evtProcessor_->getState();
00613 if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00614 }
00615 st = evtProcessor_->getState();
00616
00617 if (st!=edm::event_processor::sRunning) {
00618 reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00619 localLog(reasonForFailedState_);
00620 fsm_.fireFailed(reasonForFailedState_,this);
00621 return false;
00622 }
00623
00624 sleep(1);
00625 startSummarizeWorkLoop();
00626 startSignalMonitorWorkLoop();
00627 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00628
00629
00630 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00631 fsm_.fireEvent("EnableDone",this);
00632 localLog("-I- Start completed");
00633 return false;
00634 }
00635
00636 startSummarizeWorkLoop();
00637 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00638 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00639 fsm_.fireEvent("EnableDone",this);
00640 localLog("-I- Start completed");
00641 return false;
00642 }
00643
00644 bool FUEventProcessor::doEndRunInEDM() {
00645
00646 if (forkInfoObj_) {
00647
00648 int count = 30;
00649 bool waitedForEDM=false;
00650 while (!edm_init_done_ && count) {
00651 ::sleep(1);
00652 if (count%5==0)
00653 LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00654 count--;
00655 waitedForEDM=true;
00656 }
00657
00658 if (waitedForEDM) sleep(5);
00659
00660
00661
00662 if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00663 return true;
00664
00665 forkInfoObj_->lock();
00666 forkInfoObj_->stopCondition=true;
00667 sem_post(forkInfoObj_->control_sem_);
00668 forkInfoObj_->unlock();
00669
00670 count = 30;
00671
00672 edm::event_processor::State st;
00673 while(count--) {
00674 st = evtProcessor_->getState();
00675 if (st==edm::event_processor::sRunning) ::usleep(100000);
00676 else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00677 break;
00678 }
00679 else {
00680 std::ostringstream ost;
00681 ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00682 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00683 fsm_.fireFailed(ost.str(),this);
00684 return false;
00685 }
00686 if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00687 forkInfoObj_->lock();
00688 forkInfoObj_->stopCondition=true;
00689 sem_post(forkInfoObj_->control_sem_);
00690 forkInfoObj_->unlock();
00691 LOG4CPLUS_WARN(getApplicationLogger(),
00692 "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00693 }
00694 }
00695 if (count<0) {
00696 std::ostringstream ost;
00697 if (!forkInfoObj_->receivedStop_)
00698 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
00699 << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00700 else
00701 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00702 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00703 fsm_.fireFailed(ost.str(),this);
00704 return false;
00705 }
00706 }
00707 return true;
00708 }
00709
00710
00711 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00712 {
00713 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00714 if(nbSubProcesses_.value_!=0) {
00715 stopSlavesAndAcknowledge();
00716 if (forkInEDM_.value_) {
00717
00718 sem_post(sigmon_sem_);
00719 doEndRunInEDM();
00720 }
00721 }
00722 vulture_->stop();
00723
00724 if (forkInEDM_.value_) {
00725
00726 bool tmpHasShMem_=hasShMem_;
00727 hasShMem_=false;
00728 stopClassic();
00729 hasShMem_=tmpHasShMem_;
00730 return false;
00731 }
00732 stopClassic();
00733 return false;
00734 }
00735
00736
00737
00738 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00739 {
00740 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00741 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00742 if(nbSubProcesses_.value_!=0) {
00743 stopSlavesAndAcknowledge();
00744 if (forkInEDM_.value_) {
00745 sem_post(sigmon_sem_);
00746 doEndRunInEDM();
00747 }
00748 }
00749 try{
00750 evtProcessor_.stopAndHalt();
00751
00752 if (forkInfoObj_) {
00753 delete forkInfoObj_;
00754 forkInfoObj_=0;
00755 }
00756 }
00757 catch (evf::Exception &e) {
00758 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00759 localLog(reasonForFailedState_);
00760 fsm_.fireFailed(reasonForFailedState_,this);
00761 }
00762
00763
00764 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00765 fsm_.fireEvent("HaltDone",this);
00766
00767 localLog("-I- Halt completed");
00768 return false;
00769 }
00770
00771
00772
00773 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00774 throw (xoap::exception::Exception)
00775 {
00776 return fsm_.commandCallback(msg);
00777 }
00778
00779
00780
00781 void FUEventProcessor::actionPerformed(xdata::Event& e)
00782 {
00783
00784 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00785 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00786
00787 if ( item == "parameterSet") {
00788 LOG4CPLUS_WARN(getApplicationLogger(),
00789 "HLT Menu changed, force reinitialization of EventProcessor");
00790 epInitialized_ = false;
00791 }
00792 if ( item == "outputEnabled") {
00793 if(outprev_ != outPut_) {
00794 LOG4CPLUS_WARN(getApplicationLogger(),
00795 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00796 evtProcessor_->enableEndPaths(outPut_);
00797 outprev_ = outPut_;
00798 }
00799 }
00800 if (item == "globalInputPrescale") {
00801 LOG4CPLUS_WARN(getApplicationLogger(),
00802 "Setting global input prescale has no effect "
00803 <<"in this version of the code");
00804 }
00805 if ( item == "globalOutputPrescale") {
00806 LOG4CPLUS_WARN(getApplicationLogger(),
00807 "Setting global output prescale has no effect "
00808 <<"in this version of the code");
00809 }
00810 }
00811
00812 }
00813
00814
00815 void FUEventProcessor::getSlavePids(xgi::Input *in, xgi::Output *out)
00816 {
00817 for (unsigned int i=0;i<subs_.size();i++)
00818 {
00819 if (i!=0) *out << ",";
00820 *out << subs_[i].pid();
00821 }
00822 }
00823
00824 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00825 {
00826 using namespace cgicc;
00827 pid_t pid = 0;
00828 std::ostringstream ost;
00829 ost << "&";
00830
00831 Cgicc cgi(in);
00832 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00833 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00834 mycgi->getEnvironment().begin();
00835 mit != mycgi->getEnvironment().end(); mit++)
00836 ost << mit->first << "%" << mit->second << ";";
00837 std::vector<FormEntry> els = cgi.getElements() ;
00838 std::vector<FormEntry> el1;
00839 cgi.getElement("method",el1);
00840 std::vector<FormEntry> el2;
00841 cgi.getElement("process",el2);
00842 if(el1.size()!=0) {
00843 std::string meth = el1[0].getValue();
00844 if(el2.size()!=0) {
00845 unsigned int i = 0;
00846 std::string mod = el2[0].getValue();
00847 pid = atoi(mod.c_str());
00848 for(; i < subs_.size(); i++)
00849 if(subs_[i].pid()==pid) break;
00850 if(i>=subs_.size()){
00851 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00852 return;
00853 }
00854 if(subs_[i].alive() != 1){
00855 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00856 return;
00857 }
00858 MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00859 strncpy(msg1->mtext,meth.c_str(),meth.length());
00860 strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00861 subs_[i].post(msg1,true);
00862 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00863 superSleepSec_.value_=10*keep_supersleep_original_value;
00864 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00865 bool done = false;
00866 std::vector<char *>pieces;
00867 while(!done){
00868 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00869 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00870 ::sleep(1);
00871 continue;
00872 }
00873 unsigned int nbytes = atoi(msg->mtext);
00874 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00875 char *buf= new char[nbytes];
00876 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00877 if(retval!=nbytes) std::cout
00878 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00879 pieces.push_back(buf);
00880 }
00881 superSleepSec_.value_=keep_supersleep_original_value;
00882 for(unsigned int j = 0; j < pieces.size(); j++){
00883 *out<<pieces[j];
00884 delete[] pieces[j];
00885 }
00886 }
00887 }
00888 }
00889
00890
00891
00892 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00893 throw (xgi::exception::Exception)
00894 {
00895
00896
00897 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00898 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00899 << getApplicationDescriptor()->getInstance() << "</title>"
00900 << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00901 << "</head></html>";
00902 }
00903
00904
00905
00906
00907
00908 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00909 throw (xgi::exception::Exception)
00910 {
00911
00912 std::string urn = getApplicationDescriptor()->getURN();
00913
00914 *out << "<!-- base href=\"/" << urn
00915 << "\"> -->" << std::endl;
00916 *out << "<html>" << std::endl;
00917 *out << "<head>" << std::endl;
00918 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00919 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00920 *out << "<title>" << getApplicationDescriptor()->getClassName()
00921 << getApplicationDescriptor()->getInstance()
00922 << " MAIN</title>" << std::endl;
00923 *out << "</head>" << std::endl;
00924 *out << "<body>" << std::endl;
00925 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00926 *out << "<tr>" << std::endl;
00927 *out << " <td align=\"left\">" << std::endl;
00928 *out << " <img" << std::endl;
00929 *out << " align=\"middle\"" << std::endl;
00930 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00931 *out << " alt=\"main\"" << std::endl;
00932 *out << " width=\"64\"" << std::endl;
00933 *out << " height=\"64\"" << std::endl;
00934 *out << " border=\"\"/>" << std::endl;
00935 *out << " <b>" << std::endl;
00936 *out << getApplicationDescriptor()->getClassName()
00937 << getApplicationDescriptor()->getInstance() << std::endl;
00938 *out << " " << fsm_.stateName()->toString() << std::endl;
00939 *out << " </b>" << std::endl;
00940 *out << " </td>" << std::endl;
00941 *out << " <td width=\"32\">" << std::endl;
00942 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00943 *out << " <img" << std::endl;
00944 *out << " align=\"middle\"" << std::endl;
00945 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00946 *out << " alt=\"HyperDAQ\"" << std::endl;
00947 *out << " width=\"32\"" << std::endl;
00948 *out << " height=\"32\"" << std::endl;
00949 *out << " border=\"\"/>" << std::endl;
00950 *out << " </a>" << std::endl;
00951 *out << " </td>" << std::endl;
00952 *out << " <td width=\"32\">" << std::endl;
00953 *out << " </td>" << std::endl;
00954 *out << " <td width=\"32\">" << std::endl;
00955 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00956 *out << " <img" << std::endl;
00957 *out << " align=\"middle\"" << std::endl;
00958 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00959 *out << " alt=\"main\"" << std::endl;
00960 *out << " width=\"32\"" << std::endl;
00961 *out << " height=\"32\"" << std::endl;
00962 *out << " border=\"\"/>" << std::endl;
00963 *out << " </a>" << std::endl;
00964 *out << " </td>" << std::endl;
00965 *out << "</tr>" << std::endl;
00966 *out << "</table>" << std::endl;
00967
00968 *out << "<hr/>" << std::endl;
00969
00970 std::ostringstream ost;
00971 if(myProcess_)
00972 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00973 else
00974 ost << "/moduleWeb?";
00975 urn += ost.str();
00976 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00977 evtProcessor_.taskWebPage(in,out,urn);
00978 else if(evtProcessor_)
00979 evtProcessor_.summaryWebPage(in,out,urn);
00980 else
00981 *out << "<td>HLT Unconfigured</td>" << std::endl;
00982 *out << "</table>" << std::endl;
00983
00984 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
00985 *out << configuration_ << std::endl;
00986 *out << "</textarea><P>" << std::endl;
00987
00988 *out << "</body>" << std::endl;
00989 *out << "</html>" << std::endl;
00990
00991
00992 }
00993 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
00994 throw (xgi::exception::Exception)
00995 {
00996
00997 out->getHTTPResponseHeader().addHeader( "Content-Type",
00998 "application/octet-stream" );
00999 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
01000 "binary" );
01001 if(evtProcessor_ != 0){
01002 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
01003 }
01004 }
01005
01006 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
01007 throw (xgi::exception::Exception)
01008 {
01009
01010 if(evtProcessor_ != 0){
01011 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01012 if(legenda !=0){
01013 std::string slegenda = ((xdata::String*)legenda)->value_;
01014 *out << slegenda << std::endl;
01015 }
01016 }
01017 }
01018
01019
01020 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)
01021 {
01022 std::string errmsg;
01023 try {
01024 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01025 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01026 edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01027 }
01028 catch (cms::Exception& e) {
01029 errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01030 }
01031 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01032 }
01033
01034
01035 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
01036 {
01037 std::string errmsg;
01038 bool success = false;
01039 try {
01040 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01041 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01042 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01043 if (!success) errmsg = "Failed to attach DQM service to shared memory";
01044 }
01045 catch (cms::Exception& e) {
01046 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01047 }
01048 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01049 }
01050
01051
01052
01053 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01054 {
01055 std::string errmsg;
01056 bool success = false;
01057 try {
01058 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01059 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01060 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01061 if (!success) errmsg = "Failed to detach DQM service from shared memory";
01062 }
01063 catch (cms::Exception& e) {
01064 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01065 }
01066 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01067 }
01068
01069
01070 std::string FUEventProcessor::logsAsString()
01071 {
01072 std::ostringstream oss;
01073 if(logWrap_)
01074 {
01075 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01076 oss << logRing_[i] << std::endl;
01077 for(unsigned int i = 0; i < logRingIndex_; i++)
01078 oss << logRing_[i] << std::endl;
01079 }
01080 else
01081 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01082 oss << logRing_[i] << std::endl;
01083
01084 return oss.str();
01085 }
01086
01087 void FUEventProcessor::localLog(std::string m)
01088 {
01089 timeval tv;
01090
01091 gettimeofday(&tv,0);
01092 tm *uptm = localtime(&tv.tv_sec);
01093 char datestring[256];
01094 strftime(datestring, sizeof(datestring),"%c", uptm);
01095
01096 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01097 logRingIndex_--;
01098 std::ostringstream timestamp;
01099 timestamp << " at " << datestring;
01100 m += timestamp.str();
01101 logRing_[logRingIndex_] = m;
01102 }
01103
01104 void FUEventProcessor::startSupervisorLoop()
01105 {
01106 try {
01107 wlSupervising_=
01108 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01109 "waiting");
01110 if (!wlSupervising_->isActive()) wlSupervising_->activate();
01111 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01112 "Supervisor");
01113 wlSupervising_->submit(asSupervisor_);
01114 supervising_ = true;
01115 }
01116 catch (xcept::Exception& e) {
01117 std::string msg = "Failed to start workloop 'Supervisor'.";
01118 XCEPT_RETHROW(evf::Exception,msg,e);
01119 }
01120 }
01121
01122 void FUEventProcessor::startReceivingLoop()
01123 {
01124 try {
01125 wlReceiving_=
01126 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01127 "waiting");
01128 if (!wlReceiving_->isActive()) wlReceiving_->activate();
01129 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01130 "Receiving");
01131 wlReceiving_->submit(asReceiveMsgAndExecute_);
01132 receiving_ = true;
01133 }
01134 catch (xcept::Exception& e) {
01135 std::string msg = "Failed to start workloop 'Receiving'.";
01136 XCEPT_RETHROW(evf::Exception,msg,e);
01137 }
01138 }
01139 void FUEventProcessor::startReceivingMonitorLoop()
01140 {
01141 try {
01142 wlReceivingMonitor_=
01143 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01144 "waiting");
01145 if (!wlReceivingMonitor_->isActive())
01146 wlReceivingMonitor_->activate();
01147 asReceiveMsgAndRead_ =
01148 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01149 "ReceivingM");
01150 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01151 receivingM_ = true;
01152 }
01153 catch (xcept::Exception& e) {
01154 std::string msg = "Failed to start workloop 'ReceivingM'.";
01155 XCEPT_RETHROW(evf::Exception,msg,e);
01156 }
01157 }
01158
01159 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01160 {
01161 MsgBuf msg;
01162 try{
01163 myProcess_->rcvSlave(msg,false);
01164 if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01165 {
01166 rlimit rl;
01167 getrlimit(RLIMIT_CORE,&rl);
01168 rl.rlim_cur=0;
01169 setrlimit(RLIMIT_CORE,&rl);
01170 rlimit_coresize_changed_=true;
01171 }
01172 if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01173 {
01174
01175 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01176 rlimit_coresize_changed_=false;
01177 }
01178 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01179 {
01180 pthread_mutex_lock(&stop_lock_);
01181 try {
01182 fsm_.fireEvent("Stop",this);
01183 }
01184 catch (...) {
01185 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01186 << getpid() << " The state on Stop event was not consistent");
01187 }
01188
01189 try {
01190 stopClassic();
01191 }
01192 catch (...) {
01193 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
01194 }
01195
01196
01197 try{
01198 messageServicePresence_.reset();
01199 }
01200 catch(...) {
01201 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
01202 }
01203
01204 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01205 myProcess_->postSlave(msg1,false);
01206 pthread_mutex_unlock(&stop_lock_);
01207 fclose(stdout);
01208 fclose(stderr);
01209 _exit(EXIT_SUCCESS);
01210 }
01211 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01212 _exit(EXIT_SUCCESS);
01213 }
01214 catch(evf::Exception &e){
01215 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
01216 }
01217 return true;
01218 }
01219
01220 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01221 {
01222 pthread_mutex_lock(&stop_lock_);
01223 if(subs_.size()!=nbSubProcesses_.value_)
01224 {
01225 pthread_mutex_lock(&pickup_lock_);
01226 if(subs_.size()!=nbSubProcesses_.value_) {
01227 subs_.resize(nbSubProcesses_.value_);
01228 spMStates_.resize(nbSubProcesses_.value_);
01229 spmStates_.resize(nbSubProcesses_.value_);
01230 for(unsigned int i = 0; i < spMStates_.size(); i++)
01231 {
01232 spMStates_[i] = edm::event_processor::sInit;
01233 spmStates_[i] = 0;
01234 }
01235 }
01236 pthread_mutex_unlock(&pickup_lock_);
01237 }
01238 bool running = fsm_.stateName()->toString()=="Enabled";
01239 bool stopping = fsm_.stateName()->toString()=="stopping";
01240 for(unsigned int i = 0; i < subs_.size(); i++)
01241 {
01242 if(subs_[i].alive()==-1000) continue;
01243 int sl;
01244 pid_t sub_pid = subs_[i].pid();
01245 pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01246
01247 if(killedOrNot && killedOrNot==sub_pid) {
01248 pthread_mutex_lock(&pickup_lock_);
01249
01250 if (i<subs_.size() && subs_[i].alive()!=-1000) {
01251 subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01252 std::ostringstream ost;
01253 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01254 else if(WIFSIGNALED(sl)!=0) {
01255 ost << " process terminated with signal " << WTERMSIG(sl);
01256 }
01257 else ost << " process stopped ";
01258
01259
01260
01261
01262 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01263 subs_[i].setReasonForFailed(ost.str());
01264 spMStates_[i] = evtProcessor_.notstarted_state_code();
01265 spmStates_[i] = 0;
01266 std::ostringstream ost1;
01267 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01268 localLog(ost1.str());
01269 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01270 }
01271 pthread_mutex_unlock(&pickup_lock_);
01272 }
01273 }
01274 pthread_mutex_unlock(&stop_lock_);
01275 if(stopping) return true;
01276
01277
01278 if (running && rlimit_coresize_changed_) {
01279 timeval newtv;
01280 gettimeofday(&newtv,0);
01281 int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01282 if (delta>60*15) {
01283 std::ostringstream ostr;
01284 ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01285 std::cout << ostr.str() << std::endl;
01286 LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01287 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01288 MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01289 for (unsigned int i = 0; i < subs_.size(); i++) {
01290 try {
01291 if (subs_[i].alive())
01292 subs_[i].post(master_message_rlr_,false);
01293 }
01294 catch (...) {}
01295 }
01296 rlimit_coresize_changed_=false;
01297 crashesThisRun_=0;
01298 }
01299 }
01300
01301 if(running && edm_init_done_)
01302 {
01303
01304
01305 if(autoRestartSlaves_.value_){
01306 pthread_mutex_lock(&stop_lock_);
01307 for(unsigned int i = 0; i < subs_.size(); i++)
01308 {
01309 if(subs_[i].alive() != 1){
01310 if(subs_[i].countdown() == 0)
01311 {
01312 if(subs_[i].restartCount()>2){
01313 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
01314 << " - maximum restart count reached");
01315 std::ostringstream ost1;
01316 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
01317 localLog(ost1.str());
01318 subs_[i].countdown()--;
01319 XCEPT_DECLARE(evf::Exception,
01320 sentinelException, ost1.str());
01321 notifyQualified("error",sentinelException);
01322 subs_[i].disconnect();
01323 continue;
01324 }
01325 subs_[i].restartCount()++;
01326 if (forkInEDM_.value_) {
01327 restartForkInEDM(i);
01328 }
01329 else {
01330 pid_t rr = subs_[i].forkNew();
01331 if(rr==0)
01332 {
01333 myProcess_=&subs_[i];
01334 scalersUpdates_ = 0;
01335 int retval = pthread_mutex_destroy(&stop_lock_);
01336 if(retval != 0) perror("error");
01337 retval = pthread_mutex_init(&stop_lock_,0);
01338 if(retval != 0) perror("error");
01339 fsm_.disableRcmsStateNotification();
01340 fsm_.fireEvent("Stop",this);
01341 fsm_.fireEvent("StopDone",this);
01342 fsm_.fireEvent("Enable",this);
01343 try{
01344 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01345 if(lsid) {
01346 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01347 }
01348 }
01349 catch(...){
01350 std::cout << "trouble with lsindex during restart" << std::endl;
01351 }
01352 try{
01353 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01354 if(lstb) {
01355 ((xdata::Boolean*)(lstb))->value_ = false;
01356 }
01357 }
01358 catch(...){
01359 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01360 }
01361
01362 evtProcessor_.adjustLsIndexForRestart();
01363 evtProcessor_.resetPackedTriggerReport();
01364 enableMPEPSlave();
01365 return false;
01366 }
01367 else
01368 {
01369 std::ostringstream ost1;
01370 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01371 localLog(ost1.str());
01372 }
01373 }
01374 }
01375 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01376 }
01377 }
01378 pthread_mutex_unlock(&stop_lock_);
01379 }
01380 }
01381 xdata::Serializable *lsid = 0;
01382 xdata::Serializable *psid = 0;
01383 xdata::Serializable *dqmp = 0;
01384 xdata::UnsignedInteger32 *dqm = 0;
01385
01386
01387
01388 if(running && edm_init_done_){
01389 try{
01390 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01391 psid = applicationInfoSpace_->find("prescaleSetIndex");
01392 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01393 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01394 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01395 }
01396 catch(xdata::exception::Exception e){
01397 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01398 }
01399
01400 try{
01401 if(nbProcessed !=0 && nbAccepted !=0)
01402 {
01403 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01404 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01405 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01406 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01407 if(dqmp!=0)
01408 dqm = (xdata::UnsignedInteger32*)dqmp;
01409 if(dqm) dqm->value_ = 0;
01410 nbTotalDQM_ = 0;
01411 nbp->value_ = 0;
01412 nba->value_ = 0;
01413 nblive_ = 0;
01414 nbdead_ = 0;
01415 scalersUpdates_ = 0;
01416
01417 for(unsigned int i = 0; i < subs_.size(); i++)
01418 {
01419 if(subs_[i].alive()>0)
01420 {
01421 nblive_++;
01422 try{
01423 subs_[i].post(master_message_prg_,true);
01424
01425 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01426 if(retval == (unsigned long) master_message_prr_->mtype){
01427 prg* p = (struct prg*)(master_message_prr_->mtext);
01428 subs_[i].setParams(p);
01429 spMStates_[i] = p->Ms;
01430 spmStates_[i] = p->ms;
01431 cpustat_->addEntry(p->ms);
01432 if(!subs_[i].inInconsistentState() &&
01433 (p->Ms == edm::event_processor::sError
01434 || p->Ms == edm::event_processor::sInvalid
01435 || p->Ms == edm::event_processor::sStopping))
01436 {
01437 std::ostringstream ost;
01438 ost << "edm::eventprocessor slot " << i << " process id "
01439 << subs_[i].pid() << " not in Running state : Mstate="
01440 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01441 << evtProcessor_.moduleNameFromIndex(p->ms)
01442 << " - Look into possible error messages from HLT process";
01443 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01444 }
01445 nbp->value_ += subs_[i].params().nbp;
01446 nba->value_ += subs_[i].params().nba;
01447 if(dqm)dqm->value_ += p->dqm;
01448 nbTotalDQM_ += p->dqm;
01449 scalersUpdates_ += p->trp;
01450 if(p->ls > ls->value_) ls->value_ = p->ls;
01451 if(p->ps != ps->value_) ps->value_ = p->ps;
01452 }
01453 else{
01454 nbp->value_ += subs_[i].get_save_nbp();
01455 nba->value_ += subs_[i].get_save_nba();
01456 }
01457 }
01458 catch(evf::Exception &e){
01459 LOG4CPLUS_INFO(getApplicationLogger(),
01460 "could not send/receive msg on slot "
01461 << i << " - " << e.what());
01462 }
01463
01464 }
01465 else
01466 {
01467 nbp->value_ += subs_[i].get_save_nbp();
01468 nba->value_ += subs_[i].get_save_nba();
01469 nbdead_++;
01470 }
01471 }
01472 if(nbp->value_>64){
01473 for(unsigned int i = 0; i < subs_.size(); i++)
01474 {
01475 if(subs_[i].params().nbp == 0){
01476
01477 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01478 {
01479 subs_[i].found_invalid();
01480 if(subs_[i].nfound_invalid() > 60){
01481 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01482 subs_[i].post(msg3,false);
01483 std::ostringstream ost1;
01484 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01485 localLog(ost1.str());
01486 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01487 XCEPT_DECLARE(evf::Exception,
01488 sentinelException, ost1.str());
01489 notifyQualified("error",sentinelException);
01490
01491 }
01492 }
01493 }
01494 }
01495 }
01496 }
01497 }
01498 catch(std::exception &e){
01499 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01500 }
01501 catch(...){
01502 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01503 }
01504 }
01505 else{
01506 for(unsigned int i = 0; i < subs_.size(); i++)
01507 {
01508 if(subs_[i].alive()==-1000)
01509 {
01510 spMStates_[i] = edm::event_processor::sInit;
01511 spmStates_[i] = 0;
01512 }
01513 }
01514 }
01515 try{
01516 monitorInfoSpace_->lock();
01517 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01518 monitorInfoSpace_->unlock();
01519 }
01520 catch(xdata::exception::Exception &e)
01521 {
01522 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01523
01524 }
01525 ::sleep(superSleepSec_.value_);
01526 return true;
01527 }
01528
01529 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01530 {
01531 try {
01532 wlScalers_=
01533 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01534 "waiting");
01535 if (!wlScalers_->isActive()) wlScalers_->activate();
01536 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01537 "Scalers");
01538
01539 wlScalers_->submit(asScalers_);
01540 wlScalersActive_ = true;
01541 }
01542 catch (xcept::Exception& e) {
01543 std::string msg = "Failed to start workloop 'Scalers'.";
01544 XCEPT_RETHROW(evf::Exception,msg,e);
01545 }
01546 }
01547
01548
01549
01550 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01551 {
01552 try {
01553 wlSummarize_=
01554 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01555 "waiting");
01556 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01557
01558 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01559 "Summary");
01560
01561 wlSummarize_->submit(asSummarize_);
01562 wlSummarizeActive_ = true;
01563 }
01564 catch (xcept::Exception& e) {
01565 std::string msg = "Failed to start workloop 'Summarize'.";
01566 XCEPT_RETHROW(evf::Exception,msg,e);
01567 }
01568 }
01569
01570
01571
01572 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01573 {
01574 if(evtProcessor_)
01575 {
01576 if(!evtProcessor_.getTriggerReport(true)) {
01577 wlScalersActive_ = false;
01578 return false;
01579 }
01580 }
01581 else
01582 {
01583 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01584 wlScalersActive_ = false;
01585 return false;
01586 }
01587 if(myProcess_)
01588 {
01589
01590 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01591 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01592 scalersUpdates_++;
01593 }
01594 else
01595 evtProcessor_.fireScalersUpdate();
01596 return true;
01597 }
01598
01599
01600 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01601 {
01602 evtProcessor_.resetPackedTriggerReport();
01603 bool atLeastOneProcessUpdatedSuccessfully = false;
01604 int msgCount = 0;
01605 for (unsigned int i = 0; i < subs_.size(); i++)
01606 {
01607 if(subs_[i].alive()>0)
01608 {
01609 int ret = 0;
01610 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01611 evtProcessor_.getLumiSectionReferenceIndex()))
01612 {
01613 ret = MSQS_MESSAGE_TYPE_TRR;
01614 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01615 }
01616 else{
01617 bool insync = false;
01618 bool exception_caught = false;
01619 while(!insync){
01620 try{
01621 ret = subs_[i].rcv(master_message_trr_,false);
01622 }
01623 catch(evf::Exception &e)
01624 {
01625 std::cout << "exception in msgrcv on " << i
01626 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01627 exception_caught = true;
01628 break;
01629
01630 }
01631 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01632 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01633 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01634 insync = true;
01635 }
01636 }
01637 }
01638 if(exception_caught) continue;
01639 }
01640 msgCount++;
01641 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01642 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01643 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01644 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01645 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01646 subs_[i].add_postponed_trigger_update(master_message_trr_);
01647 }else{
01648 atLeastOneProcessUpdatedSuccessfully = true;
01649 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01650 }
01651 }
01652 else std::cout << "msgrcv returned error " << errno << std::endl;
01653 }
01654 }
01655 if(atLeastOneProcessUpdatedSuccessfully){
01656 nbSubProcessesReporting_.value_ = msgCount;
01657 evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01658 evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01659 evtProcessor_.updateRollingReport();
01660 evtProcessor_.fireScalersUpdate();
01661 }
01662 else{
01663 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01664 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01665 nbSubProcessesReporting_.value_ = 0;
01666 ::sleep(10);
01667 }
01668 if(fsm_.stateName()->toString()!="Enabled"){
01669 wlScalersActive_ = false;
01670 return false;
01671 }
01672
01673 if(iDieStatisticsGathering_.value_){
01674 try{
01675 unsigned long long idleTmp=idleProcStats_;
01676 unsigned long long allPSTmp=allProcStats_;
01677 idleProcStats_=allProcStats_=0;
01678
01679 utils::procCpuStat(idleProcStats_,allProcStats_);
01680 timeval oldtime=lastProcReport_;
01681 gettimeofday(&lastProcReport_,0);
01682
01683 if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01684 cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01685 int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
01686 + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
01687 cpustat_->setElapsed(deltaTms);
01688 }
01689 else {
01690 cpustat_->setCPUStat(0);
01691 cpustat_->setElapsed(0);
01692 }
01693
01694 TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01695 cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01696 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01697 ratestat_->sendStat((unsigned char*)trsp,
01698 sizeof(TriggerReportStatic),
01699 evtProcessor_.getLumiSectionReferenceIndex());
01700 }catch(evf::Exception &e){
01701 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01702 << e.what());
01703 }
01704 }
01705 cpustat_->reset();
01706 return true;
01707 }
01708
01709
01710
01711 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01712 {
01713 try{
01714 myProcess_->rcvSlave(slave_message_monitoring_,true);
01715 switch(slave_message_monitoring_->mtype)
01716 {
01717 case MSQM_MESSAGE_TYPE_MCS:
01718 {
01719 xgi::Input *in = 0;
01720 xgi::Output out;
01721 evtProcessor_.microState(in,&out);
01722 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01723 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01724 myProcess_->postSlave(msg1,true);
01725 break;
01726 }
01727
01728 case MSQM_MESSAGE_TYPE_PRG:
01729 {
01730 xdata::Serializable *dqmp = 0;
01731 xdata::UnsignedInteger32 *dqm = 0;
01732 evtProcessor_.monitoring(0);
01733 try{
01734 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01735 } catch(xdata::exception::Exception e){}
01736 if(dqmp!=0)
01737 dqm = (xdata::UnsignedInteger32*)dqmp;
01738
01739
01740 prg * data = (prg*)slave_message_prr_->mtext;
01741 data->ls = evtProcessor_.lsid_;
01742 data->eols = evtProcessor_.lastLumiUsingEol_;
01743 data->ps = evtProcessor_.psid_;
01744 data->nbp = evtProcessor_->totalEvents();
01745 data->nba = evtProcessor_->totalEventsPassed();
01746 data->Ms = evtProcessor_.epMAltState_.value_;
01747 data->ms = evtProcessor_.epmAltState_.value_;
01748 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01749 data->trp = scalersUpdates_;
01750
01751 myProcess_->postSlave(slave_message_prr_,true);
01752 if(exitOnError_.value_)
01753 {
01754
01755
01756 if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError)
01757 {
01758 bool running = true;
01759 int count = 0;
01760 while(running){
01761 int retval = pthread_mutex_lock(&stop_lock_);
01762 if(retval != 0) perror("error");
01763 running = fsm_.stateName()->toString()=="Enabled";
01764 if(count>5) _exit(-1);
01765 pthread_mutex_unlock(&stop_lock_);
01766 if(running) {::sleep(1); count++;}
01767 }
01768 }
01769
01770 }
01771
01772 break;
01773 }
01774 case MSQM_MESSAGE_TYPE_WEB:
01775 {
01776 xgi::Input *in = 0;
01777 xgi::Output out;
01778 unsigned int bytesToSend = 0;
01779 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01780 std::string query = slave_message_monitoring_->mtext;
01781 size_t pos = query.find_first_of("&");
01782 std::string method;
01783 std::string args;
01784 if(pos!=std::string::npos)
01785 {
01786 method = query.substr(0,pos);
01787 args = query.substr(pos+1,query.length()-pos-1);
01788 }
01789 else
01790 method=query;
01791
01792 if(method=="Spotlight")
01793 {
01794 spotlightWebPage(in,&out);
01795 }
01796 else if(method=="procStat")
01797 {
01798 procStat(in,&out);
01799 }
01800 else if(method=="moduleWeb")
01801 {
01802 internal::MyCgi mycgi;
01803 boost::char_separator<char> sep(";");
01804 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01805 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01806 tok_iter != tokens.end(); ++tok_iter){
01807 size_t pos = (*tok_iter).find_first_of("%");
01808 if(pos != std::string::npos){
01809 std::string first = (*tok_iter).substr(0 , pos);
01810 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01811 mycgi.getEnvironment()[first]=second;
01812 }
01813 }
01814 moduleWeb(&mycgi,&out);
01815 }
01816 else if(method=="Default")
01817 {
01818 defaultWebPage(in,&out);
01819 }
01820 else
01821 {
01822 out << "Error 404!!!!!!!!" << std::endl;
01823 }
01824
01825
01826 bytesToSend = out.str().size();
01827 unsigned int cycle = 0;
01828 if(bytesToSend==0)
01829 {
01830 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01831 myProcess_->postSlave(msg1,true);
01832 }
01833 while(bytesToSend !=0){
01834 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01835 write(anonymousPipe_[PIPE_WRITE],
01836 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01837 msgSize);
01838 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01839 myProcess_->postSlave(msg1,true);
01840 bytesToSend -= msgSize;
01841 cycle++;
01842 }
01843 break;
01844 }
01845 case MSQM_MESSAGE_TYPE_TRP:
01846 {
01847 break;
01848 }
01849 }
01850 }
01851 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01852 return true;
01853 }
01854
01855 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01856 {
01857
01858
01859 try {
01860 wlSignalMonitor_=
01861 toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01862 "waiting");
01863
01864 if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01865 asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01866 "SignalMonitor");
01867 wlSignalMonitor_->submit(asSignalMonitor_);
01868 signalMonitorActive_ = true;
01869 }
01870 catch (xcept::Exception& e) {
01871 std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01872 std::cout << e.what() << std::endl;
01873 XCEPT_RETHROW(evf::Exception,msg,e);
01874 }
01875 }
01876
01877
01878 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01879 {
01880 while (1) {
01881 sem_wait(sigmon_sem_);
01882 std::cout << " received signal notification from slave!"<< std::endl;
01883
01884
01885 bool running = fsm_.stateName()->toString()=="Enabled";
01886 bool stopping = fsm_.stateName()->toString()=="stopping";
01887 bool enabling = fsm_.stateName()->toString()=="enabling";
01888 if (!running && !enabling) {
01889 signalMonitorActive_ = false;
01890 return false;
01891 }
01892
01893 crashesThisRun_++;
01894 gettimeofday(&lastCrashTime_,0);
01895
01896
01897 if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01898
01899 rlimit rlold;
01900 getrlimit(RLIMIT_CORE,&rlold);
01901 rlimit rlnew = rlold;
01902 rlnew.rlim_cur=0;
01903 setrlimit(RLIMIT_CORE,&rlnew);
01904 rlimit_coresize_changed_=true;
01905 MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01906
01907 unsigned int min=1;
01908 for (unsigned int i = min; i < subs_.size(); i++) {
01909 try {
01910 if (subs_[i].alive()) {
01911 subs_[i].post(master_message_rli_,false);
01912 }
01913 }
01914 catch (...) {}
01915 }
01916 std::ostringstream ostr;
01917 ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01918 << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01919 LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01920 }
01921 }
01922 signalMonitorActive_ = false;
01923 return false;
01924 }
01925
01926
01927 bool FUEventProcessor::enableCommon()
01928 {
01929 try {
01930 if(hasShMem_) attachDqmToShm();
01931 int sc = 0;
01932 evtProcessor_->clearCounters();
01933 if(isRunNumberSetter_)
01934 evtProcessor_->setRunNumber(runNumber_.value_);
01935 else
01936 evtProcessor_->declareRunNumber(runNumber_.value_);
01937
01938 try{
01939 ::sleep(1);
01940 evtProcessor_->runAsync();
01941 sc = evtProcessor_->statusAsync();
01942 }
01943 catch(cms::Exception &e) {
01944 reasonForFailedState_ = e.explainSelf();
01945 fsm_.fireFailed(reasonForFailedState_,this);
01946 localLog(reasonForFailedState_);
01947 return false;
01948 }
01949 catch(std::exception &e) {
01950 reasonForFailedState_ = e.what();
01951 fsm_.fireFailed(reasonForFailedState_,this);
01952 localLog(reasonForFailedState_);
01953 return false;
01954 }
01955 catch(...) {
01956 reasonForFailedState_ = "Unknown Exception";
01957 fsm_.fireFailed(reasonForFailedState_,this);
01958 localLog(reasonForFailedState_);
01959 return false;
01960 }
01961 if(sc != 0) {
01962 std::ostringstream oss;
01963 oss<<"EventProcessor::runAsync returned status code " << sc;
01964 reasonForFailedState_ = oss.str();
01965 fsm_.fireFailed(reasonForFailedState_,this);
01966 localLog(reasonForFailedState_);
01967 return false;
01968 }
01969 }
01970 catch (xcept::Exception &e) {
01971 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01972 fsm_.fireFailed(reasonForFailedState_,this);
01973 localLog(reasonForFailedState_);
01974 return false;
01975 }
01976 try{
01977 fsm_.fireEvent("EnableDone",this);
01978 }
01979 catch (xcept::Exception &e) {
01980 std::cout << "exception " << (std::string)e.what() << std::endl;
01981 throw;
01982 }
01983
01984 return false;
01985 }
01986
01987 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
01988 ((FUEventProcessor*)addr)->forkProcessesFromEDM();
01989 }
01990
01991 void FUEventProcessor::forkProcessesFromEDM() {
01992
01993 moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
01994 unsigned int forkFrom=0;
01995 unsigned int forkTo=nbSubProcesses_.value_;
01996 if (forkParams->slotId>=0) {
01997 forkFrom=forkParams->slotId;
01998 forkTo=forkParams->slotId+1;
01999 }
02000
02001
02002 try {
02003 if (sorRef_) {
02004 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02005 for (unsigned int i=0;i<shmOutputs.size();i++) {
02006
02007 shmOutputs[i]->unregisterFromShm();
02008
02009 shmOutputs[i]->stop();
02010 }
02011 }
02012 }
02013 catch (std::exception &e)
02014 {
02015 reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02016 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02017 fsm_.fireFailed(reasonForFailedState_,this);
02018 localLog(reasonForFailedState_);
02019 }
02020 catch (...) {
02021 reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02022 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02023 fsm_.fireFailed(reasonForFailedState_,this);
02024 localLog(reasonForFailedState_);
02025 }
02026
02027 std::string currentState = fsm_.stateName()->toString();
02028
02029
02030 if (currentState!="stopping") {
02031 try {
02032 messageServicePresence_.reset();
02033 }
02034 catch (...) {
02035 LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
02036 }
02037 }
02038
02039 if (currentState=="stopping") {
02040 LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02041 forkParams->isMaster=1;
02042 forkInfoObj_->forkParams.slotId=-1;
02043 forkInfoObj_->forkParams.restart=0;
02044 }
02045
02046 else for(unsigned int i=forkFrom; i<forkTo; i++)
02047 {
02048 int retval = subs_[i].forkNew();
02049 if(retval==0)
02050 {
02051 forkParams->isMaster=0;
02052 myProcess_ = &subs_[i];
02053
02054 delete toolbox::mem::_s_mutex_ptr_;
02055 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02056 int retval = pthread_mutex_destroy(&stop_lock_);
02057 if(retval != 0) perror("error");
02058 retval = pthread_mutex_init(&stop_lock_,0);
02059 if(retval != 0) perror("error");
02060 fsm_.disableRcmsStateNotification();
02061
02062
02063 try{
02064 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02065 if(pf != 0) {
02066 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02067 }
02068 else {
02069 LOG4CPLUS_ERROR(getApplicationLogger(),
02070 "SLAVE: Unable to create message service presence. pid:"<<getpid());
02071 }
02072 }
02073 catch(...) {
02074 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
02075 }
02076
02077 ML::MLlog4cplus::setAppl(this);
02078
02079
02080 try {
02081 if (sorRef_) {
02082 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02083 for (unsigned int i=0;i<shmOutputs.size();i++)
02084 shmOutputs[i]->start();
02085 }
02086 }
02087 catch (...)
02088 {
02089 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02090 }
02091
02092 if (forkParams->restart) {
02093
02094 scalersUpdates_ = 0;
02095 try {
02096 fsm_.fireEvent("Stop",this);
02097 fsm_.fireEvent("StopDone",this);
02098 fsm_.fireEvent("Enable",this);
02099 } catch (...) {
02100 LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02101 }
02102 try{
02103 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02104 if(lsid) {
02105 ((xdata::UnsignedInteger32*)(lsid))->value_--;
02106 }
02107 }
02108 catch(...){
02109 std::cout << "trouble with lsindex during restart" << std::endl;
02110 }
02111 try{
02112 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02113 if(lstb) {
02114 ((xdata::Boolean*)(lstb))->value_ = false;
02115 }
02116 }
02117 catch(...){
02118 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02119 }
02120 evtProcessor_.adjustLsIndexForRestart();
02121 evtProcessor_.resetPackedTriggerReport();
02122 }
02123
02124
02125 startReceivingLoop();
02126 startReceivingMonitorLoop();
02127 startScalersWorkLoop();
02128 while(!evtProcessor_.isWaitingForLs())
02129 ::usleep(100000);
02130
02131
02132 if(hasShMem_) attachDqmToShm();
02133
02134
02135 try {
02136 fsm_.fireEvent("EnableDone",this);
02137 }
02138 catch (...) {}
02139
02140
02141 while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02142
02143
02144 sigset_t tmpset_thread;
02145 sigemptyset(&tmpset_thread);
02146 sigaddset(&tmpset_thread, SIGQUIT);
02147 sigaddset(&tmpset_thread, SIGILL);
02148 sigaddset(&tmpset_thread, SIGABRT);
02149 sigaddset(&tmpset_thread, SIGFPE);
02150 sigaddset(&tmpset_thread, SIGSEGV);
02151 sigaddset(&tmpset_thread, SIGALRM);
02152
02153 pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02154
02155
02156 struct sigaction sa;
02157 sigset_t tmpset;
02158 memset(&tmpset,0,sizeof(tmpset));
02159 sigemptyset(&tmpset);
02160 sa.sa_mask=tmpset;
02161 sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02162 sa.sa_handler=0;
02163 sa.sa_sigaction=evfep_sighandler;
02164
02165 sigaction(SIGQUIT,&sa,0);
02166 sigaction(SIGILL,&sa,0);
02167 sigaction(SIGABRT,&sa,0);
02168 sigaction(SIGFPE,&sa,0);
02169 sigaction(SIGSEGV,&sa,0);
02170 sa.sa_sigaction=evfep_alarmhandler;
02171 sigaction(SIGALRM,&sa,0);
02172
02173
02174 return ;
02175 }
02176 else {
02177
02178 forkParams->isMaster=1;
02179 forkInfoObj_->forkParams.slotId=-1;
02180 if (forkParams->restart) {
02181 std::ostringstream ost1;
02182 ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02183 localLog(ost1.str());
02184 }
02185 forkInfoObj_->forkParams.restart=0;
02186
02187 }
02188 }
02189
02190
02191 try{
02192
02193 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02194 if(pf != 0) {
02195 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02196 }
02197 else {
02198 LOG4CPLUS_ERROR(getApplicationLogger(),
02199 "Unable to recreate message service presence ");
02200 }
02201 }
02202 catch(...) {
02203 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02204 }
02205
02206 edm_init_done_=true;
02207 }
02208
02209 bool FUEventProcessor::enableForkInEDM()
02210 {
02211 evtProcessor_.resetWaiting();
02212 try {
02213
02214
02215
02216 int sc = 0;
02217
02218 evtProcessor_->clearCounters();
02219 if(isRunNumberSetter_)
02220 evtProcessor_->setRunNumber(runNumber_.value_);
02221 else
02222 evtProcessor_->declareRunNumber(runNumber_.value_);
02223
02224
02225 pthread_mutex_destroy(&forkObjLock_);
02226 pthread_mutex_init(&forkObjLock_,0);
02227 if (forkInfoObj_) delete forkInfoObj_;
02228 forkInfoObj_ = new moduleweb::ForkInfoObj();
02229 forkInfoObj_->mst_lock_=&forkObjLock_;
02230 forkInfoObj_->fuAddr=(void*)this;
02231 forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02232 forkInfoObj_->forkParams.slotId=-1;
02233 forkInfoObj_->forkParams.restart=0;
02234 forkInfoObj_->forkParams.isMaster=-1;
02235 forkInfoObj_->stopCondition=0;
02236 if (mwrRef_)
02237 mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02238
02239 evtProcessor_->runAsync();
02240 sc = evtProcessor_->statusAsync();
02241
02242 if(sc != 0) {
02243 std::ostringstream oss;
02244 oss<<"EventProcessor::runAsync returned status code " << sc;
02245 reasonForFailedState_ = oss.str();
02246 fsm_.fireFailed(reasonForFailedState_,this);
02247 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02248 return false;
02249 }
02250 }
02251
02252 catch(cms::Exception &e) {
02253 reasonForFailedState_ = e.explainSelf();
02254 fsm_.fireFailed(reasonForFailedState_,this);
02255 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02256 return false;
02257 }
02258 catch(std::exception &e) {
02259 reasonForFailedState_ = e.what();
02260 fsm_.fireFailed(reasonForFailedState_,this);
02261 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02262 return false;
02263 }
02264 catch(...) {
02265 reasonForFailedState_ = "Unknown Exception";
02266 fsm_.fireFailed(reasonForFailedState_,this);
02267 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02268 return false;
02269 }
02270 return true;
02271 }
02272
02273 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02274
02275
02276 forkInfoObj_->lock();
02277 forkInfoObj_->forkParams.slotId=slotId;
02278 forkInfoObj_->forkParams.restart=true;
02279 forkInfoObj_->forkParams.isMaster=1;
02280 forkInfoObj_->stopCondition=0;
02281 LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02282 sem_post(forkInfoObj_->control_sem_);
02283 forkInfoObj_->unlock();
02284 usleep(1000);
02285 return true;
02286 }
02287
02288 bool FUEventProcessor::enableClassic()
02289 {
02290 bool retval = enableCommon();
02291 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02292 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02293 ::sleep(1);
02294 }
02295
02296
02297
02298 localLog("-I- Start completed");
02299 return retval;
02300 }
02301 bool FUEventProcessor::enableMPEPSlave()
02302 {
02303
02304
02305 startReceivingLoop();
02306 startReceivingMonitorLoop();
02307 evtProcessor_.resetWaiting();
02308 startScalersWorkLoop();
02309 while(!evtProcessor_.isWaitingForLs())
02310 ::sleep(1);
02311
02312
02313
02314 try{
02315
02316 try{
02317 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02318 if(pf != 0) {
02319 pf->makePresence("MessageServicePresence").release();
02320 }
02321 else {
02322 LOG4CPLUS_ERROR(getApplicationLogger(),
02323 "Unable to create message service presence ");
02324 }
02325 }
02326 catch(...) {
02327 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02328 }
02329 ML::MLlog4cplus::setAppl(this);
02330 }
02331 catch (xcept::Exception &e) {
02332 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02333 fsm_.fireFailed(reasonForFailedState_,this);
02334 localLog(reasonForFailedState_);
02335 }
02336 bool retval = enableCommon();
02337
02338
02339
02340
02341 return retval;
02342 }
02343
02344 bool FUEventProcessor::stopClassic()
02345 {
02346 bool failed=false;
02347 try {
02348 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02349 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02350 if(rc == edm::EventProcessor::epSuccess)
02351 fsm_.fireEvent("StopDone",this);
02352 else
02353 {
02354 failed=true;
02355
02356 if(rc == edm::EventProcessor::epTimedOut)
02357 reasonForFailedState_ = "EventProcessor stop timed out";
02358 else
02359 reasonForFailedState_ = "EventProcessor did not receive STOP event";
02360 }
02361 }
02362 catch (xcept::Exception &e) {
02363 failed=true;
02364 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02365 }
02366 catch (edm::Exception &e) {
02367 failed=true;
02368 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02369 }
02370 catch (...) {
02371 failed=true;
02372 reasonForFailedState_= "Stopping FAILED: unknown exception";
02373 }
02374 try {
02375 if (hasShMem_) {
02376 detachDqmFromShm();
02377 if (failed)
02378 LOG4CPLUS_WARN(getApplicationLogger(),
02379 "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
02380 }
02381 }
02382 catch (cms::Exception & e) {
02383 failed=true;
02384 reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
02385 }
02386 catch (...) {
02387 failed=true;
02388 reasonForFailedState_= "DQM detach failed: Unknown exception";
02389 }
02390
02391 if (failed) {
02392 LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
02393 << reasonForFailedState_ << " (pid:" << getpid()<<")");
02394 localLog(reasonForFailedState_);
02395 fsm_.fireFailed(reasonForFailedState_,this);
02396 }
02397
02398 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02399 localLog("-I- Stop completed");
02400 return false;
02401 }
02402
02403 void FUEventProcessor::stopSlavesAndAcknowledge()
02404 {
02405 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02406 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02407
02408 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02409 for(unsigned int i = 0; i < subs_.size(); i++)
02410 {
02411 pthread_mutex_lock(&stop_lock_);
02412 if(subs_[i].alive()>0){
02413 processes_to_stop[i] = true;
02414 subs_[i].post(msg,false);
02415 }
02416 pthread_mutex_unlock(&stop_lock_);
02417 }
02418 for(unsigned int i = 0; i < subs_.size(); i++)
02419 {
02420 pthread_mutex_lock(&stop_lock_);
02421 if(processes_to_stop[i]){
02422 try{
02423 subs_[i].rcv(msg1,false);
02424 }
02425 catch(evf::Exception &e){
02426 std::ostringstream ost;
02427 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
02428 reasonForFailedState_ = ost.str();
02429 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02430
02431 localLog(reasonForFailedState_);
02432 pthread_mutex_unlock(&stop_lock_);
02433 continue;
02434 }
02435 }
02436 else {
02437 pthread_mutex_unlock(&stop_lock_);
02438 continue;
02439 }
02440 pthread_mutex_unlock(&stop_lock_);
02441 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02442 while(subs_[i].alive()>0) ::usleep(10000);
02443 subs_[i].disconnect();
02444 }
02445
02446
02447 }
02448
02449 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02450 {
02451 std::string urn = getApplicationDescriptor()->getURN();
02452 try{
02453 evtProcessor_.stateNameFromIndex(0);
02454 evtProcessor_.moduleNameFromIndex(0);
02455 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02456 *out << "<tr><td>" << fsm_.stateName()->toString()
02457 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02458 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02459 evtProcessor_.microState(in,out);
02460 *out << "<td></td><td>" << nbTotalDQM_
02461 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02462 if(nbSubProcesses_.value_!=0 && !myProcess_)
02463 {
02464 pthread_mutex_lock(&start_lock_);
02465 for(unsigned int i = 0; i < subs_.size(); i++)
02466 {
02467 try{
02468 if(subs_[i].alive()>0)
02469 {
02470 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
02471 << i << "\">""Alive</td><td>S</td><td>"
02472 << subs_[i].queueId() << "<td>"
02473 << subs_[i].queueStatus()<< "/"
02474 << subs_[i].queueOccupancy() << "/"
02475 << subs_[i].queuePidOfLastSend() << "/"
02476 << subs_[i].queuePidOfLastReceive()
02477 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
02478 << subs_[i].pid() << "&method=procStat\">"
02479 << subs_[i].pid()<<"</a></td>"
02480 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
02481 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
02482 << subs_[i].params().nba << "/" << subs_[i].params().nbp
02483 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
02484 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
02485 << "</td><td>" << subs_[i].params().ps
02486 << "</td><td"
02487 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
02488 << subs_[i].params().eols
02489 << "</td><td>" << subs_[i].params().dqm
02490 << "</td><td>" << subs_[i].params().trp << "</td>";
02491 }
02492 else
02493 {
02494 pthread_mutex_lock(&pickup_lock_);
02495 *out << "<tr><td id=\"a"<< i << "\" ";
02496 if(subs_[i].alive()==-1000)
02497 *out << " bgcolor=\"#bbaabb\">NotInitialized";
02498 else
02499 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02500 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02501 << subs_[i].queueStatus() << "/"
02502 << subs_[i].queueOccupancy() << "/"
02503 << subs_[i].queuePidOfLastSend() << "/"
02504 << subs_[i].queuePidOfLastReceive()
02505 << "</td><td id=\"p"<< i << "\">"
02506 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02507 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
02508 {
02509 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
02510 *out << " will restart in " << subs_[i].countdown() << " s";
02511 else if(autoRestartSlaves_)
02512 *out << " reached maximum restart count";
02513 else *out << " autoRestart is disabled ";
02514 }
02515 *out << "</td><td"
02516 << ((subs_[i].params().eols<subs_[i].params().ls) ?
02517 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
02518 << ">"
02519 << subs_[i].params().eols
02520 << "</td><td>" << subs_[i].params().dqm
02521 << "</td><td>" << subs_[i].params().trp << "</td>";
02522 pthread_mutex_unlock(&pickup_lock_);
02523 }
02524 *out << "</tr>";
02525 }
02526 catch(evf::Exception &e){
02527 *out << "<tr><td id=\"a"<< i << "\" "
02528 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02529 << subs_[i].queueStatus() << "/"
02530 << subs_[i].queueOccupancy() << "/"
02531 << subs_[i].queuePidOfLastSend() << "/"
02532 << subs_[i].queuePidOfLastReceive()
02533 << "</td><td id=\"p"<< i << "\">"
02534 <<subs_[i].pid()<<"</td>";
02535 }
02536 }
02537 pthread_mutex_unlock(&start_lock_);
02538 }
02539 }
02540 catch(evf::Exception &e)
02541 {
02542 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
02543 }
02544 catch(cms::Exception &e)
02545 {
02546 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
02547 }
02548 catch(std::exception &e)
02549 {
02550 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
02551 }
02552 catch(...)
02553 {
02554 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
02555 }
02556
02557 }
02558
02559
02560 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02561 {
02562 using namespace utils;
02563
02564 *out << updaterStatic_;
02565 mDiv(out,"loads");
02566 uptime(out);
02567 cDiv(out);
02568 mDiv(out,"st",fsm_.stateName()->toString());
02569 mDiv(out,"ru",runNumber_.toString());
02570 mDiv(out,"nsl",nbSubProcesses_.value_);
02571 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02572 mDiv(out,"cl");
02573 *out << getApplicationDescriptor()->getClassName()
02574 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02575 cDiv(out);
02576 mDiv(out,"in",getApplicationDescriptor()->getInstance());
02577 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02578 mDiv(out,"hlt");
02579 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02580 cDiv(out);
02581 *out << std::endl;
02582 }
02583 else
02584 mDiv(out,"hlt","Not yet...");
02585
02586 mDiv(out,"sq",squidPresent_.toString());
02587 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02588 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02589 if(nbProcessed != 0 && nbAccepted != 0)
02590 {
02591 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02592 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02593 }
02594 else
02595 {
02596 mDiv(out,"tt",0);
02597 mDiv(out,"ac",0);
02598 }
02599 if(!myProcess_)
02600 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02601 else
02602 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02603
02604 mDiv(out,"idi",iDieUrl_.value_);
02605 if(vp_!=0){
02606 mDiv(out,"vpi",(unsigned int) vp_);
02607 if(vulture_->hasStarted()>=0)
02608 mDiv(out,"vul","Prowling");
02609 else
02610 mDiv(out,"vul","Dead");
02611 }
02612 else{
02613 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02614 }
02615 if(evtProcessor_){
02616 mDiv(out,"ll");
02617 *out << evtProcessor_.lastLumi().ls
02618 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02619 cDiv(out);
02620 }
02621 mDiv(out,"lg");
02622 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02623 *out << logRing_[i] << std::endl;
02624 if(logWrap_)
02625 for(unsigned int i = 0; i<logRingIndex_; i++)
02626 *out << logRing_[i] << std::endl;
02627 cDiv(out);
02628 mDiv(out,"cha");
02629 if(cpustat_) *out << cpustat_->getChart();
02630 cDiv(out);
02631 }
02632
02633 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02634 {
02635 evf::utils::procStat(out);
02636 }
02637
02638 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02639 {
02640 if(myProcess_) myProcess_->postSlave(buf,true);
02641 }
02642
02643 void FUEventProcessor::makeStaticInfo()
02644 {
02645 using namespace utils;
02646 std::ostringstream ost;
02647 mDiv(&ost,"ve");
02648 ost<< "$Revision: 1.155 $ (" << edm::getReleaseVersion() <<")";
02649 cDiv(&ost);
02650 mDiv(&ost,"ou",outPut_.toString());
02651 mDiv(&ost,"sh",hasShMem_.toString());
02652 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02653 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02654
02655 xdata::Serializable *monsleep = 0;
02656 xdata::Serializable *lstimeout = 0;
02657 try{
02658 monsleep = applicationInfoSpace_->find("monSleepSec");
02659 lstimeout = applicationInfoSpace_->find("lsTimeOut");
02660 }
02661 catch(xdata::exception::Exception e){
02662 }
02663
02664 if(monsleep!=0)
02665 mDiv(&ost,"ms",monsleep->toString());
02666 if(lstimeout!=0)
02667 mDiv(&ost,"lst",lstimeout->toString());
02668 char cbuf[sizeof(struct utsname)];
02669 struct utsname* buf = (struct utsname*)cbuf;
02670 uname(buf);
02671 mDiv(&ost,"sysinfo");
02672 ost << buf->sysname << " " << buf->nodename
02673 << " " << buf->release << " " << buf->version << " " << buf->machine;
02674 cDiv(&ost);
02675 updaterStatic_ = ost.str();
02676 }
02677
02678 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02679 {
02680
02681 sem_post(sigmon_sem_);
02682
02683
02684 sleep(2);
02685
02686
02687 alarm(5);
02688
02689 std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02690 std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02691 std::cout << "--- Stacktrace follows --" << std::endl;
02692 std::ostringstream stacktr;
02693 toolbox::stacktrace(20,stacktr);
02694 std::cout << stacktr.str();
02695 if (!rlimit_coresize_changed_)
02696 std::cout << "--- Dumping core." << " --- " << std::endl;
02697 else
02698 std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02699
02700 std::string hasdump = "";
02701 if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02702
02703 LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
02704 << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02705 << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02706 << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02707 );
02708
02709
02710 raise(sig);
02711 }
02712
02713
02714 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)