00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019
00020
00021 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00022 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00023 #include "FWCore/Utilities/interface/Presence.h"
00024 #include "FWCore/Utilities/interface/Exception.h"
00025 #include "FWCore/Version/interface/GetReleaseVersion.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027
00028 #include "toolbox/BSem.h"
00029
00030 #include <boost/tokenizer.hpp>
00031
00032 #include "xcept/tools.h"
00033 #include "xgi/Method.h"
00034
00035 #include "cgicc/CgiDefs.h"
00036 #include "cgicc/Cgicc.h"
00037 #include "cgicc/FormEntry.h"
00038
00039
00040 #include <sys/wait.h>
00041 #include <sys/utsname.h>
00042
00043 #include <typeinfo>
00044 #include <stdlib.h>
00045 #include <stdio.h>
00046 #include <errno.h>
00047
00048 using namespace evf;
00049 using namespace cgicc;
00050 namespace toolbox {
00051 namespace mem {
00052 extern toolbox::BSem * _s_mutex_ptr_;
00053 }
00054 }
00055
00057
00059
00060
00061 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00062 : xdaq::Application(s)
00063 , fsm_(this)
00064 , log_(getApplicationLogger())
00065 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00066 , runNumber_(0)
00067 , epInitialized_(false)
00068 , outPut_(true)
00069 , autoRestartSlaves_(false)
00070 , slaveRestartDelaySecs_(10)
00071 , hasShMem_(true)
00072 , hasPrescaleService_(true)
00073 , hasModuleWebRegistry_(true)
00074 , hasServiceWebRegistry_(true)
00075 , isRunNumberSetter_(true)
00076 , iDieStatisticsGathering_(false)
00077 , outprev_(true)
00078 , exitOnError_(true)
00079 , reasonForFailedState_()
00080 , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00081 , logRing_(logRingSize_)
00082 , logRingIndex_(logRingSize_)
00083 , logWrap_(false)
00084 , nbSubProcesses_(0)
00085 , nbSubProcessesReporting_(0)
00086 , nblive_(0)
00087 , nbdead_(0)
00088 , nbTotalDQM_(0)
00089 , wlReceiving_(0)
00090 , asReceiveMsgAndExecute_(0)
00091 , receiving_(false)
00092 , wlReceivingMonitor_(0)
00093 , asReceiveMsgAndRead_(0)
00094 , receivingM_(false)
00095 , myProcess_(0)
00096 , wlSupervising_(0)
00097 , asSupervisor_(0)
00098 , supervising_(false)
00099 , monitorInfoSpace_(0)
00100 , monitorLegendaInfoSpace_(0)
00101 , applicationInfoSpace_(0)
00102 , nbProcessed(0)
00103 , nbAccepted(0)
00104 , scalersInfoSpace_(0)
00105 , scalersLegendaInfoSpace_(0)
00106 , wlScalers_(0)
00107 , asScalers_(0)
00108 , wlScalersActive_(false)
00109 , scalersUpdates_(0)
00110 , wlSummarize_(0)
00111 , asSummarize_(0)
00112 , wlSummarizeActive_(false)
00113 , superSleepSec_(1)
00114 , iDieUrl_("none")
00115 , vulture_(0)
00116 , vp_(0)
00117 , cpustat_(0)
00118 , ratestat_(0)
00119 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00120 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00121 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00122 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00123 {
00124 using namespace utils;
00125
00126 names_.push_back("nbProcessed" );
00127 names_.push_back("nbAccepted" );
00128 names_.push_back("epMacroStateInt");
00129 names_.push_back("epMicroStateInt");
00130
00131 int retpipe = pipe(anonymousPipe_);
00132 if(retpipe != 0)
00133 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00134
00135 squidPresent_ = squidnet_.check();
00136
00137 evtProcessor_.setAppDesc(getApplicationDescriptor());
00138 evtProcessor_.setAppCtxt(getApplicationContext());
00139
00140 fsm_.initialize<evf::FUEventProcessor>(this);
00141
00142
00143 url_ =
00144 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00145 getApplicationDescriptor()->getURN();
00146 class_ =getApplicationDescriptor()->getClassName();
00147 instance_=getApplicationDescriptor()->getInstance();
00148 sourceId_=class_.toString()+"_"+instance_.toString();
00149 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00150 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00151
00152 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00153
00154 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155 applicationInfoSpace_ = ispace;
00156
00157
00158 ispace->fireItemAvailable("parameterSet", &configString_ );
00159 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00160 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00161 ispace->fireItemAvailable("runNumber", &runNumber_ );
00162 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00163
00164 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00165 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00166 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00167 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00168 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00169 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00170 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00171 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00172 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00173 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00174 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00175 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00176 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00177 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00178
00179
00180 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00181 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00182
00183
00184 fsm_.findRcmsStateListener();
00185
00186
00187
00188 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00189 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00190 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00191
00192 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00193 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00194 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00195
00196
00197 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00198 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00199 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00200 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00201 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00202
00203 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00204
00205 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00206 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00207 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00208
00209 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00210 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00211 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00212
00213
00214
00215 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00216 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00217
00218 evtProcessor_.setApplicationInfoSpace(ispace);
00219 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00220 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00221
00222
00223 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00224 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00225
00226
00227 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00228 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00229 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00230 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00231 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00232 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00233 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00234 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00235 xgi::bind(this, &FUEventProcessor::microState, "microState");
00236 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00237 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00238
00239
00240
00241 edm::AssertHandler ah;
00242
00243 try{
00244 LOG4CPLUS_DEBUG(getApplicationLogger(),
00245 "Trying to create message service presence ");
00246 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00247 if(pf != 0) {
00248 pf->makePresence("MessageServicePresence").release();
00249 }
00250 else {
00251 LOG4CPLUS_ERROR(getApplicationLogger(),
00252 "Unable to create message service presence ");
00253 }
00254 }
00255 catch(...) {
00256 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00257 }
00258 ML::MLlog4cplus::setAppl(this);
00259
00260 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00261 typedef AppDescSet_t::iterator AppDescIter_t;
00262
00263 AppDescSet_t rcms=
00264 getApplicationContext()->getDefaultZone()->
00265 getApplicationDescriptors("RCMSStateListener");
00266 if(rcms.size()==0)
00267 {
00268 LOG4CPLUS_WARN(getApplicationLogger(),
00269 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00270
00271 }
00272 else
00273 {
00274 AppDescIter_t it = rcms.begin();
00275 evtProcessor_.setRcms(*it);
00276 }
00277 pthread_mutex_init(&start_lock_,0);
00278 pthread_mutex_init(&stop_lock_,0);
00279 pthread_mutex_init(&pickup_lock_,0);
00280
00281 makeStaticInfo();
00282 startSupervisorLoop();
00283
00284 if(vulture_==0) vulture_ = new Vulture(true);
00285
00287
00288 AppDescSet_t setOfiDie=
00289 getApplicationContext()->getDefaultZone()->
00290 getApplicationDescriptors("evf::iDie");
00291
00292 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00293 if ((*it)->getInstance()==0)
00294 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00295 }
00296
00297
00298
00299
00300 FUEventProcessor::~FUEventProcessor()
00301 {
00302
00303
00304
00305 if(vulture_ != 0) delete vulture_;
00306 }
00307
00308
00310
00312
00313
00314
00315 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00316 {
00317
00318
00319
00320
00321
00322 unsigned short smap
00323 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00324 + ((instance_.value_==0) ? 0x8 : 0)
00325 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00326 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00327 + (hasPrescaleService_.value_ ? 0x1 : 0);
00328 if(nbSubProcesses_.value_==0)
00329 {
00330 spMStates_.setSize(1);
00331 spmStates_.setSize(1);
00332 }
00333 else
00334 {
00335 spMStates_.setSize(nbSubProcesses_.value_);
00336 spmStates_.setSize(nbSubProcesses_.value_);
00337 for(unsigned int i = 0; i < spMStates_.size(); i++)
00338 {
00339 spMStates_[i] = edm::event_processor::sInit;
00340 spmStates_[i] = 0;
00341 }
00342 }
00343 try {
00344 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00345 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00346 epInitialized_=true;
00347 if(evtProcessor_)
00348 {
00349
00350 configuration_ = evtProcessor_.configuration();
00351 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00352 evtProcessor_->beginJob();
00353 if(cpustat_) {delete cpustat_; cpustat_=0;}
00354 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00355 iDieUrl_.value_);
00356 if(ratestat_) {delete ratestat_; ratestat_=0;}
00357 ratestat_ = new RateStat(iDieUrl_.value_);
00358 if(iDieStatisticsGathering_.value_){
00359 try{
00360 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00361 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00362 if(legenda !=0){
00363 std::string slegenda = ((xdata::String*)legenda)->value_;
00364 ratestat_->sendLegenda(slegenda);
00365 }
00366
00367 }
00368 catch(evf::Exception &e){
00369 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00370 << e.what());
00371 }
00372 }
00373
00374 fsm_.fireEvent("ConfigureDone",this);
00375 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00376 localLog("-I- Configuration completed");
00377
00378 }
00379 }
00380 catch (xcept::Exception &e) {
00381 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00382 fsm_.fireFailed(reasonForFailedState_,this);
00383 localLog(reasonForFailedState_);
00384 }
00385 catch(cms::Exception &e) {
00386 reasonForFailedState_ = e.explainSelf();
00387 fsm_.fireFailed(reasonForFailedState_,this);
00388 localLog(reasonForFailedState_);
00389 }
00390 catch(std::exception &e) {
00391 reasonForFailedState_ = e.what();
00392 fsm_.fireFailed(reasonForFailedState_,this);
00393 localLog(reasonForFailedState_);
00394 }
00395 catch(...) {
00396 fsm_.fireFailed("Unknown Exception",this);
00397 }
00398
00399 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00400
00401 return false;
00402 }
00403
00404
00405
00406
00407
00408 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00409 {
00410 nbTotalDQM_ = 0;
00411 scalersUpdates_ = 0;
00412
00413
00414
00415
00416
00417 unsigned short smap
00418 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00419 + ((instance_.value_==0) ? 0x8 : 0)
00420 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00421 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00422 + (hasPrescaleService_.value_ ? 0x1 : 0);
00423
00424 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00425 if(!epInitialized_){
00426 evtProcessor_.forceInitEventProcessorMaybe();
00427 }
00428 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00429
00430 if(!epInitialized_){
00431 evtProcessor_->beginJob();
00432 if(cpustat_) {delete cpustat_; cpustat_=0;}
00433 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00434 iDieUrl_.value_);
00435 if(iDieStatisticsGathering_.value_){
00436 try{
00437 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00438 xdata::Serializable *legenda = scalersInfoSpace_->find("scalersLegenda");
00439 if(legenda !=0){
00440 std::string slegenda = ((xdata::String*)legenda)->value_;
00441 ratestat_->sendLegenda(slegenda);
00442 }
00443 }
00444 catch(evf::Exception &e){
00445 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00446 << e.what());
00447 }
00448 }
00449 if(ratestat_) {delete ratestat_; ratestat_=0;}
00450 ratestat_ = new RateStat(iDieUrl_.value_);
00451 epInitialized_ = true;
00452 }
00453 configuration_ = evtProcessor_.configuration();
00454 evtProcessor_.resetLumiSectionReferenceIndex();
00455
00456 if(nbSubProcesses_.value_==0) return enableClassic();
00457
00458 pthread_mutex_lock(&start_lock_);
00459 subs_.clear();
00460 subs_.resize(nbSubProcesses_.value_);
00461 pid_t retval = -1;
00462 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00463 {
00464 subs_[i]=SubProcess(i,retval);
00465 }
00466 pthread_mutex_unlock(&start_lock_);
00467
00468 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00469 {
00470 retval = subs_[i].forkNew();
00471 if(retval==0)
00472 {
00473 myProcess_ = &subs_[i];
00474
00475 delete toolbox::mem::_s_mutex_ptr_;
00476 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00477 int retval = pthread_mutex_destroy(&stop_lock_);
00478 if(retval != 0) perror("error");
00479 retval = pthread_mutex_init(&stop_lock_,0);
00480 if(retval != 0) perror("error");
00481 fsm_.disableRcmsStateNotification();
00482 return enableMPEPSlave();
00483
00484 }
00485 }
00486
00487 startSummarizeWorkLoop();
00488 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00489
00490 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00491 fsm_.fireEvent("EnableDone",this);
00492 localLog("-I- Start completed");
00493 return false;
00494 }
00495
00496
00497
00498 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00499 {
00500 if(nbSubProcesses_.value_!=0)
00501 stopSlavesAndAcknowledge();
00502 vulture_->stop();
00503 return stopClassic();
00504 }
00505
00506
00507
00508 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00509 {
00510 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00511 if(nbSubProcesses_.value_!=0)
00512 stopSlavesAndAcknowledge();
00513 try{
00514 evtProcessor_.stopAndHalt();
00515 }
00516 catch (evf::Exception &e) {
00517 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00518 localLog(reasonForFailedState_);
00519 fsm_.fireFailed(reasonForFailedState_,this);
00520 }
00521
00522
00523 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00524 fsm_.fireEvent("HaltDone",this);
00525
00526 localLog("-I- Halt completed");
00527 return false;
00528 }
00529
00530
00531
00532 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00533 throw (xoap::exception::Exception)
00534 {
00535 return fsm_.commandCallback(msg);
00536 }
00537
00538
00539
00540 void FUEventProcessor::actionPerformed(xdata::Event& e)
00541 {
00542
00543 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00544 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00545
00546 if ( item == "parameterSet") {
00547 LOG4CPLUS_WARN(getApplicationLogger(),
00548 "HLT Menu changed, force reinitialization of EventProcessor");
00549 epInitialized_ = false;
00550 }
00551 if ( item == "outputEnabled") {
00552 if(outprev_ != outPut_) {
00553 LOG4CPLUS_WARN(getApplicationLogger(),
00554 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00555 evtProcessor_->enableEndPaths(outPut_);
00556 outprev_ = outPut_;
00557 }
00558 }
00559 if (item == "globalInputPrescale") {
00560 LOG4CPLUS_WARN(getApplicationLogger(),
00561 "Setting global input prescale has no effect "
00562 <<"in this version of the code");
00563 }
00564 if ( item == "globalOutputPrescale") {
00565 LOG4CPLUS_WARN(getApplicationLogger(),
00566 "Setting global output prescale has no effect "
00567 <<"in this version of the code");
00568 }
00569 }
00570
00571 }
00572
00573
00574 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00575 {
00576 using namespace cgicc;
00577 pid_t pid = 0;
00578 std::ostringstream ost;
00579 ost << "&";
00580
00581 Cgicc cgi(in);
00582 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00583 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00584 mycgi->getEnvironment().begin();
00585 mit != mycgi->getEnvironment().end(); mit++)
00586 ost << mit->first << "%" << mit->second << ";";
00587 std::vector<FormEntry> els = cgi.getElements() ;
00588 std::vector<FormEntry> el1;
00589 cgi.getElement("method",el1);
00590 std::vector<FormEntry> el2;
00591 cgi.getElement("process",el2);
00592 if(el1.size()!=0) {
00593 std::string meth = el1[0].getValue();
00594 if(el2.size()!=0) {
00595 unsigned int i = 0;
00596 std::string mod = el2[0].getValue();
00597 pid = atoi(mod.c_str());
00598 for(; i < subs_.size(); i++)
00599 if(subs_[i].pid()==pid) break;
00600 if(i>=subs_.size()){
00601 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00602 return;
00603 }
00604 if(subs_[i].alive() != 1){
00605 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00606 return;
00607 }
00608 MsgBuf msg1(meth.length()+ost.str().length(),MSQM_MESSAGE_TYPE_WEB);
00609 strcpy(msg1->mtext,meth.c_str());
00610 strcpy(msg1->mtext+meth.length(),ost.str().c_str());
00611 subs_[i].post(msg1,true);
00612 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00613 superSleepSec_.value_=10*keep_supersleep_original_value;
00614 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00615 bool done = false;
00616 std::vector<char *>pieces;
00617 while(!done){
00618 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00619 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00620 ::sleep(1);
00621 continue;
00622 }
00623 unsigned int nbytes = atoi(msg->mtext);
00624 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00625 char *buf= new char[nbytes];
00626 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00627 if(retval!=nbytes) std::cout
00628 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00629 pieces.push_back(buf);
00630 }
00631 superSleepSec_.value_=keep_supersleep_original_value;
00632 for(unsigned int j = 0; j < pieces.size(); j++){
00633 *out<<pieces[j];
00634 delete[] pieces[j];
00635 }
00636 }
00637 }
00638 }
00639
00640
00641
00642 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00643 throw (xgi::exception::Exception)
00644 {
00645
00646
00647 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00648 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00649 << getApplicationDescriptor()->getInstance() << "</title>"
00650 << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00651 << "</head></html>";
00652 }
00653
00654
00655
00656
00657
00658 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00659 throw (xgi::exception::Exception)
00660 {
00661
00662 std::string urn = getApplicationDescriptor()->getURN();
00663
00664 *out << "<!-- base href=\"/" << urn
00665 << "\"> -->" << std::endl;
00666 *out << "<html>" << std::endl;
00667 *out << "<head>" << std::endl;
00668 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00669 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00670 *out << "<title>" << getApplicationDescriptor()->getClassName()
00671 << getApplicationDescriptor()->getInstance()
00672 << " MAIN</title>" << std::endl;
00673 *out << "</head>" << std::endl;
00674 *out << "<body>" << std::endl;
00675 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00676 *out << "<tr>" << std::endl;
00677 *out << " <td align=\"left\">" << std::endl;
00678 *out << " <img" << std::endl;
00679 *out << " align=\"middle\"" << std::endl;
00680 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00681 *out << " alt=\"main\"" << std::endl;
00682 *out << " width=\"64\"" << std::endl;
00683 *out << " height=\"64\"" << std::endl;
00684 *out << " border=\"\"/>" << std::endl;
00685 *out << " <b>" << std::endl;
00686 *out << getApplicationDescriptor()->getClassName()
00687 << getApplicationDescriptor()->getInstance() << std::endl;
00688 *out << " " << fsm_.stateName()->toString() << std::endl;
00689 *out << " </b>" << std::endl;
00690 *out << " </td>" << std::endl;
00691 *out << " <td width=\"32\">" << std::endl;
00692 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00693 *out << " <img" << std::endl;
00694 *out << " align=\"middle\"" << std::endl;
00695 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00696 *out << " alt=\"HyperDAQ\"" << std::endl;
00697 *out << " width=\"32\"" << std::endl;
00698 *out << " height=\"32\"" << std::endl;
00699 *out << " border=\"\"/>" << std::endl;
00700 *out << " </a>" << std::endl;
00701 *out << " </td>" << std::endl;
00702 *out << " <td width=\"32\">" << std::endl;
00703 *out << " </td>" << std::endl;
00704 *out << " <td width=\"32\">" << std::endl;
00705 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00706 *out << " <img" << std::endl;
00707 *out << " align=\"middle\"" << std::endl;
00708 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00709 *out << " alt=\"main\"" << std::endl;
00710 *out << " width=\"32\"" << std::endl;
00711 *out << " height=\"32\"" << std::endl;
00712 *out << " border=\"\"/>" << std::endl;
00713 *out << " </a>" << std::endl;
00714 *out << " </td>" << std::endl;
00715 *out << "</tr>" << std::endl;
00716 *out << "</table>" << std::endl;
00717
00718 *out << "<hr/>" << std::endl;
00719
00720 std::ostringstream ost;
00721 if(myProcess_)
00722 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00723 else
00724 ost << "/moduleWeb?";
00725 urn += ost.str();
00726 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00727 evtProcessor_.taskWebPage(in,out,urn);
00728 else if(evtProcessor_)
00729 evtProcessor_.summaryWebPage(in,out,urn);
00730 else
00731 *out << "<td>HLT Unconfigured</td>" << std::endl;
00732 *out << "</table>" << std::endl;
00733
00734 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
00735 *out << configuration_ << std::endl;
00736 *out << "</textarea><P>" << std::endl;
00737
00738 *out << "</body>" << std::endl;
00739 *out << "</html>" << std::endl;
00740
00741
00742 }
00743 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
00744 throw (xgi::exception::Exception)
00745 {
00746
00747 out->getHTTPResponseHeader().addHeader( "Content-Type",
00748 "application/octet-stream" );
00749 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00750 "binary" );
00751 if(evtProcessor_ != 0){
00752 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00753 }
00754 }
00755
00756 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
00757 throw (xgi::exception::Exception)
00758 {
00759
00760 if(evtProcessor_ != 0){
00761 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00762 if(legenda !=0){
00763 std::string slegenda = ((xdata::String*)legenda)->value_;
00764 *out << slegenda << std::endl;
00765 }
00766 }
00767 }
00768
00769 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
00770 {
00771 std::string errmsg;
00772 bool success = false;
00773 try {
00774 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00775 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00776 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00777 if (!success) errmsg = "Failed to attach DQM service to shared memory";
00778 }
00779 catch (cms::Exception& e) {
00780 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00781 }
00782 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00783 }
00784
00785
00786
00787 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00788 {
00789 std::string errmsg;
00790 bool success = false;
00791 try {
00792 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00793 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00794 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00795 if (!success) errmsg = "Failed to detach DQM service from shared memory";
00796 }
00797 catch (cms::Exception& e) {
00798 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00799 }
00800 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00801 }
00802
00803
00804 std::string FUEventProcessor::logsAsString()
00805 {
00806 std::ostringstream oss;
00807 if(logWrap_)
00808 {
00809 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00810 oss << logRing_[i] << std::endl;
00811 for(unsigned int i = 0; i < logRingIndex_; i++)
00812 oss << logRing_[i] << std::endl;
00813 }
00814 else
00815 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00816 oss << logRing_[i] << std::endl;
00817
00818 return oss.str();
00819 }
00820
00821 void FUEventProcessor::localLog(std::string m)
00822 {
00823 timeval tv;
00824
00825 gettimeofday(&tv,0);
00826 tm *uptm = localtime(&tv.tv_sec);
00827 char datestring[256];
00828 strftime(datestring, sizeof(datestring),"%c", uptm);
00829
00830 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00831 logRingIndex_--;
00832 std::ostringstream timestamp;
00833 timestamp << " at " << datestring;
00834 m += timestamp.str();
00835 logRing_[logRingIndex_] = m;
00836 }
00837
00838 void FUEventProcessor::startSupervisorLoop()
00839 {
00840 try {
00841 wlSupervising_=
00842 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00843 "waiting");
00844 if (!wlSupervising_->isActive()) wlSupervising_->activate();
00845 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
00846 "Supervisor");
00847 wlSupervising_->submit(asSupervisor_);
00848 supervising_ = true;
00849 }
00850 catch (xcept::Exception& e) {
00851 std::string msg = "Failed to start workloop 'Supervisor'.";
00852 XCEPT_RETHROW(evf::Exception,msg,e);
00853 }
00854 }
00855
00856 void FUEventProcessor::startReceivingLoop()
00857 {
00858 try {
00859 wlReceiving_=
00860 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
00861 "waiting");
00862 if (!wlReceiving_->isActive()) wlReceiving_->activate();
00863 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
00864 "Receiving");
00865 wlReceiving_->submit(asReceiveMsgAndExecute_);
00866 receiving_ = true;
00867 }
00868 catch (xcept::Exception& e) {
00869 std::string msg = "Failed to start workloop 'Receiving'.";
00870 XCEPT_RETHROW(evf::Exception,msg,e);
00871 }
00872 }
00873 void FUEventProcessor::startReceivingMonitorLoop()
00874 {
00875 try {
00876 wlReceivingMonitor_=
00877 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
00878 "waiting");
00879 if (!wlReceivingMonitor_->isActive())
00880 wlReceivingMonitor_->activate();
00881 asReceiveMsgAndRead_ =
00882 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
00883 "ReceivingM");
00884 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
00885 receivingM_ = true;
00886 }
00887 catch (xcept::Exception& e) {
00888 std::string msg = "Failed to start workloop 'ReceivingM'.";
00889 XCEPT_RETHROW(evf::Exception,msg,e);
00890 }
00891 }
00892
00893 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
00894 {
00895 MsgBuf msg;
00896 try{
00897 myProcess_->rcvSlave(msg,false);
00898 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
00899 {
00900 pthread_mutex_lock(&stop_lock_);
00901 fsm_.fireEvent("Stop",this);
00902 try{
00903 LOG4CPLUS_DEBUG(getApplicationLogger(),
00904 "Trying to create message service presence ");
00905 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00906 if(pf != 0) {
00907 pf->makePresence("MessageServicePresence").release();
00908 }
00909 else {
00910 LOG4CPLUS_ERROR(getApplicationLogger(),
00911 "Unable to create message service presence ");
00912 }
00913 }
00914 catch(...) {
00915 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00916 }
00917 stopClassic();
00918 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
00919 myProcess_->postSlave(msg1,false);
00920 pthread_mutex_unlock(&stop_lock_);
00921 fclose(stdout);
00922 fclose(stderr);
00923 exit(EXIT_SUCCESS);
00924 }
00925 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
00926 _exit(EXIT_SUCCESS);
00927 }
00928 catch(evf::Exception &e){}
00929 return true;
00930 }
00931
00932 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
00933 {
00934 pthread_mutex_lock(&stop_lock_);
00935 if(subs_.size()!=nbSubProcesses_.value_)
00936 {
00937 subs_.resize(nbSubProcesses_.value_);
00938 spMStates_.resize(nbSubProcesses_.value_);
00939 spmStates_.resize(nbSubProcesses_.value_);
00940 for(unsigned int i = 0; i < spMStates_.size(); i++)
00941 {
00942 spMStates_[i] = edm::event_processor::sInit;
00943 spmStates_[i] = 0;
00944 }
00945 }
00946 bool running = fsm_.stateName()->toString()=="Enabled";
00947 bool stopping = fsm_.stateName()->toString()=="stopping";
00948 for(unsigned int i = 0; i < subs_.size(); i++)
00949 {
00950 if(subs_[i].alive()==-1000) continue;
00951 int sl;
00952
00953 pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
00954
00955 if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
00956 else continue;
00957 pthread_mutex_lock(&pickup_lock_);
00958 std::ostringstream ost;
00959 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
00960 else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
00961 else ost << " process stopped ";
00962 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
00963 subs_[i].setReasonForFailed(ost.str());
00964 spMStates_[i] = evtProcessor_.notstarted_state_code();
00965 spmStates_[i] = 0;
00966 std::ostringstream ost1;
00967 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
00968 localLog(ost1.str());
00969 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
00970 pthread_mutex_unlock(&pickup_lock_);
00971 }
00972 pthread_mutex_unlock(&stop_lock_);
00973 if(stopping) return true;
00974
00975 if(running)
00976 {
00977
00978
00979 if(autoRestartSlaves_.value_){
00980 pthread_mutex_lock(&stop_lock_);
00981 for(unsigned int i = 0; i < subs_.size(); i++)
00982 {
00983 if(subs_[i].alive() != 1){
00984 if(subs_[i].countdown() == 0)
00985 {
00986 if(subs_[i].restartCount()>2){
00987 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
00988 << " - maximum restart count reached");
00989 std::ostringstream ost1;
00990 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
00991 localLog(ost1.str());
00992 subs_[i].countdown()--;
00993 XCEPT_DECLARE(evf::Exception,
00994 sentinelException, ost1.str());
00995 notifyQualified("error",sentinelException);
00996 subs_[i].disconnect();
00997 continue;
00998 }
00999 subs_[i].restartCount()++;
01000 pid_t rr = subs_[i].forkNew();
01001 if(rr==0)
01002 {
01003 myProcess_=&subs_[i];
01004 scalersUpdates_ = 0;
01005 int retval = pthread_mutex_destroy(&stop_lock_);
01006 if(retval != 0) perror("error");
01007 retval = pthread_mutex_init(&stop_lock_,0);
01008 if(retval != 0) perror("error");
01009 fsm_.disableRcmsStateNotification();
01010 fsm_.fireEvent("Stop",this);
01011 fsm_.fireEvent("StopDone",this);
01012 fsm_.fireEvent("Enable",this);
01013 try{
01014 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01015 if(lsid) {
01016 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01017 }
01018 }
01019 catch(...){
01020 std::cout << "trouble with lsindex during restart" << std::endl;
01021 }
01022 try{
01023 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01024 if(lstb) {
01025 ((xdata::Boolean*)(lstb))->value_ = false;
01026 }
01027 }
01028 catch(...){
01029 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01030 }
01031
01032 evtProcessor_.adjustLsIndexForRestart();
01033 evtProcessor_.resetPackedTriggerReport();
01034 enableMPEPSlave();
01035 return false;
01036 }
01037 else
01038 {
01039 std::ostringstream ost1;
01040 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01041 localLog(ost1.str());
01042 }
01043 }
01044 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01045 }
01046 }
01047 pthread_mutex_unlock(&stop_lock_);
01048 }
01049 }
01050 xdata::Serializable *lsid = 0;
01051 xdata::Serializable *psid = 0;
01052 xdata::Serializable *epMAltState = 0;
01053 xdata::Serializable *epmAltState = 0;
01054 xdata::Serializable *dqmp = 0;
01055 xdata::UnsignedInteger32 *dqm = 0;
01056
01057
01058
01059 if(running){
01060 try{
01061 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01062 psid = applicationInfoSpace_->find("prescaleSetIndex");
01063 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01064 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01065 epMAltState = monitorInfoSpace_->find("epSPMacroStateInt");
01066 epmAltState = monitorInfoSpace_->find("epSPMicroStateInt");
01067 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01068 }
01069 catch(xdata::exception::Exception e){
01070 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01071 }
01072
01073 try{
01074 if(nbProcessed !=0 && nbAccepted !=0)
01075 {
01076 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01077 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01078 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01079 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01080 if(dqmp!=0)
01081 dqm = (xdata::UnsignedInteger32*)dqmp;
01082 if(dqm) dqm->value_ = 0;
01083 nbTotalDQM_ = 0;
01084 nbp->value_ = 0;
01085 nba->value_ = 0;
01086 nblive_ = 0;
01087 nbdead_ = 0;
01088 scalersUpdates_ = 0;
01089
01090 for(unsigned int i = 0; i < subs_.size(); i++)
01091 {
01092 if(subs_[i].alive()>0)
01093 {
01094 nblive_++;
01095 try{
01096 subs_[i].post(master_message_prg_,true);
01097
01098 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01099 if(retval == (unsigned long) master_message_prr_->mtype){
01100 prg* p = (struct prg*)(master_message_prr_->mtext);
01101 subs_[i].setParams(p);
01102 spMStates_[i] = p->Ms;
01103 spmStates_[i] = p->ms;
01104 cpustat_->addEntry(p->ms);
01105 if(!subs_[i].inInconsistentState() &&
01106 (p->Ms == edm::event_processor::sError
01107 || p->Ms == edm::event_processor::sInvalid
01108 || p->Ms == edm::event_processor::sStopping))
01109 {
01110 std::ostringstream ost;
01111 ost << "edm::eventprocessor slot " << i << " process id "
01112 << subs_[i].pid() << " not in Running state : Mstate="
01113 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01114 << evtProcessor_.moduleNameFromIndex(p->ms)
01115 << " - Look into possible error messages from HLT process";
01116 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01117 }
01118 nbp->value_ += subs_[i].params().nbp;
01119 nba->value_ += subs_[i].params().nba;
01120 if(dqm)dqm->value_ += p->dqm;
01121 nbTotalDQM_ += p->dqm;
01122 scalersUpdates_ += p->trp;
01123 if(p->ls > ls->value_) ls->value_ = p->ls;
01124 if(p->ps != ps->value_) ps->value_ = p->ps;
01125 }
01126 else{
01127 nbp->value_ += subs_[i].get_save_nbp();
01128 nba->value_ += subs_[i].get_save_nba();
01129 }
01130 }
01131 catch(evf::Exception &e){
01132 LOG4CPLUS_INFO(getApplicationLogger(),
01133 "could not send/receive msg on slot "
01134 << i << " - " << e.what());
01135 }
01136
01137 }
01138 else
01139 {
01140 nbp->value_ += subs_[i].get_save_nbp();
01141 nba->value_ += subs_[i].get_save_nba();
01142 nbdead_++;
01143 }
01144 }
01145 if(nbp->value_>64){
01146 for(unsigned int i = 0; i < subs_.size(); i++)
01147 {
01148 if(subs_[i].params().nbp == 0){
01149
01150 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01151 {
01152 subs_[i].found_invalid();
01153 if(subs_[i].nfound_invalid() > 60){
01154 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01155 subs_[i].post(msg3,false);
01156 std::ostringstream ost1;
01157 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01158 localLog(ost1.str());
01159 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01160 XCEPT_DECLARE(evf::Exception,
01161 sentinelException, ost1.str());
01162 notifyQualified("error",sentinelException);
01163
01164 }
01165 }
01166 }
01167 }
01168 }
01169 }
01170 }
01171 catch(std::exception &e){
01172 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01173 }
01174 catch(...){
01175 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01176 }
01177 }
01178 else{
01179 for(unsigned int i = 0; i < subs_.size(); i++)
01180 {
01181 if(subs_[i].alive()==-1000)
01182 {
01183 spMStates_[i] = edm::event_processor::sInit;
01184 spmStates_[i] = 0;
01185 }
01186 }
01187 }
01188 try{
01189 monitorInfoSpace_->lock();
01190 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01191 monitorInfoSpace_->unlock();
01192 }
01193 catch(xdata::exception::Exception &e)
01194 {
01195 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01196
01197 }
01198 ::sleep(superSleepSec_.value_);
01199 return true;
01200 }
01201
01202 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01203 {
01204 try {
01205 wlScalers_=
01206 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01207 "waiting");
01208 if (!wlScalers_->isActive()) wlScalers_->activate();
01209 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01210 "Scalers");
01211
01212 wlScalers_->submit(asScalers_);
01213 wlScalersActive_ = true;
01214 }
01215 catch (xcept::Exception& e) {
01216 std::string msg = "Failed to start workloop 'Scalers'.";
01217 XCEPT_RETHROW(evf::Exception,msg,e);
01218 }
01219 }
01220
01221
01222
01223 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01224 {
01225 try {
01226 wlSummarize_=
01227 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01228 "waiting");
01229 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01230
01231 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01232 "Summary");
01233
01234 wlSummarize_->submit(asSummarize_);
01235 wlSummarizeActive_ = true;
01236 }
01237 catch (xcept::Exception& e) {
01238 std::string msg = "Failed to start workloop 'Summarize'.";
01239 XCEPT_RETHROW(evf::Exception,msg,e);
01240 }
01241 }
01242
01243
01244
01245 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01246 {
01247 if(evtProcessor_)
01248 {
01249 if(!evtProcessor_.getTriggerReport(true)) {
01250 wlScalersActive_ = false;
01251 return false;
01252 }
01253 }
01254 else
01255 {
01256 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01257 wlScalersActive_ = false;
01258 return false;
01259 }
01260 if(myProcess_)
01261 {
01262
01263 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01264 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01265 scalersUpdates_++;
01266 }
01267 else
01268 evtProcessor_.fireScalersUpdate();
01269 return true;
01270 }
01271
01272
01273 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01274 {
01275 evtProcessor_.resetPackedTriggerReport();
01276 bool atLeastOneProcessUpdatedSuccessfully = false;
01277 int msgCount = 0;
01278 for (unsigned int i = 0; i < subs_.size(); i++)
01279 {
01280 if(subs_[i].alive()>0)
01281 {
01282 int ret = 0;
01283 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01284 evtProcessor_.getLumiSectionReferenceIndex()))
01285 {
01286 ret = MSQS_MESSAGE_TYPE_TRR;
01287 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01288 }
01289 else{
01290 bool insync = false;
01291 bool exception_caught = false;
01292 while(!insync){
01293 try{
01294 ret = subs_[i].rcv(master_message_trr_,false);
01295 }
01296 catch(evf::Exception &e)
01297 {
01298 std::cout << "exception in msgrcv on " << i
01299 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01300 exception_caught = true;
01301 break;
01302
01303 }
01304 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01305 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01306 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01307 insync = true;
01308 }
01309 }
01310 }
01311 if(exception_caught) continue;
01312 }
01313 msgCount++;
01314 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01315 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01316 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01317 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01318 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01319 subs_[i].add_postponed_trigger_update(master_message_trr_);
01320 }else{
01321 atLeastOneProcessUpdatedSuccessfully = true;
01322 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01323 }
01324 }
01325 else std::cout << "msgrcv returned error " << errno << std::endl;
01326 }
01327 }
01328 if(atLeastOneProcessUpdatedSuccessfully){
01329 nbSubProcessesReporting_.value_ = msgCount;
01330 evtProcessor_.updateRollingReport();
01331 evtProcessor_.fireScalersUpdate();
01332 }
01333 else{
01334 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01335 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01336 nbSubProcessesReporting_.value_ = 0;
01337 ::sleep(10);
01338 }
01339 if(fsm_.stateName()->toString()!="Enabled"){
01340 wlScalersActive_ = false;
01341 return false;
01342 }
01343
01344 if(iDieStatisticsGathering_.value_){
01345 try{
01346 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01347 ratestat_->sendStat((unsigned char*)(evtProcessor_.getPackedTriggerReportAsStruct()),
01348 sizeof(TriggerReportStatic),
01349 evtProcessor_.getLumiSectionReferenceIndex());
01350 }catch(evf::Exception &e){
01351 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01352 << e.what());
01353 }
01354 }
01355 cpustat_->reset();
01356 return true;
01357 }
01358
01359
01360
01361 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01362 {
01363 try{
01364 myProcess_->rcvSlave(slave_message_monitoring_,true);
01365 switch(slave_message_monitoring_->mtype)
01366 {
01367 case MSQM_MESSAGE_TYPE_MCS:
01368 {
01369 xgi::Input *in = 0;
01370 xgi::Output out;
01371 evtProcessor_.microState(in,&out);
01372 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01373 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01374 myProcess_->postSlave(msg1,true);
01375 break;
01376 }
01377
01378 case MSQM_MESSAGE_TYPE_PRG:
01379 {
01380 xdata::Serializable *dqmp = 0;
01381 xdata::UnsignedInteger32 *dqm = 0;
01382 evtProcessor_.monitoring(0);
01383 try{
01384 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01385 } catch(xdata::exception::Exception e){}
01386 if(dqmp!=0)
01387 dqm = (xdata::UnsignedInteger32*)dqmp;
01388
01389
01390 prg * data = (prg*)slave_message_prr_->mtext;
01391 data->ls = evtProcessor_.lsid_;
01392 data->eols = evtProcessor_.lastLumiUsingEol_;
01393 data->ps = evtProcessor_.psid_;
01394 data->nbp = evtProcessor_->totalEvents();
01395 data->nba = evtProcessor_->totalEventsPassed();
01396 data->Ms = evtProcessor_.epMAltState_.value_;
01397 data->ms = evtProcessor_.epmAltState_.value_;
01398 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01399 data->trp = scalersUpdates_;
01400
01401 myProcess_->postSlave(slave_message_prr_,true);
01402 if(exitOnError_.value_)
01403 {
01404
01405
01406 if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError)
01407 {
01408 bool running = true;
01409 int count = 0;
01410 while(running){
01411 int retval = pthread_mutex_lock(&stop_lock_);
01412 if(retval != 0) perror("error");
01413 running = fsm_.stateName()->toString()=="Enabled";
01414 if(count>5) _exit(-1);
01415 pthread_mutex_unlock(&stop_lock_);
01416 if(running) {::sleep(1); count++;}
01417 }
01418 }
01419
01420 }
01421
01422 break;
01423 }
01424 case MSQM_MESSAGE_TYPE_WEB:
01425 {
01426 xgi::Input *in = 0;
01427 xgi::Output out;
01428 unsigned int bytesToSend = 0;
01429 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01430 std::string query = slave_message_monitoring_->mtext;
01431 size_t pos = query.find_first_of("&");
01432 std::string method;
01433 std::string args;
01434 if(pos!=std::string::npos)
01435 {
01436 method = query.substr(0,pos);
01437 args = query.substr(pos+1,query.length()-pos-1);
01438 }
01439 else
01440 method=query;
01441
01442 if(method=="Spotlight")
01443 {
01444 spotlightWebPage(in,&out);
01445 }
01446 else if(method=="procStat")
01447 {
01448 procStat(in,&out);
01449 }
01450 else if(method=="moduleWeb")
01451 {
01452 internal::MyCgi mycgi;
01453 boost::char_separator<char> sep(";");
01454 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01455 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01456 tok_iter != tokens.end(); ++tok_iter){
01457 size_t pos = (*tok_iter).find_first_of("%");
01458 if(pos != std::string::npos){
01459 std::string first = (*tok_iter).substr(0 , pos);
01460 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01461 mycgi.getEnvironment()[first]=second;
01462 }
01463 }
01464 moduleWeb(&mycgi,&out);
01465 }
01466 else if(method=="Default")
01467 {
01468 defaultWebPage(in,&out);
01469 }
01470 else
01471 {
01472 out << "Error 404!!!!!!!!" << std::endl;
01473 }
01474
01475
01476 bytesToSend = out.str().size();
01477 unsigned int cycle = 0;
01478 if(bytesToSend==0)
01479 {
01480 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01481 myProcess_->postSlave(msg1,true);
01482 }
01483 while(bytesToSend !=0){
01484 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01485 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01486 write(anonymousPipe_[PIPE_WRITE],
01487 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01488 msgSize);
01489 myProcess_->postSlave(msg1,true);
01490 bytesToSend -= msgSize;
01491 cycle++;
01492 }
01493 break;
01494 }
01495 case MSQM_MESSAGE_TYPE_TRP:
01496 {
01497 break;
01498 }
01499 }
01500 }
01501 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01502 return true;
01503 }
01504
01505 bool FUEventProcessor::enableCommon()
01506 {
01507 try {
01508 if(hasShMem_) attachDqmToShm();
01509 int sc = 0;
01510 evtProcessor_->clearCounters();
01511 if(isRunNumberSetter_)
01512 evtProcessor_->setRunNumber(runNumber_.value_);
01513 else
01514 evtProcessor_->declareRunNumber(runNumber_.value_);
01515 try{
01516 ::sleep(1);
01517 evtProcessor_->runAsync();
01518 sc = evtProcessor_->statusAsync();
01519 }
01520 catch(cms::Exception &e) {
01521 reasonForFailedState_ = e.explainSelf();
01522 fsm_.fireFailed(reasonForFailedState_,this);
01523 localLog(reasonForFailedState_);
01524 return false;
01525 }
01526 catch(std::exception &e) {
01527 reasonForFailedState_ = e.what();
01528 fsm_.fireFailed(reasonForFailedState_,this);
01529 localLog(reasonForFailedState_);
01530 return false;
01531 }
01532 catch(...) {
01533 reasonForFailedState_ = "Unknown Exception";
01534 fsm_.fireFailed(reasonForFailedState_,this);
01535 localLog(reasonForFailedState_);
01536 return false;
01537 }
01538 if(sc != 0) {
01539 std::ostringstream oss;
01540 oss<<"EventProcessor::runAsync returned status code " << sc;
01541 reasonForFailedState_ = oss.str();
01542 fsm_.fireFailed(reasonForFailedState_,this);
01543 localLog(reasonForFailedState_);
01544 return false;
01545 }
01546 }
01547 catch (xcept::Exception &e) {
01548 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01549 fsm_.fireFailed(reasonForFailedState_,this);
01550 localLog(reasonForFailedState_);
01551 return false;
01552 }
01553 try{
01554 fsm_.fireEvent("EnableDone",this);
01555 }
01556 catch (xcept::Exception &e) {
01557 std::cout << "exception " << (std::string)e.what() << std::endl;
01558 throw;
01559 }
01560
01561 return false;
01562 }
01563
01564 bool FUEventProcessor::enableClassic()
01565 {
01566 bool retval = enableCommon();
01567 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01568 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01569 ::sleep(1);
01570 }
01571
01572
01573
01574 localLog("-I- Start completed");
01575 return retval;
01576 }
01577 bool FUEventProcessor::enableMPEPSlave()
01578 {
01579
01580 startReceivingLoop();
01581 startReceivingMonitorLoop();
01582 evtProcessor_.resetWaiting();
01583 startScalersWorkLoop();
01584 while(!evtProcessor_.isWaitingForLs())
01585 ::sleep(1);
01586
01587
01588 try{
01589
01590 try{
01591 LOG4CPLUS_DEBUG(getApplicationLogger(),
01592 "Trying to create message service presence ");
01593 edm::PresenceFactory *pf = edm::PresenceFactory::get();
01594 if(pf != 0) {
01595 pf->makePresence("MessageServicePresence").release();
01596 }
01597 else {
01598 LOG4CPLUS_ERROR(getApplicationLogger(),
01599 "Unable to create message service presence ");
01600 }
01601 }
01602 catch(...) {
01603 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01604 }
01605 ML::MLlog4cplus::setAppl(this);
01606 }
01607 catch (xcept::Exception &e) {
01608 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01609 fsm_.fireFailed(reasonForFailedState_,this);
01610 localLog(reasonForFailedState_);
01611 }
01612 bool retval = enableCommon();
01613
01614
01615
01616
01617 return retval;
01618 }
01619
01620 bool FUEventProcessor::stopClassic()
01621 {
01622 try {
01623 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
01624 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
01625 if(rc == edm::EventProcessor::epSuccess)
01626 fsm_.fireEvent("StopDone",this);
01627 else
01628 {
01629
01630 if(rc == edm::EventProcessor::epTimedOut)
01631 reasonForFailedState_ = "EventProcessor stop timed out";
01632 else
01633 reasonForFailedState_ = "EventProcessor did not receive STOP event";
01634 fsm_.fireFailed(reasonForFailedState_,this);
01635 localLog(reasonForFailedState_);
01636 }
01637 if(hasShMem_) detachDqmFromShm();
01638 }
01639 catch (xcept::Exception &e) {
01640 reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
01641 localLog(reasonForFailedState_);
01642 fsm_.fireFailed(reasonForFailedState_,this);
01643 }
01644 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
01645 localLog("-I- Stop completed");
01646 return false;
01647 }
01648
01649 void FUEventProcessor::stopSlavesAndAcknowledge()
01650 {
01651 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
01652 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
01653
01654 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
01655 for(unsigned int i = 0; i < subs_.size(); i++)
01656 {
01657 pthread_mutex_lock(&stop_lock_);
01658 if(subs_[i].alive()>0){
01659 processes_to_stop[i] = true;
01660 subs_[i].post(msg,false);
01661 }
01662 pthread_mutex_unlock(&stop_lock_);
01663 }
01664 for(unsigned int i = 0; i < subs_.size(); i++)
01665 {
01666 pthread_mutex_lock(&stop_lock_);
01667 if(processes_to_stop[i]){
01668 try{
01669 subs_[i].rcv(msg1,false);
01670 }
01671 catch(evf::Exception &e){
01672 std::ostringstream ost;
01673 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
01674 reasonForFailedState_ = ost.str();
01675 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01676
01677 localLog(reasonForFailedState_);
01678 break;
01679 }
01680 }
01681 else {
01682 pthread_mutex_unlock(&stop_lock_);
01683 continue;
01684 }
01685 pthread_mutex_unlock(&stop_lock_);
01686 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
01687 while(subs_[i].alive()>0) ::usleep(10000);
01688 subs_[i].disconnect();
01689 }
01690
01691
01692 }
01693
01694 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
01695 {
01696 std::string urn = getApplicationDescriptor()->getURN();
01697 try{
01698 evtProcessor_.stateNameFromIndex(0);
01699 evtProcessor_.moduleNameFromIndex(0);
01700 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
01701 *out << "<tr><td>" << fsm_.stateName()->toString()
01702 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
01703 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
01704 evtProcessor_.microState(in,out);
01705 *out << "<td></td><td>" << nbTotalDQM_
01706 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
01707 if(nbSubProcesses_.value_!=0 && !myProcess_)
01708 {
01709 pthread_mutex_lock(&start_lock_);
01710 for(unsigned int i = 0; i < subs_.size(); i++)
01711 {
01712 try{
01713 if(subs_[i].alive()>0)
01714 {
01715 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
01716 << i << "\">""Alive</td><td>S</td><td>"
01717 << subs_[i].queueId() << "<td>"
01718 << subs_[i].queueStatus()<< "/"
01719 << subs_[i].queueOccupancy() << "/"
01720 << subs_[i].queuePidOfLastSend() << "/"
01721 << subs_[i].queuePidOfLastReceive()
01722 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
01723 << subs_[i].pid() << "&method=procStat\">"
01724 << subs_[i].pid()<<"</a></td>"
01725 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
01726 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
01727 << subs_[i].params().nba << "/" << subs_[i].params().nbp
01728 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
01729 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
01730 << "</td><td>" << subs_[i].params().ps
01731 << "</td><td"
01732 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
01733 << subs_[i].params().eols
01734 << "</td><td>" << subs_[i].params().dqm
01735 << "</td><td>" << subs_[i].params().trp << "</td>";
01736 }
01737 else
01738 {
01739 pthread_mutex_lock(&pickup_lock_);
01740 *out << "<tr><td id=\"a"<< i << "\" ";
01741 if(subs_[i].alive()==-1000)
01742 *out << " bgcolor=\"#bbaabb\">NotInitialized";
01743 else
01744 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
01745 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
01746 << subs_[i].queueStatus() << "/"
01747 << subs_[i].queueOccupancy() << "/"
01748 << subs_[i].queuePidOfLastSend() << "/"
01749 << subs_[i].queuePidOfLastReceive()
01750 << "</td><td id=\"p"<< i << "\">"
01751 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
01752 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
01753 {
01754 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
01755 *out << " will restart in " << subs_[i].countdown() << " s";
01756 else if(autoRestartSlaves_)
01757 *out << " reached maximum restart count";
01758 else *out << " autoRestart is disabled ";
01759 }
01760 *out << "</td><td"
01761 << ((subs_[i].params().eols<subs_[i].params().ls) ?
01762 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
01763 << ">"
01764 << subs_[i].params().eols
01765 << "</td><td>" << subs_[i].params().dqm
01766 << "</td><td>" << subs_[i].params().trp << "</td>";
01767 pthread_mutex_unlock(&pickup_lock_);
01768 }
01769 *out << "</tr>";
01770 }
01771 catch(evf::Exception &e){
01772 *out << "<tr><td id=\"a"<< i << "\" "
01773 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
01774 << subs_[i].queueStatus() << "/"
01775 << subs_[i].queueOccupancy() << "/"
01776 << subs_[i].queuePidOfLastSend() << "/"
01777 << subs_[i].queuePidOfLastReceive()
01778 << "</td><td id=\"p"<< i << "\">"
01779 <<subs_[i].pid()<<"</td>";
01780 }
01781 }
01782 pthread_mutex_unlock(&start_lock_);
01783 }
01784 }
01785 catch(evf::Exception &e)
01786 {
01787 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
01788 }
01789 catch(cms::Exception &e)
01790 {
01791 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
01792 }
01793 catch(std::exception &e)
01794 {
01795 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
01796 }
01797 catch(...)
01798 {
01799 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
01800 }
01801
01802 }
01803
01804
01805 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
01806 {
01807 using namespace utils;
01808
01809 *out << updaterStatic_;
01810 mDiv(out,"loads");
01811 uptime(out);
01812 cDiv(out);
01813 mDiv(out,"st",fsm_.stateName()->toString());
01814 mDiv(out,"ru",runNumber_.toString());
01815 mDiv(out,"nsl",nbSubProcesses_.value_);
01816 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
01817 mDiv(out,"cl");
01818 *out << getApplicationDescriptor()->getClassName()
01819 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
01820 cDiv(out);
01821 mDiv(out,"in",getApplicationDescriptor()->getInstance());
01822 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
01823 mDiv(out,"hlt");
01824 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
01825 cDiv(out);
01826 *out << std::endl;
01827 }
01828 else
01829 mDiv(out,"hlt","Not yet...");
01830
01831 mDiv(out,"sq",squidPresent_.toString());
01832 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
01833 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
01834 if(nbProcessed != 0 && nbAccepted != 0)
01835 {
01836 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
01837 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
01838 }
01839 else
01840 {
01841 mDiv(out,"tt",0);
01842 mDiv(out,"ac",0);
01843 }
01844 if(!myProcess_)
01845 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
01846 else
01847 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
01848
01849 mDiv(out,"idi",iDieUrl_.value_);
01850 if(vp_!=0){
01851 mDiv(out,"vpi",(unsigned int) vp_);
01852 if(vulture_->hasStarted()>=0)
01853 mDiv(out,"vul","Prowling");
01854 else
01855 mDiv(out,"vul","Dead");
01856 }
01857 else{
01858 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
01859 }
01860 if(evtProcessor_){
01861 mDiv(out,"ll");
01862 *out << evtProcessor_.lastLumi().ls
01863 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
01864 cDiv(out);
01865 }
01866 mDiv(out,"lg");
01867 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
01868 *out << logRing_[i] << std::endl;
01869 if(logWrap_)
01870 for(unsigned int i = 0; i<logRingIndex_; i++)
01871 *out << logRing_[i] << std::endl;
01872 cDiv(out);
01873
01874 }
01875
01876 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
01877 {
01878 evf::utils::procStat(out);
01879 }
01880
01881 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
01882 {
01883 if(myProcess_) myProcess_->postSlave(buf,true);
01884 }
01885
01886 void FUEventProcessor::makeStaticInfo()
01887 {
01888 using namespace utils;
01889 std::ostringstream ost;
01890 mDiv(&ost,"ve");
01891 ost<< "$Revision: 1.132 $ (" << edm::getReleaseVersion() <<")";
01892 cDiv(&ost);
01893 mDiv(&ost,"ou",outPut_.toString());
01894 mDiv(&ost,"sh",hasShMem_.toString());
01895 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
01896 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
01897
01898 xdata::Serializable *monsleep = 0;
01899 xdata::Serializable *lstimeout = 0;
01900 try{
01901 monsleep = applicationInfoSpace_->find("monSleepSec");
01902 lstimeout = applicationInfoSpace_->find("lsTimeOut");
01903 }
01904 catch(xdata::exception::Exception e){
01905 }
01906
01907 if(monsleep!=0)
01908 mDiv(&ost,"ms",monsleep->toString());
01909 if(lstimeout!=0)
01910 mDiv(&ost,"lst",lstimeout->toString());
01911 char cbuf[sizeof(struct utsname)];
01912 struct utsname* buf = (struct utsname*)cbuf;
01913 uname(buf);
01914 mDiv(&ost,"sysinfo");
01915 ost << buf->sysname << " " << buf->nodename
01916 << " " << buf->release << " " << buf->version << " " << buf->machine;
01917 cDiv(&ost);
01918 updaterStatic_ = ost.str();
01919 }
01920
01921 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)