00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00020 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00021 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00022
00023 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00024 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00025 #include "FWCore/Utilities/interface/Presence.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "FWCore/Version/interface/GetReleaseVersion.h"
00028 #include "FWCore/ServiceRegistry/interface/Service.h"
00029
00030 #include "toolbox/BSem.h"
00031 #include "toolbox/Runtime.h"
00032 #include "toolbox/stacktrace.h"
00033 #include "toolbox/net/Utils.h"
00034
00035 #include <boost/tokenizer.hpp>
00036
00037 #include "xcept/tools.h"
00038 #include "xgi/Method.h"
00039
00040 #include "cgicc/CgiDefs.h"
00041 #include "cgicc/Cgicc.h"
00042 #include "cgicc/FormEntry.h"
00043
00044
00045 #include <sys/wait.h>
00046 #include <sys/utsname.h>
00047 #include <sys/mman.h>
00048 #include <signal.h>
00049
00050 #include <typeinfo>
00051 #include <stdlib.h>
00052 #include <stdio.h>
00053 #include <errno.h>
00054
00055
00056 using namespace evf;
00057 using namespace cgicc;
00058 namespace toolbox {
00059 namespace mem {
00060 extern toolbox::BSem * _s_mutex_ptr_;
00061 }
00062 }
00063
00064
00065 namespace evf {
00066 FUEventProcessor * FUInstancePtr_;
00067 int evfep_raised_signal;
00068 void evfep_sighandler(int sig, siginfo_t* info, void* c)
00069 {
00070 evfep_raised_signal=sig;
00071 FUInstancePtr_->handleSignalSlave(sig, info, c);
00072 }
00073 void evfep_alarmhandler(int sig, siginfo_t* info, void* c)
00074 {
00075 if (evfep_raised_signal) {
00076 signal(evfep_raised_signal,SIG_DFL);
00077 raise(evfep_raised_signal);
00078 }
00079 }
00080 }
00081
00083
00085
00086
00087 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00088 : xdaq::Application(s)
00089 , fsm_(this)
00090 , log_(getApplicationLogger())
00091 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00092 , runNumber_(0)
00093 , epInitialized_(false)
00094 , outPut_(true)
00095 , autoRestartSlaves_(false)
00096 , slaveRestartDelaySecs_(10)
00097 , hasShMem_(true)
00098 , hasPrescaleService_(true)
00099 , hasModuleWebRegistry_(true)
00100 , hasServiceWebRegistry_(true)
00101 , isRunNumberSetter_(true)
00102 , iDieStatisticsGathering_(false)
00103 , outprev_(true)
00104 , exitOnError_(true)
00105 , reasonForFailedState_()
00106 , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
00107 , logRing_(logRingSize_)
00108 , logRingIndex_(logRingSize_)
00109 , logWrap_(false)
00110 , nbSubProcesses_(0)
00111 , nbSubProcessesReporting_(0)
00112 , forkInEDM_(true)
00113 , nblive_(0)
00114 , nbdead_(0)
00115 , nbTotalDQM_(0)
00116 , wlReceiving_(0)
00117 , asReceiveMsgAndExecute_(0)
00118 , receiving_(false)
00119 , wlReceivingMonitor_(0)
00120 , asReceiveMsgAndRead_(0)
00121 , receivingM_(false)
00122 , myProcess_(0)
00123 , wlSupervising_(0)
00124 , asSupervisor_(0)
00125 , supervising_(false)
00126 , wlSignalMonitor_(0)
00127 , asSignalMonitor_(0)
00128 , signalMonitorActive_(false)
00129 , monitorInfoSpace_(0)
00130 , monitorLegendaInfoSpace_(0)
00131 , applicationInfoSpace_(0)
00132 , nbProcessed(0)
00133 , nbAccepted(0)
00134 , scalersInfoSpace_(0)
00135 , scalersLegendaInfoSpace_(0)
00136 , wlScalers_(0)
00137 , asScalers_(0)
00138 , wlScalersActive_(false)
00139 , scalersUpdates_(0)
00140 , wlSummarize_(0)
00141 , asSummarize_(0)
00142 , wlSummarizeActive_(false)
00143 , superSleepSec_(1)
00144 , iDieUrl_("none")
00145 , vulture_(0)
00146 , vp_(0)
00147 , cpustat_(0)
00148 , ratestat_(0)
00149 , mwrRef_(nullptr)
00150 , sorRef_(nullptr)
00151 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00152 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00153 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00154 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00155 , edm_init_done_(true)
00156 , crashesThisRun_(false)
00157 , rlimit_coresize_changed_(false)
00158 , crashesToDump_(2)
00159 , sigmon_sem_(0)
00160 , datasetCounting_(true)
00161 {
00162 using namespace utils;
00163
00164 FUInstancePtr_=this;
00165
00166 names_.push_back("nbProcessed" );
00167 names_.push_back("nbAccepted" );
00168 names_.push_back("epMacroStateInt");
00169 names_.push_back("epMicroStateInt");
00170
00171 int retpipe = pipe(anonymousPipe_);
00172 if(retpipe != 0)
00173 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00174
00175 squidPresent_ = squidnet_.check();
00176
00177 evtProcessor_.setAppDesc(getApplicationDescriptor());
00178 evtProcessor_.setAppCtxt(getApplicationContext());
00179
00180 fsm_.initialize<evf::FUEventProcessor>(this);
00181
00182
00183 url_ =
00184 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00185 getApplicationDescriptor()->getURN();
00186 class_ =getApplicationDescriptor()->getClassName();
00187 instance_=getApplicationDescriptor()->getInstance();
00188 sourceId_=class_.toString()+"_"+instance_.toString();
00189 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00190 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00191
00192 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00193
00194 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00195 applicationInfoSpace_ = ispace;
00196
00197
00198 ispace->fireItemAvailable("parameterSet", &configString_ );
00199 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00200 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00201 ispace->fireItemAvailable("runNumber", &runNumber_ );
00202 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00203
00204 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00205 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00206 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00207 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00208 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00209 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00210 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00211 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00212 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00213 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00214 ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
00215 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00216 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00217 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00218 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00219 ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
00220 ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ );
00221
00222
00223 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00224 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00225
00226
00227 fsm_.findRcmsStateListener();
00228
00229
00230
00231 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00232 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00233 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00234
00235 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00236 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00237 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00238
00239
00240 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00241 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00242 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00243 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00244 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00245
00246 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00247
00248 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00249 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00250 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00251
00252 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00253 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00254 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00255
00256
00257
00258 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00259 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00260
00261 evtProcessor_.setApplicationInfoSpace(ispace);
00262 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00263 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00264
00265
00266 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00267 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00268
00269
00270 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00271 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00272 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00273 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00274 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00275 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00276 xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
00277 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00278 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00279 xgi::bind(this, &FUEventProcessor::microState, "microState");
00280 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00281 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00282
00283
00284
00285 edm::AssertHandler ah;
00286
00287
00288 try{
00289 LOG4CPLUS_DEBUG(getApplicationLogger(),
00290 "Trying to create message service presence ");
00291 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00292 if(pf != 0) {
00293 messageServicePresence_= pf->makePresence("MessageServicePresence");
00294 }
00295 else {
00296 LOG4CPLUS_ERROR(getApplicationLogger(),
00297 "Unable to create message service presence ");
00298 }
00299 }
00300 catch(...) {
00301 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00302 }
00303 ML::MLlog4cplus::setAppl(this);
00304
00305 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00306 typedef AppDescSet_t::iterator AppDescIter_t;
00307
00308 AppDescSet_t rcms=
00309 getApplicationContext()->getDefaultZone()->
00310 getApplicationDescriptors("RCMSStateListener");
00311 if(rcms.size()==0)
00312 {
00313 LOG4CPLUS_WARN(getApplicationLogger(),
00314 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00315
00316 }
00317 else
00318 {
00319 AppDescIter_t it = rcms.begin();
00320 evtProcessor_.setRcms(*it);
00321 }
00322 pthread_mutex_init(&start_lock_,0);
00323 pthread_mutex_init(&stop_lock_,0);
00324 pthread_mutex_init(&pickup_lock_,0);
00325
00326 forkInfoObj_=nullptr;
00327 pthread_mutex_init(&forkObjLock_,0);
00328 makeStaticInfo();
00329 startSupervisorLoop();
00330
00331 if(vulture_==0) vulture_ = new Vulture(true);
00332
00334
00335 AppDescSet_t setOfiDie=
00336 getApplicationContext()->getDefaultZone()->
00337 getApplicationDescriptors("evf::iDie");
00338
00339 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00340 if ((*it)->getInstance()==0)
00341 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00342
00343
00344 getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00345
00346
00347 #ifdef linux
00348 if (sigmon_sem_==0) {
00349 sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
00350 PROT_READ | PROT_WRITE,
00351 MAP_ANONYMOUS | MAP_SHARED, 0, 0);
00352 if (!sigmon_sem_) {
00353 perror("mmap error\n");
00354 std::cout << "mmap error"<<std::endl;
00355 }
00356 else
00357 sem_init(sigmon_sem_,true,0);
00358 }
00359 #endif
00360 }
00361
00362
00363
00364
00365 FUEventProcessor::~FUEventProcessor()
00366 {
00367
00368
00369
00370 if(vulture_ != 0) delete vulture_;
00371 }
00372
00373
00375
00377
00378
00379
00380 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00381 {
00382
00383
00384
00385
00386
00387 unsigned short smap
00388 = (datasetCounting_.value_ ? 0x20 : 0 )
00389 + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00390 + (((instance_.value_%80)==0) ? 0x8 : 0)
00391 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00392 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00393 + (hasPrescaleService_.value_ ? 0x1 : 0);
00394 if(nbSubProcesses_.value_==0)
00395 {
00396 spMStates_.setSize(1);
00397 spmStates_.setSize(1);
00398 }
00399 else
00400 {
00401 spMStates_.setSize(nbSubProcesses_.value_);
00402 spmStates_.setSize(nbSubProcesses_.value_);
00403 for(unsigned int i = 0; i < spMStates_.size(); i++)
00404 {
00405 spMStates_[i] = edm::event_processor::sInit;
00406 spmStates_[i] = 0;
00407 }
00408 }
00409 try {
00410 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00411 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00412 epInitialized_=true;
00413 if(evtProcessor_)
00414 {
00415
00416 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00417 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00418
00419 configuration_ = evtProcessor_.configuration();
00420 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00421 evtProcessor_->beginJob();
00422 evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1);
00423 if(cpustat_) {delete cpustat_; cpustat_=0;}
00424 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00425 nbSubProcesses_.value_,
00426 instance_.value_,
00427 iDieUrl_.value_);
00428 if(ratestat_) {delete ratestat_; ratestat_=0;}
00429 ratestat_ = new RateStat(iDieUrl_.value_);
00430 if(iDieStatisticsGathering_.value_)
00431 {
00432 try
00433 {
00434 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00435 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00436 if(legenda !=0)
00437 {
00438 std::string slegenda = ((xdata::String*)legenda)->value_;
00439 ratestat_->sendLegenda(slegenda);
00440 }
00441 if (sorRef_ && datasetCounting_.value_)
00442 {
00443 xdata::String dsLegenda = sorRef_->getDatasetCSV();
00444 if (dsLegenda.value_.size())
00445 ratestat_->sendAuxLegenda(dsLegenda);
00446 }
00447 }
00448 catch(evf::Exception &e)
00449 {
00450 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00451 << e.what());
00452 }
00453 catch (xcept::Exception& e) {
00454 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00455 << e.what());
00456 }
00457 }
00458
00459 fsm_.fireEvent("ConfigureDone",this);
00460 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00461 localLog("-I- Configuration completed");
00462
00463 }
00464 }
00465 catch (xcept::Exception &e) {
00466 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00467 fsm_.fireFailed(reasonForFailedState_,this);
00468 localLog(reasonForFailedState_);
00469 }
00470 catch(cms::Exception &e) {
00471 reasonForFailedState_ = e.explainSelf();
00472 fsm_.fireFailed(reasonForFailedState_,this);
00473 localLog(reasonForFailedState_);
00474 }
00475 catch(std::exception &e) {
00476 reasonForFailedState_ = e.what();
00477 fsm_.fireFailed(reasonForFailedState_,this);
00478 localLog(reasonForFailedState_);
00479 }
00480 catch(...) {
00481 fsm_.fireFailed("Unknown Exception",this);
00482 }
00483
00484 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00485
00486 return false;
00487 }
00488
00489
00490
00491
00492
00493 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00494 {
00495 nbTotalDQM_ = 0;
00496 scalersUpdates_ = 0;
00497 idleProcStats_ = 0;
00498 allProcStats_ = 0;
00499
00500
00501
00502
00503
00504 unsigned short smap
00505 = (datasetCounting_.value_ ? 0x20 : 0 )
00506 + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00507 + (((instance_.value_%80)==0) ? 0x8 : 0)
00508 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00509 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00510 + (hasPrescaleService_.value_ ? 0x1 : 0);
00511
00512 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00513
00514
00515 if (rlimit_coresize_changed_)
00516 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00517 rlimit_coresize_changed_=false;
00518 crashesThisRun_=0;
00519
00520 sem_destroy(sigmon_sem_);
00521 sem_init(sigmon_sem_,true,0);
00522
00523 if(!epInitialized_){
00524 evtProcessor_.forceInitEventProcessorMaybe();
00525 }
00526 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00527
00528 mwrRef_ = evtProcessor_.getModuleWebRegistry();
00529 sorRef_ = evtProcessor_.getShmOutputModuleRegistry();
00530
00531 if(!epInitialized_){
00532 evtProcessor_->beginJob();
00533 evtProcessor_.setupFastTimerService(nbSubProcesses_.value_>0 ? nbSubProcesses_.value_:1);
00534 if(cpustat_) {delete cpustat_; cpustat_=0;}
00535 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00536 nbSubProcesses_.value_,
00537 instance_.value_,
00538 iDieUrl_.value_);
00539 if(ratestat_) {delete ratestat_; ratestat_=0;}
00540 ratestat_ = new RateStat(iDieUrl_.value_);
00541 if(iDieStatisticsGathering_.value_)
00542 {
00543 try
00544 {
00545 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00546 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00547 if(legenda !=0)
00548 {
00549 std::string slegenda = ((xdata::String*)legenda)->value_;
00550 ratestat_->sendLegenda(slegenda);
00551 }
00552 if (sorRef_ && datasetCounting_.value_)
00553 {
00554 xdata::String dsLegenda = sorRef_->getDatasetCSV();
00555 if (dsLegenda.value_.size())
00556 ratestat_->sendAuxLegenda(dsLegenda);
00557 }
00558 }
00559 catch(evf::Exception &e)
00560 {
00561 LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00562 << e.what());
00563 }
00564 catch (xcept::Exception& e) {
00565 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00566 << e.what());
00567 }
00568 }
00569 epInitialized_ = true;
00570 }
00571 configuration_ = evtProcessor_.configuration();
00572 evtProcessor_.resetLumiSectionReferenceIndex();
00573
00574 if(nbSubProcesses_.value_==0) return enableClassic();
00575
00576 pthread_mutex_lock(&start_lock_);
00577 pthread_mutex_lock(&pickup_lock_);
00578 subs_.clear();
00579 subs_.resize(nbSubProcesses_.value_);
00580 pid_t retval = -1;
00581
00582 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00583 {
00584 subs_[i]=SubProcess(i,retval);
00585 }
00586 pthread_mutex_unlock(&pickup_lock_);
00587
00588 pthread_mutex_unlock(&start_lock_);
00589
00590
00591 try {
00592 if (sorRef_) {
00593 unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
00594 if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;
00595 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
00596 for (unsigned int i=0;i<shmOutputs.size();i++) {
00597 shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
00598 }
00599 }
00600 }
00601 catch (...)
00602 {
00603 LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
00604 }
00605
00606
00607 edm_init_done_=true;
00608 if (forkInEDM_.value_) {
00609 edm_init_done_=false;
00610 enableForkInEDM();
00611 }
00612 else
00613 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00614 {
00615 retval = subs_[i].forkNew();
00616 if(retval==0)
00617 {
00618 myProcess_ = &subs_[i];
00619
00620 delete toolbox::mem::_s_mutex_ptr_;
00621 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00622 int retval = pthread_mutex_destroy(&stop_lock_);
00623 if(retval != 0) perror("error");
00624 retval = pthread_mutex_init(&stop_lock_,0);
00625 if(retval != 0) perror("error");
00626 fsm_.disableRcmsStateNotification();
00627
00628 return enableMPEPSlave();
00629
00630 }
00631 }
00632
00633 if (forkInEDM_.value_) {
00634
00635 edm::event_processor::State st;
00636 while (!edm_init_done_) {
00637 usleep(10000);
00638 st = evtProcessor_->getState();
00639 if (st==edm::event_processor::sError || st==edm::event_processor::sInvalid) break;
00640 }
00641 st = evtProcessor_->getState();
00642
00643 if (st!=edm::event_processor::sRunning) {
00644 reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
00645 localLog(reasonForFailedState_);
00646 fsm_.fireFailed(reasonForFailedState_,this);
00647 return false;
00648 }
00649
00650 sleep(1);
00651 startSummarizeWorkLoop();
00652 startSignalMonitorWorkLoop();
00653 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00654
00655
00656 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00657 fsm_.fireEvent("EnableDone",this);
00658 localLog("-I- Start completed");
00659 return false;
00660 }
00661
00662 startSummarizeWorkLoop();
00663 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00664 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00665 fsm_.fireEvent("EnableDone",this);
00666 localLog("-I- Start completed");
00667 return false;
00668 }
00669
00670 bool FUEventProcessor::doEndRunInEDM() {
00671
00672 if (forkInfoObj_) {
00673
00674 int count = 30;
00675 bool waitedForEDM=false;
00676 while (!edm_init_done_ && count) {
00677 ::sleep(1);
00678 if (count%5==0)
00679 LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
00680 count--;
00681 waitedForEDM=true;
00682 }
00683
00684 if (waitedForEDM) sleep(5);
00685
00686
00687
00688 if (evtProcessor_->getState()==edm::event_processor::sJobReady)
00689 return true;
00690
00691 forkInfoObj_->lock();
00692 forkInfoObj_->stopCondition=true;
00693 sem_post(forkInfoObj_->control_sem_);
00694 forkInfoObj_->unlock();
00695
00696 count = 30;
00697
00698 edm::event_processor::State st;
00699 while(count--) {
00700 st = evtProcessor_->getState();
00701 if (st==edm::event_processor::sRunning) ::usleep(100000);
00702 else if (st==edm::event_processor::sStopping || st==edm::event_processor::sJobReady) {
00703 break;
00704 }
00705 else {
00706 std::ostringstream ost;
00707 ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
00708 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00709 fsm_.fireFailed(ost.str(),this);
00710 return false;
00711 }
00712 if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
00713 forkInfoObj_->lock();
00714 forkInfoObj_->stopCondition=true;
00715 sem_post(forkInfoObj_->control_sem_);
00716 forkInfoObj_->unlock();
00717 LOG4CPLUS_WARN(getApplicationLogger(),
00718 "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
00719 }
00720 }
00721 if (count<0) {
00722 std::ostringstream ost;
00723 if (!forkInfoObj_->receivedStop_)
00724 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
00725 << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
00726 else
00727 ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
00728 LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
00729 fsm_.fireFailed(ost.str(),this);
00730 return false;
00731 }
00732 }
00733 return true;
00734 }
00735
00736
00737 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00738 {
00739 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00740 if(nbSubProcesses_.value_!=0) {
00741 stopSlavesAndAcknowledge();
00742 if (forkInEDM_.value_) {
00743
00744 sem_post(sigmon_sem_);
00745 if (!doEndRunInEDM())
00746 return false;
00747 }
00748 }
00749 vulture_->stop();
00750
00751 if (forkInEDM_.value_) {
00752
00753 bool tmpHasShMem_=hasShMem_;
00754 hasShMem_=false;
00755 stopClassic();
00756 hasShMem_=tmpHasShMem_;
00757 return false;
00758 }
00759 stopClassic();
00760 return false;
00761 }
00762
00763
00764
00765 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00766 {
00767 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00768 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
00769 if(nbSubProcesses_.value_!=0) {
00770 stopSlavesAndAcknowledge();
00771 if (forkInEDM_.value_) {
00772 sem_post(sigmon_sem_);
00773 if (!doEndRunInEDM())
00774 return false;
00775
00776 }
00777 }
00778 try{
00779 evtProcessor_.stopAndHalt();
00780
00781 if (forkInfoObj_) {
00782 delete forkInfoObj_;
00783 forkInfoObj_=0;
00784 }
00785 }
00786 catch (evf::Exception &e) {
00787 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00788 localLog(reasonForFailedState_);
00789 fsm_.fireFailed(reasonForFailedState_,this);
00790 }
00791
00792
00793 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00794 fsm_.fireEvent("HaltDone",this);
00795
00796 localLog("-I- Halt completed");
00797 return false;
00798 }
00799
00800
00801
00802 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00803 throw (xoap::exception::Exception)
00804 {
00805 return fsm_.commandCallback(msg);
00806 }
00807
00808
00809
00810 void FUEventProcessor::actionPerformed(xdata::Event& e)
00811 {
00812
00813 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00814 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00815
00816 if ( item == "parameterSet") {
00817 LOG4CPLUS_WARN(getApplicationLogger(),
00818 "HLT Menu changed, force reinitialization of EventProcessor");
00819 epInitialized_ = false;
00820 }
00821 if ( item == "outputEnabled") {
00822 if(outprev_ != outPut_) {
00823 LOG4CPLUS_WARN(getApplicationLogger(),
00824 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00825 evtProcessor_->enableEndPaths(outPut_);
00826 outprev_ = outPut_;
00827 }
00828 }
00829 if (item == "globalInputPrescale") {
00830 LOG4CPLUS_WARN(getApplicationLogger(),
00831 "Setting global input prescale has no effect "
00832 <<"in this version of the code");
00833 }
00834 if ( item == "globalOutputPrescale") {
00835 LOG4CPLUS_WARN(getApplicationLogger(),
00836 "Setting global output prescale has no effect "
00837 <<"in this version of the code");
00838 }
00839 }
00840
00841 }
00842
00843
00844 void FUEventProcessor::getSlavePids(xgi::Input *in, xgi::Output *out)
00845 {
00846 for (unsigned int i=0;i<subs_.size();i++)
00847 {
00848 if (i!=0) *out << ",";
00849 *out << subs_[i].pid();
00850 }
00851 }
00852
00853 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00854 {
00855 using namespace cgicc;
00856 pid_t pid = 0;
00857 std::ostringstream ost;
00858 ost << "&";
00859
00860 Cgicc cgi(in);
00861 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00862 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00863 mycgi->getEnvironment().begin();
00864 mit != mycgi->getEnvironment().end(); mit++)
00865 ost << mit->first << "%" << mit->second << ";";
00866 std::vector<FormEntry> els = cgi.getElements() ;
00867 std::vector<FormEntry> el1;
00868 cgi.getElement("method",el1);
00869 std::vector<FormEntry> el2;
00870 cgi.getElement("process",el2);
00871 if(el1.size()!=0) {
00872 std::string meth = el1[0].getValue();
00873 if(el2.size()!=0) {
00874 unsigned int i = 0;
00875 std::string mod = el2[0].getValue();
00876 pid = atoi(mod.c_str());
00877 for(; i < subs_.size(); i++)
00878 if(subs_[i].pid()==pid) break;
00879 if(i>=subs_.size()){
00880 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00881 return;
00882 }
00883 if(subs_[i].alive() != 1){
00884 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00885 return;
00886 }
00887 MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00888 strncpy(msg1->mtext,meth.c_str(),meth.length());
00889 strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00890 subs_[i].post(msg1,true);
00891 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00892 superSleepSec_.value_=10*keep_supersleep_original_value;
00893 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00894 bool done = false;
00895 std::vector<char *>pieces;
00896 while(!done){
00897 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00898 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00899 ::sleep(1);
00900 continue;
00901 }
00902 unsigned int nbytes = atoi(msg->mtext);
00903 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00904 char *buf= new char[nbytes];
00905 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00906 if(retval!=nbytes) std::cout
00907 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00908 pieces.push_back(buf);
00909 }
00910 superSleepSec_.value_=keep_supersleep_original_value;
00911 for(unsigned int j = 0; j < pieces.size(); j++){
00912 *out<<pieces[j];
00913 delete[] pieces[j];
00914 }
00915 }
00916 }
00917 }
00918
00919
00920
00921 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00922 throw (xgi::exception::Exception)
00923 {
00924
00925
00926 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00927 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00928 << getApplicationDescriptor()->getInstance() << "</title>"
00929 << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00930 << "</head></html>";
00931 }
00932
00933
00934
00935
00936
00937 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00938 throw (xgi::exception::Exception)
00939 {
00940
00941 std::string urn = getApplicationDescriptor()->getURN();
00942
00943 *out << "<!-- base href=\"/" << urn
00944 << "\"> -->" << std::endl;
00945 *out << "<html>" << std::endl;
00946 *out << "<head>" << std::endl;
00947 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00948 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00949 *out << "<title>" << getApplicationDescriptor()->getClassName()
00950 << getApplicationDescriptor()->getInstance()
00951 << " MAIN</title>" << std::endl;
00952 *out << "</head>" << std::endl;
00953 *out << "<body>" << std::endl;
00954 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00955 *out << "<tr>" << std::endl;
00956 *out << " <td align=\"left\">" << std::endl;
00957 *out << " <img" << std::endl;
00958 *out << " align=\"middle\"" << std::endl;
00959 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00960 *out << " alt=\"main\"" << std::endl;
00961 *out << " width=\"64\"" << std::endl;
00962 *out << " height=\"64\"" << std::endl;
00963 *out << " border=\"\"/>" << std::endl;
00964 *out << " <b>" << std::endl;
00965 *out << getApplicationDescriptor()->getClassName()
00966 << getApplicationDescriptor()->getInstance() << std::endl;
00967 *out << " " << fsm_.stateName()->toString() << std::endl;
00968 *out << " </b>" << std::endl;
00969 *out << " </td>" << std::endl;
00970 *out << " <td width=\"32\">" << std::endl;
00971 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00972 *out << " <img" << std::endl;
00973 *out << " align=\"middle\"" << std::endl;
00974 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00975 *out << " alt=\"HyperDAQ\"" << std::endl;
00976 *out << " width=\"32\"" << std::endl;
00977 *out << " height=\"32\"" << std::endl;
00978 *out << " border=\"\"/>" << std::endl;
00979 *out << " </a>" << std::endl;
00980 *out << " </td>" << std::endl;
00981 *out << " <td width=\"32\">" << std::endl;
00982 *out << " </td>" << std::endl;
00983 *out << " <td width=\"32\">" << std::endl;
00984 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00985 *out << " <img" << std::endl;
00986 *out << " align=\"middle\"" << std::endl;
00987 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00988 *out << " alt=\"main\"" << std::endl;
00989 *out << " width=\"32\"" << std::endl;
00990 *out << " height=\"32\"" << std::endl;
00991 *out << " border=\"\"/>" << std::endl;
00992 *out << " </a>" << std::endl;
00993 *out << " </td>" << std::endl;
00994 *out << "</tr>" << std::endl;
00995 *out << "</table>" << std::endl;
00996
00997 *out << "<hr/>" << std::endl;
00998
00999 std::ostringstream ost;
01000 if(myProcess_)
01001 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
01002 else
01003 ost << "/moduleWeb?";
01004 urn += ost.str();
01005 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
01006 evtProcessor_.taskWebPage(in,out,urn);
01007 else if(evtProcessor_)
01008 evtProcessor_.summaryWebPage(in,out,urn);
01009 else
01010 *out << "<td>HLT Unconfigured</td>" << std::endl;
01011 *out << "</table>" << std::endl;
01012
01013 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
01014 *out << configuration_ << std::endl;
01015 *out << "</textarea><P>" << std::endl;
01016
01017 *out << "</body>" << std::endl;
01018 *out << "</html>" << std::endl;
01019
01020
01021 }
01022 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
01023 throw (xgi::exception::Exception)
01024 {
01025
01026 out->getHTTPResponseHeader().addHeader( "Content-Type",
01027 "application/octet-stream" );
01028 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
01029 "binary" );
01030 if(evtProcessor_ != 0){
01031 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic));
01032 }
01033 }
01034
01035 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
01036 throw (xgi::exception::Exception)
01037 {
01038
01039 if(evtProcessor_ != 0){
01040 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
01041 if(legenda !=0){
01042 std::string slegenda = ((xdata::String*)legenda)->value_;
01043 *out << slegenda << std::endl;
01044 }
01045 }
01046 }
01047
01048
01049 void FUEventProcessor::setAttachDqmToShm() throw (evf::Exception)
01050 {
01051 std::string errmsg;
01052 try {
01053 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01054 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01055 edm::Service<FUShmDQMOutputService>()->setAttachToShm();
01056 }
01057 catch (cms::Exception& e) {
01058 errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
01059 }
01060 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01061 }
01062
01063
01064 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
01065 {
01066 std::string errmsg;
01067 bool success = false;
01068 try {
01069 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01070 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01071 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
01072 if (!success) errmsg = "Failed to attach DQM service to shared memory";
01073 }
01074 catch (cms::Exception& e) {
01075 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
01076 }
01077 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01078 }
01079
01080
01081
01082 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
01083 {
01084 std::string errmsg;
01085 bool success = false;
01086 try {
01087 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
01088 if(edm::Service<FUShmDQMOutputService>().isAvailable())
01089 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
01090 if (!success) errmsg = "Failed to detach DQM service from shared memory";
01091 }
01092 catch (cms::Exception& e) {
01093 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
01094 }
01095 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
01096 }
01097
01098
01099 std::string FUEventProcessor::logsAsString()
01100 {
01101 std::ostringstream oss;
01102 if(logWrap_)
01103 {
01104 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01105 oss << logRing_[i] << std::endl;
01106 for(unsigned int i = 0; i < logRingIndex_; i++)
01107 oss << logRing_[i] << std::endl;
01108 }
01109 else
01110 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
01111 oss << logRing_[i] << std::endl;
01112
01113 return oss.str();
01114 }
01115
01116 void FUEventProcessor::localLog(std::string m)
01117 {
01118 timeval tv;
01119
01120 gettimeofday(&tv,0);
01121 tm *uptm = localtime(&tv.tv_sec);
01122 char datestring[256];
01123 strftime(datestring, sizeof(datestring),"%c", uptm);
01124
01125 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
01126 logRingIndex_--;
01127 std::ostringstream timestamp;
01128 timestamp << " at " << datestring;
01129 m += timestamp.str();
01130 logRing_[logRingIndex_] = m;
01131 }
01132
01133 void FUEventProcessor::startSupervisorLoop()
01134 {
01135 try {
01136 wlSupervising_=
01137 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
01138 "waiting");
01139 if (!wlSupervising_->isActive()) wlSupervising_->activate();
01140 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
01141 "Supervisor");
01142 wlSupervising_->submit(asSupervisor_);
01143 supervising_ = true;
01144 }
01145 catch (xcept::Exception& e) {
01146 std::string msg = "Failed to start workloop 'Supervisor'.";
01147 XCEPT_RETHROW(evf::Exception,msg,e);
01148 }
01149 }
01150
01151 void FUEventProcessor::startReceivingLoop()
01152 {
01153 try {
01154 wlReceiving_=
01155 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
01156 "waiting");
01157 if (!wlReceiving_->isActive()) wlReceiving_->activate();
01158 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
01159 "Receiving");
01160 wlReceiving_->submit(asReceiveMsgAndExecute_);
01161 receiving_ = true;
01162 }
01163 catch (xcept::Exception& e) {
01164 std::string msg = "Failed to start workloop 'Receiving'.";
01165 XCEPT_RETHROW(evf::Exception,msg,e);
01166 }
01167 }
01168 void FUEventProcessor::startReceivingMonitorLoop()
01169 {
01170 try {
01171 wlReceivingMonitor_=
01172 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
01173 "waiting");
01174 if (!wlReceivingMonitor_->isActive())
01175 wlReceivingMonitor_->activate();
01176 asReceiveMsgAndRead_ =
01177 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
01178 "ReceivingM");
01179 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
01180 receivingM_ = true;
01181 }
01182 catch (xcept::Exception& e) {
01183 std::string msg = "Failed to start workloop 'ReceivingM'.";
01184 XCEPT_RETHROW(evf::Exception,msg,e);
01185 }
01186 }
01187
01188 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
01189 {
01190 MsgBuf msg;
01191 try{
01192 myProcess_->rcvSlave(msg,false);
01193 if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
01194 {
01195 rlimit rl;
01196 getrlimit(RLIMIT_CORE,&rl);
01197 rl.rlim_cur=0;
01198 setrlimit(RLIMIT_CORE,&rl);
01199 rlimit_coresize_changed_=true;
01200 }
01201 if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
01202 {
01203
01204 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01205 rlimit_coresize_changed_=false;
01206 }
01207 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
01208 {
01209 pthread_mutex_lock(&stop_lock_);
01210 try {
01211 fsm_.fireEvent("Stop",this);
01212 }
01213 catch (...) {
01214 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
01215 << getpid() << " The state on Stop event was not consistent");
01216 }
01217
01218 try {
01219 stopClassic();
01220 }
01221 catch (...) {
01222 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
01223 }
01224
01225
01226 try{
01227 messageServicePresence_.reset();
01228 }
01229 catch(...) {
01230 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
01231 }
01232
01233 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
01234 myProcess_->postSlave(msg1,false);
01235 pthread_mutex_unlock(&stop_lock_);
01236 fclose(stdout);
01237 fclose(stderr);
01238 _exit(EXIT_SUCCESS);
01239 }
01240 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
01241 _exit(EXIT_SUCCESS);
01242 }
01243 catch(evf::Exception &e){
01244 LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
01245 }
01246 return true;
01247 }
01248
01249 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
01250 {
01251 pthread_mutex_lock(&stop_lock_);
01252 if(subs_.size()!=nbSubProcesses_.value_)
01253 {
01254 pthread_mutex_lock(&pickup_lock_);
01255 if(subs_.size()!=nbSubProcesses_.value_) {
01256 subs_.resize(nbSubProcesses_.value_);
01257 spMStates_.resize(nbSubProcesses_.value_);
01258 spmStates_.resize(nbSubProcesses_.value_);
01259 for(unsigned int i = 0; i < spMStates_.size(); i++)
01260 {
01261 spMStates_[i] = edm::event_processor::sInit;
01262 spmStates_[i] = 0;
01263 }
01264 }
01265 pthread_mutex_unlock(&pickup_lock_);
01266 }
01267 bool running = fsm_.stateName()->toString()=="Enabled";
01268 bool stopping = fsm_.stateName()->toString()=="stopping";
01269 for(unsigned int i = 0; i < subs_.size(); i++)
01270 {
01271 if(subs_[i].alive()==-1000) continue;
01272 int sl;
01273 pid_t sub_pid = subs_[i].pid();
01274 pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
01275
01276 if(killedOrNot && killedOrNot==sub_pid) {
01277 pthread_mutex_lock(&pickup_lock_);
01278
01279 if (i<subs_.size() && subs_[i].alive()!=-1000) {
01280 subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
01281 std::ostringstream ost;
01282 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
01283 else if(WIFSIGNALED(sl)!=0) {
01284 ost << " process terminated with signal " << WTERMSIG(sl);
01285 }
01286 else ost << " process stopped ";
01287
01288
01289
01290
01291 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
01292 subs_[i].setReasonForFailed(ost.str());
01293 spMStates_[i] = evtProcessor_.notstarted_state_code();
01294 spmStates_[i] = 0;
01295 std::ostringstream ost1;
01296 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
01297 localLog(ost1.str());
01298 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
01299 }
01300 pthread_mutex_unlock(&pickup_lock_);
01301 }
01302 }
01303 pthread_mutex_unlock(&stop_lock_);
01304 if(stopping) return true;
01305
01306
01307 if (running && rlimit_coresize_changed_) {
01308 timeval newtv;
01309 gettimeofday(&newtv,0);
01310 int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
01311 if (delta>60*15) {
01312 std::ostringstream ostr;
01313 ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
01314 std::cout << ostr.str() << std::endl;
01315 LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
01316 setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
01317 MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
01318 for (unsigned int i = 0; i < subs_.size(); i++) {
01319 try {
01320 if (subs_[i].alive())
01321 subs_[i].post(master_message_rlr_,false);
01322 }
01323 catch (...) {}
01324 }
01325 rlimit_coresize_changed_=false;
01326 crashesThisRun_=0;
01327 }
01328 }
01329
01330 if(running && edm_init_done_)
01331 {
01332
01333
01334 if(autoRestartSlaves_.value_){
01335 pthread_mutex_lock(&stop_lock_);
01336 for(unsigned int i = 0; i < subs_.size(); i++)
01337 {
01338 if(subs_[i].alive() != 1){
01339 if(subs_[i].countdown() == 0)
01340 {
01341 if(subs_[i].restartCount()>2){
01342 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
01343 << " - maximum restart count reached");
01344 std::ostringstream ost1;
01345 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
01346 localLog(ost1.str());
01347 subs_[i].countdown()--;
01348 XCEPT_DECLARE(evf::Exception,
01349 sentinelException, ost1.str());
01350 notifyQualified("error",sentinelException);
01351 subs_[i].disconnect();
01352 continue;
01353 }
01354 subs_[i].restartCount()++;
01355 if (forkInEDM_.value_) {
01356 restartForkInEDM(i);
01357 }
01358 else {
01359 pid_t rr = subs_[i].forkNew();
01360 if(rr==0)
01361 {
01362 myProcess_=&subs_[i];
01363 scalersUpdates_ = 0;
01364 int retval = pthread_mutex_destroy(&stop_lock_);
01365 if(retval != 0) perror("error");
01366 retval = pthread_mutex_init(&stop_lock_,0);
01367 if(retval != 0) perror("error");
01368 fsm_.disableRcmsStateNotification();
01369 fsm_.fireEvent("Stop",this);
01370 fsm_.fireEvent("StopDone",this);
01371 fsm_.fireEvent("Enable",this);
01372 try{
01373 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01374 if(lsid) {
01375 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01376 }
01377 }
01378 catch(...){
01379 std::cout << "trouble with lsindex during restart" << std::endl;
01380 }
01381 try{
01382 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01383 if(lstb) {
01384 ((xdata::Boolean*)(lstb))->value_ = false;
01385 }
01386 }
01387 catch(...){
01388 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01389 }
01390
01391 evtProcessor_.adjustLsIndexForRestart();
01392 evtProcessor_.resetPackedTriggerReport();
01393 enableMPEPSlave();
01394 return false;
01395 }
01396 else
01397 {
01398 std::ostringstream ost1;
01399 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01400 localLog(ost1.str());
01401 }
01402 }
01403 }
01404 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01405 }
01406 }
01407 pthread_mutex_unlock(&stop_lock_);
01408 }
01409 }
01410 xdata::Serializable *lsid = 0;
01411 xdata::Serializable *psid = 0;
01412 xdata::Serializable *dqmp = 0;
01413 xdata::UnsignedInteger32 *dqm = 0;
01414
01415
01416
01417 if(running && edm_init_done_){
01418 try{
01419 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01420 psid = applicationInfoSpace_->find("prescaleSetIndex");
01421 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01422 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01423 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01424 }
01425 catch(xdata::exception::Exception e){
01426 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01427 }
01428
01429 try{
01430 if(nbProcessed !=0 && nbAccepted !=0)
01431 {
01432 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01433 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01434 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01435 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01436 if(dqmp!=0)
01437 dqm = (xdata::UnsignedInteger32*)dqmp;
01438 if(dqm) dqm->value_ = 0;
01439 nbTotalDQM_ = 0;
01440 nbp->value_ = 0;
01441 nba->value_ = 0;
01442 nblive_ = 0;
01443 nbdead_ = 0;
01444 scalersUpdates_ = 0;
01445
01446 for(unsigned int i = 0; i < subs_.size(); i++)
01447 {
01448 if(subs_[i].alive()>0)
01449 {
01450 nblive_++;
01451 try{
01452 subs_[i].post(master_message_prg_,true);
01453
01454 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01455 if(retval == (unsigned long) master_message_prr_->mtype){
01456 prg* p = (struct prg*)(master_message_prr_->mtext);
01457 subs_[i].setParams(p);
01458 spMStates_[i] = p->Ms;
01459 spmStates_[i] = p->ms;
01460 cpustat_->addEntry(p->ms);
01461 if(!subs_[i].inInconsistentState() &&
01462 (p->Ms == edm::event_processor::sError
01463 || p->Ms == edm::event_processor::sInvalid
01464 || p->Ms == edm::event_processor::sStopping))
01465 {
01466 std::ostringstream ost;
01467 ost << "edm::eventprocessor slot " << i << " process id "
01468 << subs_[i].pid() << " not in Running state : Mstate="
01469 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01470 << evtProcessor_.moduleNameFromIndex(p->ms)
01471 << " - Look into possible error messages from HLT process";
01472 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01473 }
01474 nbp->value_ += subs_[i].params().nbp;
01475 nba->value_ += subs_[i].params().nba;
01476 if(dqm)dqm->value_ += p->dqm;
01477 nbTotalDQM_ += p->dqm;
01478 scalersUpdates_ += p->trp;
01479 if(p->ls > ls->value_) ls->value_ = p->ls;
01480 if(p->ps != ps->value_) ps->value_ = p->ps;
01481 }
01482 else{
01483 nbp->value_ += subs_[i].get_save_nbp();
01484 nba->value_ += subs_[i].get_save_nba();
01485 }
01486 }
01487 catch(evf::Exception &e){
01488 LOG4CPLUS_INFO(getApplicationLogger(),
01489 "could not send/receive msg on slot "
01490 << i << " - " << e.what());
01491 }
01492
01493 }
01494 else
01495 {
01496 nbp->value_ += subs_[i].get_save_nbp();
01497 nba->value_ += subs_[i].get_save_nba();
01498 nbdead_++;
01499 }
01500 }
01501 if(nbp->value_>64){
01502 for(unsigned int i = 0; i < subs_.size(); i++)
01503 {
01504 if(subs_[i].params().nbp == 0){
01505
01506 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01507 {
01508 subs_[i].found_invalid();
01509 if(subs_[i].nfound_invalid() > 60){
01510 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01511 subs_[i].post(msg3,false);
01512 std::ostringstream ost1;
01513 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01514 localLog(ost1.str());
01515 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01516 XCEPT_DECLARE(evf::Exception,
01517 sentinelException, ost1.str());
01518 notifyQualified("error",sentinelException);
01519
01520 }
01521 }
01522 }
01523 }
01524 }
01525 }
01526 }
01527 catch(std::exception &e){
01528 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01529 }
01530 catch(...){
01531 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01532 }
01533 }
01534 else{
01535 for(unsigned int i = 0; i < subs_.size(); i++)
01536 {
01537 if(subs_[i].alive()==-1000)
01538 {
01539 spMStates_[i] = edm::event_processor::sInit;
01540 spmStates_[i] = 0;
01541 }
01542 }
01543 }
01544 try{
01545 monitorInfoSpace_->lock();
01546 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01547 monitorInfoSpace_->unlock();
01548 }
01549 catch(xdata::exception::Exception &e)
01550 {
01551 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01552
01553 }
01554 ::sleep(superSleepSec_.value_);
01555 return true;
01556 }
01557
01558 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01559 {
01560 try {
01561 wlScalers_=
01562 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01563 "waiting");
01564 if (!wlScalers_->isActive()) wlScalers_->activate();
01565 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01566 "Scalers");
01567
01568 wlScalers_->submit(asScalers_);
01569 wlScalersActive_ = true;
01570 }
01571 catch (xcept::Exception& e) {
01572 std::string msg = "Failed to start workloop 'Scalers'.";
01573 XCEPT_RETHROW(evf::Exception,msg,e);
01574 }
01575 }
01576
01577
01578
01579 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01580 {
01581 try {
01582 wlSummarize_=
01583 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01584 "waiting");
01585 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01586
01587 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01588 "Summary");
01589
01590 wlSummarize_->submit(asSummarize_);
01591 wlSummarizeActive_ = true;
01592 }
01593 catch (xcept::Exception& e) {
01594 std::string msg = "Failed to start workloop 'Summarize'.";
01595 XCEPT_RETHROW(evf::Exception,msg,e);
01596 }
01597 }
01598
01599
01600
01601 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01602 {
01603 if(evtProcessor_)
01604 {
01605 if(!evtProcessor_.getTriggerReport(true)) {
01606 wlScalersActive_ = false;
01607 return false;
01608 }
01609 }
01610 else
01611 {
01612 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01613 wlScalersActive_ = false;
01614 return false;
01615 }
01616 if(myProcess_)
01617 {
01618
01619 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01620 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01621 scalersUpdates_++;
01622 }
01623 else
01624 evtProcessor_.fireScalersUpdate();
01625 return true;
01626 }
01627
01628
01629 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01630 {
01631 evtProcessor_.resetPackedTriggerReport();
01632 bool atLeastOneProcessUpdatedSuccessfully = false;
01633 int msgCount = 0;
01634 for (unsigned int i = 0; i < subs_.size(); i++)
01635 {
01636 if(subs_[i].alive()>0)
01637 {
01638 int ret = 0;
01639 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01640 evtProcessor_.getLumiSectionReferenceIndex()))
01641 {
01642 ret = MSQS_MESSAGE_TYPE_TRR;
01643 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01644 }
01645 else{
01646 bool insync = false;
01647 bool exception_caught = false;
01648 while(!insync){
01649 try{
01650 ret = subs_[i].rcv(master_message_trr_,false);
01651 }
01652 catch(evf::Exception &e)
01653 {
01654 std::cout << "exception in msgrcv on " << i
01655 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01656 exception_caught = true;
01657 break;
01658
01659 }
01660 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01661 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01662 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01663 insync = true;
01664 }
01665 }
01666 }
01667 if(exception_caught) continue;
01668 }
01669 msgCount++;
01670 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01671 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01672 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01673 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01674 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01675 subs_[i].add_postponed_trigger_update(master_message_trr_);
01676 }else{
01677 atLeastOneProcessUpdatedSuccessfully = true;
01678 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01679 }
01680 }
01681 else std::cout << "msgrcv returned error " << errno << std::endl;
01682 }
01683 }
01684 if(atLeastOneProcessUpdatedSuccessfully){
01685 nbSubProcessesReporting_.value_ = msgCount;
01686 evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01687 evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01688 evtProcessor_.updateRollingReport();
01689 evtProcessor_.fireScalersUpdate();
01690 }
01691 else{
01692 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01693 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01694 nbSubProcessesReporting_.value_ = 0;
01695 ::sleep(10);
01696 }
01697 if(fsm_.stateName()->toString()!="Enabled"){
01698 wlScalersActive_ = false;
01699 return false;
01700 }
01701
01702 if(iDieStatisticsGathering_.value_){
01703 try{
01704 unsigned long long idleTmp=idleProcStats_;
01705 unsigned long long allPSTmp=allProcStats_;
01706 idleProcStats_=allProcStats_=0;
01707
01708 utils::procCpuStat(idleProcStats_,allProcStats_);
01709 timeval oldtime=lastProcReport_;
01710 gettimeofday(&lastProcReport_,0);
01711
01712 if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
01713 cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
01714 int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
01715 + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
01716 cpustat_->setElapsed(deltaTms);
01717 }
01718 else {
01719 cpustat_->setCPUStat(0);
01720 cpustat_->setElapsed(0);
01721 }
01722
01723 TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01724 cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01725 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01726 ratestat_->sendStat((unsigned char*)trsp,
01727 sizeof(TriggerReportStatic),
01728 evtProcessor_.getLumiSectionReferenceIndex());
01729 }catch(evf::Exception &e){
01730 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01731 << e.what());
01732 }
01733 }
01734 cpustat_->reset();
01735 return true;
01736 }
01737
01738
01739
01740 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01741 {
01742 try{
01743 myProcess_->rcvSlave(slave_message_monitoring_,true);
01744 switch(slave_message_monitoring_->mtype)
01745 {
01746 case MSQM_MESSAGE_TYPE_MCS:
01747 {
01748 xgi::Input *in = 0;
01749 xgi::Output out;
01750 evtProcessor_.microState(in,&out);
01751 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01752 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01753 myProcess_->postSlave(msg1,true);
01754 break;
01755 }
01756
01757 case MSQM_MESSAGE_TYPE_PRG:
01758 {
01759 xdata::Serializable *dqmp = 0;
01760 xdata::UnsignedInteger32 *dqm = 0;
01761 evtProcessor_.monitoring(0);
01762 try{
01763 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01764 } catch(xdata::exception::Exception e){}
01765 if(dqmp!=0)
01766 dqm = (xdata::UnsignedInteger32*)dqmp;
01767
01768
01769 prg * data = (prg*)slave_message_prr_->mtext;
01770 data->ls = evtProcessor_.lsid_;
01771 data->eols = evtProcessor_.lastLumiUsingEol_;
01772 data->ps = evtProcessor_.psid_;
01773 data->nbp = evtProcessor_->totalEvents();
01774 data->nba = evtProcessor_->totalEventsPassed();
01775 data->Ms = evtProcessor_.epMAltState_.value_;
01776 data->ms = evtProcessor_.epmAltState_.value_;
01777 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01778 data->trp = scalersUpdates_;
01779
01780 myProcess_->postSlave(slave_message_prr_,true);
01781 if(exitOnError_.value_)
01782 {
01783
01784
01785 if(data->Ms == edm::event_processor::sError)
01786 {
01787 bool running = true;
01788 int count = 0;
01789 while(running){
01790 int retval = pthread_mutex_lock(&stop_lock_);
01791 if(retval != 0) perror("error");
01792 running = fsm_.stateName()->toString()=="Enabled";
01793 if(count>5) _exit(-1);
01794 pthread_mutex_unlock(&stop_lock_);
01795 if(running) {::sleep(1); count++;}
01796 }
01797 }
01798 }
01799 break;
01800 }
01801 case MSQM_MESSAGE_TYPE_WEB:
01802 {
01803 xgi::Input *in = 0;
01804 xgi::Output out;
01805 unsigned int bytesToSend = 0;
01806 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01807 std::string query = slave_message_monitoring_->mtext;
01808 size_t pos = query.find_first_of("&");
01809 std::string method;
01810 std::string args;
01811 if(pos!=std::string::npos)
01812 {
01813 method = query.substr(0,pos);
01814 args = query.substr(pos+1,query.length()-pos-1);
01815 }
01816 else
01817 method=query;
01818
01819 if(method=="Spotlight")
01820 {
01821 spotlightWebPage(in,&out);
01822 }
01823 else if(method=="procStat")
01824 {
01825 procStat(in,&out);
01826 }
01827 else if(method=="moduleWeb")
01828 {
01829 internal::MyCgi mycgi;
01830 boost::char_separator<char> sep(";");
01831 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01832 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01833 tok_iter != tokens.end(); ++tok_iter){
01834 size_t pos = (*tok_iter).find_first_of("%");
01835 if(pos != std::string::npos){
01836 std::string first = (*tok_iter).substr(0 , pos);
01837 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01838 mycgi.getEnvironment()[first]=second;
01839 }
01840 }
01841 moduleWeb(&mycgi,&out);
01842 }
01843 else if(method=="Default")
01844 {
01845 defaultWebPage(in,&out);
01846 }
01847 else
01848 {
01849 out << "Error 404!!!!!!!!" << std::endl;
01850 }
01851
01852
01853 bytesToSend = out.str().size();
01854 unsigned int cycle = 0;
01855 if(bytesToSend==0)
01856 {
01857 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01858 myProcess_->postSlave(msg1,true);
01859 }
01860 while(bytesToSend !=0){
01861 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01862 write(anonymousPipe_[PIPE_WRITE],
01863 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01864 msgSize);
01865 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01866 myProcess_->postSlave(msg1,true);
01867 bytesToSend -= msgSize;
01868 cycle++;
01869 }
01870 break;
01871 }
01872 case MSQM_MESSAGE_TYPE_TRP:
01873 {
01874 break;
01875 }
01876 }
01877 }
01878 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01879 return true;
01880 }
01881
01882 void FUEventProcessor::startSignalMonitorWorkLoop() throw (evf::Exception)
01883 {
01884
01885
01886 try {
01887 wlSignalMonitor_=
01888 toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
01889 "waiting");
01890
01891 if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
01892 asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
01893 "SignalMonitor");
01894 wlSignalMonitor_->submit(asSignalMonitor_);
01895 signalMonitorActive_ = true;
01896 }
01897 catch (xcept::Exception& e) {
01898 std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
01899 std::cout << e.what() << std::endl;
01900 XCEPT_RETHROW(evf::Exception,msg,e);
01901 }
01902 }
01903
01904
01905 bool FUEventProcessor::sigmon(toolbox::task::WorkLoop* wl)
01906 {
01907 while (1) {
01908 sem_wait(sigmon_sem_);
01909 std::cout << " received signal notification from slave!"<< std::endl;
01910
01911
01912 bool running = fsm_.stateName()->toString()=="Enabled";
01913 bool stopping = fsm_.stateName()->toString()=="stopping";
01914 bool enabling = fsm_.stateName()->toString()=="enabling";
01915 if (!running && !enabling) {
01916 signalMonitorActive_ = false;
01917 return false;
01918 }
01919
01920 crashesThisRun_++;
01921 gettimeofday(&lastCrashTime_,0);
01922
01923
01924 if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
01925
01926 rlimit rlold;
01927 getrlimit(RLIMIT_CORE,&rlold);
01928 rlimit rlnew = rlold;
01929 rlnew.rlim_cur=0;
01930 setrlimit(RLIMIT_CORE,&rlnew);
01931 rlimit_coresize_changed_=true;
01932 MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
01933
01934 unsigned int min=1;
01935 for (unsigned int i = min; i < subs_.size(); i++) {
01936 try {
01937 if (subs_[i].alive()) {
01938 subs_[i].post(master_message_rli_,false);
01939 }
01940 }
01941 catch (...) {}
01942 }
01943 std::ostringstream ostr;
01944 ostr << "Number of recent slave crashes reaches " << crashesThisRun_
01945 << ". Disabling core dumps for next 15 minutes in this FilterUnit";
01946 LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
01947 }
01948 }
01949 signalMonitorActive_ = false;
01950 return false;
01951 }
01952
01953
01954 bool FUEventProcessor::enableCommon()
01955 {
01956 try {
01957 if(hasShMem_) attachDqmToShm();
01958 int sc = 0;
01959 evtProcessor_->clearCounters();
01960 if(isRunNumberSetter_)
01961 evtProcessor_->setRunNumber(runNumber_.value_);
01962 else
01963 evtProcessor_->declareRunNumber(runNumber_.value_);
01964
01965 try{
01966 ::sleep(1);
01967 evtProcessor_->runAsync();
01968 sc = evtProcessor_->statusAsync();
01969 }
01970 catch(cms::Exception &e) {
01971 reasonForFailedState_ = e.explainSelf();
01972 fsm_.fireFailed(reasonForFailedState_,this);
01973 localLog(reasonForFailedState_);
01974 return false;
01975 }
01976 catch(std::exception &e) {
01977 reasonForFailedState_ = e.what();
01978 fsm_.fireFailed(reasonForFailedState_,this);
01979 localLog(reasonForFailedState_);
01980 return false;
01981 }
01982 catch(...) {
01983 reasonForFailedState_ = "Unknown Exception";
01984 fsm_.fireFailed(reasonForFailedState_,this);
01985 localLog(reasonForFailedState_);
01986 return false;
01987 }
01988 if(sc != 0) {
01989 std::ostringstream oss;
01990 oss<<"EventProcessor::runAsync returned status code " << sc;
01991 reasonForFailedState_ = oss.str();
01992 fsm_.fireFailed(reasonForFailedState_,this);
01993 localLog(reasonForFailedState_);
01994 return false;
01995 }
01996 }
01997 catch (xcept::Exception &e) {
01998 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01999 fsm_.fireFailed(reasonForFailedState_,this);
02000 localLog(reasonForFailedState_);
02001 return false;
02002 }
02003 try{
02004 fsm_.fireEvent("EnableDone",this);
02005 }
02006 catch (xcept::Exception &e) {
02007 std::cout << "exception " << (std::string)e.what() << std::endl;
02008 throw;
02009 }
02010
02011 return false;
02012 }
02013
02014 void FUEventProcessor::forkProcessFromEDM_helper(void * addr) {
02015 ((FUEventProcessor*)addr)->forkProcessesFromEDM();
02016 }
02017
02018 void FUEventProcessor::forkProcessesFromEDM() {
02019
02020 moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
02021 unsigned int forkFrom=0;
02022 unsigned int forkTo=nbSubProcesses_.value_;
02023 if (forkParams->slotId>=0) {
02024 forkFrom=forkParams->slotId;
02025 forkTo=forkParams->slotId+1;
02026 }
02027
02028
02029 try {
02030 if (sorRef_) {
02031 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02032 for (unsigned int i=0;i<shmOutputs.size();i++) {
02033
02034 shmOutputs[i]->unregisterFromShm();
02035
02036 shmOutputs[i]->stop();
02037 }
02038 }
02039 }
02040 catch (std::exception &e)
02041 {
02042 reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
02043 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02044 fsm_.fireFailed(reasonForFailedState_,this);
02045 localLog(reasonForFailedState_);
02046 }
02047 catch (...) {
02048 reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
02049 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02050 fsm_.fireFailed(reasonForFailedState_,this);
02051 localLog(reasonForFailedState_);
02052 }
02053
02054 std::string currentState = fsm_.stateName()->toString();
02055
02056
02057 if (currentState!="stopping") {
02058 try {
02059 messageServicePresence_.reset();
02060 }
02061 catch (...) {
02062 LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
02063 }
02064 }
02065
02066 if (currentState=="stopping") {
02067 LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
02068 forkParams->isMaster=1;
02069 forkInfoObj_->forkParams.slotId=-1;
02070 forkInfoObj_->forkParams.restart=0;
02071 }
02072
02073 else for(unsigned int i=forkFrom; i<forkTo; i++)
02074 {
02075 int retval = subs_[i].forkNew();
02076 if(retval==0)
02077 {
02078 forkParams->isMaster=0;
02079 myProcess_ = &subs_[i];
02080
02081 delete toolbox::mem::_s_mutex_ptr_;
02082 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
02083 int retval = pthread_mutex_destroy(&stop_lock_);
02084 if(retval != 0) perror("error");
02085 retval = pthread_mutex_init(&stop_lock_,0);
02086 if(retval != 0) perror("error");
02087 fsm_.disableRcmsStateNotification();
02088
02089
02090 try{
02091 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02092 if(pf != 0) {
02093 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02094 }
02095 else {
02096 LOG4CPLUS_ERROR(getApplicationLogger(),
02097 "SLAVE: Unable to create message service presence. pid:"<<getpid());
02098 }
02099 }
02100 catch(...) {
02101 LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
02102 }
02103
02104 ML::MLlog4cplus::setAppl(this);
02105
02106
02107 try {
02108 if (sorRef_) {
02109 std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
02110 for (unsigned int i=0;i<shmOutputs.size();i++)
02111 shmOutputs[i]->start();
02112 }
02113 }
02114 catch (...)
02115 {
02116 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
02117 }
02118
02119 if (forkParams->restart) {
02120
02121 scalersUpdates_ = 0;
02122 try {
02123 fsm_.fireEvent("Stop",this);
02124 fsm_.fireEvent("StopDone",this);
02125 fsm_.fireEvent("Enable",this);
02126 } catch (...) {
02127 LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
02128 }
02129 try{
02130 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
02131 if(lsid) {
02132 ((xdata::UnsignedInteger32*)(lsid))->value_--;
02133 }
02134 }
02135 catch(...){
02136 std::cout << "trouble with lsindex during restart" << std::endl;
02137 }
02138 try{
02139 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
02140 if(lstb) {
02141 ((xdata::Boolean*)(lstb))->value_ = false;
02142 }
02143 }
02144 catch(...){
02145 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
02146 }
02147 evtProcessor_.adjustLsIndexForRestart();
02148 evtProcessor_.resetPackedTriggerReport();
02149 }
02150
02151
02152 startReceivingLoop();
02153 startReceivingMonitorLoop();
02154 startScalersWorkLoop();
02155 while(!evtProcessor_.isWaitingForLs())
02156 ::usleep(100000);
02157
02158
02159 if(hasShMem_) attachDqmToShm();
02160
02161
02162 try {
02163 fsm_.fireEvent("EnableDone",this);
02164 }
02165 catch (...) {}
02166
02167
02168 while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
02169
02170
02171 sigset_t tmpset_thread;
02172 sigemptyset(&tmpset_thread);
02173 sigaddset(&tmpset_thread, SIGQUIT);
02174 sigaddset(&tmpset_thread, SIGILL);
02175 sigaddset(&tmpset_thread, SIGABRT);
02176 sigaddset(&tmpset_thread, SIGFPE);
02177 sigaddset(&tmpset_thread, SIGSEGV);
02178 sigaddset(&tmpset_thread, SIGALRM);
02179
02180 pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
02181
02182
02183 struct sigaction sa;
02184 sigset_t tmpset;
02185 memset(&tmpset,0,sizeof(tmpset));
02186 sigemptyset(&tmpset);
02187 sa.sa_mask=tmpset;
02188 sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
02189 sa.sa_handler=0;
02190 sa.sa_sigaction=evfep_sighandler;
02191
02192 sigaction(SIGQUIT,&sa,0);
02193 sigaction(SIGILL,&sa,0);
02194 sigaction(SIGABRT,&sa,0);
02195 sigaction(SIGFPE,&sa,0);
02196 sigaction(SIGSEGV,&sa,0);
02197 sa.sa_sigaction=evfep_alarmhandler;
02198 sigaction(SIGALRM,&sa,0);
02199
02200
02201 return ;
02202 }
02203 else {
02204
02205 forkParams->isMaster=1;
02206 forkInfoObj_->forkParams.slotId=-1;
02207 if (forkParams->restart) {
02208 std::ostringstream ost1;
02209 ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
02210 localLog(ost1.str());
02211 }
02212 forkInfoObj_->forkParams.restart=0;
02213
02214 }
02215 }
02216
02217
02218 try{
02219
02220 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02221 if(pf != 0) {
02222 messageServicePresence_ = pf->makePresence("MessageServicePresence");
02223 }
02224 else {
02225 LOG4CPLUS_ERROR(getApplicationLogger(),
02226 "Unable to recreate message service presence ");
02227 }
02228 }
02229 catch(...) {
02230 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
02231 }
02232 restart_in_progress_=false;
02233 edm_init_done_=true;
02234 }
02235
02236 bool FUEventProcessor::enableForkInEDM()
02237 {
02238 evtProcessor_.resetWaiting();
02239 try {
02240
02241
02242
02243 int sc = 0;
02244
02245 evtProcessor_->clearCounters();
02246 if(isRunNumberSetter_)
02247 evtProcessor_->setRunNumber(runNumber_.value_);
02248 else
02249 evtProcessor_->declareRunNumber(runNumber_.value_);
02250
02251
02252 pthread_mutex_destroy(&forkObjLock_);
02253 pthread_mutex_init(&forkObjLock_,0);
02254 if (forkInfoObj_) delete forkInfoObj_;
02255 forkInfoObj_ = new moduleweb::ForkInfoObj();
02256 forkInfoObj_->mst_lock_=&forkObjLock_;
02257 forkInfoObj_->fuAddr=(void*)this;
02258 forkInfoObj_->forkHandler = forkProcessFromEDM_helper;
02259 forkInfoObj_->forkParams.slotId=-1;
02260 forkInfoObj_->forkParams.restart=0;
02261 forkInfoObj_->forkParams.isMaster=-1;
02262 forkInfoObj_->stopCondition=0;
02263 if (mwrRef_)
02264 mwrRef_->publishForkInfo(std::string("DaqSource"),forkInfoObj_);
02265
02266 evtProcessor_->runAsync();
02267 sc = evtProcessor_->statusAsync();
02268
02269 if(sc != 0) {
02270 std::ostringstream oss;
02271 oss<<"EventProcessor::runAsync returned status code " << sc;
02272 reasonForFailedState_ = oss.str();
02273 fsm_.fireFailed(reasonForFailedState_,this);
02274 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02275 return false;
02276 }
02277 }
02278
02279 catch(cms::Exception &e) {
02280 reasonForFailedState_ = e.explainSelf();
02281 fsm_.fireFailed(reasonForFailedState_,this);
02282 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02283 return false;
02284 }
02285 catch(std::exception &e) {
02286 reasonForFailedState_ = e.what();
02287 fsm_.fireFailed(reasonForFailedState_,this);
02288 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02289 return false;
02290 }
02291 catch(...) {
02292 reasonForFailedState_ = "Unknown Exception";
02293 fsm_.fireFailed(reasonForFailedState_,this);
02294 LOG4CPLUS_FATAL(log_,reasonForFailedState_);
02295 return false;
02296 }
02297 return true;
02298 }
02299
02300 bool FUEventProcessor::restartForkInEDM(unsigned int slotId) {
02301
02302
02303 forkInfoObj_->lock();
02304 forkInfoObj_->forkParams.slotId=slotId;
02305 forkInfoObj_->forkParams.restart=true;
02306 forkInfoObj_->forkParams.isMaster=1;
02307 forkInfoObj_->stopCondition=0;
02308 LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
02309 sem_post(forkInfoObj_->control_sem_);
02310 forkInfoObj_->unlock();
02311 usleep(1000);
02312
02313 int count=50;
02314 restart_in_progress_=true;
02315 while (restart_in_progress_ && count--) usleep(20000);
02316 return true;
02317 }
02318
02319 bool FUEventProcessor::enableClassic()
02320 {
02321 bool retval = enableCommon();
02322 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
02323 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
02324 ::sleep(1);
02325 }
02326
02327
02328
02329 localLog("-I- Start completed");
02330 return retval;
02331 }
02332 bool FUEventProcessor::enableMPEPSlave()
02333 {
02334
02335
02336 startReceivingLoop();
02337 startReceivingMonitorLoop();
02338 evtProcessor_.resetWaiting();
02339 startScalersWorkLoop();
02340 while(!evtProcessor_.isWaitingForLs())
02341 ::sleep(1);
02342
02343
02344
02345 try{
02346
02347 try{
02348 edm::PresenceFactory *pf = edm::PresenceFactory::get();
02349 if(pf != 0) {
02350 pf->makePresence("MessageServicePresence").release();
02351 }
02352 else {
02353 LOG4CPLUS_ERROR(getApplicationLogger(),
02354 "Unable to create message service presence ");
02355 }
02356 }
02357 catch(...) {
02358 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
02359 }
02360 ML::MLlog4cplus::setAppl(this);
02361 }
02362 catch (xcept::Exception &e) {
02363 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
02364 fsm_.fireFailed(reasonForFailedState_,this);
02365 localLog(reasonForFailedState_);
02366 }
02367 bool retval = enableCommon();
02368
02369
02370
02371
02372 return retval;
02373 }
02374
02375 bool FUEventProcessor::stopClassic()
02376 {
02377 bool failed=false;
02378 try {
02379 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
02380 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
02381 if(rc == edm::EventProcessor::epSuccess)
02382 fsm_.fireEvent("StopDone",this);
02383 else
02384 {
02385 failed=true;
02386
02387 if(rc == edm::EventProcessor::epTimedOut)
02388 reasonForFailedState_ = "EventProcessor stop timed out";
02389 else
02390 reasonForFailedState_ = "EventProcessor did not receive STOP event";
02391 }
02392 }
02393 catch (xcept::Exception &e) {
02394 failed=true;
02395 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02396 }
02397 catch (edm::Exception &e) {
02398 failed=true;
02399 reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
02400 }
02401 catch (...) {
02402 failed=true;
02403 reasonForFailedState_= "Stopping FAILED: unknown exception";
02404 }
02405 try {
02406 if (hasShMem_) {
02407 detachDqmFromShm();
02408 if (failed)
02409 LOG4CPLUS_WARN(getApplicationLogger(),
02410 "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
02411 }
02412 }
02413 catch (cms::Exception & e) {
02414 failed=true;
02415 reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
02416 }
02417 catch (...) {
02418 failed=true;
02419 reasonForFailedState_= "DQM detach failed: Unknown exception";
02420 }
02421
02422 if (failed) {
02423 LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
02424 << reasonForFailedState_ << " (pid:" << getpid()<<")");
02425 localLog(reasonForFailedState_);
02426 fsm_.fireFailed(reasonForFailedState_,this);
02427 }
02428
02429 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
02430 localLog("-I- Stop completed");
02431 return false;
02432 }
02433
02434 void FUEventProcessor::stopSlavesAndAcknowledge()
02435 {
02436 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
02437 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
02438
02439 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
02440 for(unsigned int i = 0; i < subs_.size(); i++)
02441 {
02442 pthread_mutex_lock(&stop_lock_);
02443 if(subs_[i].alive()>0){
02444 processes_to_stop[i] = true;
02445 subs_[i].post(msg,false);
02446 }
02447 pthread_mutex_unlock(&stop_lock_);
02448 }
02449 for(unsigned int i = 0; i < subs_.size(); i++)
02450 {
02451 pthread_mutex_lock(&stop_lock_);
02452 if(processes_to_stop[i]){
02453 try{
02454 subs_[i].rcv(msg1,false);
02455 }
02456 catch(evf::Exception &e){
02457 std::ostringstream ost;
02458 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
02459 reasonForFailedState_ = ost.str();
02460 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
02461
02462 localLog(reasonForFailedState_);
02463 pthread_mutex_unlock(&stop_lock_);
02464 continue;
02465 }
02466 }
02467 else {
02468 pthread_mutex_unlock(&stop_lock_);
02469 continue;
02470 }
02471 pthread_mutex_unlock(&stop_lock_);
02472 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
02473 while(subs_[i].alive()>0) ::usleep(10000);
02474 subs_[i].disconnect();
02475 }
02476
02477
02478 }
02479
02480 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
02481 {
02482 std::string urn = getApplicationDescriptor()->getURN();
02483 try{
02484 evtProcessor_.stateNameFromIndex(0);
02485 evtProcessor_.moduleNameFromIndex(0);
02486 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
02487 *out << "<tr><td>" << fsm_.stateName()->toString()
02488 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
02489 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
02490 evtProcessor_.microState(in,out);
02491 *out << "<td></td><td>" << nbTotalDQM_
02492 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
02493 if(nbSubProcesses_.value_!=0 && !myProcess_)
02494 {
02495 pthread_mutex_lock(&start_lock_);
02496 for(unsigned int i = 0; i < subs_.size(); i++)
02497 {
02498 try{
02499 if(subs_[i].alive()>0)
02500 {
02501 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
02502 << i << "\">""Alive</td><td>S</td><td>"
02503 << subs_[i].queueId() << "<td>"
02504 << subs_[i].queueStatus()<< "/"
02505 << subs_[i].queueOccupancy() << "/"
02506 << subs_[i].queuePidOfLastSend() << "/"
02507 << subs_[i].queuePidOfLastReceive()
02508 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
02509 << subs_[i].pid() << "&method=procStat\">"
02510 << subs_[i].pid()<<"</a></td>"
02511 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
02512 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
02513 << subs_[i].params().nba << "/" << subs_[i].params().nbp
02514 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
02515 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
02516 << "</td><td>" << subs_[i].params().ps
02517 << "</td><td"
02518 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
02519 << subs_[i].params().eols
02520 << "</td><td>" << subs_[i].params().dqm
02521 << "</td><td>" << subs_[i].params().trp << "</td>";
02522 }
02523 else
02524 {
02525 pthread_mutex_lock(&pickup_lock_);
02526 *out << "<tr><td id=\"a"<< i << "\" ";
02527 if(subs_[i].alive()==-1000)
02528 *out << " bgcolor=\"#bbaabb\">NotInitialized";
02529 else
02530 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
02531 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02532 << subs_[i].queueStatus() << "/"
02533 << subs_[i].queueOccupancy() << "/"
02534 << subs_[i].queuePidOfLastSend() << "/"
02535 << subs_[i].queuePidOfLastReceive()
02536 << "</td><td id=\"p"<< i << "\">"
02537 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
02538 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
02539 {
02540 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
02541 *out << " will restart in " << subs_[i].countdown() << " s";
02542 else if(autoRestartSlaves_)
02543 *out << " reached maximum restart count";
02544 else *out << " autoRestart is disabled ";
02545 }
02546 *out << "</td><td"
02547 << ((subs_[i].params().eols<subs_[i].params().ls) ?
02548 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
02549 << ">"
02550 << subs_[i].params().eols
02551 << "</td><td>" << subs_[i].params().dqm
02552 << "</td><td>" << subs_[i].params().trp << "</td>";
02553 pthread_mutex_unlock(&pickup_lock_);
02554 }
02555 *out << "</tr>";
02556 }
02557 catch(evf::Exception &e){
02558 *out << "<tr><td id=\"a"<< i << "\" "
02559 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
02560 << subs_[i].queueStatus() << "/"
02561 << subs_[i].queueOccupancy() << "/"
02562 << subs_[i].queuePidOfLastSend() << "/"
02563 << subs_[i].queuePidOfLastReceive()
02564 << "</td><td id=\"p"<< i << "\">"
02565 <<subs_[i].pid()<<"</td>";
02566 }
02567 }
02568 pthread_mutex_unlock(&start_lock_);
02569 }
02570 }
02571 catch(evf::Exception &e)
02572 {
02573 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
02574 }
02575 catch(cms::Exception &e)
02576 {
02577 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
02578 }
02579 catch(std::exception &e)
02580 {
02581 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
02582 }
02583 catch(...)
02584 {
02585 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
02586 }
02587
02588 }
02589
02590
02591 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
02592 {
02593 using namespace utils;
02594
02595 *out << updaterStatic_;
02596 mDiv(out,"loads");
02597 uptime(out);
02598 cDiv(out);
02599 mDiv(out,"st",fsm_.stateName()->toString());
02600 mDiv(out,"ru",runNumber_.toString());
02601 mDiv(out,"nsl",nbSubProcesses_.value_);
02602 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
02603 mDiv(out,"cl");
02604 *out << getApplicationDescriptor()->getClassName()
02605 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
02606 cDiv(out);
02607 mDiv(out,"in",getApplicationDescriptor()->getInstance());
02608 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
02609 mDiv(out,"hlt");
02610 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
02611 cDiv(out);
02612 *out << std::endl;
02613 }
02614 else
02615 mDiv(out,"hlt","Not yet...");
02616
02617 mDiv(out,"sq",squidPresent_.toString());
02618 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
02619 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
02620 if(nbProcessed != 0 && nbAccepted != 0)
02621 {
02622 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
02623 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
02624 }
02625 else
02626 {
02627 mDiv(out,"tt",0);
02628 mDiv(out,"ac",0);
02629 }
02630 if(!myProcess_)
02631 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
02632 else
02633 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
02634
02635 mDiv(out,"idi",iDieUrl_.value_);
02636 if(vp_!=0){
02637 mDiv(out,"vpi",(unsigned int) vp_);
02638 if(vulture_->hasStarted()>=0)
02639 mDiv(out,"vul","Prowling");
02640 else
02641 mDiv(out,"vul","Dead");
02642 }
02643 else{
02644 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
02645 }
02646 if(evtProcessor_){
02647 mDiv(out,"ll");
02648 *out << evtProcessor_.lastLumi().ls
02649 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
02650 cDiv(out);
02651 }
02652 mDiv(out,"lg");
02653 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
02654 *out << logRing_[i] << std::endl;
02655 if(logWrap_)
02656 for(unsigned int i = 0; i<logRingIndex_; i++)
02657 *out << logRing_[i] << std::endl;
02658 cDiv(out);
02659 mDiv(out,"cha");
02660 if(cpustat_) *out << cpustat_->getChart();
02661 cDiv(out);
02662 }
02663
02664 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
02665 {
02666 evf::utils::procStat(out);
02667 }
02668
02669 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
02670 {
02671 if(myProcess_) myProcess_->postSlave(buf,true);
02672 }
02673
02674 void FUEventProcessor::makeStaticInfo()
02675 {
02676 using namespace utils;
02677 std::ostringstream ost;
02678 mDiv(&ost,"ve");
02679 ost<< "$Revision: 1.162 $ (" << edm::getReleaseVersion() <<")";
02680 cDiv(&ost);
02681 mDiv(&ost,"ou",outPut_.toString());
02682 mDiv(&ost,"sh",hasShMem_.toString());
02683 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
02684 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
02685
02686 xdata::Serializable *monsleep = 0;
02687 xdata::Serializable *lstimeout = 0;
02688 try{
02689 monsleep = applicationInfoSpace_->find("monSleepSec");
02690 lstimeout = applicationInfoSpace_->find("lsTimeOut");
02691 }
02692 catch(xdata::exception::Exception e){
02693 }
02694
02695 if(monsleep!=0)
02696 mDiv(&ost,"ms",monsleep->toString());
02697 if(lstimeout!=0)
02698 mDiv(&ost,"lst",lstimeout->toString());
02699 char cbuf[sizeof(struct utsname)];
02700 struct utsname* buf = (struct utsname*)cbuf;
02701 uname(buf);
02702 mDiv(&ost,"sysinfo");
02703 ost << buf->sysname << " " << buf->nodename
02704 << " " << buf->release << " " << buf->version << " " << buf->machine;
02705 cDiv(&ost);
02706 updaterStatic_ = ost.str();
02707 }
02708
02709 void FUEventProcessor::handleSignalSlave(int sig, siginfo_t* info, void* c)
02710 {
02711
02712 sem_post(sigmon_sem_);
02713
02714
02715 sleep(2);
02716
02717
02718 alarm(5);
02719
02720 std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
02721 std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
02722 std::cout << "--- Stacktrace follows --" << std::endl;
02723 std::ostringstream stacktr;
02724 toolbox::stacktrace(20,stacktr);
02725 std::cout << stacktr.str();
02726 if (!rlimit_coresize_changed_)
02727 std::cout << "--- Dumping core." << " --- " << std::endl;
02728 else
02729 std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
02730
02731 std::string hasdump = "";
02732 if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
02733
02734 LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
02735 << " on node " << toolbox::net::getHostName() << " ---" << std::endl
02736 << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
02737 << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
02738 );
02739
02740
02741 raise(sig);
02742 }
02743
02744
02745 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)