00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029
00030 #include "toolbox/BSem.h"
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034
00035 #include <boost/tokenizer.hpp>
00036
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043
00044
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054
00055
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059 namespace mem {
00060 extern toolbox::BSem * _s_mutex_ptr_;
00061 }
00062 }
00063
00064
00065 namespace evf {
00066 FUEventProcessor * FUInstancePtr_;
00067 int evfep_raised_signal;
00068 void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069 {
00070 evfep_raised_signal=sig;
00071 FUInstancePtr_->handleSignalSlave(sig, info, c);
00072 }
00073 void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074 {
00075 if (evfep_raised_signal) {
00076 signal(evfep_raised_signal,SIG_DFL);
00077 raise(evfep_raised_signal);
00078 }
00079 }
00080 }
00081
00083
00085
00086
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00088 : xdaq::Application(s)
00089 , fsm_(this)
00090 , log_(getApplicationLogger())
00091 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092 , runNumber_(0)
00093 , epInitialized_(false)
00094 , outPut_(true)
00095 , autoRestartSlaves_(false)
00096 , slaveRestartDelaySecs_(10)
00097 , hasShMem_(true)
00098 , hasPrescaleService_(true)
00099 , hasModuleWebRegistry_(true)
00100 , hasServiceWebRegistry_(true)
00101 , isRunNumberSetter_(true)
00102 , iDieStatisticsGathering_(false)
00103 , outprev_(true)
00104 , exitOnError_(true)
00105 , reasonForFailedState_()
00106 , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00107 , logRing_(logRingSize_)
00108 , logRingIndex_(logRingSize_)
00109 , logWrap_(false)
00110 , nbSubProcesses_(0)
00111 , nbSubProcessesReporting_(0)
00112 , forkInEDM_(true)
00113 , nblive_(0)
00114 , nbdead_(0)
00115 , nbTotalDQM_(0)
00116 , wlReceiving_(0)
00117 , asReceiveMsgAndExecute_(0)
00118 , receiving_(false)
00119 , wlReceivingMonitor_(0)
00120 , asReceiveMsgAndRead_(0)
00121 , receivingM_(false)
00122 , myProcess_(0)
00123 , wlSupervising_(0)
00124 , asSupervisor_(0)
00125 , supervising_(false)
00126 , wlSignalMonitor_(0)
00127 , asSignalMonitor_(0)
00128 , signalMonitorActive_(false)
00129 , monitorInfoSpace_(0)
00130 , monitorLegendaInfoSpace_(0)
00131 , applicationInfoSpace_(0)
00132 , nbProcessed(0)
00133 , nbAccepted(0)
00134 , scalersInfoSpace_(0)
00135 , scalersLegendaInfoSpace_(0)
00136 , wlScalers_(0)
00137 , asScalers_(0)
00138 , wlScalersActive_(false)
00139 , scalersUpdates_(0)
00140 , wlSummarize_(0)
00141 , asSummarize_(0)
00142 , wlSummarizeActive_(false)
00143 , superSleepSec_(1)
00144 , iDieUrl_("none")
00145 , vulture_(0)
00146 , vp_(0)
00147 , cpustat_(0)
00148 , ratestat_(0)
00149 , mwrRef_(nullptr)
00150 , sorRef_(nullptr)
00151 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155 , edm_init_done_(true)
00156 , crashesThisRun_(false)
00157 , rlimit_coresize_changed_(false)
00158 , crashesToDump_(2)
00159 , sigmon_sem_(0)
00160 , datasetCounting_(true)
00161 {
00162 using namespace utils;
00163
00164 FUInstancePtr_=this;
00165
00166 names_.push_back("nbProcessed" );
00167 names_.push_back("nbAccepted" );
00168 names_.push_back("epMacroStateInt");
00169 names_.push_back("epMicroStateInt");
00170
00171 int retpipe = pipe(anonymousPipe_);
00172 if(retpipe != 0)
00173 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00174
00175 squidPresent_ = squidnet_.check();
00176
00177 evtProcessor_.setAppDesc(getApplicationDescriptor());
00178 evtProcessor_.setAppCtxt(getApplicationContext());
00179
00180 fsm_.initialize<evf::FUEventProcessor>(this);
00181
00182
00183 url_ =
00184 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00185 getApplicationDescriptor()->getURN();
00186 class_ =getApplicationDescriptor()->getClassName();
00187 instance_=getApplicationDescriptor()->getInstance();
00188 sourceId_=class_.toString()+"_"+instance_.toString();
00189 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00190 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00191
00192 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00193
00194 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00195 applicationInfoSpace_ = ispace;
00196
00197
00198 ispace->fireItemAvailable("parameterSet", &configString_ );
00199 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00200 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00201 ispace->fireItemAvailable("runNumber", &runNumber_ );
00202 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00203
00204 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00205 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00206 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00207 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00208 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00209 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00210 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00211 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00212 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00213 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00214 ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
00215 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00216 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00217 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00218 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00219 ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
00220 ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ );
00221
00222
00223 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00224 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00225
00226
00227 fsm_.findRcmsStateListener();
00228
00229
00230
00231 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00232 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00233 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00234
00235 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00236 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00237 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00238
00239
00240 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00241 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00242 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00243 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00244 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00245
00246 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00247
00248 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00249 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00250 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00251
00252 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00253 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00254 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00255
00256
00257
00258 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00259 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00260
00261 evtProcessor_.setApplicationInfoSpace(ispace);
00262 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00263 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00264
00265
00266 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00267 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00268
00269
00270 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00271 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00272 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00273 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00274 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00275 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00276 xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
00277 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00278 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00279 xgi::bind(this, &FUEventProcessor::microState, "microState");
00280 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00281 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00282
00283
00284
00285 edm::AssertHandler ah;
00286
00287
00288 try{
00289 LOG4CPLUS_DEBUG(getApplicationLogger(),
00290 "Trying to create message service presence ");
00291 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00292 if(pf != 0) {
00293 messageServicePresence_= pf->makePresence("MessageServicePresence");
00294 }
00295 else {
00296 LOG4CPLUS_ERROR(getApplicationLogger(),
00297 "Unable to create message service presence ");
00298 }
00299 }
00300 catch(...) {
00301 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00302 }
00303 ML::MLlog4cplus::setAppl(this);
00304
00305 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00306 typedef AppDescSet_t::iterator AppDescIter_t;
00307
00308 AppDescSet_t rcms=
00309 getApplicationContext()->getDefaultZone()->
00310 getApplicationDescriptors("RCMSStateListener");
00311 if(rcms.size()==0)
00312 {
00313 LOG4CPLUS_WARN(getApplicationLogger(),
00314 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00315
00316 }
00317 else
00318 {
00319 AppDescIter_t it = rcms.begin();
00320 evtProcessor_.setRcms(*it);
00321 }
00322 pthread_mutex_init(&start_lock_,0);
00323 pthread_mutex_init(&stop_lock_,0);
00324 pthread_mutex_init(&pickup_lock_,0);
00325
00326 forkInfoObj_=nullptr;
00327 pthread_mutex_init(&forkObjLock_,0);
00328 makeStaticInfo();
00329 startSupervisorLoop();
00330
00331 if(vulture_==0) vulture_ = new Vulture(true);
00332
00334
00335 AppDescSet_t setOfiDie=
00336 getApplicationContext()->getDefaultZone()->
00337 getApplicationDescriptors("evf::iDie");
00338
00339 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00340 if ((*it)->getInstance()==0)
00341 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00342
00343
00344 getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00345
00346
00347 #ifdef linux
00348 if (sigmon_sem_==0) {
00349 sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00350 PROT_READ | PROT_WRITE,
00351 MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00352 if (!sigmon_sem_) {
00353 perror("mmap error\n");
00354 std::cout << "mmap error"<<std::endl;
00355 }
00356 else
00357 sem_init(sigmon_sem_,true,0);
00358 }
00359 #endif
00360 }
00361
00362
00363
00364
00365 FUEventProcessor::~FUEventProcessor()
00366 {
00367
00368
00369
00370 if(vulture_ != 0) delete vulture_;
00371 }
00372
00373
00375
00377
00378
00379
00380 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00381 {
00382
00383
00384
00385
00386
00387 unsigned short smap
00388 = (datasetCounting_.value_ ? 0x20 : 0 )
00389 + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00390 + (((instance_.value_%80)==0) ? 0x8 : 0)
00391 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00392 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00393 + (hasPrescaleService_.value_ ? 0x1 : 0);
00394 if(nbSubProcesses_.value_==0)
00395 {
00396 spMStates_.setSize(1);
00397 spmStates_.setSize(1);
00398 }
00399 else
00400 {
00401 spMStates_.setSize(nbSubProcesses_.value_);
00402 spmStates_.setSize(nbSubProcesses_.value_);
00403 for(unsigned int i = 0; i < spMStates_.size(); i++)
00404 {
00405 spMStates_[i] = edm::event_processor::sInit;
00406 spmStates_[i] = 0;
00407 }
00408 }
00409 try {
00410 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00411 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00412 epInitialized_=true;
00413 if(evtProcessor_)
00414 {
00415
00416 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00417 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00418
00419 configuration_ = evtProcessor_.configuration();
00420 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00421 evtProcessor_->beginJob();
00422 if(cpustat_) {delete cpustat_; cpustat_=0;}
00423 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00424 nbSubProcesses_.value_,
00425 instance_.value_,
00426 iDieUrl_.value_);
00427 if(ratestat_) {delete ratestat_; ratestat_=0;}
00428 ratestat_ = new RateStat(iDieUrl_.value_);
00429 if(iDieStatisticsGathering_.value_)
00430 {
00431 try
00432 {
00433 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00434 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00435 if(legenda !=0)
00436 {
00437 std::string slegenda = ((xdata::String*)legenda)->value_;
00438 ratestat_->sendLegenda(slegenda);
00439 }
00440 if (sorRef_ && datasetCounting_.value_)
00441 {
00442 xdata::String dsLegenda = sorRef_->getDatasetCSV();
00443 if (dsLegenda.value_.size())
00444 ratestat_->sendAuxLegenda(dsLegenda);
00445 }
00446 }
00447 catch(evf::Exception &e)
00448 {
00449 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00450 << e.what());
00451 }
00452 catch (xcept::Exception& e) {
00453 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00454 << e.what());
00455 }
00456 }
00457
00458 fsm_.fireEvent("ConfigureDone",this);
00459 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00460 localLog("-I- Configuration completed");
00461
00462 }
00463 }
00464 catch (xcept::Exception &e) {
00465 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00466 fsm_.fireFailed(reasonForFailedState_,this);
00467 localLog(reasonForFailedState_);
00468 }
00469 catch(cms::Exception &e) {
00470 reasonForFailedState_ = e.explainSelf();
00471 fsm_.fireFailed(reasonForFailedState_,this);
00472 localLog(reasonForFailedState_);
00473 }
00474 catch(std::exception &e) {
00475 reasonForFailedState_ = e.what();
00476 fsm_.fireFailed(reasonForFailedState_,this);
00477 localLog(reasonForFailedState_);
00478 }
00479 catch(...) {
00480 fsm_.fireFailed("Unknown Exception",this);
00481 }
00482
00483 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00484
00485 return false;
00486 }
00487
00488
00489
00490
00491
00492 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00493 {
00494 nbTotalDQM_ = 0;
00495 scalersUpdates_ = 0;
00496 idleProcStats_ = 0;
00497 allProcStats_ = 0;
00498
00499
00500
00501
00502
00503 unsigned short smap
00504 = (datasetCounting_.value_ ? 0x20 : 0 )
00505 + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00506 + (((instance_.value_%80)==0) ? 0x8 : 0)
00507 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00508 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00509 + (hasPrescaleService_.value_ ? 0x1 : 0);
00510
00511 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00512
00513
00514 if (rlimit_coresize_changed_)
00515 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00516 rlimit_coresize_changed_=false;
00517 crashesThisRun_=0;
00518
00519 sem_destroy(sigmon_sem_);
00520 sem_init(sigmon_sem_,true,0);
00521
00522 if(!epInitialized_){
00523 evtProcessor_.forceInitEventProcessorMaybe();
00524 }
00525 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00526
00527 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00528 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00529
00530 if(!epInitialized_){
00531 evtProcessor_->beginJob();
00532 if(cpustat_) {delete cpustat_; cpustat_=0;}
00533 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00534 nbSubProcesses_.value_,
00535 instance_.value_,
00536 iDieUrl_.value_);
00537 if(ratestat_) {delete ratestat_; ratestat_=0;}
00538 ratestat_ = new RateStat(iDieUrl_.value_);
00539 if(iDieStatisticsGathering_.value_)
00540 {
00541 try
00542 {
00543 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00544 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00545 if(legenda !=0)
00546 {
00547 std::string slegenda = ((xdata::String*)legenda)->value_;
00548 ratestat_->sendLegenda(slegenda);
00549 }
00550 if (sorRef_ && datasetCounting_.value_)
00551 {
00552 xdata::String dsLegenda = sorRef_->getDatasetCSV();
00553 if (dsLegenda.value_.size())
00554 ratestat_->sendAuxLegenda(dsLegenda);
00555 }
00556 }
00557 catch(evf::Exception &e)
00558 {
00559 LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00560 << e.what());
00561 }
00562 catch (xcept::Exception& e) {
00563 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00564 << e.what());
00565 }
00566 }
00567 epInitialized_ = true;
00568 }
00569 configuration_ = evtProcessor_.configuration();
00570 evtProcessor_.resetLumiSectionReferenceIndex();
00571
00572 if(nbSubProcesses_.value_==0) return enableClassic();
00573
00574 pthread_mutex_lock(&start_lock_);
00575 pthread_mutex_lock(&pickup_lock_);
00576 subs_.clear();
00577 subs_.resize(nbSubProcesses_.value_);
00578 pid_t retval = -1;
00579
00580 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00581 {
00582 subs_[i]=SubProcess(i,retval);
00583 }
00584 pthread_mutex_unlock(&pickup_lock_);
00585
00586 pthread_mutex_unlock(&start_lock_);
00587
00588
00589 try {
00590 if (sorRef_) {
00591 unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00592 if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;
00593 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00594 for (unsigned int i=0;i<shmOutputs.size();i++) {
00595 shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00596 }
00597 }
00598 }
00599 catch (...)
00600 {
00601 LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00602 }
00603
00604
00605 edm_init_done_=true;
00606 if (forkInEDM_.value_) {
00607 edm_init_done_=false;
00608 enableForkInEDM();
00609 }
00610 else
00611 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00612 {
00613 retval = subs_[i].forkNew();
00614 if(retval==0)
00615 {
00616 myProcess_ = &subs_[i];
00617
00618 delete toolbox::mem::_s_mutex_ptr_;
00619 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00620 int retval = pthread_mutex_destroy(&stop_lock_);
00621 if(retval != 0) perror("error");
00622 retval = pthread_mutex_init(&stop_lock_,0);
00623 if(retval != 0) perror("error");
00624 fsm_.disableRcmsStateNotification();
00625
00626 return enableMPEPSlave();
00627
00628 }
00629 }
00630
00631 if (forkInEDM_.value_) {
00632
00633 edm::event_processor::State st;
00634 while (!edm_init_done_) {
00635 usleep(10000);
00636 st = evtProcessor_->getState();
00637 if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00638 }
00639 st = evtProcessor_->getState();
00640
00641 if (st!=edm::event_processor::sRunning) {
00642 reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00643 localLog(reasonForFailedState_);
00644 fsm_.fireFailed(reasonForFailedState_,this);
00645 return false;
00646 }
00647
00648 sleep(1);
00649 startSummarizeWorkLoop();
00650 startSignalMonitorWorkLoop();
00651 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00652
00653
00654 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00655 fsm_.fireEvent("EnableDone",this);
00656 localLog("-I- Start completed");
00657 return false;
00658 }
00659
00660 startSummarizeWorkLoop();
00661 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00662 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00663 fsm_.fireEvent("EnableDone",this);
00664 localLog("-I- Start completed");
00665 return false;
00666 }
00667
00668 bool FUEventProcessor::doEndRunInEDM() {
00669
00670 if (forkInfoObj_) {
00671
00672 int count = 30;
00673 bool waitedForEDM=false;
00674 while (!edm_init_done_ && count) {
00675 ::sleep(1);
00676 if (count%5==0)
00677 LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00678 count--;
00679 waitedForEDM=true;
00680 }
00681
00682 if (waitedForEDM) sleep(5);
00683
00684
00685
00686 if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00687 return true;
00688
00689 forkInfoObj_->lock();
00690 forkInfoObj_->stopCondition=true;
00691 sem_post(forkInfoObj_->control_sem_);
00692 forkInfoObj_->unlock();
00693
00694 count = 30;
00695
00696 edm::event_processor::State st;
00697 while(count--) {
00698 st = evtProcessor_->getState();
00699 if (st==edm::event_processor::sRunning) ::usleep(100000);
00700 else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00701 break;
00702 }
00703 else {
00704 std::ostringstream ost;
00705 ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00706 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00707 fsm_.fireFailed(ost.str(),this);
00708 return false;
00709 }
00710 if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00711 forkInfoObj_->lock();
00712 forkInfoObj_->stopCondition=true;
00713 sem_post(forkInfoObj_->control_sem_);
00714 forkInfoObj_->unlock();
00715 LOG4CPLUS_WARN(getApplicationLogger(),
00716 "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00717 }
00718 }
00719 if (count<0) {
00720 std::ostringstream ost;
00721 if (!forkInfoObj_->receivedStop_)
00722 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
00723 << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00724 else
00725 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00726 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00727 fsm_.fireFailed(ost.str(),this);
00728 return false;
00729 }
00730 }
00731 return true;
00732 }
00733
00734
00735 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00736 {
00737 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00738 if(nbSubProcesses_.value_!=0) {
00739 stopSlavesAndAcknowledge();
00740 if (forkInEDM_.value_) {
00741
00742 sem_post(sigmon_sem_);
00743 doEndRunInEDM();
00744 }
00745 }
00746 vulture_->stop();
00747
00748 if (forkInEDM_.value_) {
00749
00750 bool tmpHasShMem_=hasShMem_;
00751 hasShMem_=false;
00752 stopClassic();
00753 hasShMem_=tmpHasShMem_;
00754 return false;
00755 }
00756 stopClassic();
00757 return false;
00758 }
00759
00760
00761
00762 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00763 {
00764 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00765 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00766 if(nbSubProcesses_.value_!=0) {
00767 stopSlavesAndAcknowledge();
00768 if (forkInEDM_.value_) {
00769 sem_post(sigmon_sem_);
00770 doEndRunInEDM();
00771 }
00772 }
00773 try{
00774 evtProcessor_.stopAndHalt();
00775
00776 if (forkInfoObj_) {
00777 delete forkInfoObj_;
00778 forkInfoObj_=0;
00779 }
00780 }
00781 catch (evf::Exception &e) {
00782 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00783 localLog(reasonForFailedState_);
00784 fsm_.fireFailed(reasonForFailedState_,this);
00785 }
00786
00787
00788 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00789 fsm_.fireEvent("HaltDone",this);
00790
00791 localLog("-I- Halt completed");
00792 return false;
00793 }
00794
00795
00796
00797 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00798 throw (xoap::exception::Exception)
00799 {
00800 return fsm_.commandCallback(msg);
00801 }
00802
00803
00804
00805 void FUEventProcessor::actionPerformed(xdata::Event& e)
00806 {
00807
00808 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00809 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00810
00811 if ( item == "parameterSet") {
00812 LOG4CPLUS_WARN(getApplicationLogger(),
00813 "HLT Menu changed, force reinitialization of EventProcessor");
00814 epInitialized_ = false;
00815 }
00816 if ( item == "outputEnabled") {
00817 if(outprev_ != outPut_) {
00818 LOG4CPLUS_WARN(getApplicationLogger(),
00819 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00820 evtProcessor_->enableEndPaths(outPut_);
00821 outprev_ = outPut_;
00822 }
00823 }
00824 if (item == "globalInputPrescale") {
00825 LOG4CPLUS_WARN(getApplicationLogger(),
00826 "Setting global input prescale has no effect "
00827 <<"in this version of the code");
00828 }
00829 if ( item == "globalOutputPrescale") {
00830 LOG4CPLUS_WARN(getApplicationLogger(),
00831 "Setting global output prescale has no effect "
00832 <<"in this version of the code");
00833 }
00834 }
00835
00836 }
00837
00838
00839 void FUEventProcessor::getSlavePids(xgi::Input *in, xgi::Output *out)
00840 {
00841 for (unsigned int i=0;i<subs_.size();i++)
00842 {
00843 if (i!=0) *out << ",";
00844 *out << subs_[i].pid();
00845 }
00846 }
00847
00848 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00849 {
00850 using namespace cgicc;
00851 pid_t pid = 0;
00852 std::ostringstream ost;
00853 ost << "&";
00854
00855 Cgicc cgi(in);
00856 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00857 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00858 mycgi->getEnvironment().begin();
00859 mit != mycgi->getEnvironment().end(); mit++)
00860 ost << mit->first << "%" << mit->second << ";";
00861 std::vector<FormEntry> els = cgi.getElements() ;
00862 std::vector<FormEntry> el1;
00863 cgi.getElement("method",el1);
00864 std::vector<FormEntry> el2;
00865 cgi.getElement("process",el2);
00866 if(el1.size()!=0) {
00867 std::string meth = el1[0].getValue();
00868 if(el2.size()!=0) {
00869 unsigned int i = 0;
00870 std::string mod = el2[0].getValue();
00871 pid = atoi(mod.c_str());
00872 for(; i < subs_.size(); i++)
00873 if(subs_[i].pid()==pid) break;
00874 if(i>=subs_.size()){
00875 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00876 return;
00877 }
00878 if(subs_[i].alive() != 1){
00879 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00880 return;
00881 }
00882 MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00883 strncpy(msg1->mtext,meth.c_str(),meth.length());
00884 strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00885 subs_[i].post(msg1,true);
00886 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00887 superSleepSec_.value_=10*keep_supersleep_original_value;
00888 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00889 bool done = false;
00890 std::vector<char *>pieces;
00891 while(!done){
00892 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00893 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00894 ::sleep(1);
00895 continue;
00896 }
00897 unsigned int nbytes = atoi(msg->mtext);
00898 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00899 char *buf= new char[nbytes];
00900 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00901 if(retval!=nbytes) std::cout
00902 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00903 pieces.push_back(buf);
00904 }
00905 superSleepSec_.value_=keep_supersleep_original_value;
00906 for(unsigned int j = 0; j < pieces.size(); j++){
00907 *out<<pieces[j];
00908 delete[] pieces[j];
00909 }
00910 }
00911 }
00912 }
00913
00914
00915
00916 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00917 throw (xgi::exception::Exception)
00918 {
00919
00920
00921 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00922 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00923 << getApplicationDescriptor()->getInstance() << "</title>"
00924 << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00925 << "</head></html>";
00926 }
00927
00928
00929
00930
00931
00932 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00933 throw (xgi::exception::Exception)
00934 {
00935
00936 std::string urn = getApplicationDescriptor()->getURN();
00937
00938 *out << "<!-- base href=\"/" << urn
00939 << "\"> -->" << std::endl;
00940 *out << "<html>" << std::endl;
00941 *out << "<head>" << std::endl;
00942 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00943 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00944 *out << "<title>" << getApplicationDescriptor()->getClassName()
00945 << getApplicationDescriptor()->getInstance()
00946 << " MAIN</title>" << std::endl;
00947 *out << "</head>" << std::endl;
00948 *out << "<body>" << std::endl;
00949 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00950 *out << "<tr>" << std::endl;
00951 *out << " <td align=\"left\">" << std::endl;
00952 *out << " <img" << std::endl;
00953 *out << " align=\"middle\"" << std::endl;
00954 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00955 *out << " alt=\"main\"" << std::endl;
00956 *out << " width=\"64\"" << std::endl;
00957 *out << " height=\"64\"" << std::endl;
00958 *out << " border=\"\"/>" << std::endl;
00959 *out << " <b>" << std::endl;
00960 *out << getApplicationDescriptor()->getClassName()
00961 << getApplicationDescriptor()->getInstance() << std::endl;
00962 *out << " " << fsm_.stateName()->toString() << std::endl;
00963 *out << " </b>" << std::endl;
00964 *out << " </td>" << std::endl;
00965 *out << " <td width=\"32\">" << std::endl;
00966 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00967 *out << " <img" << std::endl;
00968 *out << " align=\"middle\"" << std::endl;
00969 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00970 *out << " alt=\"HyperDAQ\"" << std::endl;
00971 *out << " width=\"32\"" << std::endl;
00972 *out << " height=\"32\"" << std::endl;
00973 *out << " border=\"\"/>" << std::endl;
00974 *out << " </a>" << std::endl;
00975 *out << " </td>" << std::endl;
00976 *out << " <td width=\"32\">" << std::endl;
00977 *out << " </td>" << std::endl;
00978 *out << " <td width=\"32\">" << std::endl;
00979 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00980 *out << " <img" << std::endl;
00981 *out << " align=\"middle\"" << std::endl;
00982 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00983 *out << " alt=\"main\"" << std::endl;
00984 *out << " width=\"32\"" << std::endl;
00985 *out << " height=\"32\"" << std::endl;
00986 *out << " border=\"\"/>" << std::endl;
00987 *out << " </a>" << std::endl;
00988 *out << " </td>" << std::endl;
00989 *out << "</tr>" << std::endl;
00990 *out << "</table>" << std::endl;
00991
00992 *out << "<hr/>" << std::endl;
00993
00994 std::ostringstream ost;
00995 if(myProcess_)
00996 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00997 else
00998 ost << "/moduleWeb?";
00999 urn += ost.str();
01000 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
01001 evtProcessor_.taskWebPage(in,out,urn);
01002 else if(evtProcessor_)
01003 evtProcessor_.summaryWebPage(in,out,urn);
01004 else
01005 *out << "<td>HLT Unconfigured</td>" << std::endl;
01006 *out << "</table>" << std::endl;
01007
01008 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
01009 *out << configuration_ << std::endl;
01010 *out << "</textarea><P>" << std::endl;
01011
01012 *out << "</body>" << std::endl;
01013 *out << "</html>" << std::endl;
01014
01015
01016 }
01017 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
01018 throw (xgi::exception::Exception)
01019 {
01020
01021 out->getHTTPResponseHeader().addHeader( "Content-Type",
01022 "application/octet-stream" );
01023 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
01024 "binary" );
01025 if(evtProcessor_ != 0){
01026 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic));
01027 }
01028 }
01029
01030 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
01031 throw (xgi::exception::Exception)
01032 {
01033
01034 if(evtProcessor_ != 0){
01035 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01036 if(legenda !=0){
01037 std::string slegenda = ((xdata::String*)legenda)->value_;
01038 *out << slegenda << std::endl;
01039 }
01040 }
01041 }
01042
01043
01044 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)
01045 {
01046 std::string errmsg;
01047 try {
01048 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01049 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01050 edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01051 }
01052 catch (cms::Exception& e) {
01053 errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01054 }
01055 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01056 }
01057
01058
01059 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
01060 {
01061 std::string errmsg;
01062 bool success = false;
01063 try {
01064 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01065 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01066 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01067 if (!success) errmsg = "Failed to attach DQM service to shared memory";
01068 }
01069 catch (cms::Exception& e) {
01070 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01071 }
01072 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01073 }
01074
01075
01076
01077 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01078 {
01079 std::string errmsg;
01080 bool success = false;
01081 try {
01082 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01083 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01084 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01085 if (!success) errmsg = "Failed to detach DQM service from shared memory";
01086 }
01087 catch (cms::Exception& e) {
01088 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01089 }
01090 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01091 }
01092
01093
01094 std::string FUEventProcessor::logsAsString()
01095 {
01096 std::ostringstream oss;
01097 if(logWrap_)
01098 {
01099 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01100 oss << logRing_[i] << std::endl;
01101 for(unsigned int i = 0; i < logRingIndex_; i++)
01102 oss << logRing_[i] << std::endl;
01103 }
01104 else
01105 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01106 oss << logRing_[i] << std::endl;
01107
01108 return oss.str();
01109 }
01110
01111 void FUEventProcessor::localLog(std::string m)
01112 {
01113 timeval tv;
01114
01115 gettimeofday(&tv,0);
01116 tm *uptm = localtime(&tv.tv_sec);
01117 char datestring[256];
01118 strftime(datestring, sizeof(datestring),"%c", uptm);
01119
01120 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01121 logRingIndex_--;
01122 std::ostringstream timestamp;
01123 timestamp << " at " << datestring;
01124 m += timestamp.str();
01125 logRing_[logRingIndex_] = m;
01126 }
01127
01128 void FUEventProcessor::startSupervisorLoop()
01129 {
01130 try {
01131 wlSupervising_=
01132 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01133 "waiting");
01134 if (!wlSupervising_->isActive()) wlSupervising_->activate();
01135 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01136 "Supervisor");
01137 wlSupervising_->submit(asSupervisor_);
01138 supervising_ = true;
01139 }
01140 catch (xcept::Exception& e) {
01141 std::string msg = "Failed to start workloop 'Supervisor'.";
01142 XCEPT_RETHROW(evf::Exception,msg,e);
01143 }
01144 }
01145
01146 void FUEventProcessor::startReceivingLoop()
01147 {
01148 try {
01149 wlReceiving_=
01150 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01151 "waiting");
01152 if (!wlReceiving_->isActive()) wlReceiving_->activate();
01153 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01154 "Receiving");
01155 wlReceiving_->submit(asReceiveMsgAndExecute_);
01156 receiving_ = true;
01157 }
01158 catch (xcept::Exception& e) {
01159 std::string msg = "Failed to start workloop 'Receiving'.";
01160 XCEPT_RETHROW(evf::Exception,msg,e);
01161 }
01162 }
01163 void FUEventProcessor::startReceivingMonitorLoop()
01164 {
01165 try {
01166 wlReceivingMonitor_=
01167 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01168 "waiting");
01169 if (!wlReceivingMonitor_->isActive())
01170 wlReceivingMonitor_->activate();
01171 asReceiveMsgAndRead_ =
01172 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01173 "ReceivingM");
01174 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01175 receivingM_ = true;
01176 }
01177 catch (xcept::Exception& e) {
01178 std::string msg = "Failed to start workloop 'ReceivingM'.";
01179 XCEPT_RETHROW(evf::Exception,msg,e);
01180 }
01181 }
01182
01183 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01184 {
01185 MsgBuf msg;
01186 try{
01187 myProcess_->rcvSlave(msg,false);
01188 if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01189 {
01190 rlimit rl;
01191 getrlimit(RLIMIT_CORE,&rl);
01192 rl.rlim_cur=0;
01193 setrlimit(RLIMIT_CORE,&rl);
01194 rlimit_coresize_changed_=true;
01195 }
01196 if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01197 {
01198
01199 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01200 rlimit_coresize_changed_=false;
01201 }
01202 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01203 {
01204 pthread_mutex_lock(&stop_lock_);
01205 try {
01206 fsm_.fireEvent("Stop",this);
01207 }
01208 catch (...) {
01209 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01210 << getpid() << " The state on Stop event was not consistent");
01211 }
01212
01213 try {
01214 stopClassic();
01215 }
01216 catch (...) {
01217 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
01218 }
01219
01220
01221 try{
01222 messageServicePresence_.reset();
01223 }
01224 catch(...) {
01225 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
01226 }
01227
01228 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01229 myProcess_->postSlave(msg1,false);
01230 pthread_mutex_unlock(&stop_lock_);
01231 fclose(stdout);
01232 fclose(stderr);
01233 _exit(EXIT_SUCCESS);
01234 }
01235 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01236 _exit(EXIT_SUCCESS);
01237 }
01238 catch(evf::Exception &e){
01239 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
01240 }
01241 return true;
01242 }
01243
01244 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01245 {
01246 pthread_mutex_lock(&stop_lock_);
01247 if(subs_.size()!=nbSubProcesses_.value_)
01248 {
01249 pthread_mutex_lock(&pickup_lock_);
01250 if(subs_.size()!=nbSubProcesses_.value_) {
01251 subs_.resize(nbSubProcesses_.value_);
01252 spMStates_.resize(nbSubProcesses_.value_);
01253 spmStates_.resize(nbSubProcesses_.value_);
01254 for(unsigned int i = 0; i < spMStates_.size(); i++)
01255 {
01256 spMStates_[i] = edm::event_processor::sInit;
01257 spmStates_[i] = 0;
01258 }
01259 }
01260 pthread_mutex_unlock(&pickup_lock_);
01261 }
01262 bool running = fsm_.stateName()->toString()=="Enabled";
01263 bool stopping = fsm_.stateName()->toString()=="stopping";
01264 for(unsigned int i = 0; i < subs_.size(); i++)
01265 {
01266 if(subs_[i].alive()==-1000) continue;
01267 int sl;
01268 pid_t sub_pid = subs_[i].pid();
01269 pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01270
01271 if(killedOrNot && killedOrNot==sub_pid) {
01272 pthread_mutex_lock(&pickup_lock_);
01273
01274 if (i<subs_.size() && subs_[i].alive()!=-1000) {
01275 subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01276 std::ostringstream ost;
01277 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01278 else if(WIFSIGNALED(sl)!=0) {
01279 ost << " process terminated with signal " << WTERMSIG(sl);
01280 }
01281 else ost << " process stopped ";
01282
01283
01284
01285
01286 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01287 subs_[i].setReasonForFailed(ost.str());
01288 spMStates_[i] = evtProcessor_.notstarted_state_code();
01289 spmStates_[i] = 0;
01290 std::ostringstream ost1;
01291 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01292 localLog(ost1.str());
01293 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01294 }
01295 pthread_mutex_unlock(&pickup_lock_);
01296 }
01297 }
01298 pthread_mutex_unlock(&stop_lock_);
01299 if(stopping) return true;
01300
01301
01302 if (running && rlimit_coresize_changed_) {
01303 timeval newtv;
01304 gettimeofday(&newtv,0);
01305 int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01306 if (delta>60*15) {
01307 std::ostringstream ostr;
01308 ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01309 std::cout << ostr.str() << std::endl;
01310 LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01311 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01312 MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01313 for (unsigned int i = 0; i < subs_.size(); i++) {
01314 try {
01315 if (subs_[i].alive())
01316 subs_[i].post(master_message_rlr_,false);
01317 }
01318 catch (...) {}
01319 }
01320 rlimit_coresize_changed_=false;
01321 crashesThisRun_=0;
01322 }
01323 }
01324
01325 if(running && edm_init_done_)
01326 {
01327
01328
01329 if(autoRestartSlaves_.value_){
01330 pthread_mutex_lock(&stop_lock_);
01331 for(unsigned int i = 0; i < subs_.size(); i++)
01332 {
01333 if(subs_[i].alive() != 1){
01334 if(subs_[i].countdown() == 0)
01335 {
01336 if(subs_[i].restartCount()>2){
01337 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
01338 << " - maximum restart count reached");
01339 std::ostringstream ost1;
01340 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
01341 localLog(ost1.str());
01342 subs_[i].countdown()--;
01343 XCEPT_DECLARE(evf::Exception,
01344 sentinelException, ost1.str());
01345 notifyQualified("error",sentinelException);
01346 subs_[i].disconnect();
01347 continue;
01348 }
01349 subs_[i].restartCount()++;
01350 if (forkInEDM_.value_) {
01351 restartForkInEDM(i);
01352 }
01353 else {
01354 pid_t rr = subs_[i].forkNew();
01355 if(rr==0)
01356 {
01357 myProcess_=&subs_[i];
01358 scalersUpdates_ = 0;
01359 int retval = pthread_mutex_destroy(&stop_lock_);
01360 if(retval != 0) perror("error");
01361 retval = pthread_mutex_init(&stop_lock_,0);
01362 if(retval != 0) perror("error");
01363 fsm_.disableRcmsStateNotification();
01364 fsm_.fireEvent("Stop",this);
01365 fsm_.fireEvent("StopDone",this);
01366 fsm_.fireEvent("Enable",this);
01367 try{
01368 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01369 if(lsid) {
01370 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01371 }
01372 }
01373 catch(...){
01374 std::cout << "trouble with lsindex during restart" << std::endl;
01375 }
01376 try{
01377 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01378 if(lstb) {
01379 ((xdata::Boolean*)(lstb))->value_ = false;
01380 }
01381 }
01382 catch(...){
01383 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01384 }
01385
01386 evtProcessor_.adjustLsIndexForRestart();
01387 evtProcessor_.resetPackedTriggerReport();
01388 enableMPEPSlave();
01389 return false;
01390 }
01391 else
01392 {
01393 std::ostringstream ost1;
01394 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01395 localLog(ost1.str());
01396 }
01397 }
01398 }
01399 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01400 }
01401 }
01402 pthread_mutex_unlock(&stop_lock_);
01403 }
01404 }
01405 xdata::Serializable *lsid = 0;
01406 xdata::Serializable *psid = 0;
01407 xdata::Serializable *dqmp = 0;
01408 xdata::UnsignedInteger32 *dqm = 0;
01409
01410
01411
01412 if(running && edm_init_done_){
01413 try{
01414 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01415 psid = applicationInfoSpace_->find("prescaleSetIndex");
01416 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01417 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01418 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01419 }
01420 catch(xdata::exception::Exception e){
01421 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01422 }
01423
01424 try{
01425 if(nbProcessed !=0 && nbAccepted !=0)
01426 {
01427 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01428 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01429 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01430 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01431 if(dqmp!=0)
01432 dqm = (xdata::UnsignedInteger32*)dqmp;
01433 if(dqm) dqm->value_ = 0;
01434 nbTotalDQM_ = 0;
01435 nbp->value_ = 0;
01436 nba->value_ = 0;
01437 nblive_ = 0;
01438 nbdead_ = 0;
01439 scalersUpdates_ = 0;
01440
01441 for(unsigned int i = 0; i < subs_.size(); i++)
01442 {
01443 if(subs_[i].alive()>0)
01444 {
01445 nblive_++;
01446 try{
01447 subs_[i].post(master_message_prg_,true);
01448
01449 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01450 if(retval == (unsigned long) master_message_prr_->mtype){
01451 prg* p = (struct prg*)(master_message_prr_->mtext);
01452 subs_[i].setParams(p);
01453 spMStates_[i] = p->Ms;
01454 spmStates_[i] = p->ms;
01455 cpustat_->addEntry(p->ms);
01456 if(!subs_[i].inInconsistentState() &&
01457 (p->Ms == edm::event_processor::sError
01458 || p->Ms == edm::event_processor::sInvalid
01459 || p->Ms == edm::event_processor::sStopping))
01460 {
01461 std::ostringstream ost;
01462 ost << "edm::eventprocessor slot " << i << " process id "
01463 << subs_[i].pid() << " not in Running state : Mstate="
01464 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01465 << evtProcessor_.moduleNameFromIndex(p->ms)
01466 << " - Look into possible error messages from HLT process";
01467 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01468 }
01469 nbp->value_ += subs_[i].params().nbp;
01470 nba->value_ += subs_[i].params().nba;
01471 if(dqm)dqm->value_ += p->dqm;
01472 nbTotalDQM_ += p->dqm;
01473 scalersUpdates_ += p->trp;
01474 if(p->ls > ls->value_) ls->value_ = p->ls;
01475 if(p->ps != ps->value_) ps->value_ = p->ps;
01476 }
01477 else{
01478 nbp->value_ += subs_[i].get_save_nbp();
01479 nba->value_ += subs_[i].get_save_nba();
01480 }
01481 }
01482 catch(evf::Exception &e){
01483 LOG4CPLUS_INFO(getApplicationLogger(),
01484 "could not send/receive msg on slot "
01485 << i << " - " << e.what());
01486 }
01487
01488 }
01489 else
01490 {
01491 nbp->value_ += subs_[i].get_save_nbp();
01492 nba->value_ += subs_[i].get_save_nba();
01493 nbdead_++;
01494 }
01495 }
01496 if(nbp->value_>64){
01497 for(unsigned int i = 0; i < subs_.size(); i++)
01498 {
01499 if(subs_[i].params().nbp == 0){
01500
01501 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01502 {
01503 subs_[i].found_invalid();
01504 if(subs_[i].nfound_invalid() > 60){
01505 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01506 subs_[i].post(msg3,false);
01507 std::ostringstream ost1;
01508 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01509 localLog(ost1.str());
01510 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01511 XCEPT_DECLARE(evf::Exception,
01512 sentinelException, ost1.str());
01513 notifyQualified("error",sentinelException);
01514
01515 }
01516 }
01517 }
01518 }
01519 }
01520 }
01521 }
01522 catch(std::exception &e){
01523 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01524 }
01525 catch(...){
01526 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01527 }
01528 }
01529 else{
01530 for(unsigned int i = 0; i < subs_.size(); i++)
01531 {
01532 if(subs_[i].alive()==-1000)
01533 {
01534 spMStates_[i] = edm::event_processor::sInit;
01535 spmStates_[i] = 0;
01536 }
01537 }
01538 }
01539 try{
01540 monitorInfoSpace_->lock();
01541 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01542 monitorInfoSpace_->unlock();
01543 }
01544 catch(xdata::exception::Exception &e)
01545 {
01546 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01547
01548 }
01549 ::sleep(superSleepSec_.value_);
01550 return true;
01551 }
01552
01553 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01554 {
01555 try {
01556 wlScalers_=
01557 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01558 "waiting");
01559 if (!wlScalers_->isActive()) wlScalers_->activate();
01560 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01561 "Scalers");
01562
01563 wlScalers_->submit(asScalers_);
01564 wlScalersActive_ = true;
01565 }
01566 catch (xcept::Exception& e) {
01567 std::string msg = "Failed to start workloop 'Scalers'.";
01568 XCEPT_RETHROW(evf::Exception,msg,e);
01569 }
01570 }
01571
01572
01573
01574 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01575 {
01576 try {
01577 wlSummarize_=
01578 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01579 "waiting");
01580 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01581
01582 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01583 "Summary");
01584
01585 wlSummarize_->submit(asSummarize_);
01586 wlSummarizeActive_ = true;
01587 }
01588 catch (xcept::Exception& e) {
01589 std::string msg = "Failed to start workloop 'Summarize'.";
01590 XCEPT_RETHROW(evf::Exception,msg,e);
01591 }
01592 }
01593
01594
01595
01596 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01597 {
01598 if(evtProcessor_)
01599 {
01600 if(!evtProcessor_.getTriggerReport(true)) {
01601 wlScalersActive_ = false;
01602 return false;
01603 }
01604 }
01605 else
01606 {
01607 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01608 wlScalersActive_ = false;
01609 return false;
01610 }
01611 if(myProcess_)
01612 {
01613
01614 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01615 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01616 scalersUpdates_++;
01617 }
01618 else
01619 evtProcessor_.fireScalersUpdate();
01620 return true;
01621 }
01622
01623
01624 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01625 {
01626 evtProcessor_.resetPackedTriggerReport();
01627 bool atLeastOneProcessUpdatedSuccessfully = false;
01628 int msgCount = 0;
01629 for (unsigned int i = 0; i < subs_.size(); i++)
01630 {
01631 if(subs_[i].alive()>0)
01632 {
01633 int ret = 0;
01634 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01635 evtProcessor_.getLumiSectionReferenceIndex()))
01636 {
01637 ret = MSQS_MESSAGE_TYPE_TRR;
01638 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01639 }
01640 else{
01641 bool insync = false;
01642 bool exception_caught = false;
01643 while(!insync){
01644 try{
01645 ret = subs_[i].rcv(master_message_trr_,false);
01646 }
01647 catch(evf::Exception &e)
01648 {
01649 std::cout << "exception in msgrcv on " << i
01650 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01651 exception_caught = true;
01652 break;
01653
01654 }
01655 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01656 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01657 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01658 insync = true;
01659 }
01660 }
01661 }
01662 if(exception_caught) continue;
01663 }
01664 msgCount++;
01665 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01666 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01667 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01668 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01669 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01670 subs_[i].add_postponed_trigger_update(master_message_trr_);
01671 }else{
01672 atLeastOneProcessUpdatedSuccessfully = true;
01673 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01674 }
01675 }
01676 else std::cout << "msgrcv returned error " << errno << std::endl;
01677 }
01678 }
01679 if(atLeastOneProcessUpdatedSuccessfully){
01680 nbSubProcessesReporting_.value_ = msgCount;
01681 evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01682 evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01683 evtProcessor_.updateRollingReport();
01684 evtProcessor_.fireScalersUpdate();
01685 }
01686 else{
01687 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01688 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01689 nbSubProcessesReporting_.value_ = 0;
01690 ::sleep(10);
01691 }
01692 if(fsm_.stateName()->toString()!="Enabled"){
01693 wlScalersActive_ = false;
01694 return false;
01695 }
01696
01697 if(iDieStatisticsGathering_.value_){
01698 try{
01699 unsigned long long idleTmp=idleProcStats_;
01700 unsigned long long allPSTmp=allProcStats_;
01701 idleProcStats_=allProcStats_=0;
01702
01703 utils::procCpuStat(idleProcStats_,allProcStats_);
01704 timeval oldtime=lastProcReport_;
01705 gettimeofday(&lastProcReport_,0);
01706
01707 if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01708 cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01709 int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
01710 + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
01711 cpustat_->setElapsed(deltaTms);
01712 }
01713 else {
01714 cpustat_->setCPUStat(0);
01715 cpustat_->setElapsed(0);
01716 }
01717
01718 TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01719 cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01720 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01721 ratestat_->sendStat((unsigned char*)trsp,
01722 sizeof(TriggerReportStatic),
01723 evtProcessor_.getLumiSectionReferenceIndex());
01724 }catch(evf::Exception &e){
01725 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01726 << e.what());
01727 }
01728 }
01729 cpustat_->reset();
01730 return true;
01731 }
01732
01733
01734
01735 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01736 {
01737 try{
01738 myProcess_->rcvSlave(slave_message_monitoring_,true);
01739 switch(slave_message_monitoring_->mtype)
01740 {
01741 case MSQM_MESSAGE_TYPE_MCS:
01742 {
01743 xgi::Input *in = 0;
01744 xgi::Output out;
01745 evtProcessor_.microState(in,&out);
01746 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01747 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01748 myProcess_->postSlave(msg1,true);
01749 break;
01750 }
01751
01752 case MSQM_MESSAGE_TYPE_PRG:
01753 {
01754 xdata::Serializable *dqmp = 0;
01755 xdata::UnsignedInteger32 *dqm = 0;
01756 evtProcessor_.monitoring(0);
01757 try{
01758 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01759 } catch(xdata::exception::Exception e){}
01760 if(dqmp!=0)
01761 dqm = (xdata::UnsignedInteger32*)dqmp;
01762
01763
01764 prg * data = (prg*)slave_message_prr_->mtext;
01765 data->ls = evtProcessor_.lsid_;
01766 data->eols = evtProcessor_.lastLumiUsingEol_;
01767 data->ps = evtProcessor_.psid_;
01768 data->nbp = evtProcessor_->totalEvents();
01769 data->nba = evtProcessor_->totalEventsPassed();
01770 data->Ms = evtProcessor_.epMAltState_.value_;
01771 data->ms = evtProcessor_.epmAltState_.value_;
01772 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01773 data->trp = scalersUpdates_;
01774
01775 myProcess_->postSlave(slave_message_prr_,true);
01776 if(exitOnError_.value_)
01777 {
01778
01779
01780 if(data->Ms == edm::event_processor::sError)
01781 {
01782 bool running = true;
01783 int count = 0;
01784 while(running){
01785 int retval = pthread_mutex_lock(&stop_lock_);
01786 if(retval != 0) perror("error");
01787 running = fsm_.stateName()->toString()=="Enabled";
01788 if(count>5) _exit(-1);
01789 pthread_mutex_unlock(&stop_lock_);
01790 if(running) {::sleep(1); count++;}
01791 }
01792 }
01793 }
01794 break;
01795 }
01796 case MSQM_MESSAGE_TYPE_WEB:
01797 {
01798 xgi::Input *in = 0;
01799 xgi::Output out;
01800 unsigned int bytesToSend = 0;
01801 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01802 std::string query = slave_message_monitoring_->mtext;
01803 size_t pos = query.find_first_of("&");
01804 std::string method;
01805 std::string args;
01806 if(pos!=std::string::npos)
01807 {
01808 method = query.substr(0,pos);
01809 args = query.substr(pos+1,query.length()-pos-1);
01810 }
01811 else
01812 method=query;
01813
01814 if(method=="Spotlight")
01815 {
01816 spotlightWebPage(in,&out);
01817 }
01818 else if(method=="procStat")
01819 {
01820 procStat(in,&out);
01821 }
01822 else if(method=="moduleWeb")
01823 {
01824 internal::MyCgi mycgi;
01825 boost::char_separator<char> sep(";");
01826 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01827 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01828 tok_iter != tokens.end(); ++tok_iter){
01829 size_t pos = (*tok_iter).find_first_of("%");
01830 if(pos != std::string::npos){
01831 std::string first = (*tok_iter).substr(0 , pos);
01832 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01833 mycgi.getEnvironment()[first]=second;
01834 }
01835 }
01836 moduleWeb(&mycgi,&out);
01837 }
01838 else if(method=="Default")
01839 {
01840 defaultWebPage(in,&out);
01841 }
01842 else
01843 {
01844 out << "Error 404!!!!!!!!" << std::endl;
01845 }
01846
01847
01848 bytesToSend = out.str().size();
01849 unsigned int cycle = 0;
01850 if(bytesToSend==0)
01851 {
01852 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01853 myProcess_->postSlave(msg1,true);
01854 }
01855 while(bytesToSend !=0){
01856 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01857 write(anonymousPipe_[PIPE_WRITE],
01858 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01859 msgSize);
01860 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01861 myProcess_->postSlave(msg1,true);
01862 bytesToSend -= msgSize;
01863 cycle++;
01864 }
01865 break;
01866 }
01867 case MSQM_MESSAGE_TYPE_TRP:
01868 {
01869 break;
01870 }
01871 }
01872 }
01873 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01874 return true;
01875 }
01876
01877 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01878 {
01879
01880
01881 try {
01882 wlSignalMonitor_=
01883 toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01884 "waiting");
01885
01886 if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01887 asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01888 "SignalMonitor");
01889 wlSignalMonitor_->submit(asSignalMonitor_);
01890 signalMonitorActive_ = true;
01891 }
01892 catch (xcept::Exception& e) {
01893 std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01894 std::cout << e.what() << std::endl;
01895 XCEPT_RETHROW(evf::Exception,msg,e);
01896 }
01897 }
01898
01899
01900 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01901 {
01902 while (1) {
01903 sem_wait(sigmon_sem_);
01904 std::cout << " received signal notification from slave!"<< std::endl;
01905
01906
01907 bool running = fsm_.stateName()->toString()=="Enabled";
01908 bool stopping = fsm_.stateName()->toString()=="stopping";
01909 bool enabling = fsm_.stateName()->toString()=="enabling";
01910 if (!running && !enabling) {
01911 signalMonitorActive_ = false;
01912 return false;
01913 }
01914
01915 crashesThisRun_++;
01916 gettimeofday(&lastCrashTime_,0);
01917
01918
01919 if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01920
01921 rlimit rlold;
01922 getrlimit(RLIMIT_CORE,&rlold);
01923 rlimit rlnew = rlold;
01924 rlnew.rlim_cur=0;
01925 setrlimit(RLIMIT_CORE,&rlnew);
01926 rlimit_coresize_changed_=true;
01927 MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01928
01929 unsigned int min=1;
01930 for (unsigned int i = min; i < subs_.size(); i++) {
01931 try {
01932 if (subs_[i].alive()) {
01933 subs_[i].post(master_message_rli_,false);
01934 }
01935 }
01936 catch (...) {}
01937 }
01938 std::ostringstream ostr;
01939 ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01940 << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01941 LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01942 }
01943 }
01944 signalMonitorActive_ = false;
01945 return false;
01946 }
01947
01948
01949 bool FUEventProcessor::enableCommon()
01950 {
01951 try {
01952 if(hasShMem_) attachDqmToShm();
01953 int sc = 0;
01954 evtProcessor_->clearCounters();
01955 if(isRunNumberSetter_)
01956 evtProcessor_->setRunNumber(runNumber_.value_);
01957 else
01958 evtProcessor_->declareRunNumber(runNumber_.value_);
01959
01960 try{
01961 ::sleep(1);
01962 evtProcessor_->runAsync();
01963 sc = evtProcessor_->statusAsync();
01964 }
01965 catch(cms::Exception &e) {
01966 reasonForFailedState_ = e.explainSelf();
01967 fsm_.fireFailed(reasonForFailedState_,this);
01968 localLog(reasonForFailedState_);
01969 return false;
01970 }
01971 catch(std::exception &e) {
01972 reasonForFailedState_ = e.what();
01973 fsm_.fireFailed(reasonForFailedState_,this);
01974 localLog(reasonForFailedState_);
01975 return false;
01976 }
01977 catch(...) {
01978 reasonForFailedState_ = "Unknown Exception";
01979 fsm_.fireFailed(reasonForFailedState_,this);
01980 localLog(reasonForFailedState_);
01981 return false;
01982 }
01983 if(sc != 0) {
01984 std::ostringstream oss;
01985 oss<<"EventProcessor::runAsync returned status code " << sc;
01986 reasonForFailedState_ = oss.str();
01987 fsm_.fireFailed(reasonForFailedState_,this);
01988 localLog(reasonForFailedState_);
01989 return false;
01990 }
01991 }
01992 catch (xcept::Exception &e) {
01993 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01994 fsm_.fireFailed(reasonForFailedState_,this);
01995 localLog(reasonForFailedState_);
01996 return false;
01997 }
01998 try{
01999 fsm_.fireEvent("EnableDone",this);
02000 }
02001 catch (xcept::Exception &e) {
02002 std::cout << "exception " << (std::string)e.what() << std::endl;
02003 throw;
02004 }
02005
02006 return false;
02007 }
02008
02009 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
02010 ((FUEventProcessor*)addr)->forkProcessesFromEDM();
02011 }
02012
02013 void FUEventProcessor::forkProcessesFromEDM() {
02014
02015 moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
02016 unsigned int forkFrom=0;
02017 unsigned int forkTo=nbSubProcesses_.value_;
02018 if (forkParams->slotId>=0) {
02019 forkFrom=forkParams->slotId;
02020 forkTo=forkParams->slotId+1;
02021 }
02022
02023
02024 try {
02025 if (sorRef_) {
02026 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02027 for (unsigned int i=0;i<shmOutputs.size();i++) {
02028
02029 shmOutputs[i]->unregisterFromShm();
02030
02031 shmOutputs[i]->stop();
02032 }
02033 }
02034 }
02035 catch (std::exception &e)
02036 {
02037 reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02038 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02039 fsm_.fireFailed(reasonForFailedState_,this);
02040 localLog(reasonForFailedState_);
02041 }
02042 catch (...) {
02043 reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02044 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02045 fsm_.fireFailed(reasonForFailedState_,this);
02046 localLog(reasonForFailedState_);
02047 }
02048
02049 std::string currentState = fsm_.stateName()->toString();
02050
02051
02052 if (currentState!="stopping") {
02053 try {
02054 messageServicePresence_.reset();
02055 }
02056 catch (...) {
02057 LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
02058 }
02059 }
02060
02061 if (currentState=="stopping") {
02062 LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02063 forkParams->isMaster=1;
02064 forkInfoObj_->forkParams.slotId=-1;
02065 forkInfoObj_->forkParams.restart=0;
02066 }
02067
02068 else for(unsigned int i=forkFrom; i<forkTo; i++)
02069 {
02070 int retval = subs_[i].forkNew();
02071 if(retval==0)
02072 {
02073 forkParams->isMaster=0;
02074 myProcess_ = &subs_[i];
02075
02076 delete toolbox::mem::_s_mutex_ptr_;
02077 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02078 int retval = pthread_mutex_destroy(&stop_lock_);
02079 if(retval != 0) perror("error");
02080 retval = pthread_mutex_init(&stop_lock_,0);
02081 if(retval != 0) perror("error");
02082 fsm_.disableRcmsStateNotification();
02083
02084
02085 try{
02086 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02087 if(pf != 0) {
02088 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02089 }
02090 else {
02091 LOG4CPLUS_ERROR(getApplicationLogger(),
02092 "SLAVE: Unable to create message service presence. pid:"<<getpid());
02093 }
02094 }
02095 catch(...) {
02096 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
02097 }
02098
02099 ML::MLlog4cplus::setAppl(this);
02100
02101
02102 try {
02103 if (sorRef_) {
02104 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02105 for (unsigned int i=0;i<shmOutputs.size();i++)
02106 shmOutputs[i]->start();
02107 }
02108 }
02109 catch (...)
02110 {
02111 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02112 }
02113
02114 if (forkParams->restart) {
02115
02116 scalersUpdates_ = 0;
02117 try {
02118 fsm_.fireEvent("Stop",this);
02119 fsm_.fireEvent("StopDone",this);
02120 fsm_.fireEvent("Enable",this);
02121 } catch (...) {
02122 LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02123 }
02124 try{
02125 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02126 if(lsid) {
02127 ((xdata::UnsignedInteger32*)(lsid))->value_--;
02128 }
02129 }
02130 catch(...){
02131 std::cout << "trouble with lsindex during restart" << std::endl;
02132 }
02133 try{
02134 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02135 if(lstb) {
02136 ((xdata::Boolean*)(lstb))->value_ = false;
02137 }
02138 }
02139 catch(...){
02140 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02141 }
02142 evtProcessor_.adjustLsIndexForRestart();
02143 evtProcessor_.resetPackedTriggerReport();
02144 }
02145
02146
02147 startReceivingLoop();
02148 startReceivingMonitorLoop();
02149 startScalersWorkLoop();
02150 while(!evtProcessor_.isWaitingForLs())
02151 ::usleep(100000);
02152
02153
02154 if(hasShMem_) attachDqmToShm();
02155
02156
02157 try {
02158 fsm_.fireEvent("EnableDone",this);
02159 }
02160 catch (...) {}
02161
02162
02163 while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02164
02165
02166 sigset_t tmpset_thread;
02167 sigemptyset(&tmpset_thread);
02168 sigaddset(&tmpset_thread, SIGQUIT);
02169 sigaddset(&tmpset_thread, SIGILL);
02170 sigaddset(&tmpset_thread, SIGABRT);
02171 sigaddset(&tmpset_thread, SIGFPE);
02172 sigaddset(&tmpset_thread, SIGSEGV);
02173 sigaddset(&tmpset_thread, SIGALRM);
02174
02175 pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02176
02177
02178 struct sigaction sa;
02179 sigset_t tmpset;
02180 memset(&tmpset,0,sizeof(tmpset));
02181 sigemptyset(&tmpset);
02182 sa.sa_mask=tmpset;
02183 sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02184 sa.sa_handler=0;
02185 sa.sa_sigaction=evfep_sighandler;
02186
02187 sigaction(SIGQUIT,&sa,0);
02188 sigaction(SIGILL,&sa,0);
02189 sigaction(SIGABRT,&sa,0);
02190 sigaction(SIGFPE,&sa,0);
02191 sigaction(SIGSEGV,&sa,0);
02192 sa.sa_sigaction=evfep_alarmhandler;
02193 sigaction(SIGALRM,&sa,0);
02194
02195
02196 return ;
02197 }
02198 else {
02199
02200 forkParams->isMaster=1;
02201 forkInfoObj_->forkParams.slotId=-1;
02202 if (forkParams->restart) {
02203 std::ostringstream ost1;
02204 ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02205 localLog(ost1.str());
02206 }
02207 forkInfoObj_->forkParams.restart=0;
02208
02209 }
02210 }
02211
02212
02213 try{
02214
02215 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02216 if(pf != 0) {
02217 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02218 }
02219 else {
02220 LOG4CPLUS_ERROR(getApplicationLogger(),
02221 "Unable to recreate message service presence ");
02222 }
02223 }
02224 catch(...) {
02225 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02226 }
02227 restart_in_progress_=false;
02228 edm_init_done_=true;
02229 }
02230
02231 bool FUEventProcessor::enableForkInEDM()
02232 {
02233 evtProcessor_.resetWaiting();
02234 try {
02235
02236
02237
02238 int sc = 0;
02239
02240 evtProcessor_->clearCounters();
02241 if(isRunNumberSetter_)
02242 evtProcessor_->setRunNumber(runNumber_.value_);
02243 else
02244 evtProcessor_->declareRunNumber(runNumber_.value_);
02245
02246
02247 pthread_mutex_destroy(&forkObjLock_);
02248 pthread_mutex_init(&forkObjLock_,0);
02249 if (forkInfoObj_) delete forkInfoObj_;
02250 forkInfoObj_ = new moduleweb::ForkInfoObj();
02251 forkInfoObj_->mst_lock_=&forkObjLock_;
02252 forkInfoObj_->fuAddr=(void*)this;
02253 forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02254 forkInfoObj_->forkParams.slotId=-1;
02255 forkInfoObj_->forkParams.restart=0;
02256 forkInfoObj_->forkParams.isMaster=-1;
02257 forkInfoObj_->stopCondition=0;
02258 if (mwrRef_)
02259 mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02260
02261 evtProcessor_->runAsync();
02262 sc = evtProcessor_->statusAsync();
02263
02264 if(sc != 0) {
02265 std::ostringstream oss;
02266 oss<<"EventProcessor::runAsync returned status code " << sc;
02267 reasonForFailedState_ = oss.str();
02268 fsm_.fireFailed(reasonForFailedState_,this);
02269 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02270 return false;
02271 }
02272 }
02273
02274 catch(cms::Exception &e) {
02275 reasonForFailedState_ = e.explainSelf();
02276 fsm_.fireFailed(reasonForFailedState_,this);
02277 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02278 return false;
02279 }
02280 catch(std::exception &e) {
02281 reasonForFailedState_ = e.what();
02282 fsm_.fireFailed(reasonForFailedState_,this);
02283 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02284 return false;
02285 }
02286 catch(...) {
02287 reasonForFailedState_ = "Unknown Exception";
02288 fsm_.fireFailed(reasonForFailedState_,this);
02289 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02290 return false;
02291 }
02292 return true;
02293 }
02294
02295 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02296
02297
02298 forkInfoObj_->lock();
02299 forkInfoObj_->forkParams.slotId=slotId;
02300 forkInfoObj_->forkParams.restart=true;
02301 forkInfoObj_->forkParams.isMaster=1;
02302 forkInfoObj_->stopCondition=0;
02303 LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02304 sem_post(forkInfoObj_->control_sem_);
02305 forkInfoObj_->unlock();
02306 usleep(1000);
02307
02308 int count=50;
02309 restart_in_progress_=true;
02310 while (restart_in_progress_ && count--) usleep(20000);
02311 return true;
02312 }
02313
02314 bool FUEventProcessor::enableClassic()
02315 {
02316 bool retval = enableCommon();
02317 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02318 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02319 ::sleep(1);
02320 }
02321
02322
02323
02324 localLog("-I- Start completed");
02325 return retval;
02326 }
02327 bool FUEventProcessor::enableMPEPSlave()
02328 {
02329
02330
02331 startReceivingLoop();
02332 startReceivingMonitorLoop();
02333 evtProcessor_.resetWaiting();
02334 startScalersWorkLoop();
02335 while(!evtProcessor_.isWaitingForLs())
02336 ::sleep(1);
02337
02338
02339
02340 try{
02341
02342 try{
02343 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02344 if(pf != 0) {
02345 pf->makePresence("MessageServicePresence").release();
02346 }
02347 else {
02348 LOG4CPLUS_ERROR(getApplicationLogger(),
02349 "Unable to create message service presence ");
02350 }
02351 }
02352 catch(...) {
02353 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02354 }
02355 ML::MLlog4cplus::setAppl(this);
02356 }
02357 catch (xcept::Exception &e) {
02358 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02359 fsm_.fireFailed(reasonForFailedState_,this);
02360 localLog(reasonForFailedState_);
02361 }
02362 bool retval = enableCommon();
02363
02364
02365
02366
02367 return retval;
02368 }
02369
02370 bool FUEventProcessor::stopClassic()
02371 {
02372 bool failed=false;
02373 try {
02374 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02375 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02376 if(rc == edm::EventProcessor::epSuccess)
02377 fsm_.fireEvent("StopDone",this);
02378 else
02379 {
02380 failed=true;
02381
02382 if(rc == edm::EventProcessor::epTimedOut)
02383 reasonForFailedState_ = "EventProcessor stop timed out";
02384 else
02385 reasonForFailedState_ = "EventProcessor did not receive STOP event";
02386 }
02387 }
02388 catch (xcept::Exception &e) {
02389 failed=true;
02390 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02391 }
02392 catch (edm::Exception &e) {
02393 failed=true;
02394 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02395 }
02396 catch (...) {
02397 failed=true;
02398 reasonForFailedState_= "Stopping FAILED: unknown exception";
02399 }
02400 try {
02401 if (hasShMem_) {
02402 detachDqmFromShm();
02403 if (failed)
02404 LOG4CPLUS_WARN(getApplicationLogger(),
02405 "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
02406 }
02407 }
02408 catch (cms::Exception & e) {
02409 failed=true;
02410 reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
02411 }
02412 catch (...) {
02413 failed=true;
02414 reasonForFailedState_= "DQM detach failed: Unknown exception";
02415 }
02416
02417 if (failed) {
02418 LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
02419 << reasonForFailedState_ << " (pid:" << getpid()<<")");
02420 localLog(reasonForFailedState_);
02421 fsm_.fireFailed(reasonForFailedState_,this);
02422 }
02423
02424 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02425 localLog("-I- Stop completed");
02426 return false;
02427 }
02428
02429 void FUEventProcessor::stopSlavesAndAcknowledge()
02430 {
02431 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02432 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02433
02434 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02435 for(unsigned int i = 0; i < subs_.size(); i++)
02436 {
02437 pthread_mutex_lock(&stop_lock_);
02438 if(subs_[i].alive()>0){
02439 processes_to_stop[i] = true;
02440 subs_[i].post(msg,false);
02441 }
02442 pthread_mutex_unlock(&stop_lock_);
02443 }
02444 for(unsigned int i = 0; i < subs_.size(); i++)
02445 {
02446 pthread_mutex_lock(&stop_lock_);
02447 if(processes_to_stop[i]){
02448 try{
02449 subs_[i].rcv(msg1,false);
02450 }
02451 catch(evf::Exception &e){
02452 std::ostringstream ost;
02453 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
02454 reasonForFailedState_ = ost.str();
02455 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02456
02457 localLog(reasonForFailedState_);
02458 pthread_mutex_unlock(&stop_lock_);
02459 continue;
02460 }
02461 }
02462 else {
02463 pthread_mutex_unlock(&stop_lock_);
02464 continue;
02465 }
02466 pthread_mutex_unlock(&stop_lock_);
02467 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02468 while(subs_[i].alive()>0) ::usleep(10000);
02469 subs_[i].disconnect();
02470 }
02471
02472
02473 }
02474
02475 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02476 {
02477 std::string urn = getApplicationDescriptor()->getURN();
02478 try{
02479 evtProcessor_.stateNameFromIndex(0);
02480 evtProcessor_.moduleNameFromIndex(0);
02481 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02482 *out << "<tr><td>" << fsm_.stateName()->toString()
02483 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02484 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02485 evtProcessor_.microState(in,out);
02486 *out << "<td></td><td>" << nbTotalDQM_
02487 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02488 if(nbSubProcesses_.value_!=0 && !myProcess_)
02489 {
02490 pthread_mutex_lock(&start_lock_);
02491 for(unsigned int i = 0; i < subs_.size(); i++)
02492 {
02493 try{
02494 if(subs_[i].alive()>0)
02495 {
02496 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
02497 << i << "\">""Alive</td><td>S</td><td>"
02498 << subs_[i].queueId() << "<td>"
02499 << subs_[i].queueStatus()<< "/"
02500 << subs_[i].queueOccupancy() << "/"
02501 << subs_[i].queuePidOfLastSend() << "/"
02502 << subs_[i].queuePidOfLastReceive()
02503 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
02504 << subs_[i].pid() << "&method=procStat\">"
02505 << subs_[i].pid()<<"</a></td>"
02506 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
02507 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
02508 << subs_[i].params().nba << "/" << subs_[i].params().nbp
02509 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
02510 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
02511 << "</td><td>" << subs_[i].params().ps
02512 << "</td><td"
02513 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
02514 << subs_[i].params().eols
02515 << "</td><td>" << subs_[i].params().dqm
02516 << "</td><td>" << subs_[i].params().trp << "</td>";
02517 }
02518 else
02519 {
02520 pthread_mutex_lock(&pickup_lock_);
02521 *out << "<tr><td id=\"a"<< i << "\" ";
02522 if(subs_[i].alive()==-1000)
02523 *out << " bgcolor=\"#bbaabb\">NotInitialized";
02524 else
02525 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02526 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02527 << subs_[i].queueStatus() << "/"
02528 << subs_[i].queueOccupancy() << "/"
02529 << subs_[i].queuePidOfLastSend() << "/"
02530 << subs_[i].queuePidOfLastReceive()
02531 << "</td><td id=\"p"<< i << "\">"
02532 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02533 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
02534 {
02535 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
02536 *out << " will restart in " << subs_[i].countdown() << " s";
02537 else if(autoRestartSlaves_)
02538 *out << " reached maximum restart count";
02539 else *out << " autoRestart is disabled ";
02540 }
02541 *out << "</td><td"
02542 << ((subs_[i].params().eols<subs_[i].params().ls) ?
02543 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
02544 << ">"
02545 << subs_[i].params().eols
02546 << "</td><td>" << subs_[i].params().dqm
02547 << "</td><td>" << subs_[i].params().trp << "</td>";
02548 pthread_mutex_unlock(&pickup_lock_);
02549 }
02550 *out << "</tr>";
02551 }
02552 catch(evf::Exception &e){
02553 *out << "<tr><td id=\"a"<< i << "\" "
02554 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02555 << subs_[i].queueStatus() << "/"
02556 << subs_[i].queueOccupancy() << "/"
02557 << subs_[i].queuePidOfLastSend() << "/"
02558 << subs_[i].queuePidOfLastReceive()
02559 << "</td><td id=\"p"<< i << "\">"
02560 <<subs_[i].pid()<<"</td>";
02561 }
02562 }
02563 pthread_mutex_unlock(&start_lock_);
02564 }
02565 }
02566 catch(evf::Exception &e)
02567 {
02568 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
02569 }
02570 catch(cms::Exception &e)
02571 {
02572 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
02573 }
02574 catch(std::exception &e)
02575 {
02576 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
02577 }
02578 catch(...)
02579 {
02580 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
02581 }
02582
02583 }
02584
02585
02586 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02587 {
02588 using namespace utils;
02589
02590 *out << updaterStatic_;
02591 mDiv(out,"loads");
02592 uptime(out);
02593 cDiv(out);
02594 mDiv(out,"st",fsm_.stateName()->toString());
02595 mDiv(out,"ru",runNumber_.toString());
02596 mDiv(out,"nsl",nbSubProcesses_.value_);
02597 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02598 mDiv(out,"cl");
02599 *out << getApplicationDescriptor()->getClassName()
02600 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02601 cDiv(out);
02602 mDiv(out,"in",getApplicationDescriptor()->getInstance());
02603 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02604 mDiv(out,"hlt");
02605 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02606 cDiv(out);
02607 *out << std::endl;
02608 }
02609 else
02610 mDiv(out,"hlt","Not yet...");
02611
02612 mDiv(out,"sq",squidPresent_.toString());
02613 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02614 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02615 if(nbProcessed != 0 && nbAccepted != 0)
02616 {
02617 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02618 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02619 }
02620 else
02621 {
02622 mDiv(out,"tt",0);
02623 mDiv(out,"ac",0);
02624 }
02625 if(!myProcess_)
02626 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02627 else
02628 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02629
02630 mDiv(out,"idi",iDieUrl_.value_);
02631 if(vp_!=0){
02632 mDiv(out,"vpi",(unsigned int) vp_);
02633 if(vulture_->hasStarted()>=0)
02634 mDiv(out,"vul","Prowling");
02635 else
02636 mDiv(out,"vul","Dead");
02637 }
02638 else{
02639 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02640 }
02641 if(evtProcessor_){
02642 mDiv(out,"ll");
02643 *out << evtProcessor_.lastLumi().ls
02644 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02645 cDiv(out);
02646 }
02647 mDiv(out,"lg");
02648 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02649 *out << logRing_[i] << std::endl;
02650 if(logWrap_)
02651 for(unsigned int i = 0; i<logRingIndex_; i++)
02652 *out << logRing_[i] << std::endl;
02653 cDiv(out);
02654 mDiv(out,"cha");
02655 if(cpustat_) *out << cpustat_->getChart();
02656 cDiv(out);
02657 }
02658
02659 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02660 {
02661 evf::utils::procStat(out);
02662 }
02663
02664 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02665 {
02666 if(myProcess_) myProcess_->postSlave(buf,true);
02667 }
02668
02669 void FUEventProcessor::makeStaticInfo()
02670 {
02671 using namespace utils;
02672 std::ostringstream ost;
02673 mDiv(&ost,"ve");
02674 ost<< "$Revision: 1.160 $ (" << edm::getReleaseVersion() <<")";
02675 cDiv(&ost);
02676 mDiv(&ost,"ou",outPut_.toString());
02677 mDiv(&ost,"sh",hasShMem_.toString());
02678 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02679 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02680
02681 xdata::Serializable *monsleep = 0;
02682 xdata::Serializable *lstimeout = 0;
02683 try{
02684 monsleep = applicationInfoSpace_->find("monSleepSec");
02685 lstimeout = applicationInfoSpace_->find("lsTimeOut");
02686 }
02687 catch(xdata::exception::Exception e){
02688 }
02689
02690 if(monsleep!=0)
02691 mDiv(&ost,"ms",monsleep->toString());
02692 if(lstimeout!=0)
02693 mDiv(&ost,"lst",lstimeout->toString());
02694 char cbuf[sizeof(struct utsname)];
02695 struct utsname* buf = (struct utsname*)cbuf;
02696 uname(buf);
02697 mDiv(&ost,"sysinfo");
02698 ost << buf->sysname << " " << buf->nodename
02699 << " " << buf->release << " " << buf->version << " " << buf->machine;
02700 cDiv(&ost);
02701 updaterStatic_ = ost.str();
02702 }
02703
02704 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02705 {
02706
02707 sem_post(sigmon_sem_);
02708
02709
02710 sleep(2);
02711
02712
02713 alarm(5);
02714
02715 std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02716 std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02717 std::cout << "--- Stacktrace follows --" << std::endl;
02718 std::ostringstream stacktr;
02719 toolbox::stacktrace(20,stacktr);
02720 std::cout << stacktr.str();
02721 if (!rlimit_coresize_changed_)
02722 std::cout << "--- Dumping core." << " --- " << std::endl;
02723 else
02724 std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02725
02726 std::string hasdump = "";
02727 if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02728
02729 LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
02730 << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02731 << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02732 << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02733 );
02734
02735
02736 raise(sig);
02737 }
02738
02739
02740 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)