00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029
00030 #include "toolbox/BSem.h"
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034
00035 #include <boost/tokenizer.hpp>
00036
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043
00044
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054
00055
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059 namespace mem {
00060 extern toolbox::BSem * _s_mutex_ptr_;
00061 }
00062 }
00063
00064
00065 namespace evf {
00066 FUEventProcessor * FUInstancePtr_;
00067 int evfep_raised_signal;
00068 void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069 {
00070 evfep_raised_signal=sig;
00071 FUInstancePtr_->handleSignalSlave(sig, info, c);
00072 }
00073 void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074 {
00075 if (evfep_raised_signal) {
00076 signal(evfep_raised_signal,SIG_DFL);
00077 raise(evfep_raised_signal);
00078 }
00079 }
00080 }
00081
00083
00085
00086
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00088 : xdaq::Application(s)
00089 , fsm_(this)
00090 , log_(getApplicationLogger())
00091 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092 , runNumber_(0)
00093 , epInitialized_(false)
00094 , outPut_(true)
00095 , autoRestartSlaves_(false)
00096 , slaveRestartDelaySecs_(10)
00097 , hasShMem_(true)
00098 , hasPrescaleService_(true)
00099 , hasModuleWebRegistry_(true)
00100 , hasServiceWebRegistry_(true)
00101 , isRunNumberSetter_(true)
00102 , iDieStatisticsGathering_(false)
00103 , outprev_(true)
00104 , exitOnError_(true)
00105 , reasonForFailedState_()
00106 , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
00107 , logRing_(logRingSize_)
00108 , logRingIndex_(logRingSize_)
00109 , logWrap_(false)
00110 , nbSubProcesses_(0)
00111 , nbSubProcessesReporting_(0)
00112 , forkInEDM_(true)
00113 , nblive_(0)
00114 , nbdead_(0)
00115 , nbTotalDQM_(0)
00116 , wlReceiving_(0)
00117 , asReceiveMsgAndExecute_(0)
00118 , receiving_(false)
00119 , wlReceivingMonitor_(0)
00120 , asReceiveMsgAndRead_(0)
00121 , receivingM_(false)
00122 , myProcess_(0)
00123 , wlSupervising_(0)
00124 , asSupervisor_(0)
00125 , supervising_(false)
00126 , wlSignalMonitor_(0)
00127 , asSignalMonitor_(0)
00128 , signalMonitorActive_(false)
00129 , monitorInfoSpace_(0)
00130 , monitorLegendaInfoSpace_(0)
00131 , applicationInfoSpace_(0)
00132 , nbProcessed(0)
00133 , nbAccepted(0)
00134 , scalersInfoSpace_(0)
00135 , scalersLegendaInfoSpace_(0)
00136 , wlScalers_(0)
00137 , asScalers_(0)
00138 , wlScalersActive_(false)
00139 , scalersUpdates_(0)
00140 , wlSummarize_(0)
00141 , asSummarize_(0)
00142 , wlSummarizeActive_(false)
00143 , superSleepSec_(1)
00144 , iDieUrl_("none")
00145 , vulture_(0)
00146 , vp_(0)
00147 , cpustat_(0)
00148 , ratestat_(0)
00149 , mwrRef_(nullptr)
00150 , sorRef_(nullptr)
00151 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155 , edm_init_done_(true)
00156 , crashesThisRun_(false)
00157 , rlimit_coresize_changed_(false)
00158 , crashesToDump_(2)
00159 , sigmon_sem_(0)
00160 , datasetCounting_(true)
00161 {
00162 using namespace utils;
00163
00164 FUInstancePtr_=this;
00165
00166 names_.push_back("nbProcessed" );
00167 names_.push_back("nbAccepted" );
00168 names_.push_back("epMacroStateInt");
00169 names_.push_back("epMicroStateInt");
00170
00171 int retpipe = pipe(anonymousPipe_);
00172 if(retpipe != 0)
00173 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00174
00175 squidPresent_ = squidnet_.check();
00176
00177 evtProcessor_.setAppDesc(getApplicationDescriptor());
00178 evtProcessor_.setAppCtxt(getApplicationContext());
00179
00180 fsm_.initialize<evf::FUEventProcessor>(this);
00181
00182
00183 url_ =
00184 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00185 getApplicationDescriptor()->getURN();
00186 class_ =getApplicationDescriptor()->getClassName();
00187 instance_=getApplicationDescriptor()->getInstance();
00188 sourceId_=class_.toString()+"_"+instance_.toString();
00189 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00190 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00191
00192 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00193
00194 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00195 applicationInfoSpace_ = ispace;
00196
00197
00198 ispace->fireItemAvailable("parameterSet", &configString_ );
00199 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00200 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00201 ispace->fireItemAvailable("runNumber", &runNumber_ );
00202 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00203
00204 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00205 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00206 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00207 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00208 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00209 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00210 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00211 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00212 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00213 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00214 ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
00215 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00216 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00217 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00218 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00219 ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
00220 ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ );
00221
00222
00223 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00224 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00225
00226
00227 fsm_.findRcmsStateListener();
00228
00229
00230
00231 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00232 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00233 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00234
00235 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00236 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00237 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00238
00239
00240 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00241 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00242 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00243 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00244 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00245
00246 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00247
00248 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00249 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00250 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00251
00252 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00253 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00254 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00255
00256
00257
00258 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00259 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00260
00261 evtProcessor_.setApplicationInfoSpace(ispace);
00262 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00263 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00264
00265
00266 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00267 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00268
00269
00270 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00271 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00272 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00273 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00274 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00275 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00276 xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
00277 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00278 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00279 xgi::bind(this, &FUEventProcessor::microState, "microState");
00280 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00281 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00282
00283
00284
00285 edm::AssertHandler ah;
00286
00287
00288 try{
00289 LOG4CPLUS_DEBUG(getApplicationLogger(),
00290 "Trying to create message service presence ");
00291 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00292 if(pf != 0) {
00293 messageServicePresence_= pf->makePresence("MessageServicePresence");
00294 }
00295 else {
00296 LOG4CPLUS_ERROR(getApplicationLogger(),
00297 "Unable to create message service presence ");
00298 }
00299 }
00300 catch(...) {
00301 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00302 }
00303 ML::MLlog4cplus::setAppl(this);
00304
00305 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00306 typedef AppDescSet_t::iterator AppDescIter_t;
00307
00308 AppDescSet_t rcms=
00309 getApplicationContext()->getDefaultZone()->
00310 getApplicationDescriptors("RCMSStateListener");
00311 if(rcms.size()==0)
00312 {
00313 LOG4CPLUS_WARN(getApplicationLogger(),
00314 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00315
00316 }
00317 else
00318 {
00319 AppDescIter_t it = rcms.begin();
00320 evtProcessor_.setRcms(*it);
00321 }
00322 pthread_mutex_init(&start_lock_,0);
00323 pthread_mutex_init(&stop_lock_,0);
00324 pthread_mutex_init(&pickup_lock_,0);
00325
00326 forkInfoObj_=nullptr;
00327 pthread_mutex_init(&forkObjLock_,0);
00328 makeStaticInfo();
00329 startSupervisorLoop();
00330
00331 if(vulture_==0) vulture_ = new Vulture(true);
00332
00334
00335 AppDescSet_t setOfiDie=
00336 getApplicationContext()->getDefaultZone()->
00337 getApplicationDescriptors("evf::iDie");
00338
00339 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00340 if ((*it)->getInstance()==0)
00341 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00342
00343
00344 getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00345
00346
00347 #ifdef linux
00348 if (sigmon_sem_==0) {
00349 sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00350 PROT_READ | PROT_WRITE,
00351 MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00352 if (!sigmon_sem_) {
00353 perror("mmap error\n");
00354 std::cout << "mmap error"<<std::endl;
00355 }
00356 else
00357 sem_init(sigmon_sem_,true,0);
00358 }
00359 #endif
00360 }
00361
00362
00363
00364
00365 FUEventProcessor::~FUEventProcessor()
00366 {
00367
00368
00369
00370 if(vulture_ != 0) delete vulture_;
00371 }
00372
00373
00375
00377
00378
00379
00380 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00381 {
00382
00383
00384
00385
00386
00387 unsigned short smap
00388 = (datasetCounting_.value_ ? 0x20 : 0 )
00389 + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00390 + (((instance_.value_%80)==0) ? 0x8 : 0)
00391 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00392 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00393 + (hasPrescaleService_.value_ ? 0x1 : 0);
00394 if(nbSubProcesses_.value_==0)
00395 {
00396 spMStates_.setSize(1);
00397 spmStates_.setSize(1);
00398 }
00399 else
00400 {
00401 spMStates_.setSize(nbSubProcesses_.value_);
00402 spmStates_.setSize(nbSubProcesses_.value_);
00403 for(unsigned int i = 0; i < spMStates_.size(); i++)
00404 {
00405 spMStates_[i] = edm::event_processor::sInit;
00406 spmStates_[i] = 0;
00407 }
00408 }
00409 try {
00410 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00411 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00412 epInitialized_=true;
00413 if(evtProcessor_)
00414 {
00415
00416 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00417 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00418
00419 configuration_ = evtProcessor_.configuration();
00420 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00421 evtProcessor_->beginJob();
00422 evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1);
00423 if(cpustat_) {delete cpustat_; cpustat_=0;}
00424 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00425 nbSubProcesses_.value_,
00426 instance_.value_,
00427 iDieUrl_.value_);
00428 if(ratestat_) {delete ratestat_; ratestat_=0;}
00429 ratestat_ = new RateStat(iDieUrl_.value_);
00430 if(iDieStatisticsGathering_.value_)
00431 {
00432 try
00433 {
00434 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00435 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00436 if(legenda !=0)
00437 {
00438 std::string slegenda = ((xdata::String*)legenda)->value_;
00439 ratestat_->sendLegenda(slegenda);
00440 }
00441 if (sorRef_ && datasetCounting_.value_)
00442 {
00443 xdata::String dsLegenda = sorRef_->getDatasetCSV();
00444 if (dsLegenda.value_.size())
00445 ratestat_->sendAuxLegenda(dsLegenda);
00446 }
00447 }
00448 catch(evf::Exception &e)
00449 {
00450 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00451 << e.what());
00452 }
00453 catch (xcept::Exception& e) {
00454 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00455 << e.what());
00456 }
00457 }
00458
00459 fsm_.fireEvent("ConfigureDone",this);
00460 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00461 localLog("-I- Configuration completed");
00462
00463 }
00464 }
00465 catch (xcept::Exception &e) {
00466 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00467 fsm_.fireFailed(reasonForFailedState_,this);
00468 localLog(reasonForFailedState_);
00469 }
00470 catch(cms::Exception &e) {
00471 reasonForFailedState_ = e.explainSelf();
00472 fsm_.fireFailed(reasonForFailedState_,this);
00473 localLog(reasonForFailedState_);
00474 }
00475 catch(std::exception &e) {
00476 reasonForFailedState_ = e.what();
00477 fsm_.fireFailed(reasonForFailedState_,this);
00478 localLog(reasonForFailedState_);
00479 }
00480 catch(...) {
00481 fsm_.fireFailed("Unknown Exception",this);
00482 }
00483
00484 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00485
00486 return false;
00487 }
00488
00489
00490
00491
00492
00493 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00494 {
00495 nbTotalDQM_ = 0;
00496 scalersUpdates_ = 0;
00497 idleProcStats_ = 0;
00498 allProcStats_ = 0;
00499
00500
00501
00502
00503
00504 unsigned short smap
00505 = (datasetCounting_.value_ ? 0x20 : 0 )
00506 + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00507 + (((instance_.value_%80)==0) ? 0x8 : 0)
00508 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00509 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00510 + (hasPrescaleService_.value_ ? 0x1 : 0);
00511
00512 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00513
00514
00515 if (rlimit_coresize_changed_)
00516 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00517 rlimit_coresize_changed_=false;
00518 crashesThisRun_=0;
00519
00520 sem_destroy(sigmon_sem_);
00521 sem_init(sigmon_sem_,true,0);
00522
00523 if(!epInitialized_){
00524 evtProcessor_.forceInitEventProcessorMaybe();
00525 }
00526 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00527
00528 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00529 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00530
00531 if(!epInitialized_){
00532 evtProcessor_->beginJob();
00533 evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1);
00534 if(cpustat_) {delete cpustat_; cpustat_=0;}
00535 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00536 nbSubProcesses_.value_,
00537 instance_.value_,
00538 iDieUrl_.value_);
00539 if(ratestat_) {delete ratestat_; ratestat_=0;}
00540 ratestat_ = new RateStat(iDieUrl_.value_);
00541 if(iDieStatisticsGathering_.value_)
00542 {
00543 try
00544 {
00545 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00546 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00547 if(legenda !=0)
00548 {
00549 std::string slegenda = ((xdata::String*)legenda)->value_;
00550 ratestat_->sendLegenda(slegenda);
00551 }
00552 if (sorRef_ && datasetCounting_.value_)
00553 {
00554 xdata::String dsLegenda = sorRef_->getDatasetCSV();
00555 if (dsLegenda.value_.size())
00556 ratestat_->sendAuxLegenda(dsLegenda);
00557 }
00558 }
00559 catch(evf::Exception &e)
00560 {
00561 LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00562 << e.what());
00563 }
00564 catch (xcept::Exception& e) {
00565 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00566 << e.what());
00567 }
00568 }
00569 epInitialized_ = true;
00570 }
00571 configuration_ = evtProcessor_.configuration();
00572 evtProcessor_.resetLumiSectionReferenceIndex();
00573
00574 if(nbSubProcesses_.value_==0) return enableClassic();
00575
00576 pthread_mutex_lock(&start_lock_);
00577 pthread_mutex_lock(&pickup_lock_);
00578 subs_.clear();
00579 subs_.resize(nbSubProcesses_.value_);
00580 pid_t retval = -1;
00581
00582 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00583 {
00584 subs_[i]=SubProcess(i,retval);
00585 }
00586 pthread_mutex_unlock(&pickup_lock_);
00587
00588 pthread_mutex_unlock(&start_lock_);
00589
00590
00591 try {
00592 if (sorRef_) {
00593 unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00594 if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;
00595 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00596 for (unsigned int i=0;i<shmOutputs.size();i++) {
00597 shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00598 }
00599 }
00600 }
00601 catch (...)
00602 {
00603 LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00604 }
00605
00606
00607 edm_init_done_=true;
00608 if (forkInEDM_.value_) {
00609 edm_init_done_=false;
00610 enableForkInEDM();
00611 }
00612 else
00613 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00614 {
00615 retval = subs_[i].forkNew();
00616 if(retval==0)
00617 {
00618 myProcess_ = &subs_[i];
00619
00620 delete toolbox::mem::_s_mutex_ptr_;
00621 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00622 int retval = pthread_mutex_destroy(&stop_lock_);
00623 if(retval != 0) perror("error");
00624 retval = pthread_mutex_init(&stop_lock_,0);
00625 if(retval != 0) perror("error");
00626 fsm_.disableRcmsStateNotification();
00627
00628 return enableMPEPSlave();
00629
00630 }
00631 }
00632
00633 if (forkInEDM_.value_) {
00634
00635 edm::event_processor::State st;
00636 while (!edm_init_done_) {
00637 usleep(10000);
00638 st = evtProcessor_->getState();
00639 if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00640 }
00641 st = evtProcessor_->getState();
00642
00643 if (st!=edm::event_processor::sRunning) {
00644 reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00645 localLog(reasonForFailedState_);
00646 fsm_.fireFailed(reasonForFailedState_,this);
00647 return false;
00648 }
00649
00650 sleep(1);
00651 startSummarizeWorkLoop();
00652 startSignalMonitorWorkLoop();
00653 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00654
00655
00656 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00657 fsm_.fireEvent("EnableDone",this);
00658 localLog("-I- Start completed");
00659 return false;
00660 }
00661
00662 startSummarizeWorkLoop();
00663 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00664 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00665 fsm_.fireEvent("EnableDone",this);
00666 localLog("-I- Start completed");
00667 return false;
00668 }
00669
00670 bool FUEventProcessor::doEndRunInEDM() {
00671
00672 if (forkInfoObj_) {
00673
00674 int count = 30;
00675 bool waitedForEDM=false;
00676 while (!edm_init_done_ && count) {
00677 ::sleep(1);
00678 if (count%5==0)
00679 LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00680 count--;
00681 waitedForEDM=true;
00682 }
00683
00684 if (waitedForEDM) sleep(5);
00685
00686
00687
00688 if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00689 return true;
00690
00691 forkInfoObj_->lock();
00692 forkInfoObj_->stopCondition=true;
00693 sem_post(forkInfoObj_->control_sem_);
00694 forkInfoObj_->unlock();
00695
00696 count = 30;
00697
00698 edm::event_processor::State st;
00699 while(count--) {
00700 st = evtProcessor_->getState();
00701 if (st==edm::event_processor::sRunning) ::usleep(100000);
00702 else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00703 break;
00704 }
00705 else {
00706 std::ostringstream ost;
00707 ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00708 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00709 fsm_.fireFailed(ost.str(),this);
00710 return false;
00711 }
00712 if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00713 forkInfoObj_->lock();
00714 forkInfoObj_->stopCondition=true;
00715 sem_post(forkInfoObj_->control_sem_);
00716 forkInfoObj_->unlock();
00717 LOG4CPLUS_WARN(getApplicationLogger(),
00718 "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00719 }
00720 }
00721 if (count<0) {
00722 std::ostringstream ost;
00723 if (!forkInfoObj_->receivedStop_)
00724 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
00725 << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00726 else
00727 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00728 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00729 fsm_.fireFailed(ost.str(),this);
00730 return false;
00731 }
00732 }
00733 return true;
00734 }
00735
00736
00737 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00738 {
00739 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00740 if(nbSubProcesses_.value_!=0) {
00741 stopSlavesAndAcknowledge();
00742 if (forkInEDM_.value_) {
00743
00744 sem_post(sigmon_sem_);
00745 if (!doEndRunInEDM())
00746 return false;
00747 }
00748 }
00749 vulture_->stop();
00750
00751 if (forkInEDM_.value_) {
00752
00753 bool tmpHasShMem_=hasShMem_;
00754 hasShMem_=false;
00755 stopClassic();
00756 hasShMem_=tmpHasShMem_;
00757 return false;
00758 }
00759 stopClassic();
00760 return false;
00761 }
00762
00763
00764
00765 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00766 {
00767 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00768 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00769 if(nbSubProcesses_.value_!=0) {
00770 stopSlavesAndAcknowledge();
00771 if (forkInEDM_.value_) {
00772 sem_post(sigmon_sem_);
00773 if (!doEndRunInEDM())
00774 return false;
00775
00776 }
00777 }
00778 try{
00779 evtProcessor_.stopAndHalt();
00780
00781 if (forkInfoObj_) {
00782 delete forkInfoObj_;
00783 forkInfoObj_=0;
00784 }
00785 }
00786 catch (evf::Exception &e) {
00787 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00788 localLog(reasonForFailedState_);
00789 fsm_.fireFailed(reasonForFailedState_,this);
00790 }
00791
00792
00793 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00794 fsm_.fireEvent("HaltDone",this);
00795
00796 localLog("-I- Halt completed");
00797 return false;
00798 }
00799
00800
00801
00802 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00803 throw (xoap::exception::Exception)
00804 {
00805 return fsm_.commandCallback(msg);
00806 }
00807
00808
00809
00810 void FUEventProcessor::actionPerformed(xdata::Event& e)
00811 {
00812
00813 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00814 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00815
00816 if ( item == "parameterSet") {
00817 LOG4CPLUS_WARN(getApplicationLogger(),
00818 "HLT Menu changed, force reinitialization of EventProcessor");
00819 epInitialized_ = false;
00820 }
00821 if ( item == "outputEnabled") {
00822 if(outprev_ != outPut_) {
00823 LOG4CPLUS_WARN(getApplicationLogger(),
00824 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00825 evtProcessor_->enableEndPaths(outPut_);
00826 outprev_ = outPut_;
00827 }
00828 }
00829 if (item == "globalInputPrescale") {
00830 LOG4CPLUS_WARN(getApplicationLogger(),
00831 "Setting global input prescale has no effect "
00832 <<"in this version of the code");
00833 }
00834 if ( item == "globalOutputPrescale") {
00835 LOG4CPLUS_WARN(getApplicationLogger(),
00836 "Setting global output prescale has no effect "
00837 <<"in this version of the code");
00838 }
00839 }
00840
00841 }
00842
00843
00844 void FUEventProcessor::getSlavePids(xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
00845 {
00846 for (unsigned int i=0;i<subs_.size();i++)
00847 {
00848 if (i!=0) *out << ",";
00849 *out << subs_[i].pid();
00850 }
00851 }
00852
00853 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
00854 {
00855 using namespace cgicc;
00856 pid_t pid = 0;
00857 std::ostringstream ost;
00858 ost << "&";
00859
00860 Cgicc cgi(in);
00861 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00862 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00863 mycgi->getEnvironment().begin();
00864 mit != mycgi->getEnvironment().end(); mit++)
00865 ost << mit->first << "%" << mit->second << ";";
00866 std::vector<FormEntry> els = cgi.getElements() ;
00867 std::vector<FormEntry> el1;
00868 cgi.getElement("method",el1);
00869 std::vector<FormEntry> el2;
00870 cgi.getElement("process",el2);
00871 if(el1.size()!=0) {
00872 std::string meth = el1[0].getValue();
00873 if(el2.size()!=0) {
00874 unsigned int i = 0;
00875 std::string mod = el2[0].getValue();
00876 pid = atoi(mod.c_str());
00877 for(; i < subs_.size(); i++)
00878 if(subs_[i].pid()==pid) break;
00879 if(i>=subs_.size()){
00880 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00881 return;
00882 }
00883 if(subs_[i].alive() != 1){
00884 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00885 return;
00886 }
00887 MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00888 strncpy(msg1->mtext,meth.c_str(),meth.length());
00889 strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00890 subs_[i].post(msg1,true);
00891 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00892 superSleepSec_.value_=10*keep_supersleep_original_value;
00893 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00894 bool done = false;
00895 std::vector<char *>pieces;
00896 while(!done){
00897 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00898 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00899 ::sleep(1);
00900 continue;
00901 }
00902 unsigned int nbytes = atoi(msg->mtext);
00903 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00904 char *buf= new char[nbytes];
00905 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00906 if(retval<0){
00907 std::cout << "Failed to read from pipe." << std::endl;
00908 continue;
00909 }
00910 if(static_cast<unsigned int>(retval) != nbytes) std::cout
00911 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00912 pieces.push_back(buf);
00913 }
00914 superSleepSec_.value_=keep_supersleep_original_value;
00915 for(unsigned int j = 0; j < pieces.size(); j++){
00916 *out<<pieces[j];
00917 delete[] pieces[j];
00918 }
00919 }
00920 }
00921 }
00922
00923
00924
00925 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00926 throw (xgi::exception::Exception)
00927 {
00928
00929
00930 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00931 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00932 << getApplicationDescriptor()->getInstance() << "</title>"
00933 << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00934 << "</head></html>";
00935 }
00936
00937
00938
00939
00940
00941 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00942 throw (xgi::exception::Exception)
00943 {
00944
00945 std::string urn = getApplicationDescriptor()->getURN();
00946
00947 *out << "<!-- base href=\"/" << urn
00948 << "\"> -->" << std::endl;
00949 *out << "<html>" << std::endl;
00950 *out << "<head>" << std::endl;
00951 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00952 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00953 *out << "<title>" << getApplicationDescriptor()->getClassName()
00954 << getApplicationDescriptor()->getInstance()
00955 << " MAIN</title>" << std::endl;
00956 *out << "</head>" << std::endl;
00957 *out << "<body>" << std::endl;
00958 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00959 *out << "<tr>" << std::endl;
00960 *out << " <td align=\"left\">" << std::endl;
00961 *out << " <img" << std::endl;
00962 *out << " align=\"middle\"" << std::endl;
00963 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00964 *out << " alt=\"main\"" << std::endl;
00965 *out << " width=\"64\"" << std::endl;
00966 *out << " height=\"64\"" << std::endl;
00967 *out << " border=\"\"/>" << std::endl;
00968 *out << " <b>" << std::endl;
00969 *out << getApplicationDescriptor()->getClassName()
00970 << getApplicationDescriptor()->getInstance() << std::endl;
00971 *out << " " << fsm_.stateName()->toString() << std::endl;
00972 *out << " </b>" << std::endl;
00973 *out << " </td>" << std::endl;
00974 *out << " <td width=\"32\">" << std::endl;
00975 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00976 *out << " <img" << std::endl;
00977 *out << " align=\"middle\"" << std::endl;
00978 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00979 *out << " alt=\"HyperDAQ\"" << std::endl;
00980 *out << " width=\"32\"" << std::endl;
00981 *out << " height=\"32\"" << std::endl;
00982 *out << " border=\"\"/>" << std::endl;
00983 *out << " </a>" << std::endl;
00984 *out << " </td>" << std::endl;
00985 *out << " <td width=\"32\">" << std::endl;
00986 *out << " </td>" << std::endl;
00987 *out << " <td width=\"32\">" << std::endl;
00988 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00989 *out << " <img" << std::endl;
00990 *out << " align=\"middle\"" << std::endl;
00991 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00992 *out << " alt=\"main\"" << std::endl;
00993 *out << " width=\"32\"" << std::endl;
00994 *out << " height=\"32\"" << std::endl;
00995 *out << " border=\"\"/>" << std::endl;
00996 *out << " </a>" << std::endl;
00997 *out << " </td>" << std::endl;
00998 *out << "</tr>" << std::endl;
00999 *out << "</table>" << std::endl;
01000
01001 *out << "<hr/>" << std::endl;
01002
01003 std::ostringstream ost;
01004 if(myProcess_)
01005 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
01006 else
01007 ost << "/moduleWeb?";
01008 urn += ost.str();
01009 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
01010 evtProcessor_.taskWebPage(in,out,urn);
01011 else if(evtProcessor_)
01012 evtProcessor_.summaryWebPage(in,out,urn);
01013 else
01014 *out << "<td>HLT Unconfigured</td>" << std::endl;
01015 *out << "</table>" << std::endl;
01016
01017 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
01018 *out << configuration_ << std::endl;
01019 *out << "</textarea><P>" << std::endl;
01020
01021 *out << "</body>" << std::endl;
01022 *out << "</html>" << std::endl;
01023
01024
01025 }
01026 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
01027 throw (xgi::exception::Exception)
01028 {
01029
01030 out->getHTTPResponseHeader().addHeader( "Content-Type",
01031 "application/octet-stream" );
01032 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
01033 "binary" );
01034 if(evtProcessor_ != 0){
01035 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic));
01036 }
01037 }
01038
01039 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
01040 throw (xgi::exception::Exception)
01041 {
01042
01043 if(evtProcessor_ != 0){
01044 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01045 if(legenda !=0){
01046 std::string slegenda = ((xdata::String*)legenda)->value_;
01047 *out << slegenda << std::endl;
01048 }
01049 }
01050 }
01051
01052
01053 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)
01054 {
01055 std::string errmsg;
01056 try {
01057 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01058 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01059 edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01060 }
01061 catch (cms::Exception& e) {
01062 errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01063 }
01064 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01065 }
01066
01067
01068 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
01069 {
01070 std::string errmsg;
01071 bool success = false;
01072 try {
01073 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01074 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01075 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01076 if (!success) errmsg = "Failed to attach DQM service to shared memory";
01077 }
01078 catch (cms::Exception& e) {
01079 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01080 }
01081 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01082 }
01083
01084
01085
01086 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01087 {
01088 std::string errmsg;
01089 bool success = false;
01090 try {
01091 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01092 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01093 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01094 if (!success) errmsg = "Failed to detach DQM service from shared memory";
01095 }
01096 catch (cms::Exception& e) {
01097 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01098 }
01099 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01100 }
01101
01102
01103 std::string FUEventProcessor::logsAsString()
01104 {
01105 std::ostringstream oss;
01106 if(logWrap_)
01107 {
01108 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01109 oss << logRing_[i] << std::endl;
01110 for(unsigned int i = 0; i < logRingIndex_; i++)
01111 oss << logRing_[i] << std::endl;
01112 }
01113 else
01114 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01115 oss << logRing_[i] << std::endl;
01116
01117 return oss.str();
01118 }
01119
01120 void FUEventProcessor::localLog(std::string m)
01121 {
01122 timeval tv;
01123
01124 gettimeofday(&tv,0);
01125 tm *uptm = localtime(&tv.tv_sec);
01126 char datestring[256];
01127 strftime(datestring, sizeof(datestring),"%c", uptm);
01128
01129 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01130 logRingIndex_--;
01131 std::ostringstream timestamp;
01132 timestamp << " at " << datestring;
01133 m += timestamp.str();
01134 logRing_[logRingIndex_] = m;
01135 }
01136
01137 void FUEventProcessor::startSupervisorLoop()
01138 {
01139 try {
01140 wlSupervising_=
01141 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01142 "waiting");
01143 if (!wlSupervising_->isActive()) wlSupervising_->activate();
01144 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01145 "Supervisor");
01146 wlSupervising_->submit(asSupervisor_);
01147 supervising_ = true;
01148 }
01149 catch (xcept::Exception& e) {
01150 std::string msg = "Failed to start workloop 'Supervisor'.";
01151 XCEPT_RETHROW(evf::Exception,msg,e);
01152 }
01153 }
01154
01155 void FUEventProcessor::startReceivingLoop()
01156 {
01157 try {
01158 wlReceiving_=
01159 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01160 "waiting");
01161 if (!wlReceiving_->isActive()) wlReceiving_->activate();
01162 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01163 "Receiving");
01164 wlReceiving_->submit(asReceiveMsgAndExecute_);
01165 receiving_ = true;
01166 }
01167 catch (xcept::Exception& e) {
01168 std::string msg = "Failed to start workloop 'Receiving'.";
01169 XCEPT_RETHROW(evf::Exception,msg,e);
01170 }
01171 }
01172 void FUEventProcessor::startReceivingMonitorLoop()
01173 {
01174 try {
01175 wlReceivingMonitor_=
01176 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01177 "waiting");
01178 if (!wlReceivingMonitor_->isActive())
01179 wlReceivingMonitor_->activate();
01180 asReceiveMsgAndRead_ =
01181 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01182 "ReceivingM");
01183 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01184 receivingM_ = true;
01185 }
01186 catch (xcept::Exception& e) {
01187 std::string msg = "Failed to start workloop 'ReceivingM'.";
01188 XCEPT_RETHROW(evf::Exception,msg,e);
01189 }
01190 }
01191
01192 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01193 {
01194 MsgBuf msg;
01195 try{
01196 myProcess_->rcvSlave(msg,false);
01197 if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01198 {
01199 rlimit rl;
01200 getrlimit(RLIMIT_CORE,&rl);
01201 rl.rlim_cur=0;
01202 setrlimit(RLIMIT_CORE,&rl);
01203 rlimit_coresize_changed_=true;
01204 }
01205 if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01206 {
01207
01208 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01209 rlimit_coresize_changed_=false;
01210 }
01211 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01212 {
01213 pthread_mutex_lock(&stop_lock_);
01214 try {
01215 fsm_.fireEvent("Stop",this);
01216 }
01217 catch (...) {
01218 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01219 << getpid() << " The state on Stop event was not consistent");
01220 }
01221
01222 try {
01223 stopClassic();
01224 }
01225 catch (...) {
01226 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
01227 }
01228
01229
01230 try{
01231 messageServicePresence_.reset();
01232 }
01233 catch(...) {
01234 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
01235 }
01236
01237 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01238 myProcess_->postSlave(msg1,false);
01239 pthread_mutex_unlock(&stop_lock_);
01240 fclose(stdout);
01241 fclose(stderr);
01242 _exit(EXIT_SUCCESS);
01243 }
01244 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01245 _exit(EXIT_SUCCESS);
01246 }
01247 catch(evf::Exception &e){
01248 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
01249 }
01250 return true;
01251 }
01252
01253 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01254 {
01255 pthread_mutex_lock(&stop_lock_);
01256 if(subs_.size()!=nbSubProcesses_.value_)
01257 {
01258 pthread_mutex_lock(&pickup_lock_);
01259 if(subs_.size()!=nbSubProcesses_.value_) {
01260 subs_.resize(nbSubProcesses_.value_);
01261 spMStates_.resize(nbSubProcesses_.value_);
01262 spmStates_.resize(nbSubProcesses_.value_);
01263 for(unsigned int i = 0; i < spMStates_.size(); i++)
01264 {
01265 spMStates_[i] = edm::event_processor::sInit;
01266 spmStates_[i] = 0;
01267 }
01268 }
01269 pthread_mutex_unlock(&pickup_lock_);
01270 }
01271 bool running = fsm_.stateName()->toString()=="Enabled";
01272 bool stopping = fsm_.stateName()->toString()=="stopping";
01273 for(unsigned int i = 0; i < subs_.size(); i++)
01274 {
01275 if(subs_[i].alive()==-1000) continue;
01276 int sl;
01277 pid_t sub_pid = subs_[i].pid();
01278 pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01279
01280 if(killedOrNot && killedOrNot==sub_pid) {
01281 pthread_mutex_lock(&pickup_lock_);
01282
01283 if (i<subs_.size() && subs_[i].alive()!=-1000) {
01284 subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01285 std::ostringstream ost;
01286 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01287 else if(WIFSIGNALED(sl)!=0) {
01288 ost << " process terminated with signal " << WTERMSIG(sl);
01289 }
01290 else ost << " process stopped ";
01291
01292
01293
01294
01295 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01296 subs_[i].setReasonForFailed(ost.str());
01297 spMStates_[i] = evtProcessor_.notstarted_state_code();
01298 spmStates_[i] = 0;
01299 std::ostringstream ost1;
01300 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01301 localLog(ost1.str());
01302 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01303 }
01304 pthread_mutex_unlock(&pickup_lock_);
01305 }
01306 }
01307 pthread_mutex_unlock(&stop_lock_);
01308 if(stopping) return true;
01309
01310
01311 if (running && rlimit_coresize_changed_) {
01312 timeval newtv;
01313 gettimeofday(&newtv,0);
01314 int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01315 if (delta>60*15) {
01316 std::ostringstream ostr;
01317 ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01318 std::cout << ostr.str() << std::endl;
01319 LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01320 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01321 MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01322 for (unsigned int i = 0; i < subs_.size(); i++) {
01323 try {
01324 if (subs_[i].alive())
01325 subs_[i].post(master_message_rlr_,false);
01326 }
01327 catch (...) {}
01328 }
01329 rlimit_coresize_changed_=false;
01330 crashesThisRun_=0;
01331 }
01332 }
01333
01334 if(running && edm_init_done_)
01335 {
01336
01337
01338 if(autoRestartSlaves_.value_){
01339 pthread_mutex_lock(&stop_lock_);
01340 for(unsigned int i = 0; i < subs_.size(); i++)
01341 {
01342 if(subs_[i].alive() != 1){
01343 if(subs_[i].countdown() == 0)
01344 {
01345 if(subs_[i].restartCount()>2){
01346 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
01347 << " - maximum restart count reached");
01348 std::ostringstream ost1;
01349 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
01350 localLog(ost1.str());
01351 subs_[i].countdown()--;
01352 XCEPT_DECLARE(evf::Exception,
01353 sentinelException, ost1.str());
01354 notifyQualified("error",sentinelException);
01355 subs_[i].disconnect();
01356 continue;
01357 }
01358 subs_[i].restartCount()++;
01359 if (forkInEDM_.value_) {
01360 restartForkInEDM(i);
01361 }
01362 else {
01363 pid_t rr = subs_[i].forkNew();
01364 if(rr==0)
01365 {
01366 myProcess_=&subs_[i];
01367 scalersUpdates_ = 0;
01368 int retval = pthread_mutex_destroy(&stop_lock_);
01369 if(retval != 0) perror("error");
01370 retval = pthread_mutex_init(&stop_lock_,0);
01371 if(retval != 0) perror("error");
01372 fsm_.disableRcmsStateNotification();
01373 fsm_.fireEvent("Stop",this);
01374 fsm_.fireEvent("StopDone",this);
01375 fsm_.fireEvent("Enable",this);
01376 try{
01377 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01378 if(lsid) {
01379 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01380 }
01381 }
01382 catch(...){
01383 std::cout << "trouble with lsindex during restart" << std::endl;
01384 }
01385 try{
01386 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01387 if(lstb) {
01388 ((xdata::Boolean*)(lstb))->value_ = false;
01389 }
01390 }
01391 catch(...){
01392 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01393 }
01394
01395 evtProcessor_.adjustLsIndexForRestart();
01396 evtProcessor_.resetPackedTriggerReport();
01397 enableMPEPSlave();
01398 return false;
01399 }
01400 else
01401 {
01402 std::ostringstream ost1;
01403 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01404 localLog(ost1.str());
01405 }
01406 }
01407 }
01408 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01409 }
01410 }
01411 pthread_mutex_unlock(&stop_lock_);
01412 }
01413 }
01414 xdata::Serializable *lsid = 0;
01415 xdata::Serializable *psid = 0;
01416 xdata::Serializable *dqmp = 0;
01417 xdata::UnsignedInteger32 *dqm = 0;
01418
01419
01420
01421 if(running && edm_init_done_){
01422 try{
01423 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01424 psid = applicationInfoSpace_->find("prescaleSetIndex");
01425 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01426 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01427 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01428 }
01429 catch(xdata::exception::Exception e){
01430 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01431 }
01432
01433 try{
01434 if(nbProcessed !=0 && nbAccepted !=0)
01435 {
01436 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01437 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01438 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01439 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01440 if(dqmp!=0)
01441 dqm = (xdata::UnsignedInteger32*)dqmp;
01442 if(dqm) dqm->value_ = 0;
01443 nbTotalDQM_ = 0;
01444 nbp->value_ = 0;
01445 nba->value_ = 0;
01446 nblive_ = 0;
01447 nbdead_ = 0;
01448 scalersUpdates_ = 0;
01449
01450 for(unsigned int i = 0; i < subs_.size(); i++)
01451 {
01452 if(subs_[i].alive()>0)
01453 {
01454 nblive_++;
01455 try{
01456 subs_[i].post(master_message_prg_,true);
01457
01458 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01459 if(retval == (unsigned long) master_message_prr_->mtype){
01460 prg* p = (struct prg*)(master_message_prr_->mtext);
01461 subs_[i].setParams(p);
01462 spMStates_[i] = p->Ms;
01463 spmStates_[i] = p->ms;
01464 cpustat_->addEntry(p->ms);
01465 if(!subs_[i].inInconsistentState() &&
01466 (p->Ms == edm::event_processor::sError
01467 || p->Ms == edm::event_processor::sInvalid
01468 || p->Ms == edm::event_processor::sStopping))
01469 {
01470 std::ostringstream ost;
01471 ost << "edm::eventprocessor slot " << i << " process id "
01472 << subs_[i].pid() << " not in Running state : Mstate="
01473 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01474 << evtProcessor_.moduleNameFromIndex(p->ms)
01475 << " - Look into possible error messages from HLT process";
01476 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01477 }
01478 nbp->value_ += subs_[i].params().nbp;
01479 nba->value_ += subs_[i].params().nba;
01480 if(dqm)dqm->value_ += p->dqm;
01481 nbTotalDQM_ += p->dqm;
01482 scalersUpdates_ += p->trp;
01483 if(p->ls > ls->value_) ls->value_ = p->ls;
01484 if(p->ps != ps->value_) ps->value_ = p->ps;
01485 }
01486 else{
01487 nbp->value_ += subs_[i].get_save_nbp();
01488 nba->value_ += subs_[i].get_save_nba();
01489 }
01490 }
01491 catch(evf::Exception &e){
01492 LOG4CPLUS_INFO(getApplicationLogger(),
01493 "could not send/receive msg on slot "
01494 << i << " - " << e.what());
01495 }
01496
01497 }
01498 else
01499 {
01500 nbp->value_ += subs_[i].get_save_nbp();
01501 nba->value_ += subs_[i].get_save_nba();
01502 nbdead_++;
01503 }
01504 }
01505 if(nbp->value_>64){
01506 for(unsigned int i = 0; i < subs_.size(); i++)
01507 {
01508 if(subs_[i].params().nbp == 0){
01509
01510 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01511 {
01512 subs_[i].found_invalid();
01513 if(subs_[i].nfound_invalid() > 60){
01514 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01515 subs_[i].post(msg3,false);
01516 std::ostringstream ost1;
01517 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01518 localLog(ost1.str());
01519 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01520 XCEPT_DECLARE(evf::Exception,
01521 sentinelException, ost1.str());
01522 notifyQualified("error",sentinelException);
01523
01524 }
01525 }
01526 }
01527 }
01528 }
01529 }
01530 }
01531 catch(std::exception &e){
01532 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01533 }
01534 catch(...){
01535 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01536 }
01537 }
01538 else{
01539 for(unsigned int i = 0; i < subs_.size(); i++)
01540 {
01541 if(subs_[i].alive()==-1000)
01542 {
01543 spMStates_[i] = edm::event_processor::sInit;
01544 spmStates_[i] = 0;
01545 }
01546 }
01547 }
01548 try{
01549 monitorInfoSpace_->lock();
01550 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01551 monitorInfoSpace_->unlock();
01552 }
01553 catch(xdata::exception::Exception &e)
01554 {
01555 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01556
01557 }
01558 ::sleep(superSleepSec_.value_);
01559 return true;
01560 }
01561
01562 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01563 {
01564 try {
01565 wlScalers_=
01566 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01567 "waiting");
01568 if (!wlScalers_->isActive()) wlScalers_->activate();
01569 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01570 "Scalers");
01571
01572 wlScalers_->submit(asScalers_);
01573 wlScalersActive_ = true;
01574 }
01575 catch (xcept::Exception& e) {
01576 std::string msg = "Failed to start workloop 'Scalers'.";
01577 XCEPT_RETHROW(evf::Exception,msg,e);
01578 }
01579 }
01580
01581
01582
01583 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01584 {
01585 try {
01586 wlSummarize_=
01587 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01588 "waiting");
01589 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01590
01591 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01592 "Summary");
01593
01594 wlSummarize_->submit(asSummarize_);
01595 wlSummarizeActive_ = true;
01596 }
01597 catch (xcept::Exception& e) {
01598 std::string msg = "Failed to start workloop 'Summarize'.";
01599 XCEPT_RETHROW(evf::Exception,msg,e);
01600 }
01601 }
01602
01603
01604
01605 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01606 {
01607 if(evtProcessor_)
01608 {
01609 if(!evtProcessor_.getTriggerReport(true)) {
01610 wlScalersActive_ = false;
01611 return false;
01612 }
01613 }
01614 else
01615 {
01616 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01617 wlScalersActive_ = false;
01618 return false;
01619 }
01620 if(myProcess_)
01621 {
01622
01623 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01624 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01625 scalersUpdates_++;
01626 }
01627 else
01628 evtProcessor_.fireScalersUpdate();
01629 return true;
01630 }
01631
01632
01633 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01634 {
01635 evtProcessor_.resetPackedTriggerReport();
01636 bool atLeastOneProcessUpdatedSuccessfully = false;
01637 int msgCount = 0;
01638 for (unsigned int i = 0; i < subs_.size(); i++)
01639 {
01640 if(subs_[i].alive()>0)
01641 {
01642 int ret = 0;
01643 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01644 evtProcessor_.getLumiSectionReferenceIndex()))
01645 {
01646 ret = MSQS_MESSAGE_TYPE_TRR;
01647 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01648 }
01649 else{
01650 bool insync = false;
01651 bool exception_caught = false;
01652 while(!insync){
01653 try{
01654 ret = subs_[i].rcv(master_message_trr_,false);
01655 }
01656 catch(evf::Exception &e)
01657 {
01658 std::cout << "exception in msgrcv on " << i
01659 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01660 exception_caught = true;
01661 break;
01662
01663 }
01664 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01665 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01666 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01667 insync = true;
01668 }
01669 }
01670 }
01671 if(exception_caught) continue;
01672 }
01673 msgCount++;
01674 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01675 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01676 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01677 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01678 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01679 subs_[i].add_postponed_trigger_update(master_message_trr_);
01680 }else{
01681 atLeastOneProcessUpdatedSuccessfully = true;
01682 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01683 }
01684 }
01685 else std::cout << "msgrcv returned error " << errno << std::endl;
01686 }
01687 }
01688 if(atLeastOneProcessUpdatedSuccessfully){
01689 nbSubProcessesReporting_.value_ = msgCount;
01690 evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01691 evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01692 evtProcessor_.updateRollingReport();
01693 evtProcessor_.fireScalersUpdate();
01694 }
01695 else{
01696 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01697 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01698 nbSubProcessesReporting_.value_ = 0;
01699 ::sleep(10);
01700 }
01701 if(fsm_.stateName()->toString()!="Enabled"){
01702 wlScalersActive_ = false;
01703 return false;
01704 }
01705
01706 if(iDieStatisticsGathering_.value_){
01707 try{
01708 unsigned long long idleTmp=idleProcStats_;
01709 unsigned long long allPSTmp=allProcStats_;
01710 idleProcStats_=allProcStats_=0;
01711
01712 utils::procCpuStat(idleProcStats_,allProcStats_);
01713 timeval oldtime=lastProcReport_;
01714 gettimeofday(&lastProcReport_,0);
01715
01716 if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01717 cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01718 int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
01719 + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
01720 cpustat_->setElapsed(deltaTms);
01721 }
01722 else {
01723 cpustat_->setCPUStat(0);
01724 cpustat_->setElapsed(0);
01725 }
01726
01727 TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01728 cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01729 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01730 ratestat_->sendStat((unsigned char*)trsp,
01731 sizeof(TriggerReportStatic),
01732 evtProcessor_.getLumiSectionReferenceIndex());
01733 }catch(evf::Exception &e){
01734 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01735 << e.what());
01736 }
01737 }
01738 cpustat_->reset();
01739 return true;
01740 }
01741
01742
01743
01744 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01745 {
01746 try{
01747 myProcess_->rcvSlave(slave_message_monitoring_,true);
01748 switch(slave_message_monitoring_->mtype)
01749 {
01750 case MSQM_MESSAGE_TYPE_MCS:
01751 {
01752 xgi::Input *in = 0;
01753 xgi::Output out;
01754 evtProcessor_.microState(in,&out);
01755 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01756 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01757 myProcess_->postSlave(msg1,true);
01758 break;
01759 }
01760
01761 case MSQM_MESSAGE_TYPE_PRG:
01762 {
01763 xdata::Serializable *dqmp = 0;
01764 xdata::UnsignedInteger32 *dqm = 0;
01765 evtProcessor_.monitoring(0);
01766 try{
01767 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01768 } catch(xdata::exception::Exception e){}
01769 if(dqmp!=0)
01770 dqm = (xdata::UnsignedInteger32*)dqmp;
01771
01772
01773 prg * data = (prg*)slave_message_prr_->mtext;
01774 data->ls = evtProcessor_.lsid_;
01775 data->eols = evtProcessor_.lastLumiUsingEol_;
01776 data->ps = evtProcessor_.psid_;
01777 data->nbp = evtProcessor_->totalEvents();
01778 data->nba = evtProcessor_->totalEventsPassed();
01779 data->Ms = evtProcessor_.epMAltState_.value_;
01780 data->ms = evtProcessor_.epmAltState_.value_;
01781 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01782 data->trp = scalersUpdates_;
01783
01784 myProcess_->postSlave(slave_message_prr_,true);
01785 if(exitOnError_.value_)
01786 {
01787
01788
01789 if(data->Ms == edm::event_processor::sError)
01790 {
01791 bool running = true;
01792 int count = 0;
01793 while(running){
01794 int retval = pthread_mutex_lock(&stop_lock_);
01795 if(retval != 0) perror("error");
01796 running = fsm_.stateName()->toString()=="Enabled";
01797 if(count>5) _exit(-1);
01798 pthread_mutex_unlock(&stop_lock_);
01799 if(running) {::sleep(1); count++;}
01800 }
01801 }
01802 }
01803 break;
01804 }
01805 case MSQM_MESSAGE_TYPE_WEB:
01806 {
01807 xgi::Input *in = 0;
01808 xgi::Output out;
01809 unsigned int bytesToSend = 0;
01810 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01811 std::string query = slave_message_monitoring_->mtext;
01812 size_t pos = query.find_first_of("&");
01813 std::string method;
01814 std::string args;
01815 if(pos!=std::string::npos)
01816 {
01817 method = query.substr(0,pos);
01818 args = query.substr(pos+1,query.length()-pos-1);
01819 }
01820 else
01821 method=query;
01822
01823 if(method=="Spotlight")
01824 {
01825 spotlightWebPage(in,&out);
01826 }
01827 else if(method=="procStat")
01828 {
01829 procStat(in,&out);
01830 }
01831 else if(method=="moduleWeb")
01832 {
01833 internal::MyCgi mycgi;
01834 boost::char_separator<char> sep(";");
01835 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01836 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01837 tok_iter != tokens.end(); ++tok_iter){
01838 size_t pos = (*tok_iter).find_first_of("%");
01839 if(pos != std::string::npos){
01840 std::string first = (*tok_iter).substr(0 , pos);
01841 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01842 mycgi.getEnvironment()[first]=second;
01843 }
01844 }
01845 moduleWeb(&mycgi,&out);
01846 }
01847 else if(method=="Default")
01848 {
01849 defaultWebPage(in,&out);
01850 }
01851 else
01852 {
01853 out << "Error 404!!!!!!!!" << std::endl;
01854 }
01855
01856
01857 bytesToSend = out.str().size();
01858 unsigned int cycle = 0;
01859 if(bytesToSend==0)
01860 {
01861 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01862 myProcess_->postSlave(msg1,true);
01863 }
01864 while(bytesToSend !=0){
01865 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01866 write(anonymousPipe_[PIPE_WRITE],
01867 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01868 msgSize);
01869 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01870 myProcess_->postSlave(msg1,true);
01871 bytesToSend -= msgSize;
01872 cycle++;
01873 }
01874 break;
01875 }
01876 case MSQM_MESSAGE_TYPE_TRP:
01877 {
01878 break;
01879 }
01880 }
01881 }
01882 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01883 return true;
01884 }
01885
01886 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01887 {
01888
01889
01890 try {
01891 wlSignalMonitor_=
01892 toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01893 "waiting");
01894
01895 if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01896 asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01897 "SignalMonitor");
01898 wlSignalMonitor_->submit(asSignalMonitor_);
01899 signalMonitorActive_ = true;
01900 }
01901 catch (xcept::Exception& e) {
01902 std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01903 std::cout << e.what() << std::endl;
01904 XCEPT_RETHROW(evf::Exception,msg,e);
01905 }
01906 }
01907
01908
01909 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01910 {
01911 while (1) {
01912 sem_wait(sigmon_sem_);
01913 std::cout << " received signal notification from slave!"<< std::endl;
01914
01915
01916 bool running = fsm_.stateName()->toString()=="Enabled";
01917 bool stopping = fsm_.stateName()->toString()=="stopping";
01918 bool enabling = fsm_.stateName()->toString()=="enabling";
01919 if (!running && !enabling) {
01920 signalMonitorActive_ = false;
01921 return false;
01922 }
01923
01924 crashesThisRun_++;
01925 gettimeofday(&lastCrashTime_,0);
01926
01927
01928 if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01929
01930 rlimit rlold;
01931 getrlimit(RLIMIT_CORE,&rlold);
01932 rlimit rlnew = rlold;
01933 rlnew.rlim_cur=0;
01934 setrlimit(RLIMIT_CORE,&rlnew);
01935 rlimit_coresize_changed_=true;
01936 MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01937
01938 unsigned int min=1;
01939 for (unsigned int i = min; i < subs_.size(); i++) {
01940 try {
01941 if (subs_[i].alive()) {
01942 subs_[i].post(master_message_rli_,false);
01943 }
01944 }
01945 catch (...) {}
01946 }
01947 std::ostringstream ostr;
01948 ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01949 << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01950 LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01951 }
01952 }
01953 signalMonitorActive_ = false;
01954 return false;
01955 }
01956
01957
01958 bool FUEventProcessor::enableCommon()
01959 {
01960 try {
01961 if(hasShMem_) attachDqmToShm();
01962 int sc = 0;
01963 evtProcessor_->clearCounters();
01964 if(isRunNumberSetter_)
01965 evtProcessor_->setRunNumber(runNumber_.value_);
01966 else
01967 evtProcessor_->declareRunNumber(runNumber_.value_);
01968
01969 try{
01970 ::sleep(1);
01971 evtProcessor_->runAsync();
01972 sc = evtProcessor_->statusAsync();
01973 }
01974 catch(cms::Exception &e) {
01975 reasonForFailedState_ = e.explainSelf();
01976 fsm_.fireFailed(reasonForFailedState_,this);
01977 localLog(reasonForFailedState_);
01978 return false;
01979 }
01980 catch(std::exception &e) {
01981 reasonForFailedState_ = e.what();
01982 fsm_.fireFailed(reasonForFailedState_,this);
01983 localLog(reasonForFailedState_);
01984 return false;
01985 }
01986 catch(...) {
01987 reasonForFailedState_ = "Unknown Exception";
01988 fsm_.fireFailed(reasonForFailedState_,this);
01989 localLog(reasonForFailedState_);
01990 return false;
01991 }
01992 if(sc != 0) {
01993 std::ostringstream oss;
01994 oss<<"EventProcessor::runAsync returned status code " << sc;
01995 reasonForFailedState_ = oss.str();
01996 fsm_.fireFailed(reasonForFailedState_,this);
01997 localLog(reasonForFailedState_);
01998 return false;
01999 }
02000 }
02001 catch (xcept::Exception &e) {
02002 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02003 fsm_.fireFailed(reasonForFailedState_,this);
02004 localLog(reasonForFailedState_);
02005 return false;
02006 }
02007 try{
02008 fsm_.fireEvent("EnableDone",this);
02009 }
02010 catch (xcept::Exception &e) {
02011 std::cout << "exception " << (std::string)e.what() << std::endl;
02012 throw;
02013 }
02014
02015 return false;
02016 }
02017
02018 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
02019 ((FUEventProcessor*)addr)->forkProcessesFromEDM();
02020 }
02021
02022 void FUEventProcessor::forkProcessesFromEDM() {
02023
02024 moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
02025 unsigned int forkFrom=0;
02026 unsigned int forkTo=nbSubProcesses_.value_;
02027 if (forkParams->slotId>=0) {
02028 forkFrom=forkParams->slotId;
02029 forkTo=forkParams->slotId+1;
02030 }
02031
02032
02033 try {
02034 if (sorRef_) {
02035 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02036 for (unsigned int i=0;i<shmOutputs.size();i++) {
02037
02038 shmOutputs[i]->unregisterFromShm();
02039
02040 shmOutputs[i]->stop();
02041 }
02042 }
02043 }
02044 catch (std::exception &e)
02045 {
02046 reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02047 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02048 fsm_.fireFailed(reasonForFailedState_,this);
02049 localLog(reasonForFailedState_);
02050 }
02051 catch (...) {
02052 reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02053 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02054 fsm_.fireFailed(reasonForFailedState_,this);
02055 localLog(reasonForFailedState_);
02056 }
02057
02058 std::string currentState = fsm_.stateName()->toString();
02059
02060
02061 if (currentState!="stopping") {
02062 try {
02063 messageServicePresence_.reset();
02064 }
02065 catch (...) {
02066 LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
02067 }
02068 }
02069
02070 if (currentState=="stopping") {
02071 LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02072 forkParams->isMaster=1;
02073 forkInfoObj_->forkParams.slotId=-1;
02074 forkInfoObj_->forkParams.restart=0;
02075 }
02076
02077 else for(unsigned int i=forkFrom; i<forkTo; i++)
02078 {
02079 int retval = subs_[i].forkNew();
02080 if(retval==0)
02081 {
02082 forkParams->isMaster=0;
02083 myProcess_ = &subs_[i];
02084
02085 delete toolbox::mem::_s_mutex_ptr_;
02086 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02087 int retval = pthread_mutex_destroy(&stop_lock_);
02088 if(retval != 0) perror("error");
02089 retval = pthread_mutex_init(&stop_lock_,0);
02090 if(retval != 0) perror("error");
02091 fsm_.disableRcmsStateNotification();
02092
02093
02094 try{
02095 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02096 if(pf != 0) {
02097 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02098 }
02099 else {
02100 LOG4CPLUS_ERROR(getApplicationLogger(),
02101 "SLAVE: Unable to create message service presence. pid:"<<getpid());
02102 }
02103 }
02104 catch(...) {
02105 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
02106 }
02107
02108 ML::MLlog4cplus::setAppl(this);
02109
02110
02111 try {
02112 if (sorRef_) {
02113 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02114 for (unsigned int i=0;i<shmOutputs.size();i++)
02115 shmOutputs[i]->start();
02116 }
02117 }
02118 catch (...)
02119 {
02120 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02121 }
02122
02123 if (forkParams->restart) {
02124
02125 scalersUpdates_ = 0;
02126 try {
02127 fsm_.fireEvent("Stop",this);
02128 fsm_.fireEvent("StopDone",this);
02129 fsm_.fireEvent("Enable",this);
02130 } catch (...) {
02131 LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02132 }
02133 try{
02134 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02135 if(lsid) {
02136 ((xdata::UnsignedInteger32*)(lsid))->value_--;
02137 }
02138 }
02139 catch(...){
02140 std::cout << "trouble with lsindex during restart" << std::endl;
02141 }
02142 try{
02143 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02144 if(lstb) {
02145 ((xdata::Boolean*)(lstb))->value_ = false;
02146 }
02147 }
02148 catch(...){
02149 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02150 }
02151 evtProcessor_.adjustLsIndexForRestart();
02152 evtProcessor_.resetPackedTriggerReport();
02153 }
02154
02155
02156 startReceivingLoop();
02157 startReceivingMonitorLoop();
02158 startScalersWorkLoop();
02159 while(!evtProcessor_.isWaitingForLs())
02160 ::usleep(100000);
02161
02162
02163 if(hasShMem_) attachDqmToShm();
02164
02165
02166 try {
02167 fsm_.fireEvent("EnableDone",this);
02168 }
02169 catch (...) {}
02170
02171
02172 while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02173
02174
02175 sigset_t tmpset_thread;
02176 sigemptyset(&tmpset_thread);
02177 sigaddset(&tmpset_thread, SIGQUIT);
02178 sigaddset(&tmpset_thread, SIGILL);
02179 sigaddset(&tmpset_thread, SIGABRT);
02180 sigaddset(&tmpset_thread, SIGFPE);
02181 sigaddset(&tmpset_thread, SIGSEGV);
02182 sigaddset(&tmpset_thread, SIGALRM);
02183
02184 pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02185
02186
02187 struct sigaction sa;
02188 sigset_t tmpset;
02189 memset(&tmpset,0,sizeof(tmpset));
02190 sigemptyset(&tmpset);
02191 sa.sa_mask=tmpset;
02192 sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02193 sa.sa_handler=0;
02194 sa.sa_sigaction=evfep_sighandler;
02195
02196 sigaction(SIGQUIT,&sa,0);
02197 sigaction(SIGILL,&sa,0);
02198 sigaction(SIGABRT,&sa,0);
02199 sigaction(SIGFPE,&sa,0);
02200 sigaction(SIGSEGV,&sa,0);
02201 sa.sa_sigaction=evfep_alarmhandler;
02202 sigaction(SIGALRM,&sa,0);
02203
02204
02205 return ;
02206 }
02207 else {
02208
02209 forkParams->isMaster=1;
02210 forkInfoObj_->forkParams.slotId=-1;
02211 if (forkParams->restart) {
02212 std::ostringstream ost1;
02213 ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02214 localLog(ost1.str());
02215 }
02216 forkInfoObj_->forkParams.restart=0;
02217
02218 }
02219 }
02220
02221
02222 try{
02223
02224 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02225 if(pf != 0) {
02226 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02227 }
02228 else {
02229 LOG4CPLUS_ERROR(getApplicationLogger(),
02230 "Unable to recreate message service presence ");
02231 }
02232 }
02233 catch(...) {
02234 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02235 }
02236 restart_in_progress_=false;
02237 edm_init_done_=true;
02238 }
02239
02240 bool FUEventProcessor::enableForkInEDM()
02241 {
02242 evtProcessor_.resetWaiting();
02243 try {
02244
02245
02246
02247 int sc = 0;
02248
02249 evtProcessor_->clearCounters();
02250 if(isRunNumberSetter_)
02251 evtProcessor_->setRunNumber(runNumber_.value_);
02252 else
02253 evtProcessor_->declareRunNumber(runNumber_.value_);
02254
02255
02256 pthread_mutex_destroy(&forkObjLock_);
02257 pthread_mutex_init(&forkObjLock_,0);
02258 if (forkInfoObj_) delete forkInfoObj_;
02259 forkInfoObj_ = new moduleweb::ForkInfoObj();
02260 forkInfoObj_->mst_lock_=&forkObjLock_;
02261 forkInfoObj_->fuAddr=(void*)this;
02262 forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02263 forkInfoObj_->forkParams.slotId=-1;
02264 forkInfoObj_->forkParams.restart=0;
02265 forkInfoObj_->forkParams.isMaster=-1;
02266 forkInfoObj_->stopCondition=0;
02267 if (mwrRef_)
02268 mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02269
02270 evtProcessor_->runAsync();
02271 sc = evtProcessor_->statusAsync();
02272
02273 if(sc != 0) {
02274 std::ostringstream oss;
02275 oss<<"EventProcessor::runAsync returned status code " << sc;
02276 reasonForFailedState_ = oss.str();
02277 fsm_.fireFailed(reasonForFailedState_,this);
02278 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02279 return false;
02280 }
02281 }
02282
02283 catch(cms::Exception &e) {
02284 reasonForFailedState_ = e.explainSelf();
02285 fsm_.fireFailed(reasonForFailedState_,this);
02286 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02287 return false;
02288 }
02289 catch(std::exception &e) {
02290 reasonForFailedState_ = e.what();
02291 fsm_.fireFailed(reasonForFailedState_,this);
02292 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02293 return false;
02294 }
02295 catch(...) {
02296 reasonForFailedState_ = "Unknown Exception";
02297 fsm_.fireFailed(reasonForFailedState_,this);
02298 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02299 return false;
02300 }
02301 return true;
02302 }
02303
02304 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02305
02306
02307 forkInfoObj_->lock();
02308 forkInfoObj_->forkParams.slotId=slotId;
02309 forkInfoObj_->forkParams.restart=true;
02310 forkInfoObj_->forkParams.isMaster=1;
02311 forkInfoObj_->stopCondition=0;
02312 LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02313 sem_post(forkInfoObj_->control_sem_);
02314 forkInfoObj_->unlock();
02315 usleep(1000);
02316
02317 int count=50;
02318 restart_in_progress_=true;
02319 while (restart_in_progress_ && count--) usleep(20000);
02320 return true;
02321 }
02322
02323 bool FUEventProcessor::enableClassic()
02324 {
02325 bool retval = enableCommon();
02326 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02327 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02328 ::sleep(1);
02329 }
02330
02331
02332
02333 localLog("-I- Start completed");
02334 return retval;
02335 }
02336 bool FUEventProcessor::enableMPEPSlave()
02337 {
02338
02339
02340 startReceivingLoop();
02341 startReceivingMonitorLoop();
02342 evtProcessor_.resetWaiting();
02343 startScalersWorkLoop();
02344 while(!evtProcessor_.isWaitingForLs())
02345 ::sleep(1);
02346
02347
02348
02349 try{
02350
02351 try{
02352 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02353 if(pf != 0) {
02354 pf->makePresence("MessageServicePresence").release();
02355 }
02356 else {
02357 LOG4CPLUS_ERROR(getApplicationLogger(),
02358 "Unable to create message service presence ");
02359 }
02360 }
02361 catch(...) {
02362 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02363 }
02364 ML::MLlog4cplus::setAppl(this);
02365 }
02366 catch (xcept::Exception &e) {
02367 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02368 fsm_.fireFailed(reasonForFailedState_,this);
02369 localLog(reasonForFailedState_);
02370 }
02371 bool retval = enableCommon();
02372
02373
02374
02375
02376 return retval;
02377 }
02378
02379 bool FUEventProcessor::stopClassic()
02380 {
02381 bool failed=false;
02382 try {
02383 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02384 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02385 if(rc == edm::EventProcessor::epSuccess)
02386 fsm_.fireEvent("StopDone",this);
02387 else
02388 {
02389 failed=true;
02390
02391 if(rc == edm::EventProcessor::epTimedOut)
02392 reasonForFailedState_ = "EventProcessor stop timed out";
02393 else
02394 reasonForFailedState_ = "EventProcessor did not receive STOP event";
02395 }
02396 }
02397 catch (xcept::Exception &e) {
02398 failed=true;
02399 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02400 }
02401 catch (edm::Exception &e) {
02402 failed=true;
02403 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02404 }
02405 catch (...) {
02406 failed=true;
02407 reasonForFailedState_= "Stopping FAILED: unknown exception";
02408 }
02409 try {
02410 if (hasShMem_) {
02411 detachDqmFromShm();
02412 if (failed)
02413 LOG4CPLUS_WARN(getApplicationLogger(),
02414 "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
02415 }
02416 }
02417 catch (cms::Exception & e) {
02418 failed=true;
02419 reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
02420 }
02421 catch (...) {
02422 failed=true;
02423 reasonForFailedState_= "DQM detach failed: Unknown exception";
02424 }
02425
02426 if (failed) {
02427 LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
02428 << reasonForFailedState_ << " (pid:" << getpid()<<")");
02429 localLog(reasonForFailedState_);
02430 fsm_.fireFailed(reasonForFailedState_,this);
02431 }
02432
02433 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02434 localLog("-I- Stop completed");
02435 return false;
02436 }
02437
02438 void FUEventProcessor::stopSlavesAndAcknowledge()
02439 {
02440 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02441 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02442
02443 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02444 for(unsigned int i = 0; i < subs_.size(); i++)
02445 {
02446 pthread_mutex_lock(&stop_lock_);
02447 if(subs_[i].alive()>0){
02448 processes_to_stop[i] = true;
02449 subs_[i].post(msg,false);
02450 }
02451 pthread_mutex_unlock(&stop_lock_);
02452 }
02453 for(unsigned int i = 0; i < subs_.size(); i++)
02454 {
02455 pthread_mutex_lock(&stop_lock_);
02456 if(processes_to_stop[i]){
02457 try{
02458 subs_[i].rcv(msg1,false);
02459 }
02460 catch(evf::Exception &e){
02461 std::ostringstream ost;
02462 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
02463 reasonForFailedState_ = ost.str();
02464 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02465
02466 localLog(reasonForFailedState_);
02467 pthread_mutex_unlock(&stop_lock_);
02468 continue;
02469 }
02470 }
02471 else {
02472 pthread_mutex_unlock(&stop_lock_);
02473 continue;
02474 }
02475 pthread_mutex_unlock(&stop_lock_);
02476 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02477 while(subs_[i].alive()>0) ::usleep(10000);
02478 subs_[i].disconnect();
02479 }
02480
02481
02482 }
02483
02484 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out) throw (xgi::exception::Exception)
02485 {
02486 std::string urn = getApplicationDescriptor()->getURN();
02487 try{
02488 evtProcessor_.stateNameFromIndex(0);
02489 evtProcessor_.moduleNameFromIndex(0);
02490 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02491 *out << "<tr><td>" << fsm_.stateName()->toString()
02492 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02493 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02494 evtProcessor_.microState(in,out);
02495 *out << "<td></td><td>" << nbTotalDQM_
02496 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02497 if(nbSubProcesses_.value_!=0 && !myProcess_)
02498 {
02499 pthread_mutex_lock(&start_lock_);
02500 for(unsigned int i = 0; i < subs_.size(); i++)
02501 {
02502 try{
02503 if(subs_[i].alive()>0)
02504 {
02505 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
02506 << i << "\">""Alive</td><td>S</td><td>"
02507 << subs_[i].queueId() << "<td>"
02508 << subs_[i].queueStatus()<< "/"
02509 << subs_[i].queueOccupancy() << "/"
02510 << subs_[i].queuePidOfLastSend() << "/"
02511 << subs_[i].queuePidOfLastReceive()
02512 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
02513 << subs_[i].pid() << "&method=procStat\">"
02514 << subs_[i].pid()<<"</a></td>"
02515 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
02516 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
02517 << subs_[i].params().nba << "/" << subs_[i].params().nbp
02518 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
02519 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
02520 << "</td><td>" << subs_[i].params().ps
02521 << "</td><td"
02522 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
02523 << subs_[i].params().eols
02524 << "</td><td>" << subs_[i].params().dqm
02525 << "</td><td>" << subs_[i].params().trp << "</td>";
02526 }
02527 else
02528 {
02529 pthread_mutex_lock(&pickup_lock_);
02530 *out << "<tr><td id=\"a"<< i << "\" ";
02531 if(subs_[i].alive()==-1000)
02532 *out << " bgcolor=\"#bbaabb\">NotInitialized";
02533 else
02534 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02535 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02536 << subs_[i].queueStatus() << "/"
02537 << subs_[i].queueOccupancy() << "/"
02538 << subs_[i].queuePidOfLastSend() << "/"
02539 << subs_[i].queuePidOfLastReceive()
02540 << "</td><td id=\"p"<< i << "\">"
02541 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02542 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
02543 {
02544 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
02545 *out << " will restart in " << subs_[i].countdown() << " s";
02546 else if(autoRestartSlaves_)
02547 *out << " reached maximum restart count";
02548 else *out << " autoRestart is disabled ";
02549 }
02550 *out << "</td><td"
02551 << ((subs_[i].params().eols<subs_[i].params().ls) ?
02552 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
02553 << ">"
02554 << subs_[i].params().eols
02555 << "</td><td>" << subs_[i].params().dqm
02556 << "</td><td>" << subs_[i].params().trp << "</td>";
02557 pthread_mutex_unlock(&pickup_lock_);
02558 }
02559 *out << "</tr>";
02560 }
02561 catch(evf::Exception &e){
02562 *out << "<tr><td id=\"a"<< i << "\" "
02563 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02564 << subs_[i].queueStatus() << "/"
02565 << subs_[i].queueOccupancy() << "/"
02566 << subs_[i].queuePidOfLastSend() << "/"
02567 << subs_[i].queuePidOfLastReceive()
02568 << "</td><td id=\"p"<< i << "\">"
02569 <<subs_[i].pid()<<"</td>";
02570 }
02571 }
02572 pthread_mutex_unlock(&start_lock_);
02573 }
02574 }
02575 catch(evf::Exception &e)
02576 {
02577 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
02578 }
02579 catch(cms::Exception &e)
02580 {
02581 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
02582 }
02583 catch(std::exception &e)
02584 {
02585 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
02586 }
02587 catch(...)
02588 {
02589 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
02590 }
02591
02592 }
02593
02594
02595 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out) throw (xgi::exception::Exception)
02596 {
02597 using namespace utils;
02598
02599 *out << updaterStatic_;
02600 mDiv(out,"loads");
02601 uptime(out);
02602 cDiv(out);
02603 mDiv(out,"st",fsm_.stateName()->toString());
02604 mDiv(out,"ru",runNumber_.toString());
02605 mDiv(out,"nsl",nbSubProcesses_.value_);
02606 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02607 mDiv(out,"cl");
02608 *out << getApplicationDescriptor()->getClassName()
02609 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02610 cDiv(out);
02611 mDiv(out,"in",getApplicationDescriptor()->getInstance());
02612 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02613 mDiv(out,"hlt");
02614 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02615 cDiv(out);
02616 *out << std::endl;
02617 }
02618 else
02619 mDiv(out,"hlt","Not yet...");
02620
02621 mDiv(out,"sq",squidPresent_.toString());
02622 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02623 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02624 if(nbProcessed != 0 && nbAccepted != 0)
02625 {
02626 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02627 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02628 }
02629 else
02630 {
02631 mDiv(out,"tt",0);
02632 mDiv(out,"ac",0);
02633 }
02634 if(!myProcess_)
02635 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02636 else
02637 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02638
02639 mDiv(out,"idi",iDieUrl_.value_);
02640 if(vp_!=0){
02641 mDiv(out,"vpi",(unsigned int) vp_);
02642 if(vulture_->hasStarted()>=0)
02643 mDiv(out,"vul","Prowling");
02644 else
02645 mDiv(out,"vul","Dead");
02646 }
02647 else{
02648 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02649 }
02650 if(evtProcessor_){
02651 mDiv(out,"ll");
02652 *out << evtProcessor_.lastLumi().ls
02653 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02654 cDiv(out);
02655 }
02656 mDiv(out,"lg");
02657 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02658 *out << logRing_[i] << std::endl;
02659 if(logWrap_)
02660 for(unsigned int i = 0; i<logRingIndex_; i++)
02661 *out << logRing_[i] << std::endl;
02662 cDiv(out);
02663 mDiv(out,"cha");
02664 if(cpustat_) *out << cpustat_->getChart();
02665 cDiv(out);
02666 }
02667
02668 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
02669 {
02670 evf::utils::procStat(out);
02671 }
02672
02673 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02674 {
02675 if(myProcess_) myProcess_->postSlave(buf,true);
02676 }
02677
02678 void FUEventProcessor::makeStaticInfo()
02679 {
02680 using namespace utils;
02681 std::ostringstream ost;
02682 mDiv(&ost,"ve");
02683 ost<< "$Revision: 1.164 $ (" << edm::getReleaseVersion() <<")";
02684 cDiv(&ost);
02685 mDiv(&ost,"ou",outPut_.toString());
02686 mDiv(&ost,"sh",hasShMem_.toString());
02687 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02688 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02689
02690 xdata::Serializable *monsleep = 0;
02691 xdata::Serializable *lstimeout = 0;
02692 try{
02693 monsleep = applicationInfoSpace_->find("monSleepSec");
02694 lstimeout = applicationInfoSpace_->find("lsTimeOut");
02695 }
02696 catch(xdata::exception::Exception e){
02697 }
02698
02699 if(monsleep!=0)
02700 mDiv(&ost,"ms",monsleep->toString());
02701 if(lstimeout!=0)
02702 mDiv(&ost,"lst",lstimeout->toString());
02703 char cbuf[sizeof(struct utsname)];
02704 struct utsname* buf = (struct utsname*)cbuf;
02705 uname(buf);
02706 mDiv(&ost,"sysinfo");
02707 ost << buf->sysname << " " << buf->nodename
02708 << " " << buf->release << " " << buf->version << " " << buf->machine;
02709 cDiv(&ost);
02710 updaterStatic_ = ost.str();
02711 }
02712
02713 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02714 {
02715
02716 sem_post(sigmon_sem_);
02717
02718
02719 sleep(2);
02720
02721
02722 alarm(5);
02723
02724 std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02725 std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02726 std::cout << "--- Stacktrace follows --" << std::endl;
02727 std::ostringstream stacktr;
02728 toolbox::stacktrace(20,stacktr);
02729 std::cout << stacktr.str();
02730 if (!rlimit_coresize_changed_)
02731 std::cout << "--- Dumping core." << " --- " << std::endl;
02732 else
02733 std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02734
02735 std::string hasdump = "";
02736 if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02737
02738 LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
02739 << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02740 << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02741 << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02742 );
02743
02744
02745 raise(sig);
02746 }
02747
02748
02749 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)