00001
00002
00003
00004
00005
00007
00008 #include "FUEventProcessor.h"
00009 #include "procUtils.h"
00010 #include "EventFilter/Utilities/interface/CPUStat.h"
00011 #include "EventFilter/Utilities/interface/RateStat.h"
00012
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include "EventFilter/Message2log4cplus/interface/MLlog4cplus.h"
00016 #include "EventFilter/Modules/interface/FUShmDQMOutputService.h"
00017 #include "EventFilter/Utilities/interface/ServiceWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ServiceWeb.h"
00019
00020
00021 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00022 #include "FWCore/PluginManager/interface/PresenceFactory.h"
00023 #include "FWCore/Utilities/interface/Presence.h"
00024 #include "FWCore/Utilities/interface/Exception.h"
00025 #include "FWCore/Version/interface/GetReleaseVersion.h"
00026 #include "FWCore/ServiceRegistry/interface/Service.h"
00027
00028 #include <boost/tokenizer.hpp>
00029
00030 #include "xcept/tools.h"
00031 #include "xgi/Method.h"
00032
00033 #include "cgicc/CgiDefs.h"
00034 #include "cgicc/Cgicc.h"
00035 #include "cgicc/FormEntry.h"
00036
00037
00038 #include <sys/wait.h>
00039 #include <sys/utsname.h>
00040
00041 #include <typeinfo>
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <errno.h>
00045
00046 using namespace evf;
00047 using namespace cgicc;
00048
00049
00051
00053
00054
00055 FUEventProcessor::FUEventProcessor(xdaq::ApplicationStub *s)
00056 : xdaq::Application(s)
00057 , fsm_(this)
00058 , log_(getApplicationLogger())
00059 , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
00060 , runNumber_(0)
00061 , epInitialized_(false)
00062 , outPut_(true)
00063 , autoRestartSlaves_(false)
00064 , slaveRestartDelaySecs_(10)
00065 , hasShMem_(true)
00066 , hasPrescaleService_(true)
00067 , hasModuleWebRegistry_(true)
00068 , hasServiceWebRegistry_(true)
00069 , isRunNumberSetter_(true)
00070 , iDieStatisticsGathering_(false)
00071 , outprev_(true)
00072 , exitOnError_(true)
00073 , reasonForFailedState_()
00074 , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
00075 , logRing_(logRingSize_)
00076 , logRingIndex_(logRingSize_)
00077 , logWrap_(false)
00078 , nbSubProcesses_(0)
00079 , nbSubProcessesReporting_(0)
00080 , nblive_(0)
00081 , nbdead_(0)
00082 , nbTotalDQM_(0)
00083 , wlReceiving_(0)
00084 , asReceiveMsgAndExecute_(0)
00085 , receiving_(false)
00086 , wlReceivingMonitor_(0)
00087 , asReceiveMsgAndRead_(0)
00088 , receivingM_(false)
00089 , myProcess_(0)
00090 , wlSupervising_(0)
00091 , asSupervisor_(0)
00092 , supervising_(false)
00093 , monitorInfoSpace_(0)
00094 , monitorLegendaInfoSpace_(0)
00095 , applicationInfoSpace_(0)
00096 , nbProcessed(0)
00097 , nbAccepted(0)
00098 , scalersInfoSpace_(0)
00099 , scalersLegendaInfoSpace_(0)
00100 , wlScalers_(0)
00101 , asScalers_(0)
00102 , wlScalersActive_(false)
00103 , scalersUpdates_(0)
00104 , wlSummarize_(0)
00105 , asSummarize_(0)
00106 , wlSummarizeActive_(false)
00107 , superSleepSec_(1)
00108 , iDieUrl_("none")
00109 , vulture_(0)
00110 , vp_(0)
00111 , cpustat_(0)
00112 , ratestat_(0)
00113 {
00114 using namespace utils;
00115
00116 names_.push_back("nbProcessed" );
00117 names_.push_back("nbAccepted" );
00118 names_.push_back("epMacroStateInt");
00119 names_.push_back("epMicroStateInt");
00120
00121 int retpipe = pipe(anonymousPipe_);
00122 if(retpipe != 0)
00123 LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
00124
00125 squidPresent_ = squidnet_.check();
00126
00127 evtProcessor_.setAppDesc(getApplicationDescriptor());
00128 evtProcessor_.setAppCtxt(getApplicationContext());
00129
00130 fsm_.initialize<evf::FUEventProcessor>(this);
00131
00132
00133 url_ =
00134 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00135 getApplicationDescriptor()->getURN();
00136 class_ =getApplicationDescriptor()->getClassName();
00137 instance_=getApplicationDescriptor()->getInstance();
00138 sourceId_=class_.toString()+"_"+instance_.toString();
00139 LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
00140 LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
00141
00142 getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
00143
00144 xdata::InfoSpace *ispace = getApplicationInfoSpace();
00145 applicationInfoSpace_ = ispace;
00146
00147
00148 ispace->fireItemAvailable("parameterSet", &configString_ );
00149 ispace->fireItemAvailable("epInitialized", &epInitialized_ );
00150 ispace->fireItemAvailable("stateName", fsm_.stateName() );
00151 ispace->fireItemAvailable("runNumber", &runNumber_ );
00152 ispace->fireItemAvailable("outputEnabled", &outPut_ );
00153
00154 ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
00155 ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
00156 ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
00157 ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
00158 ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
00159 ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
00160 ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
00161 ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
00162 ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
00163 ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
00164 ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
00165 ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
00166 ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
00167 ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
00168
00169
00170 getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
00171 getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
00172
00173
00174 fsm_.findRcmsStateListener();
00175
00176
00177
00178 std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
00179 toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
00180 monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00181
00182 std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
00183 urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
00184 monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00185
00186
00187 monitorInfoSpace_->fireItemAvailable("url", &url_ );
00188 monitorInfoSpace_->fireItemAvailable("class", &class_ );
00189 monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
00190 monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
00191 monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
00192
00193 monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
00194
00195 std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
00196 urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
00197 scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00198
00199 std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
00200 urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
00201 scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
00202
00203
00204
00205 evtProcessor_.setScalersInfoSpace(scalersInfoSpace_,scalersLegendaInfoSpace_);
00206 scalersInfoSpace_->fireItemAvailable("instance", &instance_);
00207
00208 evtProcessor_.setApplicationInfoSpace(ispace);
00209 evtProcessor_.setMonitorInfoSpace(monitorInfoSpace_,monitorLegendaInfoSpace_);
00210 evtProcessor_.publishConfigAndMonitorItems(nbSubProcesses_.value_!=0);
00211
00212
00213 monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
00214 monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
00215
00216
00217 xgi::bind(this, &FUEventProcessor::css, "styles.css");
00218 xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
00219 xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
00220 xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
00221 xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
00222 xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
00223 xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
00224 xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
00225 xgi::bind(this, &FUEventProcessor::microState, "microState");
00226 xgi::bind(this, &FUEventProcessor::updater, "updater" );
00227 xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
00228
00229
00230
00231 edm::AssertHandler ah;
00232
00233 try{
00234 LOG4CPLUS_DEBUG(getApplicationLogger(),
00235 "Trying to create message service presence ");
00236 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00237 if(pf != 0) {
00238 pf->makePresence("MessageServicePresence").release();
00239 }
00240 else {
00241 LOG4CPLUS_ERROR(getApplicationLogger(),
00242 "Unable to create message service presence ");
00243 }
00244 }
00245 catch(...) {
00246 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00247 }
00248 ML::MLlog4cplus::setAppl(this);
00249
00250 typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00251 typedef AppDescSet_t::iterator AppDescIter_t;
00252
00253 AppDescSet_t rcms=
00254 getApplicationContext()->getDefaultZone()->
00255 getApplicationDescriptors("RCMSStateListener");
00256 if(rcms.size()==0)
00257 {
00258 LOG4CPLUS_WARN(getApplicationLogger(),
00259 "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
00260
00261 }
00262 else
00263 {
00264 AppDescIter_t it = rcms.begin();
00265 evtProcessor_.setRcms(*it);
00266 }
00267 pthread_mutex_init(&start_lock_,0);
00268 pthread_mutex_init(&stop_lock_,0);
00269 pthread_mutex_init(&pickup_lock_,0);
00270
00271 makeStaticInfo();
00272 startSupervisorLoop();
00273
00274 if(vulture_==0) vulture_ = new Vulture(true);
00275
00277
00278 AppDescSet_t setOfiDie=
00279 getApplicationContext()->getDefaultZone()->
00280 getApplicationDescriptors("evf::iDie");
00281
00282 for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
00283 if ((*it)->getInstance()==0)
00284 iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
00285 }
00286
00287
00288
00289
00290 FUEventProcessor::~FUEventProcessor()
00291 {
00292
00293
00294
00295 if(vulture_ != 0) delete vulture_;
00296 }
00297
00298
00300
00302
00303
00304
00305 bool FUEventProcessor::configuring(toolbox::task::WorkLoop* wl)
00306 {
00307
00308
00309
00310
00311
00312 unsigned short smap
00313 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00314 + ((instance_.value_==0) ? 0x8 : 0)
00315 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00316 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00317 + (hasPrescaleService_.value_ ? 0x1 : 0);
00318 if(nbSubProcesses_.value_==0)
00319 {
00320 spMStates_.setSize(1);
00321 spmStates_.setSize(1);
00322 }
00323 else
00324 {
00325 spMStates_.setSize(nbSubProcesses_.value_);
00326 spmStates_.setSize(nbSubProcesses_.value_);
00327 for(unsigned int i = 0; i < spMStates_.size(); i++)
00328 {
00329 spMStates_[i] = edm::event_processor::sInit;
00330 spmStates_[i] = 0;
00331 }
00332 }
00333 try {
00334 LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
00335 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00336 epInitialized_=true;
00337 if(evtProcessor_)
00338 {
00339
00340 configuration_ = evtProcessor_.configuration();
00341 if(nbSubProcesses_.value_==0) evtProcessor_.startMonitoringWorkLoop();
00342 evtProcessor_->beginJob();
00343 if(cpustat_) {delete cpustat_; cpustat_=0;}
00344 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00345 iDieUrl_.value_);
00346 if(ratestat_) {delete ratestat_; ratestat_=0;}
00347 ratestat_ = new RateStat(iDieUrl_.value_);
00348 if(iDieStatisticsGathering_.value_){
00349 try{
00350 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00351 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00352 if(legenda !=0){
00353 std::string slegenda = ((xdata::String*)legenda)->value_;
00354 ratestat_->sendLegenda(slegenda);
00355 }
00356
00357 }
00358 catch(evf::Exception &e){
00359 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00360 << e.what());
00361 }
00362 }
00363
00364 fsm_.fireEvent("ConfigureDone",this);
00365 LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
00366 localLog("-I- Configuration completed");
00367
00368 }
00369 }
00370 catch (xcept::Exception &e) {
00371 reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
00372 fsm_.fireFailed(reasonForFailedState_,this);
00373 localLog(reasonForFailedState_);
00374 }
00375 catch(cms::Exception &e) {
00376 reasonForFailedState_ = e.explainSelf();
00377 fsm_.fireFailed(reasonForFailedState_,this);
00378 localLog(reasonForFailedState_);
00379 }
00380 catch(std::exception &e) {
00381 reasonForFailedState_ = e.what();
00382 fsm_.fireFailed(reasonForFailedState_,this);
00383 localLog(reasonForFailedState_);
00384 }
00385 catch(...) {
00386 fsm_.fireFailed("Unknown Exception",this);
00387 }
00388
00389 if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
00390
00391 return false;
00392 }
00393
00394
00395
00396
00397
00398 bool FUEventProcessor::enabling(toolbox::task::WorkLoop* wl)
00399 {
00400 nbTotalDQM_ = 0;
00401 scalersUpdates_ = 0;
00402
00403
00404
00405
00406
00407 unsigned short smap
00408 = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
00409 + ((instance_.value_==0) ? 0x8 : 0)
00410 + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
00411 + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
00412 + (hasPrescaleService_.value_ ? 0x1 : 0);
00413
00414 LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
00415 if(!epInitialized_){
00416 evtProcessor_.forceInitEventProcessorMaybe();
00417 }
00418 std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
00419
00420 if(!epInitialized_){
00421 evtProcessor_->beginJob();
00422 if(cpustat_) {delete cpustat_; cpustat_=0;}
00423 cpustat_ = new CPUStat(evtProcessor_.getNumberOfMicrostates(),
00424 iDieUrl_.value_);
00425 if(iDieStatisticsGathering_.value_){
00426 try{
00427 cpustat_->sendLegenda(evtProcessor_.getmicromap());
00428 xdata::Serializable *legenda = scalersInfoSpace_->find("scalersLegenda");
00429 if(legenda !=0){
00430 std::string slegenda = ((xdata::String*)legenda)->value_;
00431 ratestat_->sendLegenda(slegenda);
00432 }
00433 }
00434 catch(evf::Exception &e){
00435 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
00436 << e.what());
00437 }
00438 }
00439 if(ratestat_) {delete ratestat_; ratestat_=0;}
00440 ratestat_ = new RateStat(iDieUrl_.value_);
00441 epInitialized_ = true;
00442 }
00443 configuration_ = evtProcessor_.configuration();
00444 evtProcessor_.resetLumiSectionReferenceIndex();
00445
00446 if(nbSubProcesses_.value_==0) return enableClassic();
00447
00448 pthread_mutex_lock(&start_lock_);
00449
00450
00451 pid_t retval = -1;
00452 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00453 {
00454 subs_[i]=SubProcess(i,retval);
00455 }
00456 pthread_mutex_unlock(&start_lock_);
00457
00458 for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
00459 {
00460 retval = subs_[i].forkNew();
00461 if(retval==0)
00462 {
00463 myProcess_ = &subs_[i];
00464 int retval = pthread_mutex_destroy(&stop_lock_);
00465 if(retval != 0) perror("error");
00466 retval = pthread_mutex_init(&stop_lock_,0);
00467 if(retval != 0) perror("error");
00468 fsm_.disableRcmsStateNotification();
00469 return enableMPEPSlave();
00470
00471 }
00472 }
00473
00474 startSummarizeWorkLoop();
00475 vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
00476
00477 LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
00478 fsm_.fireEvent("EnableDone",this);
00479 localLog("-I- Start completed");
00480 return false;
00481 }
00482
00483
00484
00485 bool FUEventProcessor::stopping(toolbox::task::WorkLoop* wl)
00486 {
00487 if(nbSubProcesses_.value_!=0)
00488 stopSlavesAndAcknowledge();
00489 vulture_->stop();
00490 return stopClassic();
00491 }
00492
00493
00494
00495 bool FUEventProcessor::halting(toolbox::task::WorkLoop* wl)
00496 {
00497 LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
00498 if(nbSubProcesses_.value_!=0)
00499 stopSlavesAndAcknowledge();
00500 try{
00501 evtProcessor_.stopAndHalt();
00502 }
00503 catch (evf::Exception &e) {
00504 reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
00505 localLog(reasonForFailedState_);
00506 fsm_.fireFailed(reasonForFailedState_,this);
00507 }
00508
00509
00510 LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
00511 fsm_.fireEvent("HaltDone",this);
00512
00513 localLog("-I- Halt completed");
00514 return false;
00515 }
00516
00517
00518
00519 xoap::MessageReference FUEventProcessor::fsmCallback(xoap::MessageReference msg)
00520 throw (xoap::exception::Exception)
00521 {
00522 return fsm_.commandCallback(msg);
00523 }
00524
00525
00526
00527 void FUEventProcessor::actionPerformed(xdata::Event& e)
00528 {
00529
00530 if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
00531 std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00532
00533 if ( item == "parameterSet") {
00534 LOG4CPLUS_WARN(getApplicationLogger(),
00535 "HLT Menu changed, force reinitialization of EventProcessor");
00536 epInitialized_ = false;
00537 }
00538 if ( item == "outputEnabled") {
00539 if(outprev_ != outPut_) {
00540 LOG4CPLUS_WARN(getApplicationLogger(),
00541 (outprev_ ? "Disabling " : "Enabling ")<<"global output");
00542 evtProcessor_->enableEndPaths(outPut_);
00543 outprev_ = outPut_;
00544 }
00545 }
00546 if (item == "globalInputPrescale") {
00547 LOG4CPLUS_WARN(getApplicationLogger(),
00548 "Setting global input prescale has no effect "
00549 <<"in this version of the code");
00550 }
00551 if ( item == "globalOutputPrescale") {
00552 LOG4CPLUS_WARN(getApplicationLogger(),
00553 "Setting global output prescale has no effect "
00554 <<"in this version of the code");
00555 }
00556 }
00557
00558 }
00559
00560
00561 void FUEventProcessor::subWeb(xgi::Input *in, xgi::Output *out)
00562 {
00563 using namespace cgicc;
00564 pid_t pid = 0;
00565 std::ostringstream ost;
00566 ost << "&";
00567
00568 Cgicc cgi(in);
00569 internal::MyCgi *mycgi = (internal::MyCgi*)in;
00570 for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
00571 mycgi->getEnvironment().begin();
00572 mit != mycgi->getEnvironment().end(); mit++)
00573 ost << mit->first << "%" << mit->second << ";";
00574 std::vector<FormEntry> els = cgi.getElements() ;
00575 std::vector<FormEntry> el1;
00576 cgi.getElement("method",el1);
00577 std::vector<FormEntry> el2;
00578 cgi.getElement("process",el2);
00579 if(el1.size()!=0) {
00580 std::string meth = el1[0].getValue();
00581 if(el2.size()!=0) {
00582 unsigned int i = 0;
00583 std::string mod = el2[0].getValue();
00584 pid = atoi(mod.c_str());
00585 for(; i < subs_.size(); i++)
00586 if(subs_[i].pid()==pid) break;
00587 if(i>=subs_.size()){
00588 *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
00589 return;
00590 }
00591 if(subs_[i].alive() != 1){
00592 *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
00593 return;
00594 }
00595 MsgBuf msg1(meth.length()+ost.str().length(),MSQM_MESSAGE_TYPE_WEB);
00596 strcpy(msg1->mtext,meth.c_str());
00597 strcpy(msg1->mtext+meth.length(),ost.str().c_str());
00598 subs_[i].post(msg1,true);
00599 unsigned int keep_supersleep_original_value = superSleepSec_.value_;
00600 superSleepSec_.value_=10*keep_supersleep_original_value;
00601 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_WEB);
00602 bool done = false;
00603 std::vector<char *>pieces;
00604 while(!done){
00605 unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
00606 if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
00607 ::sleep(1);
00608 continue;
00609 }
00610 unsigned int nbytes = atoi(msg->mtext);
00611 if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true;
00612 char *buf= new char[nbytes];
00613 ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
00614 if(retval!=nbytes) std::cout
00615 << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
00616 pieces.push_back(buf);
00617 }
00618 superSleepSec_.value_=keep_supersleep_original_value;
00619 for(unsigned int j = 0; j < pieces.size(); j++){
00620 *out<<pieces[j];
00621 delete[] pieces[j];
00622 }
00623 }
00624 }
00625 }
00626
00627
00628
00629 void FUEventProcessor::defaultWebPage(xgi::Input *in, xgi::Output *out)
00630 throw (xgi::exception::Exception)
00631 {
00632
00633
00634 *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
00635 << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
00636 << getApplicationDescriptor()->getInstance() << "</title>"
00637 << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
00638 << "</head></html>";
00639 }
00640
00641
00642
00643
00644
00645 void FUEventProcessor::spotlightWebPage(xgi::Input *in, xgi::Output *out)
00646 throw (xgi::exception::Exception)
00647 {
00648
00649 std::string urn = getApplicationDescriptor()->getURN();
00650
00651 *out << "<!-- base href=\"/" << urn
00652 << "\"> -->" << std::endl;
00653 *out << "<html>" << std::endl;
00654 *out << "<head>" << std::endl;
00655 *out << "<link type=\"text/css\" rel=\"stylesheet\"";
00656 *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
00657 *out << "<title>" << getApplicationDescriptor()->getClassName()
00658 << getApplicationDescriptor()->getInstance()
00659 << " MAIN</title>" << std::endl;
00660 *out << "</head>" << std::endl;
00661 *out << "<body>" << std::endl;
00662 *out << "<table border=\"0\" width=\"100%\">" << std::endl;
00663 *out << "<tr>" << std::endl;
00664 *out << " <td align=\"left\">" << std::endl;
00665 *out << " <img" << std::endl;
00666 *out << " align=\"middle\"" << std::endl;
00667 *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
00668 *out << " alt=\"main\"" << std::endl;
00669 *out << " width=\"64\"" << std::endl;
00670 *out << " height=\"64\"" << std::endl;
00671 *out << " border=\"\"/>" << std::endl;
00672 *out << " <b>" << std::endl;
00673 *out << getApplicationDescriptor()->getClassName()
00674 << getApplicationDescriptor()->getInstance() << std::endl;
00675 *out << " " << fsm_.stateName()->toString() << std::endl;
00676 *out << " </b>" << std::endl;
00677 *out << " </td>" << std::endl;
00678 *out << " <td width=\"32\">" << std::endl;
00679 *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
00680 *out << " <img" << std::endl;
00681 *out << " align=\"middle\"" << std::endl;
00682 *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
00683 *out << " alt=\"HyperDAQ\"" << std::endl;
00684 *out << " width=\"32\"" << std::endl;
00685 *out << " height=\"32\"" << std::endl;
00686 *out << " border=\"\"/>" << std::endl;
00687 *out << " </a>" << std::endl;
00688 *out << " </td>" << std::endl;
00689 *out << " <td width=\"32\">" << std::endl;
00690 *out << " </td>" << std::endl;
00691 *out << " <td width=\"32\">" << std::endl;
00692 *out << " <a href=\"/" << urn << "/\">" << std::endl;
00693 *out << " <img" << std::endl;
00694 *out << " align=\"middle\"" << std::endl;
00695 *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
00696 *out << " alt=\"main\"" << std::endl;
00697 *out << " width=\"32\"" << std::endl;
00698 *out << " height=\"32\"" << std::endl;
00699 *out << " border=\"\"/>" << std::endl;
00700 *out << " </a>" << std::endl;
00701 *out << " </td>" << std::endl;
00702 *out << "</tr>" << std::endl;
00703 *out << "</table>" << std::endl;
00704
00705 *out << "<hr/>" << std::endl;
00706
00707 std::ostringstream ost;
00708 if(myProcess_)
00709 ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
00710 else
00711 ost << "/moduleWeb?";
00712 urn += ost.str();
00713 if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
00714 evtProcessor_.taskWebPage(in,out,urn);
00715 else if(evtProcessor_)
00716 evtProcessor_.summaryWebPage(in,out,urn);
00717 else
00718 *out << "<td>HLT Unconfigured</td>" << std::endl;
00719 *out << "</table>" << std::endl;
00720
00721 *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
00722 *out << configuration_ << std::endl;
00723 *out << "</textarea><P>" << std::endl;
00724
00725 *out << "</body>" << std::endl;
00726 *out << "</html>" << std::endl;
00727
00728
00729 }
00730 void FUEventProcessor::scalersWeb(xgi::Input *in, xgi::Output *out)
00731 throw (xgi::exception::Exception)
00732 {
00733
00734 out->getHTTPResponseHeader().addHeader( "Content-Type",
00735 "application/octet-stream" );
00736 out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
00737 "binary" );
00738 if(evtProcessor_ != 0){
00739 out->write( (char*)(evtProcessor_.getPackedTriggerReportAsStruct()), sizeof(TriggerReportStatic) );
00740 }
00741 }
00742
00743 void FUEventProcessor::pathNames(xgi::Input *in, xgi::Output *out)
00744 throw (xgi::exception::Exception)
00745 {
00746
00747 if(evtProcessor_ != 0){
00748 xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
00749 if(legenda !=0){
00750 std::string slegenda = ((xdata::String*)legenda)->value_;
00751 *out << slegenda << std::endl;
00752 }
00753 }
00754 }
00755
00756 void FUEventProcessor::attachDqmToShm() throw (evf::Exception)
00757 {
00758 std::string errmsg;
00759 bool success = false;
00760 try {
00761 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00762 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00763 success = edm::Service<FUShmDQMOutputService>()->attachToShm();
00764 if (!success) errmsg = "Failed to attach DQM service to shared memory";
00765 }
00766 catch (cms::Exception& e) {
00767 errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
00768 }
00769 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00770 }
00771
00772
00773
00774 void FUEventProcessor::detachDqmFromShm() throw (evf::Exception)
00775 {
00776 std::string errmsg;
00777 bool success = false;
00778 try {
00779 edm::ServiceRegistry::Operate operate(evtProcessor_->getToken());
00780 if(edm::Service<FUShmDQMOutputService>().isAvailable())
00781 success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
00782 if (!success) errmsg = "Failed to detach DQM service from shared memory";
00783 }
00784 catch (cms::Exception& e) {
00785 errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
00786 }
00787 if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
00788 }
00789
00790
00791 std::string FUEventProcessor::logsAsString()
00792 {
00793 std::ostringstream oss;
00794 if(logWrap_)
00795 {
00796 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00797 oss << logRing_[i] << std::endl;
00798 for(unsigned int i = 0; i < logRingIndex_; i++)
00799 oss << logRing_[i] << std::endl;
00800 }
00801 else
00802 for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
00803 oss << logRing_[i] << std::endl;
00804
00805 return oss.str();
00806 }
00807
00808 void FUEventProcessor::localLog(std::string m)
00809 {
00810 timeval tv;
00811
00812 gettimeofday(&tv,0);
00813 tm *uptm = localtime(&tv.tv_sec);
00814 char datestring[256];
00815 strftime(datestring, sizeof(datestring),"%c", uptm);
00816
00817 if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
00818 logRingIndex_--;
00819 std::ostringstream timestamp;
00820 timestamp << " at " << datestring;
00821 m += timestamp.str();
00822 logRing_[logRingIndex_] = m;
00823 }
00824
00825 void FUEventProcessor::startSupervisorLoop()
00826 {
00827 try {
00828 wlSupervising_=
00829 toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
00830 "waiting");
00831 if (!wlSupervising_->isActive()) wlSupervising_->activate();
00832 asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
00833 "Supervisor");
00834 wlSupervising_->submit(asSupervisor_);
00835 supervising_ = true;
00836 }
00837 catch (xcept::Exception& e) {
00838 std::string msg = "Failed to start workloop 'Supervisor'.";
00839 XCEPT_RETHROW(evf::Exception,msg,e);
00840 }
00841 }
00842
00843 void FUEventProcessor::startReceivingLoop()
00844 {
00845 try {
00846 wlReceiving_=
00847 toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
00848 "waiting");
00849 if (!wlReceiving_->isActive()) wlReceiving_->activate();
00850 asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
00851 "Receiving");
00852 wlReceiving_->submit(asReceiveMsgAndExecute_);
00853 receiving_ = true;
00854 }
00855 catch (xcept::Exception& e) {
00856 std::string msg = "Failed to start workloop 'Receiving'.";
00857 XCEPT_RETHROW(evf::Exception,msg,e);
00858 }
00859 }
00860 void FUEventProcessor::startReceivingMonitorLoop()
00861 {
00862 try {
00863 wlReceivingMonitor_=
00864 toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
00865 "waiting");
00866 if (!wlReceivingMonitor_->isActive())
00867 wlReceivingMonitor_->activate();
00868 asReceiveMsgAndRead_ =
00869 toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
00870 "ReceivingM");
00871 wlReceivingMonitor_->submit(asReceiveMsgAndRead_);
00872 receivingM_ = true;
00873 }
00874 catch (xcept::Exception& e) {
00875 std::string msg = "Failed to start workloop 'ReceivingM'.";
00876 XCEPT_RETHROW(evf::Exception,msg,e);
00877 }
00878 }
00879
00880 bool FUEventProcessor::receiving(toolbox::task::WorkLoop *)
00881 {
00882 MsgBuf msg;
00883 try{
00884 myProcess_->rcvSlave(msg,false);
00885 if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
00886 {
00887 pthread_mutex_lock(&stop_lock_);
00888 fsm_.fireEvent("Stop",this);
00889 try{
00890 LOG4CPLUS_DEBUG(getApplicationLogger(),
00891 "Trying to create message service presence ");
00892 edm::PresenceFactory *pf = edm::PresenceFactory::get();
00893 if(pf != 0) {
00894 pf->makePresence("MessageServicePresence").release();
00895 }
00896 else {
00897 LOG4CPLUS_ERROR(getApplicationLogger(),
00898 "Unable to create message service presence ");
00899 }
00900 }
00901 catch(...) {
00902 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
00903 }
00904 stopClassic();
00905 MsgBuf msg1(0,MSQS_MESSAGE_TYPE_STOP);
00906 myProcess_->postSlave(msg1,false);
00907 pthread_mutex_unlock(&stop_lock_);
00908 fclose(stdout);
00909 fclose(stderr);
00910 exit(EXIT_SUCCESS);
00911 }
00912 }
00913 catch(evf::Exception &e){}
00914 return true;
00915 }
00916
00917 bool FUEventProcessor::supervisor(toolbox::task::WorkLoop *)
00918 {
00919 pthread_mutex_lock(&stop_lock_);
00920 if(subs_.size()!=nbSubProcesses_.value_)
00921 {
00922 subs_.resize(nbSubProcesses_.value_);
00923 spMStates_.resize(nbSubProcesses_.value_);
00924 spmStates_.resize(nbSubProcesses_.value_);
00925 for(unsigned int i = 0; i < spMStates_.size(); i++)
00926 {
00927 spMStates_[i] = edm::event_processor::sInit;
00928 spmStates_[i] = 0;
00929 }
00930 }
00931 bool running = fsm_.stateName()->toString()=="Enabled";
00932 bool stopping = fsm_.stateName()->toString()=="stopping";
00933 for(unsigned int i = 0; i < subs_.size(); i++)
00934 {
00935 if(subs_[i].alive()==-1000) continue;
00936 int sl;
00937
00938 pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
00939
00940 if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
00941 else continue;
00942 pthread_mutex_lock(&pickup_lock_);
00943 std::ostringstream ost;
00944 if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
00945 else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
00946 else ost << " process stopped ";
00947 subs_[i].countdown()=slaveRestartDelaySecs_.value_;
00948 subs_[i].setReasonForFailed(ost.str());
00949 spMStates_[i] = evtProcessor_.notstarted_state_code();
00950 spmStates_[i] = 0;
00951 std::ostringstream ost1;
00952 ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
00953 localLog(ost1.str());
00954 if(!autoRestartSlaves_.value_) subs_[i].disconnect();
00955 pthread_mutex_unlock(&pickup_lock_);
00956 }
00957 pthread_mutex_unlock(&stop_lock_);
00958 if(stopping) return true;
00959
00960 if(running)
00961 {
00962
00963
00964 if(autoRestartSlaves_.value_){
00965 pthread_mutex_lock(&stop_lock_);
00966 for(unsigned int i = 0; i < subs_.size(); i++)
00967 {
00968 if(subs_[i].alive() != 1){
00969 if(subs_[i].countdown() == 0)
00970 {
00971 if(subs_[i].restartCount()>2){
00972 LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
00973 << " - maximum restart count reached");
00974 std::ostringstream ost1;
00975 ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
00976 localLog(ost1.str());
00977 subs_[i].countdown()--;
00978 XCEPT_DECLARE(evf::Exception,
00979 sentinelException, ost1.str());
00980 notifyQualified("error",sentinelException);
00981 continue;
00982 }
00983 subs_[i].restartCount()++;
00984 pid_t rr = subs_[i].forkNew();
00985 if(rr==0)
00986 {
00987 myProcess_=&subs_[i];
00988 scalersUpdates_ = 0;
00989 int retval = pthread_mutex_destroy(&stop_lock_);
00990 if(retval != 0) perror("error");
00991 retval = pthread_mutex_init(&stop_lock_,0);
00992 if(retval != 0) perror("error");
00993 fsm_.disableRcmsStateNotification();
00994 fsm_.fireEvent("Stop",this);
00995 fsm_.fireEvent("StopDone",this);
00996 fsm_.fireEvent("Enable",this);
00997 try{
00998 xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
00999 if(lsid) {
01000 ((xdata::UnsignedInteger32*)(lsid))->value_--;
01001 }
01002 }
01003 catch(...){
01004 std::cout << "trouble with lsindex during restart" << std::endl;
01005 }
01006 try{
01007 xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
01008 if(lstb) {
01009 ((xdata::Boolean*)(lstb))->value_ = false;
01010 }
01011 }
01012 catch(...){
01013 std::cout << "trouble with resetting flag for eol recovery " << std::endl;
01014 }
01015
01016 evtProcessor_.adjustLsIndexForRestart();
01017 evtProcessor_.resetPackedTriggerReport();
01018 enableMPEPSlave();
01019 return false;
01020 }
01021 else
01022 {
01023 std::ostringstream ost1;
01024 ost1 << "-I- New Process " << rr << " forked for slot " << i;
01025 localLog(ost1.str());
01026 }
01027 }
01028 if(subs_[i].countdown()>=0) subs_[i].countdown()--;
01029 }
01030 }
01031 pthread_mutex_unlock(&stop_lock_);
01032 }
01033 }
01034 xdata::Serializable *lsid = 0;
01035 xdata::Serializable *psid = 0;
01036 xdata::Serializable *epMAltState = 0;
01037 xdata::Serializable *epmAltState = 0;
01038 xdata::Serializable *dqmp = 0;
01039 xdata::UnsignedInteger32 *dqm = 0;
01040
01041
01042
01043 MsgBuf msg1(0,MSQM_MESSAGE_TYPE_PRG);
01044 MsgBuf msg2(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_PRR);
01045 if(running){
01046 try{
01047 lsid = applicationInfoSpace_->find("lumiSectionIndex");
01048 psid = applicationInfoSpace_->find("prescaleSetIndex");
01049 nbProcessed = monitorInfoSpace_->find("nbProcessed");
01050 nbAccepted = monitorInfoSpace_->find("nbAccepted");
01051 epMAltState = monitorInfoSpace_->find("epSPMacroStateInt");
01052 epmAltState = monitorInfoSpace_->find("epSPMicroStateInt");
01053 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01054 }
01055 catch(xdata::exception::Exception e){
01056 LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
01057 }
01058
01059 try{
01060 if(nbProcessed !=0 && nbAccepted !=0)
01061 {
01062 xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
01063 xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
01064 xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
01065 xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
01066 if(dqmp!=0)
01067 dqm = (xdata::UnsignedInteger32*)dqmp;
01068 if(dqm) dqm->value_ = 0;
01069 nbTotalDQM_ = 0;
01070 nbp->value_ = 0;
01071 nba->value_ = 0;
01072 nblive_ = 0;
01073 nbdead_ = 0;
01074 scalersUpdates_ = 0;
01075
01076 for(unsigned int i = 0; i < subs_.size(); i++)
01077 {
01078 if(subs_[i].alive()>0)
01079 {
01080 nblive_++;
01081 try{
01082 subs_[i].post(msg1,true);
01083
01084 unsigned long retval = subs_[i].rcvNonBlocking(msg2,true);
01085 if(retval == (unsigned long) msg2->mtype){
01086 prg* p = (struct prg*)(msg2->mtext);
01087 subs_[i].setParams(p);
01088 spMStates_[i] = p->Ms;
01089 spmStates_[i] = p->ms;
01090 cpustat_->addEntry(p->ms);
01091 if(!subs_[i].inInconsistentState() &&
01092 (p->Ms == edm::event_processor::sError
01093 || p->Ms == edm::event_processor::sInvalid
01094 || p->Ms == edm::event_processor::sStopping))
01095 {
01096 std::ostringstream ost;
01097 ost << "edm::eventprocessor slot " << i << " process id "
01098 << subs_[i].pid() << " not in Running state : Mstate="
01099 << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
01100 << evtProcessor_.moduleNameFromIndex(p->ms)
01101 << " - Look into possible error messages from HLT process";
01102 LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
01103 }
01104 nbp->value_ += subs_[i].params().nbp;
01105 nba->value_ += subs_[i].params().nba;
01106 if(dqm)dqm->value_ += p->dqm;
01107 nbTotalDQM_ += p->dqm;
01108 scalersUpdates_ += p->trp;
01109 if(p->ls > ls->value_) ls->value_ = p->ls;
01110 if(p->ps != ps->value_) ps->value_ = p->ps;
01111 }
01112 else{
01113 nbp->value_ += subs_[i].get_save_nbp();
01114 nba->value_ += subs_[i].get_save_nba();
01115 }
01116 }
01117 catch(evf::Exception &e){
01118 LOG4CPLUS_INFO(getApplicationLogger(),
01119 "could not send/receive msg on slot "
01120 << i << " - " << e.what());
01121 }
01122
01123 }
01124 else
01125 {
01126 nbp->value_ += subs_[i].get_save_nbp();
01127 nba->value_ += subs_[i].get_save_nba();
01128 nbdead_++;
01129 }
01130 }
01131 }
01132
01133 }
01134 catch(std::exception &e){
01135 LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
01136 }
01137 catch(...){
01138 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
01139 }
01140 }
01141 else{
01142 for(unsigned int i = 0; i < subs_.size(); i++)
01143 {
01144 if(subs_[i].alive()==-1000)
01145 {
01146 spMStates_[i] = edm::event_processor::sInit;
01147 spmStates_[i] = 0;
01148 }
01149 }
01150 }
01151 try{
01152 monitorInfoSpace_->lock();
01153 monitorInfoSpace_->fireItemGroupChanged(names_,0);
01154 monitorInfoSpace_->unlock();
01155 }
01156 catch(xdata::exception::Exception &e)
01157 {
01158 LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
01159
01160 }
01161 ::sleep(superSleepSec_.value_);
01162 return true;
01163 }
01164
01165 void FUEventProcessor::startScalersWorkLoop() throw (evf::Exception)
01166 {
01167 try {
01168 wlScalers_=
01169 toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
01170 "waiting");
01171 if (!wlScalers_->isActive()) wlScalers_->activate();
01172 asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
01173 "Scalers");
01174
01175 wlScalers_->submit(asScalers_);
01176 wlScalersActive_ = true;
01177 }
01178 catch (xcept::Exception& e) {
01179 std::string msg = "Failed to start workloop 'Scalers'.";
01180 XCEPT_RETHROW(evf::Exception,msg,e);
01181 }
01182 }
01183
01184
01185
01186 void FUEventProcessor::startSummarizeWorkLoop() throw (evf::Exception)
01187 {
01188 try {
01189 wlSummarize_=
01190 toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
01191 "waiting");
01192 if (!wlSummarize_->isActive()) wlSummarize_->activate();
01193
01194 asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
01195 "Summary");
01196
01197 wlSummarize_->submit(asSummarize_);
01198 wlSummarizeActive_ = true;
01199 }
01200 catch (xcept::Exception& e) {
01201 std::string msg = "Failed to start workloop 'Summarize'.";
01202 XCEPT_RETHROW(evf::Exception,msg,e);
01203 }
01204 }
01205
01206
01207
01208 bool FUEventProcessor::scalers(toolbox::task::WorkLoop* wl)
01209 {
01210 if(evtProcessor_)
01211 {
01212 if(!evtProcessor_.getTriggerReport(true)) {
01213 wlScalersActive_ = false;
01214 return false;
01215 }
01216 }
01217 else
01218 {
01219 std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
01220 wlScalersActive_ = false;
01221 return false;
01222 }
01223 if(myProcess_)
01224 {
01225
01226 int ret = myProcess_->postSlave(evtProcessor_.getPackedTriggerReport(),false);
01227 if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
01228 scalersUpdates_++;
01229 }
01230 else
01231 evtProcessor_.fireScalersUpdate();
01232 return true;
01233 }
01234
01235
01236 bool FUEventProcessor::summarize(toolbox::task::WorkLoop* wl)
01237 {
01238 MsgBuf msg(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_TRR);
01239 evtProcessor_.resetPackedTriggerReport();
01240 bool atLeastOneProcessUpdatedSuccessfully = false;
01241 int msgCount = 0;
01242 for (unsigned int i = 0; i < subs_.size(); i++)
01243 {
01244 if(subs_[i].alive()>0)
01245 {
01246
01247 int ret = 0;
01248 try{
01249 ret = subs_[i].rcv(msg,false);
01250 msgCount++;
01251 }
01252 catch(evf::Exception &e)
01253 {
01254 std::cout << "exception in msgrcv on " << i
01255 << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
01256 continue;
01257
01258 }
01259
01260
01261 if(ret==MSQS_MESSAGE_TYPE_TRR) {
01262 TriggerReportStatic *trp = (TriggerReportStatic *)msg->mtext;
01263 if(trp->lumiSection > evtProcessor_.getLumiSectionReferenceIndex()){
01264 std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
01265 << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
01266 subs_[i].post(msg,false);
01267 }else{
01268 atLeastOneProcessUpdatedSuccessfully = true;
01269 evtProcessor_.sumAndPackTriggerReport(msg);
01270 }
01271 }
01272 else std::cout << "msgrcv returned error " << errno << std::endl;
01273 }
01274 }
01275 if(atLeastOneProcessUpdatedSuccessfully){
01276 nbSubProcessesReporting_.value_ = msgCount;
01277 evtProcessor_.updateRollingReport();
01278 evtProcessor_.fireScalersUpdate();
01279 }
01280 else{
01281 LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
01282 if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
01283 nbSubProcessesReporting_.value_ = 0;
01284 ::sleep(10);
01285 }
01286 if(fsm_.stateName()->toString()!="Enabled"){
01287 wlScalersActive_ = false;
01288 return false;
01289 }
01290
01291 if(iDieStatisticsGathering_.value_){
01292 try{
01293 cpustat_ ->sendStat(evtProcessor_.getLumiSectionReferenceIndex());
01294 ratestat_->sendStat((unsigned char*)(evtProcessor_.getPackedTriggerReportAsStruct()),
01295 sizeof(TriggerReportStatic),
01296 evtProcessor_.getLumiSectionReferenceIndex());
01297 }catch(evf::Exception &e){
01298 LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
01299 << e.what());
01300 }
01301 }
01302 cpustat_->reset();
01303 return true;
01304 }
01305
01306
01307
01308 bool FUEventProcessor::receivingAndMonitor(toolbox::task::WorkLoop *)
01309 {
01310 MsgBuf msg;
01311 try{
01312 myProcess_->rcvSlave(msg,true);
01313 switch(msg->mtype)
01314 {
01315 case MSQM_MESSAGE_TYPE_MCS:
01316 {
01317 xgi::Input *in = 0;
01318 xgi::Output out;
01319 evtProcessor_.microState(in,&out);
01320 MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
01321 strncpy(msg1->mtext,out.str().c_str(),out.str().size());
01322 myProcess_->postSlave(msg1,true);
01323 break;
01324 }
01325
01326 case MSQM_MESSAGE_TYPE_PRG:
01327 {
01328 xdata::Serializable *dqmp = 0;
01329 xdata::UnsignedInteger32 *dqm = 0;
01330 evtProcessor_.monitoring(0);
01331 try{
01332 dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
01333 } catch(xdata::exception::Exception e){}
01334 if(dqmp!=0)
01335 dqm = (xdata::UnsignedInteger32*)dqmp;
01336 MsgBuf msg1(sizeof(prg),MSQS_MESSAGE_TYPE_PRR);
01337
01338 prg * data = (prg*)msg1->mtext;
01339 data->ls = evtProcessor_.lsid_;
01340 data->ps = evtProcessor_.psid_;
01341 data->nbp = evtProcessor_->totalEvents();
01342 data->nba = evtProcessor_->totalEventsPassed();
01343 data->Ms = evtProcessor_.epMAltState_.value_;
01344 data->ms = evtProcessor_.epmAltState_.value_;
01345 if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
01346 data->trp = scalersUpdates_;
01347
01348 myProcess_->postSlave(msg1,true);
01349 if(exitOnError_.value_)
01350 {
01351
01352
01353 if(data->Ms == edm::event_processor::sStopping || data->Ms == edm::event_processor::sError)
01354 {
01355 bool running = true;
01356 int count = 0;
01357 while(running){
01358 int retval = pthread_mutex_lock(&stop_lock_);
01359 if(retval != 0) perror("error");
01360 running = fsm_.stateName()->toString()=="Enabled";
01361 if(count>5) exit(-1);
01362 pthread_mutex_unlock(&stop_lock_);
01363 if(running) {::sleep(1); count++;}
01364 }
01365 }
01366
01367 }
01368
01369 break;
01370 }
01371 case MSQM_MESSAGE_TYPE_WEB:
01372 {
01373 xgi::Input *in = 0;
01374 xgi::Output out;
01375 unsigned int bytesToSend = 0;
01376 MsgBuf msg1(NUMERIC_MESSAGE_SIZE,MSQS_MESSAGE_TYPE_WEB);
01377 std::string query = msg->mtext;
01378 size_t pos = query.find_first_of("&");
01379 std::string method;
01380 std::string args;
01381 if(pos!=std::string::npos)
01382 {
01383 method = query.substr(0,pos);
01384 args = query.substr(pos+1,query.length()-pos-1);
01385 }
01386 else
01387 method=query;
01388
01389 if(method=="Spotlight")
01390 {
01391 spotlightWebPage(in,&out);
01392 }
01393 else if(method=="procStat")
01394 {
01395 procStat(in,&out);
01396 }
01397 else if(method=="moduleWeb")
01398 {
01399 internal::MyCgi mycgi;
01400 boost::char_separator<char> sep(";");
01401 boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
01402 for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
01403 tok_iter != tokens.end(); ++tok_iter){
01404 size_t pos = (*tok_iter).find_first_of("%");
01405 if(pos != std::string::npos){
01406 std::string first = (*tok_iter).substr(0 , pos);
01407 std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
01408 mycgi.getEnvironment()[first]=second;
01409 }
01410 }
01411 moduleWeb(&mycgi,&out);
01412 }
01413 else if(method=="Default")
01414 {
01415 defaultWebPage(in,&out);
01416 }
01417 else
01418 {
01419 out << "Error 404!!!!!!!!" << std::endl;
01420 }
01421
01422
01423 bytesToSend = out.str().size();
01424 unsigned int cycle = 0;
01425 if(bytesToSend==0)
01426 {
01427 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
01428 myProcess_->postSlave(msg1,true);
01429 }
01430 while(bytesToSend !=0){
01431 unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
01432 snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
01433 write(anonymousPipe_[PIPE_WRITE],
01434 out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
01435 msgSize);
01436 myProcess_->postSlave(msg1,true);
01437 bytesToSend -= msgSize;
01438 cycle++;
01439 }
01440 break;
01441 }
01442 case MSQM_MESSAGE_TYPE_TRP:
01443 {
01444 break;
01445 }
01446 }
01447 }
01448 catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
01449 return true;
01450 }
01451
01452 bool FUEventProcessor::enableCommon()
01453 {
01454 try {
01455 if(hasShMem_) attachDqmToShm();
01456 int sc = 0;
01457 evtProcessor_->clearCounters();
01458 if(isRunNumberSetter_)
01459 evtProcessor_->setRunNumber(runNumber_.value_);
01460 else
01461 evtProcessor_->declareRunNumber(runNumber_.value_);
01462 try{
01463 ::sleep(1);
01464 evtProcessor_->runAsync();
01465 sc = evtProcessor_->statusAsync();
01466 }
01467 catch(cms::Exception &e) {
01468 reasonForFailedState_ = e.explainSelf();
01469 fsm_.fireFailed(reasonForFailedState_,this);
01470 localLog(reasonForFailedState_);
01471 return false;
01472 }
01473 catch(std::exception &e) {
01474 reasonForFailedState_ = e.what();
01475 fsm_.fireFailed(reasonForFailedState_,this);
01476 localLog(reasonForFailedState_);
01477 return false;
01478 }
01479 catch(...) {
01480 reasonForFailedState_ = "Unknown Exception";
01481 fsm_.fireFailed(reasonForFailedState_,this);
01482 localLog(reasonForFailedState_);
01483 return false;
01484 }
01485 if(sc != 0) {
01486 std::ostringstream oss;
01487 oss<<"EventProcessor::runAsync returned status code " << sc;
01488 reasonForFailedState_ = oss.str();
01489 fsm_.fireFailed(reasonForFailedState_,this);
01490 localLog(reasonForFailedState_);
01491 return false;
01492 }
01493 }
01494 catch (xcept::Exception &e) {
01495 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01496 fsm_.fireFailed(reasonForFailedState_,this);
01497 localLog(reasonForFailedState_);
01498 return false;
01499 }
01500 try{
01501 fsm_.fireEvent("EnableDone",this);
01502 }
01503 catch (xcept::Exception &e) {
01504 std::cout << "exception " << (std::string)e.what() << std::endl;
01505 throw;
01506 }
01507
01508 return false;
01509 }
01510
01511 bool FUEventProcessor::enableClassic()
01512 {
01513 bool retval = enableCommon();
01514 while(evtProcessor_->getState()!= edm::event_processor::sRunning){
01515 LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
01516 ::sleep(1);
01517 }
01518
01519
01520
01521 localLog("-I- Start completed");
01522 return retval;
01523 }
01524 bool FUEventProcessor::enableMPEPSlave()
01525 {
01526
01527 startReceivingLoop();
01528 startReceivingMonitorLoop();
01529 evtProcessor_.resetWaiting();
01530 startScalersWorkLoop();
01531 while(!evtProcessor_.isWaitingForLs())
01532 ::sleep(1);
01533
01534
01535 try{
01536
01537 try{
01538 LOG4CPLUS_DEBUG(getApplicationLogger(),
01539 "Trying to create message service presence ");
01540 edm::PresenceFactory *pf = edm::PresenceFactory::get();
01541 if(pf != 0) {
01542 pf->makePresence("MessageServicePresence").release();
01543 }
01544 else {
01545 LOG4CPLUS_ERROR(getApplicationLogger(),
01546 "Unable to create message service presence ");
01547 }
01548 }
01549 catch(...) {
01550 LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
01551 }
01552 ML::MLlog4cplus::setAppl(this);
01553 }
01554 catch (xcept::Exception &e) {
01555 reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
01556 fsm_.fireFailed(reasonForFailedState_,this);
01557 localLog(reasonForFailedState_);
01558 }
01559 bool retval = enableCommon();
01560
01561
01562
01563
01564 return retval;
01565 }
01566
01567 bool FUEventProcessor::stopClassic()
01568 {
01569 try {
01570 LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
01571 edm::EventProcessor::StatusCode rc = evtProcessor_.stop();
01572 if(rc == edm::EventProcessor::epSuccess)
01573 fsm_.fireEvent("StopDone",this);
01574 else
01575 {
01576
01577 if(rc == edm::EventProcessor::epTimedOut)
01578 reasonForFailedState_ = "EventProcessor stop timed out";
01579 else
01580 reasonForFailedState_ = "EventProcessor did not receive STOP event";
01581 fsm_.fireFailed(reasonForFailedState_,this);
01582 localLog(reasonForFailedState_);
01583 }
01584 if(hasShMem_) detachDqmFromShm();
01585 }
01586 catch (xcept::Exception &e) {
01587 reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
01588 localLog(reasonForFailedState_);
01589 fsm_.fireFailed(reasonForFailedState_,this);
01590 }
01591 LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
01592 localLog("-I- Stop completed");
01593 return false;
01594 }
01595
01596 void FUEventProcessor::stopSlavesAndAcknowledge()
01597 {
01598 MsgBuf msg(0,MSQM_MESSAGE_TYPE_STOP);
01599 MsgBuf msg1(MAX_MSG_SIZE,MSQS_MESSAGE_TYPE_STOP);
01600
01601 for(unsigned int i = 0; i < subs_.size(); i++)
01602 {
01603 pthread_mutex_lock(&stop_lock_);
01604 if(subs_[i].alive()>0)subs_[i].post(msg,false);
01605 pthread_mutex_unlock(&stop_lock_);
01606 }
01607 for(unsigned int i = 0; i < subs_.size(); i++)
01608 {
01609 pthread_mutex_lock(&stop_lock_);
01610 if(subs_[i].alive()>0){
01611 try{
01612 subs_[i].rcv(msg1,false);
01613 }
01614 catch(evf::Exception &e){
01615 std::ostringstream ost;
01616 ost << "failed to get STOP - errno ->" << errno << " " << e.what();
01617 reasonForFailedState_ = ost.str();
01618 LOG4CPLUS_WARN(getApplicationLogger(),reasonForFailedState_);
01619 fsm_.fireFailed(reasonForFailedState_,this);
01620 localLog(reasonForFailedState_);
01621 break;
01622 }
01623 }
01624 else {
01625 pthread_mutex_unlock(&stop_lock_);
01626 continue;
01627 }
01628 pthread_mutex_unlock(&stop_lock_);
01629 if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
01630 while(subs_[i].alive()>0) ::usleep(10000);
01631 subs_[i].disconnect();
01632 }
01633
01634
01635 }
01636
01637 void FUEventProcessor::microState(xgi::Input *in,xgi::Output *out)
01638 {
01639 std::string urn = getApplicationDescriptor()->getURN();
01640 try{
01641 evtProcessor_.stateNameFromIndex(0);
01642 evtProcessor_.moduleNameFromIndex(0);
01643 if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
01644 *out << "<tr><td>" << fsm_.stateName()->toString()
01645 << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
01646 << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
01647 evtProcessor_.microState(in,out);
01648 *out << "<td>" << nbTotalDQM_
01649 << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
01650 if(nbSubProcesses_.value_!=0 && !myProcess_)
01651 {
01652 pthread_mutex_lock(&start_lock_);
01653 for(unsigned int i = 0; i < subs_.size(); i++)
01654 {
01655 try{
01656 if(subs_[i].alive()>0)
01657 {
01658 *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
01659 << i << "\">""Alive</td><td>S</td><td>"
01660 << subs_[i].queueId() << "<td>"
01661 << subs_[i].queueStatus()<< "/"
01662 << subs_[i].queueOccupancy() << "/"
01663 << subs_[i].queuePidOfLastSend() << "/"
01664 << subs_[i].queuePidOfLastReceive()
01665 << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
01666 << subs_[i].pid() << "&method=procStat\">"
01667 << subs_[i].pid()<<"</a></td>"
01668 << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
01669 << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
01670 << subs_[i].params().nba << "/" << subs_[i].params().nbp
01671 << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
01672 << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
01673 << "</td><td>" << subs_[i].params().ps
01674 << "</td><td>" << subs_[i].params().dqm
01675 << "</td><td>" << subs_[i].params().trp << "</td>";
01676 }
01677 else
01678 {
01679 pthread_mutex_lock(&pickup_lock_);
01680 *out << "<tr><td id=\"a"<< i << "\" ";
01681 if(subs_[i].alive()==-1000)
01682 *out << " bgcolor=\"#bbaabb\">NotInitialized";
01683 else
01684 *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
01685 *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
01686 << subs_[i].queueStatus() << "/"
01687 << subs_[i].queueOccupancy() << "/"
01688 << subs_[i].queuePidOfLastSend() << "/"
01689 << subs_[i].queuePidOfLastReceive()
01690 << "</td><td id=\"p"<< i << "\">"
01691 <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
01692 if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
01693 {
01694 if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
01695 *out << " will restart in " << subs_[i].countdown() << " s";
01696 else if(autoRestartSlaves_)
01697 *out << " reached maximum restart count";
01698 else *out << " autoRestart is disabled ";
01699 }
01700 *out << "</td><td>" << subs_[i].params().dqm
01701 << "</td><td>" << subs_[i].params().trp << "</td>";
01702 pthread_mutex_unlock(&pickup_lock_);
01703 }
01704 *out << "</tr>";
01705 }
01706 catch(evf::Exception &e){
01707 *out << "<tr><td id=\"a"<< i << "\" "
01708 <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
01709 << subs_[i].queueStatus() << "/"
01710 << subs_[i].queueOccupancy() << "/"
01711 << subs_[i].queuePidOfLastSend() << "/"
01712 << subs_[i].queuePidOfLastReceive()
01713 << "</td><td id=\"p"<< i << "\">"
01714 <<subs_[i].pid()<<"</td>";
01715 }
01716 }
01717 pthread_mutex_unlock(&start_lock_);
01718 }
01719 }
01720 catch(evf::Exception &e)
01721 {
01722 LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
01723 }
01724 catch(cms::Exception &e)
01725 {
01726 LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
01727 }
01728 catch(std::exception &e)
01729 {
01730 LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
01731 }
01732 catch(...)
01733 {
01734 LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
01735 }
01736
01737 }
01738
01739
01740 void FUEventProcessor::updater(xgi::Input *in,xgi::Output *out)
01741 {
01742 using namespace utils;
01743
01744 *out << updaterStatic_;
01745 mDiv(out,"loads");
01746 uptime(out);
01747 cDiv(out);
01748 mDiv(out,"st",fsm_.stateName()->toString());
01749 mDiv(out,"ru",runNumber_.toString());
01750 mDiv(out,"nsl",nbSubProcesses_.value_);
01751 mDiv(out,"cl");
01752 *out << getApplicationDescriptor()->getClassName()
01753 << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
01754 cDiv(out);
01755 mDiv(out,"in",getApplicationDescriptor()->getInstance());
01756 if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
01757 mDiv(out,"hlt");
01758 *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
01759 cDiv(out);
01760 *out << std::endl;
01761 }
01762 else
01763 mDiv(out,"hlt","Not yet...");
01764
01765 mDiv(out,"sq",squidPresent_.toString());
01766 mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
01767 mDiv(out,"mwl",evtProcessor_.wlMonitoring());
01768 if(nbProcessed != 0 && nbAccepted != 0)
01769 {
01770 mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
01771 mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
01772 }
01773 else
01774 {
01775 mDiv(out,"tt",0);
01776 mDiv(out,"ac",0);
01777 }
01778 if(!myProcess_)
01779 mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
01780 else
01781 mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
01782
01783 mDiv(out,"idi",iDieUrl_.value_);
01784 if(vp_!=0){
01785 mDiv(out,"vpi",(unsigned int) vp_);
01786 if(vulture_->hasStarted()>=0)
01787 mDiv(out,"vul","Prowling");
01788 else
01789 mDiv(out,"vul","Dead");
01790 }
01791 else{
01792 mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
01793 }
01794 if(evtProcessor_){
01795 mDiv(out,"ll");
01796 *out << evtProcessor_.lastLumi().ls
01797 << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
01798 cDiv(out);
01799 }
01800 mDiv(out,"lg");
01801 for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
01802 *out << logRing_[i] << std::endl;
01803 if(logWrap_)
01804 for(unsigned int i = 0; i<logRingIndex_; i++)
01805 *out << logRing_[i] << std::endl;
01806 cDiv(out);
01807
01808 }
01809
01810 void FUEventProcessor::procStat(xgi::Input *in, xgi::Output *out)
01811 {
01812 evf::utils::procStat(out);
01813 }
01814
01815 void FUEventProcessor::sendMessageOverMonitorQueue(MsgBuf &buf)
01816 {
01817 if(myProcess_) myProcess_->postSlave(buf,true);
01818 }
01819
01820 void FUEventProcessor::makeStaticInfo()
01821 {
01822 using namespace utils;
01823 std::ostringstream ost;
01824 mDiv(&ost,"ve");
01825 ost<< "$Revision: 1.124 $ (" << edm::getReleaseVersion() <<")";
01826 cDiv(&ost);
01827 mDiv(&ost,"ou",outPut_.toString());
01828 mDiv(&ost,"sh",hasShMem_.toString());
01829 mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
01830 mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
01831
01832 xdata::Serializable *monsleep = 0;
01833 xdata::Serializable *lstimeout = 0;
01834 try{
01835 monsleep = applicationInfoSpace_->find("monSleepSec");
01836 lstimeout = applicationInfoSpace_->find("lsTimeOut");
01837 }
01838 catch(xdata::exception::Exception e){
01839 }
01840
01841 if(monsleep!=0)
01842 mDiv(&ost,"ms",monsleep->toString());
01843 if(lstimeout!=0)
01844 mDiv(&ost,"lst",lstimeout->toString());
01845 char cbuf[sizeof(struct utsname)];
01846 struct utsname* buf = (struct utsname*)cbuf;
01847 uname(buf);
01848 mDiv(&ost,"sysinfo");
01849 ost << buf->sysname << " " << buf->nodename
01850 << " " << buf->release << " " << buf->version << " " << buf->machine;
01851 cDiv(&ost);
01852 updaterStatic_ = ost.str();
01853 }
01854
01855 XDAQ_INSTANTIATOR_IMPL(evf::FUEventProcessor)