00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019
00020
00021 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00022 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00023 #include "FWCore/Utilities/interface/Presence.h"
00024 #include "FWCore/Utilities/interface/Exception.h"
00025 #include "FWCore/Version/interface/GetReleaseVersion.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027
00028 #include "toolbox/BSem.h"
00029
00030 #include <boost/tokenizer.hpp>
00031
00032 #include "xcept/tools.h"
00033 #include "xgi/Method.h"
00034
00035 #include "cgicc/CgiDefs.h"
00036 #include "cgicc/Cgicc.h"
00037 #include "cgicc/FormEntry.h"
00038
00039
00040 #include <sys/wait.h>
00041 #include <sys/utsname.h>
00042
00043 #include <typeinfo>
00044 #include <stdlib.h>
00045 #include <stdio.h>
00046 #include <errno.h>
00047
00048 using namespace evf;
00049 using namespace cgicc;
00050 namespace toolbox {
00051 namespace mem {
00052 extern toolbox::BSem * _s_mutex_ptr_;
00053 }
00054 }
00055
00057
00059
00060
00061 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00062 : xdaq::Application(s)
00063 , fsm_(this)
00064 , log_(getApplicationLogger())
00065 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00066 , runNumber_(0)
00067 , epInitialized_(false)
00068 , outPut_(true)
00069 , autoRestartSlaves_(false)
00070 , slaveRestartDelaySecs_(10)
00071 , hasShMem_(true)
00072 , hasPrescaleService_(true)
00073 , hasModuleWebRegistry_(true)
00074 , hasServiceWebRegistry_(true)
00075 , isRunNumberSetter_(true)
00076 , iDieStatisticsGathering_(false)
00077 , outprev_(true)
00078 , exitOnError_(true)
00079 , reasonForFailedState_()
00080 , squidnet_(3128,"https://localhost:8000/RELEASE-NOTES.txt")
00081 , logRing_(logRingSize_)
00082 , logRingIndex_(logRingSize_)
00083 , logWrap_(false)
00084 , nbSubProcesses_(0)
00085 , nbSubProcessesReporting_(0)
00086 , nblive_(0)
00087 , nbdead_(0)
00088 , nbTotalDQM_(0)
00089 , wlReceiving_(0)
00090 , asReceiveMsgAndExecute_(0)
00091 , receiving_(false)
00092 , wlReceivingMonitor_(0)
00093 , asReceiveMsgAndRead_(0)
00094 , receivingM_(false)
00095 , myProcess_(0)
00096 , wlSupervising_(0)
00097 , asSupervisor_(0)
00098 , supervising_(false)
00099 , monitorInfoSpace_(0)
00100 , monitorLegendaInfoSpace_(0)
00101 , applicationInfoSpace_(0)
00102 , nbProcessed(0)
00103 , nbAccepted(0)
00104 , scalersInfoSpace_(0)
00105 , scalersLegendaInfoSpace_(0)
00106 , wlScalers_(0)
00107 , asScalers_(0)
00108 , wlScalersActive_(false)
00109 , scalersUpdates_(0)
00110 , wlSummarize_(0)
00111 , asSummarize_(0)
00112 , wlSummarizeActive_(false)
00113 , superSleepSec_(1)
00114 , iDieUrl_("none")
00115 , vulture_(0)
00116 , vp_(0)
00117 , cpustat_(0)
00118 , ratestat_(0)
00119 , master_message_prg_(0,MSQM_MESSAGE_TYPE_PRG)
00120 , master_message_prr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR)
00121 , slave_message_prr_(sizeof(prg),MSQS_MESSAGE_TYPE_PRR)
00122 , master_message_trr_(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR)
00123 {
00124 using namespace utils;
00125
00126 names_.push_back("nbProcessed" );
00127 names_.push_back("nbAccepted" );
00128 names_.push_back("epMacroStateInt");
00129 names_.push_back("epMicroStateInt");
00130
00131 int retpipe = pipe(anonymousPipe_);
00132 if(retpipe != 0)
00133 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00134
00135 squidPresent_ = squidnet_.check();
00136
00137 evtProcessor_.setAppDesc(getApplicationDescriptor());
00138 evtProcessor_.setAppCtxt(getApplicationContext());
00139
00140 fsm_.initialize<evf::FUEventProcessor>(this);
00141
00142
00143 url_ =
00144 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00145 getApplicationDescriptor()->getURN();
00146 class_ =getApplicationDescriptor()->getClassName();
00147 instance_=getApplicationDescriptor()->getInstance();
00148 sourceId_=class_.toString()+"_"+instance_.toString();
00149 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00150 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00151
00152 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00153
00154 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00155 applicationInfoSpace_ = ispace;
00156
00157
00158 ispace->fireItemAvailable("parameterSet", &configString_ );
00159 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00160 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00161 ispace->fireItemAvailable("runNumber", &runNumber_ );
00162 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00163
00164 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00165 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00166 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00167 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00168 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00169 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00170 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00171 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00172 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00173 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00174 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00175 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00176 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00177 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00178
00179
00180 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00181 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00182
00183
00184 fsm_.findRcmsStateListener();
00185
00186
00187
00188 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00189 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00190 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00191
00192 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00193 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00194 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00195
00196
00197 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00198 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00199 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00200 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00201 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00202
00203 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00204
00205 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00206 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00207 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00208
00209 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00210 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00211 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00212
00213
00214
00215 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00216 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00217
00218 evtProcessor_.setApplicationInfoSpace(ispace);
00219 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00220 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00221
00222
00223 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00224 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00225
00226
00227 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00228 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00229 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00230 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00231 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00232 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00233 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00234 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00235 xgi::bind(this, &FUEventProcessor::microState, "microState");
00236 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00237 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00238
00239
00240
00241 edm::AssertHandler ah;
00242
00243 try{
00244 LOG4CPLUS_DEBUG(getApplicationLogger(),
00245 "Trying to create message service presence ");
00246 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00247 if(pf != 0) {
00248 pf->makePresence("MessageServicePresence").release();
00249 }
00250 else {
00251 LOG4CPLUS_ERROR(getApplicationLogger(),
00252 "Unable to create message service presence ");
00253 }
00254 }
00255 catch(...) {
00256 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00257 }
00258 ML::MLlog4cplus::setAppl(this);
00259
00260 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00261 typedef AppDescSet_t::iterator AppDescIter_t;
00262
00263 AppDescSet_t rcms=
00264 getApplicationContext()->getDefaultZone()->
00265 getApplicationDescriptors("RCMSStateListener");
00266 if(rcms.size()==0)
00267 {
00268 LOG4CPLUS_WARN(getApplicationLogger(),
00269 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00270
00271 }
00272 else
00273 {
00274 AppDescIter_t it = rcms.begin();
00275 evtProcessor_.setRcms(*it);
00276 }
00277 pthread_mutex_init(&start_lock_,0);
00278 pthread_mutex_init(&stop_lock_,0);
00279 pthread_mutex_init(&pickup_lock_,0);
00280
00281 makeStaticInfo();
00282 startSupervisorLoop();
00283
00284 if(vulture_==0) vulture_ = new Vulture(true);
00285
00287
00288 AppDescSet_t setOfiDie=
00289 getApplicationContext()->getDefaultZone()->
00290 getApplicationDescriptors("evf::iDie");
00291
00292 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00293 if ((*it)->getInstance()==0)
00294 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00295 }
00296
00297
00298
00299
00300 FUEventProcessor::~FUEventProcessor()
00301 {
00302
00303
00304
00305 if(vulture_ != 0) delete vulture_;
00306 }
00307
00308
00310
00312
00313
00314
00315 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00316 {
00317
00318
00319
00320
00321
00322 unsigned short smap
00323 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00324 + (((instance_.value_%80)==0) ? 0x8 : 0)
00325 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00326 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00327 + (hasPrescaleService_.value_ ? 0x1 : 0);
00328 if(nbSubProcesses_.value_==0)
00329 {
00330 spMStates_.setSize(1);
00331 spmStates_.setSize(1);
00332 }
00333 else
00334 {
00335 spMStates_.setSize(nbSubProcesses_.value_);
00336 spmStates_.setSize(nbSubProcesses_.value_);
00337 for(unsigned int i = 0; i < spMStates_.size(); i++)
00338 {
00339 spMStates_[i] = edm::event_processor::sInit;
00340 spmStates_[i] = 0;
00341 }
00342 }
00343 try {
00344 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00345 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00346 epInitialized_=true;
00347 if(evtProcessor_)
00348 {
00349
00350 configuration_ = evtProcessor_.configuration();
00351 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00352 evtProcessor_->beginJob();
00353 if(cpustat_) {delete cpustat_; cpustat_=0;}
00354 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00355 nbSubProcesses_.value_,
00356 instance_.value_,
00357 iDieUrl_.value_);
00358 if(ratestat_) {delete ratestat_; ratestat_=0;}
00359 ratestat_ = new RateStat(iDieUrl_.value_);
00360 if(iDieStatisticsGathering_.value_){
00361 try{
00362 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00363 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00364 if(legenda !=0){
00365 std::string slegenda = ((xdata::String*)legenda)->value_;
00366 ratestat_->sendLegenda(slegenda);
00367 }
00368
00369 }
00370 catch(evf::Exception &e){
00371 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00372 << e.what());
00373 }
00374 }
00375
00376 fsm_.fireEvent("ConfigureDone",this);
00377 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00378 localLog("-I- Configuration completed");
00379
00380 }
00381 }
00382 catch (xcept::Exception &e) {
00383 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00384 fsm_.fireFailed(reasonForFailedState_,this);
00385 localLog(reasonForFailedState_);
00386 }
00387 catch(cms::Exception &e) {
00388 reasonForFailedState_ = e.explainSelf();
00389 fsm_.fireFailed(reasonForFailedState_,this);
00390 localLog(reasonForFailedState_);
00391 }
00392 catch(std::exception &e) {
00393 reasonForFailedState_ = e.what();
00394 fsm_.fireFailed(reasonForFailedState_,this);
00395 localLog(reasonForFailedState_);
00396 }
00397 catch(...) {
00398 fsm_.fireFailed("Unknown Exception",this);
00399 }
00400
00401 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00402
00403 return false;
00404 }
00405
00406
00407
00408
00409
00410 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00411 {
00412 nbTotalDQM_ = 0;
00413 scalersUpdates_ = 0;
00414
00415
00416
00417
00418
00419 unsigned short smap
00420 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00421 + (((instance_.value_%80)==0) ? 0x8 : 0)
00422 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00423 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00424 + (hasPrescaleService_.value_ ? 0x1 : 0);
00425
00426 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00427 if(!epInitialized_){
00428 evtProcessor_.forceInitEventProcessorMaybe();
00429 }
00430 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00431
00432 if(!epInitialized_){
00433 evtProcessor_->beginJob();
00434 if(cpustat_) {delete cpustat_; cpustat_=0;}
00435 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00436 nbSubProcesses_.value_,
00437 instance_.value_,
00438 iDieUrl_.value_);
00439 if(ratestat_) {delete ratestat_; ratestat_=0;}
00440 ratestat_ = new RateStat(iDieUrl_.value_);
00441 if(iDieStatisticsGathering_.value_){
00442 try
00443 {
00444 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00445 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00446 if(legenda !=0)
00447 {
00448 std::string slegenda = ((xdata::String*)legenda)->value_;
00449 ratestat_->sendLegenda(slegenda);
00450 }
00451 }
00452 catch(evf::Exception &e)
00453 {
00454 LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
00455 << e.what());
00456 }
00457 catch (xcept::Exception& e) {
00458 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
00459 << e.what());
00460 }
00461 }
00462 epInitialized_ = true;
00463 }
00464 configuration_ = evtProcessor_.configuration();
00465 evtProcessor_.resetLumiSectionReferenceIndex();
00466
00467 if(nbSubProcesses_.value_==0) return enableClassic();
00468
00469 pthread_mutex_lock(&start_lock_);
00470 subs_.clear();
00471 subs_.resize(nbSubProcesses_.value_);
00472 pid_t retval = -1;
00473 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00474 {
00475 subs_[i]=SubProcess(i,retval);
00476 }
00477 pthread_mutex_unlock(&start_lock_);
00478
00479 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00480 {
00481 retval = subs_[i].forkNew();
00482 if(retval==0)
00483 {
00484 myProcess_ = &subs_[i];
00485
00486 delete toolbox::mem::_s_mutex_ptr_;
00487 toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
00488 int retval = pthread_mutex_destroy(&stop_lock_);
00489 if(retval != 0) perror("error");
00490 retval = pthread_mutex_init(&stop_lock_,0);
00491 if(retval != 0) perror("error");
00492 fsm_.disableRcmsStateNotification();
00493 return enableMPEPSlave();
00494
00495 }
00496 }
00497
00498 startSummarizeWorkLoop();
00499 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00500
00501 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00502 fsm_.fireEvent("EnableDone",this);
00503 localLog("-I- Start completed");
00504 return false;
00505 }
00506
00507
00508
00509 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00510 {
00511 if(nbSubProcesses_.value_!=0)
00512 stopSlavesAndAcknowledge();
00513 vulture_->stop();
00514 return stopClassic();
00515 }
00516
00517
00518
00519 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00520 {
00521 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00522 if(nbSubProcesses_.value_!=0)
00523 stopSlavesAndAcknowledge();
00524 try{
00525 evtProcessor_.stopAndHalt();
00526 }
00527 catch (evf::Exception &e) {
00528 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00529 localLog(reasonForFailedState_);
00530 fsm_.fireFailed(reasonForFailedState_,this);
00531 }
00532
00533
00534 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00535 fsm_.fireEvent("HaltDone",this);
00536
00537 localLog("-I- Halt completed");
00538 return false;
00539 }
00540
00541
00542
00543 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00544 throw (xoap::exception::Exception)
00545 {
00546 return fsm_.commandCallback(msg);
00547 }
00548
00549
00550
00551 void FUEventProcessor::actionPerformed(xdata::Event& e)
00552 {
00553
00554 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00555 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00556
00557 if ( item == "parameterSet") {
00558 LOG4CPLUS_WARN(getApplicationLogger(),
00559 "HLT Menu changed, force reinitialization of EventProcessor");
00560 epInitialized_ = false;
00561 }
00562 if ( item == "outputEnabled") {
00563 if(outprev_ != outPut_) {
00564 LOG4CPLUS_WARN(getApplicationLogger(),
00565 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00566 evtProcessor_->enableEndPaths(outPut_);
00567 outprev_ = outPut_;
00568 }
00569 }
00570 if (item == "globalInputPrescale") {
00571 LOG4CPLUS_WARN(getApplicationLogger(),
00572 "Setting global input prescale has no effect "
00573 <<"in this version of the code");
00574 }
00575 if ( item == "globalOutputPrescale") {
00576 LOG4CPLUS_WARN(getApplicationLogger(),
00577 "Setting global output prescale has no effect "
00578 <<"in this version of the code");
00579 }
00580 }
00581
00582 }
00583
00584
00585 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00586 {
00587 using namespace cgicc;
00588 pid_t pid = 0;
00589 std::ostringstream ost;
00590 ost << "&";
00591
00592 Cgicc cgi(in);
00593 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00594 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00595 mycgi->getEnvironment().begin();
00596 mit != mycgi->getEnvironment().end(); mit++)
00597 ost << mit->first << "%" << mit->second << ";";
00598 std::vector<FormEntry> els = cgi.getElements() ;
00599 std::vector<FormEntry> el1;
00600 cgi.getElement("method",el1);
00601 std::vector<FormEntry> el2;
00602 cgi.getElement("process",el2);
00603 if(el1.size()!=0) {
00604 std::string meth = el1[0].getValue();
00605 if(el2.size()!=0) {
00606 unsigned int i = 0;
00607 std::string mod = el2[0].getValue();
00608 pid = atoi(mod.c_str());
00609 for(; i < subs_.size(); i++)
00610 if(subs_[i].pid()==pid) break;
00611 if(i>=subs_.size()){
00612 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00613 return;
00614 }
00615 if(subs_[i].alive() != 1){
00616 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00617 return;
00618 }
00619 MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
00620 strncpy(msg1->mtext,meth.c_str(),meth.length());
00621 strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
00622 subs_[i].post(msg1,true);
00623 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00624 superSleepSec_.value_=10*keep_supersleep_original_value;
00625 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00626 bool done = false;
00627 std::vector<char *>pieces;
00628 while(!done){
00629 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00630 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00631 ::sleep(1);
00632 continue;
00633 }
00634 unsigned int nbytes = atoi(msg->mtext);
00635 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00636 char *buf= new char[nbytes];
00637 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00638 if(retval!=nbytes) std::cout
00639 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00640 pieces.push_back(buf);
00641 }
00642 superSleepSec_.value_=keep_supersleep_original_value;
00643 for(unsigned int j = 0; j < pieces.size(); j++){
00644 *out<<pieces[j];
00645 delete[] pieces[j];
00646 }
00647 }
00648 }
00649 }
00650
00651
00652
00653 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00654 throw (xgi::exception::Exception)
00655 {
00656
00657
00658 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00659 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00660 << getApplicationDescriptor()->getInstance() << "</title>"
00661 << "<meta https-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00662 << "</head></html>";
00663 }
00664
00665
00666
00667
00668
00669 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00670 throw (xgi::exception::Exception)
00671 {
00672
00673 std::string urn = getApplicationDescriptor()->getURN();
00674
00675 *out << "<!-- base href=\"/" << urn
00676 << "\"> -->" << std::endl;
00677 *out << "<html>" << std::endl;
00678 *out << "<head>" << std::endl;
00679 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00680 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00681 *out << "<title>" << getApplicationDescriptor()->getClassName()
00682 << getApplicationDescriptor()->getInstance()
00683 << " MAIN</title>" << std::endl;
00684 *out << "</head>" << std::endl;
00685 *out << "<body>" << std::endl;
00686 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00687 *out << "<tr>" << std::endl;
00688 *out << " <td align=\"left\">" << std::endl;
00689 *out << " <img" << std::endl;
00690 *out << " align=\"middle\"" << std::endl;
00691 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00692 *out << " alt=\"main\"" << std::endl;
00693 *out << " width=\"64\"" << std::endl;
00694 *out << " height=\"64\"" << std::endl;
00695 *out << " border=\"\"/>" << std::endl;
00696 *out << " <b>" << std::endl;
00697 *out << getApplicationDescriptor()->getClassName()
00698 << getApplicationDescriptor()->getInstance() << std::endl;
00699 *out << " " << fsm_.stateName()->toString() << std::endl;
00700 *out << " </b>" << std::endl;
00701 *out << " </td>" << std::endl;
00702 *out << " <td width=\"32\">" << std::endl;
00703 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00704 *out << " <img" << std::endl;
00705 *out << " align=\"middle\"" << std::endl;
00706 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00707 *out << " alt=\"HyperDAQ\"" << std::endl;
00708 *out << " width=\"32\"" << std::endl;
00709 *out << " height=\"32\"" << std::endl;
00710 *out << " border=\"\"/>" << std::endl;
00711 *out << " </a>" << std::endl;
00712 *out << " </td>" << std::endl;
00713 *out << " <td width=\"32\">" << std::endl;
00714 *out << " </td>" << std::endl;
00715 *out << " <td width=\"32\">" << std::endl;
00716 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00717 *out << " <img" << std::endl;
00718 *out << " align=\"middle\"" << std::endl;
00719 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00720 *out << " alt=\"main\"" << std::endl;
00721 *out << " width=\"32\"" << std::endl;
00722 *out << " height=\"32\"" << std::endl;
00723 *out << " border=\"\"/>" << std::endl;
00724 *out << " </a>" << std::endl;
00725 *out << " </td>" << std::endl;
00726 *out << "</tr>" << std::endl;
00727 *out << "</table>" << std::endl;
00728
00729 *out << "<hr/>" << std::endl;
00730
00731 std::ostringstream ost;
00732 if(myProcess_)
00733 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00734 else
00735 ost << "/moduleWeb?";
00736 urn += ost.str();
00737 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00738 evtProcessor_.taskWebPage(in,out,urn);
00739 else if(evtProcessor_)
00740 evtProcessor_.summaryWebPage(in,out,urn);
00741 else
00742 *out << "<td>HLT Unconfigured</td>" << std::endl;
00743 *out << "</table>" << std::endl;
00744
00745 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
00746 *out << configuration_ << std::endl;
00747 *out << "</textarea><P>" << std::endl;
00748
00749 *out << "</body>" << std::endl;
00750 *out << "</html>" << std::endl;
00751
00752
00753 }
00754 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
00755 throw (xgi::exception::Exception)
00756 {
00757
00758 out->getHTTPResponseHeader().addHeader( "Content-Type",
00759 "application/octet-stream" );
00760 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00761 "binary" );
00762 if(evtProcessor_ != 0){
00763 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00764 }
00765 }
00766
00767 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
00768 throw (xgi::exception::Exception)
00769 {
00770
00771 if(evtProcessor_ != 0){
00772 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00773 if(legenda !=0){
00774 std::string slegenda = ((xdata::String*)legenda)->value_;
00775 *out << slegenda << std::endl;
00776 }
00777 }
00778 }
00779
00780 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
00781 {
00782 std::string errmsg;
00783 bool success = false;
00784 try {
00785 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00786 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00787 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00788 if (!success) errmsg = "Failed to attach DQM service to shared memory";
00789 }
00790 catch (cms::Exception& e) {
00791 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00792 }
00793 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00794 }
00795
00796
00797
00798 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00799 {
00800 std::string errmsg;
00801 bool success = false;
00802 try {
00803 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00804 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00805 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00806 if (!success) errmsg = "Failed to detach DQM service from shared memory";
00807 }
00808 catch (cms::Exception& e) {
00809 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00810 }
00811 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00812 }
00813
00814
00815 std::string FUEventProcessor::logsAsString()
00816 {
00817 std::ostringstream oss;
00818 if(logWrap_)
00819 {
00820 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00821 oss << logRing_[i] << std::endl;
00822 for(unsigned int i = 0; i < logRingIndex_; i++)
00823 oss << logRing_[i] << std::endl;
00824 }
00825 else
00826 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00827 oss << logRing_[i] << std::endl;
00828
00829 return oss.str();
00830 }
00831
00832 void FUEventProcessor::localLog(std::string m)
00833 {
00834 timeval tv;
00835
00836 gettimeofday(&tv,0);
00837 tm *uptm = localtime(&tv.tv_sec);
00838 char datestring[256];
00839 strftime(datestring, sizeof(datestring),"%c", uptm);
00840
00841 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00842 logRingIndex_--;
00843 std::ostringstream timestamp;
00844 timestamp << " at " << datestring;
00845 m += timestamp.str();
00846 logRing_[logRingIndex_] = m;
00847 }
00848
00849 void FUEventProcessor::startSupervisorLoop()
00850 {
00851 try {
00852 wlSupervising_=
00853 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00854 "waiting");
00855 if (!wlSupervising_->isActive()) wlSupervising_->activate();
00856 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
00857 "Supervisor");
00858 wlSupervising_->submit(asSupervisor_);
00859 supervising_ = true;
00860 }
00861 catch (xcept::Exception& e) {
00862 std::string msg = "Failed to start workloop 'Supervisor'.";
00863 XCEPT_RETHROW(evf::Exception,msg,e);
00864 }
00865 }
00866
00867 void FUEventProcessor::startReceivingLoop()
00868 {
00869 try {
00870 wlReceiving_=
00871 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
00872 "waiting");
00873 if (!wlReceiving_->isActive()) wlReceiving_->activate();
00874 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
00875 "Receiving");
00876 wlReceiving_->submit(asReceiveMsgAndExecute_);
00877 receiving_ = true;
00878 }
00879 catch (xcept::Exception& e) {
00880 std::string msg = "Failed to start workloop 'Receiving'.";
00881 XCEPT_RETHROW(evf::Exception,msg,e);
00882 }
00883 }
00884 void FUEventProcessor::startReceivingMonitorLoop()
00885 {
00886 try {
00887 wlReceivingMonitor_=
00888 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
00889 "waiting");
00890 if (!wlReceivingMonitor_->isActive())
00891 wlReceivingMonitor_->activate();
00892 asReceiveMsgAndRead_ =
00893 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
00894 "ReceivingM");
00895 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
00896 receivingM_ = true;
00897 }
00898 catch (xcept::Exception& e) {
00899 std::string msg = "Failed to start workloop 'ReceivingM'.";
00900 XCEPT_RETHROW(evf::Exception,msg,e);
00901 }
00902 }
00903
00904 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
00905 {
00906 MsgBuf msg;
00907 try{
00908 myProcess_->rcvSlave(msg,false);
00909 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
00910 {
00911 pthread_mutex_lock(&stop_lock_);
00912 fsm_.fireEvent("Stop",this);
00913 try{
00914 LOG4CPLUS_DEBUG(getApplicationLogger(),
00915 "Trying to create message service presence ");
00916 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00917 if(pf != 0) {
00918 pf->makePresence("MessageServicePresence").release();
00919 }
00920 else {
00921 LOG4CPLUS_ERROR(getApplicationLogger(),
00922 "Unable to create message service presence ");
00923 }
00924 }
00925 catch(...) {
00926 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00927 }
00928 stopClassic();
00929 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
00930 myProcess_->postSlave(msg1,false);
00931 pthread_mutex_unlock(&stop_lock_);
00932 fclose(stdout);
00933 fclose(stderr);
00934 _exit(EXIT_SUCCESS);
00935 }
00936 if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
00937 _exit(EXIT_SUCCESS);
00938 }
00939 catch(evf::Exception &e){}
00940 return true;
00941 }
00942
00943 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
00944 {
00945 pthread_mutex_lock(&stop_lock_);
00946 if(subs_.size()!=nbSubProcesses_.value_)
00947 {
00948 subs_.resize(nbSubProcesses_.value_);
00949 spMStates_.resize(nbSubProcesses_.value_);
00950 spmStates_.resize(nbSubProcesses_.value_);
00951 for(unsigned int i = 0; i < spMStates_.size(); i++)
00952 {
00953 spMStates_[i] = edm::event_processor::sInit;
00954 spmStates_[i] = 0;
00955 }
00956 }
00957 bool running = fsm_.stateName()->toString()=="Enabled";
00958 bool stopping = fsm_.stateName()->toString()=="stopping";
00959 for(unsigned int i = 0; i < subs_.size(); i++)
00960 {
00961 if(subs_[i].alive()==-1000) continue;
00962 int sl;
00963
00964 pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
00965
00966 if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
00967 else continue;
00968 pthread_mutex_lock(&pickup_lock_);
00969 std::ostringstream ost;
00970 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
00971 else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
00972 else ost << " process stopped ";
00973 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
00974 subs_[i].setReasonForFailed(ost.str());
00975 spMStates_[i] = evtProcessor_.notstarted_state_code();
00976 spmStates_[i] = 0;
00977 std::ostringstream ost1;
00978 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
00979 localLog(ost1.str());
00980 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
00981 pthread_mutex_unlock(&pickup_lock_);
00982 }
00983 pthread_mutex_unlock(&stop_lock_);
00984 if(stopping) return true;
00985
00986 if(running)
00987 {
00988
00989
00990 if(autoRestartSlaves_.value_){
00991 pthread_mutex_lock(&stop_lock_);
00992 for(unsigned int i = 0; i < subs_.size(); i++)
00993 {
00994 if(subs_[i].alive() != 1){
00995 if(subs_[i].countdown() == 0)
00996 {
00997 if(subs_[i].restartCount()>2){
00998 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
00999 << " - maximum restart count reached");
01000 std::ostringstream ost1;
01001 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
01002 localLog(ost1.str());
01003 subs_[i].countdown()--;
01004 XCEPT_DECLARE(evf::Exception,
01005 sentinelException, ost1.str());
01006 notifyQualified("error",sentinelException);
01007 subs_[i].disconnect();
01008 continue;
01009 }
01010 subs_[i].restartCount()++;
01011 pid_t rr = subs_[i].forkNew();
01012 if(rr==0)
01013 {
01014 myProcess_=&subs_[i];
01015 scalersUpdates_ = 0;
01016 int retval = pthread_mutex_destroy(&stop_lock_);
01017 if(retval != 0) perror("error");
01018 retval = pthread_mutex_init(&stop_lock_,0);
01019 if(retval != 0) perror("error");
01020 fsm_.disableRcmsStateNotification();
01021 fsm_.fireEvent("Stop",this);
01022 fsm_.fireEvent("StopDone",this);
01023 fsm_.fireEvent("Enable",this);
01024 try{
01025 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
01026 if(lsid) {
01027 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01028 }
01029 }
01030 catch(...){
01031 std::cout << "trouble with lsindex during restart" << std::endl;
01032 }
01033 try{
01034 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01035 if(lstb) {
01036 ((xdata::Boolean*)(lstb))->value_ = false;
01037 }
01038 }
01039 catch(...){
01040 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01041 }
01042
01043 evtProcessor_.adjustLsIndexForRestart();
01044 evtProcessor_.resetPackedTriggerReport();
01045 enableMPEPSlave();
01046 return false;
01047 }
01048 else
01049 {
01050 std::ostringstream ost1;
01051 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01052 localLog(ost1.str());
01053 }
01054 }
01055 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01056 }
01057 }
01058 pthread_mutex_unlock(&stop_lock_);
01059 }
01060 }
01061 xdata::Serializable *lsid = 0;
01062 xdata::Serializable *psid = 0;
01063 xdata::Serializable *dqmp = 0;
01064 xdata::UnsignedInteger32 *dqm = 0;
01065
01066
01067
01068 if(running){
01069 try{
01070 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01071 psid = applicationInfoSpace_->find("prescaleSetIndex");
01072 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01073 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01074 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01075 }
01076 catch(xdata::exception::Exception e){
01077 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01078 }
01079
01080 try{
01081 if(nbProcessed !=0 && nbAccepted !=0)
01082 {
01083 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01084 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01085 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01086 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01087 if(dqmp!=0)
01088 dqm = (xdata::UnsignedInteger32*)dqmp;
01089 if(dqm) dqm->value_ = 0;
01090 nbTotalDQM_ = 0;
01091 nbp->value_ = 0;
01092 nba->value_ = 0;
01093 nblive_ = 0;
01094 nbdead_ = 0;
01095 scalersUpdates_ = 0;
01096
01097 for(unsigned int i = 0; i < subs_.size(); i++)
01098 {
01099 if(subs_[i].alive()>0)
01100 {
01101 nblive_++;
01102 try{
01103 subs_[i].post(master_message_prg_,true);
01104
01105 unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
01106 if(retval == (unsigned long) master_message_prr_->mtype){
01107 prg* p = (struct prg*)(master_message_prr_->mtext);
01108 subs_[i].setParams(p);
01109 spMStates_[i] = p->Ms;
01110 spmStates_[i] = p->ms;
01111 cpustat_->addEntry(p->ms);
01112 if(!subs_[i].inInconsistentState() &&
01113 (p->Ms == edm::event_processor::sError
01114 || p->Ms == edm::event_processor::sInvalid
01115 || p->Ms == edm::event_processor::sStopping))
01116 {
01117 std::ostringstream ost;
01118 ost << "edm::eventprocessor slot " << i << " process id "
01119 << subs_[i].pid() << " not in Running state : Mstate="
01120 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01121 << evtProcessor_.moduleNameFromIndex(p->ms)
01122 << " - Look into possible error messages from HLT process";
01123 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01124 }
01125 nbp->value_ += subs_[i].params().nbp;
01126 nba->value_ += subs_[i].params().nba;
01127 if(dqm)dqm->value_ += p->dqm;
01128 nbTotalDQM_ += p->dqm;
01129 scalersUpdates_ += p->trp;
01130 if(p->ls > ls->value_) ls->value_ = p->ls;
01131 if(p->ps != ps->value_) ps->value_ = p->ps;
01132 }
01133 else{
01134 nbp->value_ += subs_[i].get_save_nbp();
01135 nba->value_ += subs_[i].get_save_nba();
01136 }
01137 }
01138 catch(evf::Exception &e){
01139 LOG4CPLUS_INFO(getApplicationLogger(),
01140 "could not send/receive msg on slot "
01141 << i << " - " << e.what());
01142 }
01143
01144 }
01145 else
01146 {
01147 nbp->value_ += subs_[i].get_save_nbp();
01148 nba->value_ += subs_[i].get_save_nba();
01149 nbdead_++;
01150 }
01151 }
01152 if(nbp->value_>64){
01153 for(unsigned int i = 0; i < subs_.size(); i++)
01154 {
01155 if(subs_[i].params().nbp == 0){
01156
01157 if(subs_[i].alive()>0 && subs_[i].params().ms == 0)
01158 {
01159 subs_[i].found_invalid();
01160 if(subs_[i].nfound_invalid() > 60){
01161 MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP);
01162 subs_[i].post(msg3,false);
01163 std::ostringstream ost1;
01164 ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
01165 localLog(ost1.str());
01166 LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
01167 XCEPT_DECLARE(evf::Exception,
01168 sentinelException, ost1.str());
01169 notifyQualified("error",sentinelException);
01170
01171 }
01172 }
01173 }
01174 }
01175 }
01176 }
01177 }
01178 catch(std::exception &e){
01179 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01180 }
01181 catch(...){
01182 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01183 }
01184 }
01185 else{
01186 for(unsigned int i = 0; i < subs_.size(); i++)
01187 {
01188 if(subs_[i].alive()==-1000)
01189 {
01190 spMStates_[i] = edm::event_processor::sInit;
01191 spmStates_[i] = 0;
01192 }
01193 }
01194 }
01195 try{
01196 monitorInfoSpace_->lock();
01197 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01198 monitorInfoSpace_->unlock();
01199 }
01200 catch(xdata::exception::Exception &e)
01201 {
01202 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01203
01204 }
01205 ::sleep(superSleepSec_.value_);
01206 return true;
01207 }
01208
01209 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01210 {
01211 try {
01212 wlScalers_=
01213 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01214 "waiting");
01215 if (!wlScalers_->isActive()) wlScalers_->activate();
01216 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01217 "Scalers");
01218
01219 wlScalers_->submit(asScalers_);
01220 wlScalersActive_ = true;
01221 }
01222 catch (xcept::Exception& e) {
01223 std::string msg = "Failed to start workloop 'Scalers'.";
01224 XCEPT_RETHROW(evf::Exception,msg,e);
01225 }
01226 }
01227
01228
01229
01230 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01231 {
01232 try {
01233 wlSummarize_=
01234 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01235 "waiting");
01236 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01237
01238 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01239 "Summary");
01240
01241 wlSummarize_->submit(asSummarize_);
01242 wlSummarizeActive_ = true;
01243 }
01244 catch (xcept::Exception& e) {
01245 std::string msg = "Failed to start workloop 'Summarize'.";
01246 XCEPT_RETHROW(evf::Exception,msg,e);
01247 }
01248 }
01249
01250
01251
01252 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01253 {
01254 if(evtProcessor_)
01255 {
01256 if(!evtProcessor_.getTriggerReport(true)) {
01257 wlScalersActive_ = false;
01258 return false;
01259 }
01260 }
01261 else
01262 {
01263 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01264 wlScalersActive_ = false;
01265 return false;
01266 }
01267 if(myProcess_)
01268 {
01269
01270 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01271 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01272 scalersUpdates_++;
01273 }
01274 else
01275 evtProcessor_.fireScalersUpdate();
01276 return true;
01277 }
01278
01279
01280 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01281 {
01282 evtProcessor_.resetPackedTriggerReport();
01283 bool atLeastOneProcessUpdatedSuccessfully = false;
01284 int msgCount = 0;
01285 for (unsigned int i = 0; i < subs_.size(); i++)
01286 {
01287 if(subs_[i].alive()>0)
01288 {
01289 int ret = 0;
01290 if(subs_[i].check_postponed_trigger_update(master_message_trr_,
01291 evtProcessor_.getLumiSectionReferenceIndex()))
01292 {
01293 ret = MSQS_MESSAGE_TYPE_TRR;
01294 std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01295 }
01296 else{
01297 bool insync = false;
01298 bool exception_caught = false;
01299 while(!insync){
01300 try{
01301 ret = subs_[i].rcv(master_message_trr_,false);
01302 }
01303 catch(evf::Exception &e)
01304 {
01305 std::cout << "exception in msgrcv on " << i
01306 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01307 exception_caught = true;
01308 break;
01309
01310 }
01311 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01312 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01313 if(trp->lumiSection >= evtProcessor_.getLumiSectionReferenceIndex()){
01314 insync = true;
01315 }
01316 }
01317 }
01318 if(exception_caught) continue;
01319 }
01320 msgCount++;
01321 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01322 TriggerReportStatic *trp = (TriggerReportStatic *)master_message_trr_->mtext;
01323 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01324 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01325 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01326 subs_[i].add_postponed_trigger_update(master_message_trr_);
01327 }else{
01328 atLeastOneProcessUpdatedSuccessfully = true;
01329 evtProcessor_.sumAndPackTriggerReport(master_message_trr_);
01330 }
01331 }
01332 else std::cout << "msgrcv returned error " << errno << std::endl;
01333 }
01334 }
01335 if(atLeastOneProcessUpdatedSuccessfully){
01336 nbSubProcessesReporting_.value_ = msgCount;
01337 evtProcessor_.getPackedTriggerReportAsStruct()->nbExpected = nbSubProcesses_.value_;
01338 evtProcessor_.getPackedTriggerReportAsStruct()->nbReporting = nbSubProcessesReporting_.value_;
01339 evtProcessor_.updateRollingReport();
01340 evtProcessor_.fireScalersUpdate();
01341 }
01342 else{
01343 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01344 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01345 nbSubProcessesReporting_.value_ = 0;
01346 ::sleep(10);
01347 }
01348 if(fsm_.stateName()->toString()!="Enabled"){
01349 wlScalersActive_ = false;
01350 return false;
01351 }
01352
01353 if(iDieStatisticsGathering_.value_){
01354 try{
01355 TriggerReportStatic *trsp = evtProcessor_.getPackedTriggerReportAsStruct();
01356 cpustat_ ->setNproc(trsp->eventSummary.totalEvents);
01357 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01358 ratestat_->sendStat((unsigned char*)trsp,
01359 sizeof(TriggerReportStatic),
01360 evtProcessor_.getLumiSectionReferenceIndex());
01361 }catch(evf::Exception &e){
01362 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01363 << e.what());
01364 }
01365 }
01366 cpustat_->reset();
01367 return true;
01368 }
01369
01370
01371
01372 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01373 {
01374 try{
01375 myProcess_->rcvSlave(slave_message_monitoring_,true);
01376 switch(slave_message_monitoring_->mtype)
01377 {
01378 case MSQM_MESSAGE_TYPE_MCS:
01379 {
01380 xgi::Input *in = 0;
01381 xgi::Output out;
01382 evtProcessor_.microState(in,&out);
01383 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01384 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01385 myProcess_->postSlave(msg1,true);
01386 break;
01387 }
01388
01389 case MSQM_MESSAGE_TYPE_PRG:
01390 {
01391 xdata::Serializable *dqmp = 0;
01392 xdata::UnsignedInteger32 *dqm = 0;
01393 evtProcessor_.monitoring(0);
01394 try{
01395 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01396 } catch(xdata::exception::Exception e){}
01397 if(dqmp!=0)
01398 dqm = (xdata::UnsignedInteger32*)dqmp;
01399
01400
01401 prg * data = (prg*)slave_message_prr_->mtext;
01402 data->ls = evtProcessor_.lsid_;
01403 data->eols = evtProcessor_.lastLumiUsingEol_;
01404 data->ps = evtProcessor_.psid_;
01405 data->nbp = evtProcessor_->totalEvents();
01406 data->nba = evtProcessor_->totalEventsPassed();
01407 data->Ms = evtProcessor_.epMAltState_.value_;
01408 data->ms = evtProcessor_.epmAltState_.value_;
01409 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01410 data->trp = scalersUpdates_;
01411
01412 myProcess_->postSlave(slave_message_prr_,true);
01413 if(exitOnError_.value_)
01414 {
01415
01416
01417 if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError)
01418 {
01419 bool running = true;
01420 int count = 0;
01421 while(running){
01422 int retval = pthread_mutex_lock(&stop_lock_);
01423 if(retval != 0) perror("error");
01424 running = fsm_.stateName()->toString()=="Enabled";
01425 if(count>5) _exit(-1);
01426 pthread_mutex_unlock(&stop_lock_);
01427 if(running) {::sleep(1); count++;}
01428 }
01429 }
01430
01431 }
01432
01433 break;
01434 }
01435 case MSQM_MESSAGE_TYPE_WEB:
01436 {
01437 xgi::Input *in = 0;
01438 xgi::Output out;
01439 unsigned int bytesToSend = 0;
01440 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01441 std::string query = slave_message_monitoring_->mtext;
01442 size_t pos = query.find_first_of("&");
01443 std::string method;
01444 std::string args;
01445 if(pos!=std::string::npos)
01446 {
01447 method = query.substr(0,pos);
01448 args = query.substr(pos+1,query.length()-pos-1);
01449 }
01450 else
01451 method=query;
01452
01453 if(method=="Spotlight")
01454 {
01455 spotlightWebPage(in,&out);
01456 }
01457 else if(method=="procStat")
01458 {
01459 procStat(in,&out);
01460 }
01461 else if(method=="moduleWeb")
01462 {
01463 internal::MyCgi mycgi;
01464 boost::char_separator<char> sep(";");
01465 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01466 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01467 tok_iter != tokens.end(); ++tok_iter){
01468 size_t pos = (*tok_iter).find_first_of("%");
01469 if(pos != std::string::npos){
01470 std::string first = (*tok_iter).substr(0 , pos);
01471 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01472 mycgi.getEnvironment()[first]=second;
01473 }
01474 }
01475 moduleWeb(&mycgi,&out);
01476 }
01477 else if(method=="Default")
01478 {
01479 defaultWebPage(in,&out);
01480 }
01481 else
01482 {
01483 out << "Error 404!!!!!!!!" << std::endl;
01484 }
01485
01486
01487 bytesToSend = out.str().size();
01488 unsigned int cycle = 0;
01489 if(bytesToSend==0)
01490 {
01491 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01492 myProcess_->postSlave(msg1,true);
01493 }
01494 while(bytesToSend !=0){
01495 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01496 write(anonymousPipe_[PIPE_WRITE],
01497 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01498 msgSize);
01499 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01500 myProcess_->postSlave(msg1,true);
01501 bytesToSend -= msgSize;
01502 cycle++;
01503 }
01504 break;
01505 }
01506 case MSQM_MESSAGE_TYPE_TRP:
01507 {
01508 break;
01509 }
01510 }
01511 }
01512 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01513 return true;
01514 }
01515
01516 bool FUEventProcessor::enableCommon()
01517 {
01518 try {
01519 if(hasShMem_) attachDqmToShm();
01520 int sc = 0;
01521 evtProcessor_->clearCounters();
01522 if(isRunNumberSetter_)
01523 evtProcessor_->setRunNumber(runNumber_.value_);
01524 else
01525 evtProcessor_->declareRunNumber(runNumber_.value_);
01526 try{
01527 ::sleep(1);
01528 evtProcessor_->runAsync();
01529 sc = evtProcessor_->statusAsync();
01530 }
01531 catch(cms::Exception &e) {
01532 reasonForFailedState_ = e.explainSelf();
01533 fsm_.fireFailed(reasonForFailedState_,this);
01534 localLog(reasonForFailedState_);
01535 return false;
01536 }
01537 catch(std::exception &e) {
01538 reasonForFailedState_ = e.what();
01539 fsm_.fireFailed(reasonForFailedState_,this);
01540 localLog(reasonForFailedState_);
01541 return false;
01542 }
01543 catch(...) {
01544 reasonForFailedState_ = "Unknown Exception";
01545 fsm_.fireFailed(reasonForFailedState_,this);
01546 localLog(reasonForFailedState_);
01547 return false;
01548 }
01549 if(sc != 0) {
01550 std::ostringstream oss;
01551 oss<<"EventProcessor::runAsync returned status code " << sc;
01552 reasonForFailedState_ = oss.str();
01553 fsm_.fireFailed(reasonForFailedState_,this);
01554 localLog(reasonForFailedState_);
01555 return false;
01556 }
01557 }
01558 catch (xcept::Exception &e) {
01559 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01560 fsm_.fireFailed(reasonForFailedState_,this);
01561 localLog(reasonForFailedState_);
01562 return false;
01563 }
01564 try{
01565 fsm_.fireEvent("EnableDone",this);
01566 }
01567 catch (xcept::Exception &e) {
01568 std::cout << "exception " << (std::string)e.what() << std::endl;
01569 throw;
01570 }
01571
01572 return false;
01573 }
01574
01575 bool FUEventProcessor::enableClassic()
01576 {
01577 bool retval = enableCommon();
01578 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01579 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01580 ::sleep(1);
01581 }
01582
01583
01584
01585 localLog("-I- Start completed");
01586 return retval;
01587 }
01588 bool FUEventProcessor::enableMPEPSlave()
01589 {
01590
01591 startReceivingLoop();
01592 startReceivingMonitorLoop();
01593 evtProcessor_.resetWaiting();
01594 startScalersWorkLoop();
01595 while(!evtProcessor_.isWaitingForLs())
01596 ::sleep(1);
01597
01598
01599 try{
01600
01601 try{
01602 LOG4CPLUS_DEBUG(getApplicationLogger(),
01603 "Trying to create message service presence ");
01604 edm::PresenceFactory *pf = edm::PresenceFactory::get();
01605 if(pf != 0) {
01606 pf->makePresence("MessageServicePresence").release();
01607 }
01608 else {
01609 LOG4CPLUS_ERROR(getApplicationLogger(),
01610 "Unable to create message service presence ");
01611 }
01612 }
01613 catch(...) {
01614 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01615 }
01616 ML::MLlog4cplus::setAppl(this);
01617 }
01618 catch (xcept::Exception &e) {
01619 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01620 fsm_.fireFailed(reasonForFailedState_,this);
01621 localLog(reasonForFailedState_);
01622 }
01623 bool retval = enableCommon();
01624
01625
01626
01627
01628 return retval;
01629 }
01630
01631 bool FUEventProcessor::stopClassic()
01632 {
01633 try {
01634 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
01635 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
01636 if(rc == edm::EventProcessor::epSuccess)
01637 fsm_.fireEvent("StopDone",this);
01638 else
01639 {
01640
01641 if(rc == edm::EventProcessor::epTimedOut)
01642 reasonForFailedState_ = "EventProcessor stop timed out";
01643 else
01644 reasonForFailedState_ = "EventProcessor did not receive STOP event";
01645 fsm_.fireFailed(reasonForFailedState_,this);
01646 localLog(reasonForFailedState_);
01647 }
01648 if(hasShMem_) detachDqmFromShm();
01649 }
01650 catch (xcept::Exception &e) {
01651 reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
01652 localLog(reasonForFailedState_);
01653 fsm_.fireFailed(reasonForFailedState_,this);
01654 }
01655 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
01656 localLog("-I- Stop completed");
01657 return false;
01658 }
01659
01660 void FUEventProcessor::stopSlavesAndAcknowledge()
01661 {
01662 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
01663 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
01664
01665 std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
01666 for(unsigned int i = 0; i < subs_.size(); i++)
01667 {
01668 pthread_mutex_lock(&stop_lock_);
01669 if(subs_[i].alive()>0){
01670 processes_to_stop[i] = true;
01671 subs_[i].post(msg,false);
01672 }
01673 pthread_mutex_unlock(&stop_lock_);
01674 }
01675 for(unsigned int i = 0; i < subs_.size(); i++)
01676 {
01677 pthread_mutex_lock(&stop_lock_);
01678 if(processes_to_stop[i]){
01679 try{
01680 subs_[i].rcv(msg1,false);
01681 }
01682 catch(evf::Exception &e){
01683 std::ostringstream ost;
01684 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
01685 reasonForFailedState_ = ost.str();
01686 LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
01687
01688 localLog(reasonForFailedState_);
01689 pthread_mutex_unlock(&stop_lock_);
01690 continue;
01691 }
01692 }
01693 else {
01694 pthread_mutex_unlock(&stop_lock_);
01695 continue;
01696 }
01697 pthread_mutex_unlock(&stop_lock_);
01698 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
01699 while(subs_[i].alive()>0) ::usleep(10000);
01700 subs_[i].disconnect();
01701 }
01702
01703
01704 }
01705
01706 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
01707 {
01708 std::string urn = getApplicationDescriptor()->getURN();
01709 try{
01710 evtProcessor_.stateNameFromIndex(0);
01711 evtProcessor_.moduleNameFromIndex(0);
01712 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
01713 *out << "<tr><td>" << fsm_.stateName()->toString()
01714 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
01715 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
01716 evtProcessor_.microState(in,out);
01717 *out << "<td></td><td>" << nbTotalDQM_
01718 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
01719 if(nbSubProcesses_.value_!=0 && !myProcess_)
01720 {
01721 pthread_mutex_lock(&start_lock_);
01722 for(unsigned int i = 0; i < subs_.size(); i++)
01723 {
01724 try{
01725 if(subs_[i].alive()>0)
01726 {
01727 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
01728 << i << "\">""Alive</td><td>S</td><td>"
01729 << subs_[i].queueId() << "<td>"
01730 << subs_[i].queueStatus()<< "/"
01731 << subs_[i].queueOccupancy() << "/"
01732 << subs_[i].queuePidOfLastSend() << "/"
01733 << subs_[i].queuePidOfLastReceive()
01734 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
01735 << subs_[i].pid() << "&method=procStat\">"
01736 << subs_[i].pid()<<"</a></td>"
01737 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
01738 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
01739 << subs_[i].params().nba << "/" << subs_[i].params().nbp
01740 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
01741 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
01742 << "</td><td>" << subs_[i].params().ps
01743 << "</td><td"
01744 << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
01745 << subs_[i].params().eols
01746 << "</td><td>" << subs_[i].params().dqm
01747 << "</td><td>" << subs_[i].params().trp << "</td>";
01748 }
01749 else
01750 {
01751 pthread_mutex_lock(&pickup_lock_);
01752 *out << "<tr><td id=\"a"<< i << "\" ";
01753 if(subs_[i].alive()==-1000)
01754 *out << " bgcolor=\"#bbaabb\">NotInitialized";
01755 else
01756 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
01757 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
01758 << subs_[i].queueStatus() << "/"
01759 << subs_[i].queueOccupancy() << "/"
01760 << subs_[i].queuePidOfLastSend() << "/"
01761 << subs_[i].queuePidOfLastReceive()
01762 << "</td><td id=\"p"<< i << "\">"
01763 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
01764 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
01765 {
01766 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
01767 *out << " will restart in " << subs_[i].countdown() << " s";
01768 else if(autoRestartSlaves_)
01769 *out << " reached maximum restart count";
01770 else *out << " autoRestart is disabled ";
01771 }
01772 *out << "</td><td"
01773 << ((subs_[i].params().eols<subs_[i].params().ls) ?
01774 " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
01775 << ">"
01776 << subs_[i].params().eols
01777 << "</td><td>" << subs_[i].params().dqm
01778 << "</td><td>" << subs_[i].params().trp << "</td>";
01779 pthread_mutex_unlock(&pickup_lock_);
01780 }
01781 *out << "</tr>";
01782 }
01783 catch(evf::Exception &e){
01784 *out << "<tr><td id=\"a"<< i << "\" "
01785 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
01786 << subs_[i].queueStatus() << "/"
01787 << subs_[i].queueOccupancy() << "/"
01788 << subs_[i].queuePidOfLastSend() << "/"
01789 << subs_[i].queuePidOfLastReceive()
01790 << "</td><td id=\"p"<< i << "\">"
01791 <<subs_[i].pid()<<"</td>";
01792 }
01793 }
01794 pthread_mutex_unlock(&start_lock_);
01795 }
01796 }
01797 catch(evf::Exception &e)
01798 {
01799 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
01800 }
01801 catch(cms::Exception &e)
01802 {
01803 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
01804 }
01805 catch(std::exception &e)
01806 {
01807 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
01808 }
01809 catch(...)
01810 {
01811 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
01812 }
01813
01814 }
01815
01816
01817 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
01818 {
01819 using namespace utils;
01820
01821 *out << updaterStatic_;
01822 mDiv(out,"loads");
01823 uptime(out);
01824 cDiv(out);
01825 mDiv(out,"st",fsm_.stateName()->toString());
01826 mDiv(out,"ru",runNumber_.toString());
01827 mDiv(out,"nsl",nbSubProcesses_.value_);
01828 mDiv(out,"nsr",nbSubProcessesReporting_.value_);
01829 mDiv(out,"cl");
01830 *out << getApplicationDescriptor()->getClassName()
01831 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
01832 cDiv(out);
01833 mDiv(out,"in",getApplicationDescriptor()->getInstance());
01834 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
01835 mDiv(out,"hlt");
01836 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
01837 cDiv(out);
01838 *out << std::endl;
01839 }
01840 else
01841 mDiv(out,"hlt","Not yet...");
01842
01843 mDiv(out,"sq",squidPresent_.toString());
01844 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
01845 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
01846 if(nbProcessed != 0 && nbAccepted != 0)
01847 {
01848 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
01849 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
01850 }
01851 else
01852 {
01853 mDiv(out,"tt",0);
01854 mDiv(out,"ac",0);
01855 }
01856 if(!myProcess_)
01857 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
01858 else
01859 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
01860
01861 mDiv(out,"idi",iDieUrl_.value_);
01862 if(vp_!=0){
01863 mDiv(out,"vpi",(unsigned int) vp_);
01864 if(vulture_->hasStarted()>=0)
01865 mDiv(out,"vul","Prowling");
01866 else
01867 mDiv(out,"vul","Dead");
01868 }
01869 else{
01870 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
01871 }
01872 if(evtProcessor_){
01873 mDiv(out,"ll");
01874 *out << evtProcessor_.lastLumi().ls
01875 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
01876 cDiv(out);
01877 }
01878 mDiv(out,"lg");
01879 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
01880 *out << logRing_[i] << std::endl;
01881 if(logWrap_)
01882 for(unsigned int i = 0; i<logRingIndex_; i++)
01883 *out << logRing_[i] << std::endl;
01884 cDiv(out);
01885 mDiv(out,"cha");
01886 if(cpustat_) *out << cpustat_->getChart();
01887 cDiv(out);
01888 }
01889
01890 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
01891 {
01892 evf::utils::procStat(out);
01893 }
01894
01895 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
01896 {
01897 if(myProcess_) myProcess_->postSlave(buf,true);
01898 }
01899
01900 void FUEventProcessor::makeStaticInfo()
01901 {
01902 using namespace utils;
01903 std::ostringstream ost;
01904 mDiv(&ost,"ve");
01905 ost<< "$Revision: 1.134 $ (" << edm::getReleaseVersion() <<")";
01906 cDiv(&ost);
01907 mDiv(&ost,"ou",outPut_.toString());
01908 mDiv(&ost,"sh",hasShMem_.toString());
01909 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
01910 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
01911
01912 xdata::Serializable *monsleep = 0;
01913 xdata::Serializable *lstimeout = 0;
01914 try{
01915 monsleep = applicationInfoSpace_->find("monSleepSec");
01916 lstimeout = applicationInfoSpace_->find("lsTimeOut");
01917 }
01918 catch(xdata::exception::Exception e){
01919 }
01920
01921 if(monsleep!=0)
01922 mDiv(&ost,"ms",monsleep->toString());
01923 if(lstimeout!=0)
01924 mDiv(&ost,"lst",lstimeout->toString());
01925 char cbuf[sizeof(struct utsname)];
01926 struct utsname* buf = (struct utsname*)cbuf;
01927 uname(buf);
01928 mDiv(&ost,"sysinfo");
01929 ost << buf->sysname << " " << buf->nodename
01930 << " " << buf->release << " " << buf->version << " " << buf->machine;
01931 cDiv(&ost);
01932 updaterStatic_ = ost.str();
01933 }
01934
01935 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)