00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00012 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00013 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00014
00015 #include "FWCore/Utilities/interface/CRC16.h"
00016
00017 #include "i2o/Method.h"
00018 #include "interface/shared/i2oXFunctionCodes.h"
00019 #include "xcept/tools.h"
00020
00021 #include "toolbox/mem/HeapAllocator.h"
00022 #include "toolbox/mem/Reference.h"
00023 #include "toolbox/mem/MemoryPoolFactory.h"
00024 #include "toolbox/mem/exception/Exception.h"
00025
00026 #include "xoap/MessageReference.h"
00027 #include "xoap/MessageFactory.h"
00028 #include "xoap/SOAPEnvelope.h"
00029 #include "xoap/SOAPBody.h"
00030 #include "xoap/domutils.h"
00031 #include "xoap/Method.h"
00032
00033 #include "cgicc/CgiDefs.h"
00034 #include "cgicc/Cgicc.h"
00035 #include "cgicc/HTMLClasses.h"
00036
00037 #include <signal.h>
00038 #include <iostream>
00039 #include <sstream>
00040
00041
00042 using namespace std;
00043 using namespace evf;
00044
00045
00047
00049
00050
00051 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00052 : xdaq::Application(s)
00053 , fsm_(this)
00054 , gui_(0)
00055 , log_(getApplicationLogger())
00056 , bu_(0)
00057 , sm_(0)
00058 , i2oPool_(0)
00059 , resourceTable_(0)
00060 , wlMonitoring_(0)
00061 , asMonitoring_(0)
00062 , wlWatching_(0)
00063 , asWatching_(0)
00064 , instance_(0)
00065 , runNumber_(0)
00066 , deltaT_(0.0)
00067 , deltaN_(0)
00068 , deltaSumOfSquares_(0)
00069 , deltaSumOfSizes_(0)
00070 , throughput_(0.0)
00071 , rate_(0.0)
00072 , average_(0.0)
00073 , rms_(0.0)
00074 , nbAllocatedEvents_(0)
00075 , nbPendingRequests_(0)
00076 , nbReceivedEvents_(0)
00077 , nbSentEvents_(0)
00078 , nbSentErrorEvents_(0)
00079 , nbPendingSMDiscards_(0)
00080 , nbDiscardedEvents_(0)
00081 , nbLostEvents_(0)
00082 , nbDataErrors_(0)
00083 , nbCrcErrors_(0)
00084 , segmentationMode_(false)
00085 , nbClients_(0)
00086 , clientPrcIds_("")
00087 , nbRawCells_(16)
00088 , nbRecoCells_(8)
00089 , nbDqmCells_(8)
00090 , rawCellSize_(0x400000)
00091 , recoCellSize_(0x800000)
00092 , dqmCellSize_(0x800000)
00093 , doDropEvents_(false)
00094 , doFedIdCheck_(true)
00095 , doCrcCheck_(1)
00096 , doDumpEvents_(0)
00097 , buClassName_("BU")
00098 , buInstance_(0)
00099 , smClassName_("StorageManager")
00100 , smInstance_(0)
00101 , monSleepSec_(2)
00102 , watchSleepSec_(10)
00103 , timeOutSec_(30)
00104 , processKillerEnabled_(true)
00105 , useEvmBoard_(true)
00106 , reasonForFailed_("")
00107 , nbAllocateSent_(0)
00108 , nbTakeReceived_(0)
00109 , nbDataDiscardReceived_(0)
00110 , nbDqmDiscardReceived_(0)
00111 , nbSentLast_(0)
00112 , sumOfSquaresLast_(0)
00113 , sumOfSizesLast_(0)
00114 , lock_(toolbox::BSem::FULL)
00115 {
00116
00117 fsm_.initialize<evf::FUResourceBroker>(this);
00118
00119
00120 url_ =
00121 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00122 getApplicationDescriptor()->getURN();
00123 class_ =getApplicationDescriptor()->getClassName();
00124 instance_=getApplicationDescriptor()->getInstance();
00125 sourceId_=class_.toString()+"_"+instance_.toString();
00126
00127
00128 i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00129 I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00130 i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00131 I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00132 i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00133 I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00134
00135
00136
00137 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00138 gui_=new WebGUI(this,&fsm_);
00139 vector<toolbox::lang::Method*> methods=gui_->getMethods();
00140 vector<toolbox::lang::Method*>::iterator it;
00141 for (it=methods.begin();it!=methods.end();++it) {
00142 if ((*it)->type()=="cgi") {
00143 string name=static_cast<xgi::MethodSignature*>(*it)->name();
00144 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00145 }
00146 }
00147 xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00148
00149
00150
00151 string i2oPoolName=sourceId_+"_i2oPool";
00152 try {
00153 toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00154 toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00155 toolbox::mem::MemoryPoolFactory* poolFactory=
00156 toolbox::mem::getMemoryPoolFactory();
00157 i2oPool_=poolFactory->createPool(urn,allocator);
00158 }
00159 catch (toolbox::mem::exception::Exception& e) {
00160 string s="Failed to create pool: "+i2oPoolName;
00161 LOG4CPLUS_FATAL(log_,s);
00162 XCEPT_RETHROW(xcept::Exception,s,e);
00163 }
00164
00165
00166 exportParameters();
00167
00168
00169 fsm_.findRcmsStateListener();
00170
00171
00172 getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00173
00174 }
00175
00176
00177
00178 FUResourceBroker::~FUResourceBroker()
00179 {
00180
00181 }
00182
00183
00184
00186
00188
00189
00190 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00191 {
00192 try {
00193 LOG4CPLUS_INFO(log_, "Start configuring ...");
00194 connectToBUandSM();
00195 resourceTable_=new FUResourceTable(segmentationMode_.value_,
00196 nbRawCells_.value_,
00197 nbRecoCells_.value_,
00198 nbDqmCells_.value_,
00199 rawCellSize_.value_,
00200 recoCellSize_.value_,
00201 dqmCellSize_.value_,
00202 bu_,sm_,
00203 log_);
00204 FUResource::doFedIdCheck(doFedIdCheck_);
00205 FUResource::useEvmBoard(useEvmBoard_);
00206 resourceTable_->setDoCrcCheck(doCrcCheck_);
00207 resourceTable_->setDoDumpEvents(doDumpEvents_);
00208 reset();
00209 LOG4CPLUS_INFO(log_, "Finished configuring!");
00210
00211 fsm_.fireEvent("ConfigureDone",this);
00212 }
00213 catch (xcept::Exception &e) {
00214 std::string msg = "configuring FAILED: " + (string)e.what();
00215 reasonForFailed_ = e.what();
00216 fsm_.fireFailed(msg,this);
00217 }
00218
00219 return false;
00220 }
00221
00222
00223
00224 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00225 {
00226 try {
00227 LOG4CPLUS_INFO(log_, "Start enabling ...");
00228 startMonitoringWorkLoop();
00229 startWatchingWorkLoop();
00230 resourceTable_->setRunNumber(runNumber_);
00231 resourceTable_->resetCounters();
00232 resourceTable_->startDiscardWorkLoop();
00233 resourceTable_->startSendDataWorkLoop();
00234 resourceTable_->startSendDqmWorkLoop();
00235 resourceTable_->sendAllocate();
00236 LOG4CPLUS_INFO(log_, "Finished enabling!");
00237 fsm_.fireEvent("EnableDone",this);
00238 }
00239 catch (xcept::Exception &e) {
00240 std::string msg = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00241 reasonForFailed_ = e.what();
00242 fsm_.fireFailed(msg,this);
00243 }
00244
00245 return false;
00246 }
00247
00248
00249
00250 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00251 {
00252 try {
00253 LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00254 resourceTable_->stop();
00255 UInt_t count = 0;
00256 while (count<10) {
00257 if (resourceTable_->isReadyToShutDown()) {
00258 LOG4CPLUS_INFO(log_,"ResourceTable successfully shutdown ("<<count+1<<").");
00259 break;
00260 }
00261 else {
00262 count++;
00263 LOG4CPLUS_DEBUG(log_,"Waiting for ResourceTable to shutdown ("<<count<<")");
00264 ::sleep(1);
00265 }
00266 }
00267
00268 if (count<10) {
00269 LOG4CPLUS_INFO(log_, "Finished stopping!");
00270 fsm_.fireEvent("StopDone",this);
00271 }
00272 else {
00273 std::string msg = "stopping FAILED: ResourceTable shutdown timed out.";
00274 reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT.";
00275 fsm_.fireFailed(msg,this);
00276 }
00277 }
00278 catch (xcept::Exception &e) {
00279 std::string msg = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00280 reasonForFailed_ = e.what();
00281 fsm_.fireFailed(msg,this);
00282 }
00283
00284 return false;
00285 }
00286
00287
00288
00289 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00290 {
00291 try {
00292 LOG4CPLUS_INFO(log_, "Start halting ...");
00293 if (resourceTable_->isActive()) {
00294 resourceTable_->halt();
00295 UInt_t count = 0;
00296 while (count<10) {
00297 if (resourceTable_->isReadyToShutDown()) {
00298 lock();
00299 delete resourceTable_;
00300 resourceTable_=0;
00301 unlock();
00302 LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00303 break;
00304 }
00305 else {
00306 count++;
00307 LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00308 ::sleep(1);
00309 }
00310 }
00311 }
00312 else {
00313 lock();
00314 delete resourceTable_;
00315 resourceTable_=0;
00316 unlock();
00317 }
00318
00319 if (0==resourceTable_) {
00320 LOG4CPLUS_INFO(log_,"Finished halting!");
00321 fsm_.fireEvent("HaltDone",this);
00322 }
00323 else {
00324 std::string msg = "halting FAILED: ResourceTable shutdown timed out.";
00325 reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00326 fsm_.fireFailed(msg,this);
00327 }
00328 }
00329 catch (xcept::Exception &e) {
00330 std::string msg = "halting FAILED: "+xcept::stdformat_exception_history(e);
00331 reasonForFailed_ = e.what();
00332 fsm_.fireFailed(msg,this);
00333 }
00334
00335 return false;
00336 }
00337
00338
00339
00340 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00341 throw (xoap::exception::Exception)
00342 {
00343 return fsm_.commandCallback(msg);
00344 }
00345
00346
00347
00348 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00349 {
00350 nbTakeReceived_.value_++;
00351 bool eventComplete=resourceTable_->buildResource(bufRef);
00352 if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00353
00354 }
00355
00356
00357
00358 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00359 {
00360 nbDataDiscardReceived_.value_++;
00361 resourceTable_->discardDataEvent(bufRef);
00362 }
00363
00364
00365
00366 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00367 {
00368 nbDqmDiscardReceived_.value_++;
00369 resourceTable_->discardDqmEvent(bufRef);
00370 }
00371
00372
00373
00374 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00375 {
00376 typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00377 typedef AppDescSet_t::iterator AppDescIter_t;
00378
00379
00380 AppDescSet_t setOfBUs=
00381 getApplicationContext()->getDefaultZone()->
00382 getApplicationDescriptors(buClassName_.toString());
00383
00384 if (0!=bu_) { delete bu_; bu_=0; }
00385
00386 for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00387 if ((*it)->getInstance()==buInstance_)
00388 bu_=new BUProxy(getApplicationDescriptor(),*it,
00389 getApplicationContext(),i2oPool_);
00390
00391 if (0==bu_) {
00392 string msg=sourceId_+" failed to locate input BU!";
00393 XCEPT_RAISE(evf::Exception,msg);
00394 }
00395
00396
00397 AppDescSet_t setOfSMs=
00398 getApplicationContext()->getDefaultZone()->
00399 getApplicationDescriptors(smClassName_.toString());
00400
00401 if (0!=sm_) { delete sm_; sm_=0; }
00402
00403 for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00404 if ((*it)->getInstance()==smInstance_)
00405 sm_=new SMProxy(getApplicationDescriptor(),*it,
00406 getApplicationContext(),i2oPool_);
00407
00408 if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00409 }
00410
00411
00412
00413 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00414 throw (xgi::exception::Exception)
00415 {
00416 string name=in->getenv("PATH_INFO");
00417 if (name.empty()) name="defaultWebPage";
00418 static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00419 }
00420
00421
00422
00423 void FUResourceBroker::actionPerformed(xdata::Event& e)
00424 {
00425 lock();
00426
00427 if (0!=resourceTable_) {
00428
00429
00430
00431 if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00432 nbClients_ =resourceTable_->nbClients();
00433 clientPrcIds_ =resourceTable_->clientPrcIdsAsString();
00434 nbAllocatedEvents_ =resourceTable_->nbAllocated();
00435 nbPendingRequests_ =resourceTable_->nbPending();
00436 nbReceivedEvents_ =resourceTable_->nbCompleted();
00437 nbSentEvents_ =resourceTable_->nbSent();
00438 nbSentErrorEvents_ =resourceTable_->nbSentError();
00439 nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00440 nbDiscardedEvents_ =resourceTable_->nbDiscarded();
00441 nbLostEvents_ =resourceTable_->nbLost();
00442 nbDataErrors_ =resourceTable_->nbErrors();
00443 nbCrcErrors_ =resourceTable_->nbCrcErrors();
00444 nbAllocateSent_ =resourceTable_->nbAllocSent();
00445 }
00446 else if (e.type()=="ItemChangedEvent") {
00447
00448 string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00449
00450 if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00451 if (item=="useEvmBoard") FUResource::useEvmBoard(useEvmBoard_);
00452 if (item=="doCrcCheck") resourceTable_->setDoCrcCheck(doCrcCheck_);
00453 if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00454 }
00455
00456
00457 }
00458 else {
00459 nbClients_ =0;
00460 clientPrcIds_ ="";
00461 nbAllocatedEvents_ =0;
00462 nbPendingRequests_ =0;
00463 nbReceivedEvents_ =0;
00464 nbSentEvents_ =0;
00465 nbSentErrorEvents_ =0;
00466 nbPendingSMDiscards_=0;
00467 nbDiscardedEvents_ =0;
00468 nbLostEvents_ =0;
00469 nbDataErrors_ =0;
00470 nbCrcErrors_ =0;
00471 nbAllocateSent_ =0;
00472 }
00473 unlock();
00474 }
00475
00476
00477
00478 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00479 {
00480 struct timezone timezone;
00481 gettimeofday(&monStartTime_,&timezone);
00482
00483 try {
00484 wlMonitoring_=
00485 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00486 "waiting");
00487 if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00488 asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00489 sourceId_+"Monitoring");
00490 wlMonitoring_->submit(asMonitoring_);
00491 }
00492 catch (xcept::Exception& e) {
00493 string msg = "Failed to start workloop 'Monitoring'.";
00494 XCEPT_RETHROW(evf::Exception,msg,e);
00495 }
00496 }
00497
00498
00499
00500 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00501 {
00502 unsigned int nbSent;
00503 uint64_t sumOfSquares;
00504 unsigned int sumOfSizes;
00505 uint64_t deltaSumOfSquares;
00506
00507 lock();
00508 if (0==resourceTable_) {
00509 deltaT_.value_ =0.0;
00510 deltaN_.value_ = 0;
00511 deltaSumOfSquares_.value_=0.0;
00512 deltaSumOfSizes_.value_ = 0;
00513 throughput_ =0.0;
00514 rate_ =0.0;
00515 average_ =0.0;
00516 rms_ =0.0;
00517 unlock();
00518 return false;
00519 }
00520 else {
00521 nbSent =resourceTable_->nbSent();
00522 sumOfSquares=resourceTable_->sumOfSquares();
00523 sumOfSizes =resourceTable_->sumOfSizes();
00524 }
00525 unlock();
00526
00527 struct timeval monEndTime;
00528 struct timezone timezone;
00529
00530 gettimeofday(&monEndTime,&timezone);
00531
00532 xdata::getInfoSpaceFactory()->lock();
00533 gui_->monInfoSpace()->lock();
00534
00535 deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00536 monStartTime_=monEndTime;
00537
00538 deltaN_.value_=nbSent-nbSentLast_;
00539 nbSentLast_=nbSent;
00540
00541 deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00542 deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00543 sumOfSquaresLast_=sumOfSquares;
00544
00545 deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00546 sumOfSizesLast_=sumOfSizes;
00547
00548 if (deltaT_.value_!=0) {
00549 throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00550 rate_ =deltaN_.value_/deltaT_.value_;
00551 }
00552 else {
00553 throughput_=0.0;
00554 rate_ =0.0;
00555 }
00556
00557 double meanOfSquares,mean,squareOfMean,variance;
00558
00559 if(deltaN_.value_!=0) {
00560 meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00561 mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00562 squareOfMean=mean*mean;
00563 variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00564
00565 average_=deltaSumOfSizes_.value_/deltaN_.value_;
00566 rms_ =std::sqrt(variance);
00567 }
00568 else {
00569 average_=0.0;
00570 rms_ =0.0;
00571 }
00572
00573 gui_->monInfoSpace()->unlock();
00574 xdata::getInfoSpaceFactory()->unlock();
00575
00576 ::sleep(monSleepSec_.value_);
00577
00578 return true;
00579 }
00580
00581
00582
00583 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00584 {
00585 try {
00586 wlWatching_=
00587 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00588 "waiting");
00589 if (!wlWatching_->isActive()) wlWatching_->activate();
00590 asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00591 sourceId_+"Watching");
00592 wlWatching_->submit(asWatching_);
00593 }
00594 catch (xcept::Exception& e) {
00595 string msg = "Failed to start workloop 'Watching'.";
00596 XCEPT_RETHROW(evf::Exception,msg,e);
00597 }
00598 }
00599
00600
00601
00602 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00603 {
00604 lock();
00605
00606 if (0==resourceTable_) {
00607 unlock();
00608 return false;
00609 }
00610
00611 vector<pid_t> prcids=resourceTable_->clientPrcIds();
00612 for (UInt_t i=0;i<prcids.size();i++) {
00613 pid_t pid =prcids[i];
00614 int status=kill(pid,0);
00615 if (status!=0) {
00616 LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send raw data to err stream.");
00617 resourceTable_->handleCrashedEP(runNumber_,pid);
00618 }
00619 }
00620
00621 vector<pid_t> evt_prcids =resourceTable_->cellPrcIds();
00622 vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00623 vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps();
00624
00625 time_t tcurr=time(0);
00626 for (UInt_t i=0;i<evt_tstamps.size();i++) {
00627 pid_t pid =evt_prcids[i];
00628 UInt_t evt =evt_numbers[i];
00629 time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00630 double tdiff =difftime(tcurr,tstamp);
00631 if (tdiff>timeOutSec_) {
00632 if(processKillerEnabled_) {
00633 LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
00634 kill(pid,9);
00635 }
00636 else {
00637 LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
00638 <<timeOutSec_<<"sec for process "<<pid);
00639 }
00640 }
00641 }
00642
00643 unlock();
00644
00645 ::sleep(watchSleepSec_.value_);
00646
00647 return true;
00648 }
00649
00650
00651
00652 void FUResourceBroker::exportParameters()
00653 {
00654 assert(0!=gui_);
00655
00656 gui_->addMonitorParam("url", &url_);
00657 gui_->addMonitorParam("class", &class_);
00658 gui_->addMonitorParam("instance", &instance_);
00659 gui_->addMonitorParam("runNumber", &runNumber_);
00660 gui_->addMonitorParam("stateName", fsm_.stateName());
00661
00662 gui_->addMonitorParam("deltaT", &deltaT_);
00663 gui_->addMonitorParam("deltaN", &deltaN_);
00664 gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
00665 gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
00666
00667 gui_->addMonitorParam("throughput", &throughput_);
00668 gui_->addMonitorParam("rate", &rate_);
00669 gui_->addMonitorParam("average", &average_);
00670 gui_->addMonitorParam("rms", &rms_);
00671
00672 gui_->addMonitorCounter("nbAllocatedEvents", &nbAllocatedEvents_);
00673 gui_->addMonitorCounter("nbPendingRequests", &nbPendingRequests_);
00674 gui_->addMonitorCounter("nbReceivedEvents", &nbReceivedEvents_);
00675 gui_->addMonitorCounter("nbSentEvents", &nbSentEvents_);
00676 gui_->addMonitorCounter("nbSentErrorEvents", &nbSentErrorEvents_);
00677 gui_->addMonitorCounter("nbPendingSMDiscards", &nbPendingSMDiscards_);
00678 gui_->addMonitorCounter("nbDiscardedEvents", &nbDiscardedEvents_);
00679 gui_->addMonitorCounter("nbLostEvents", &nbLostEvents_);
00680 gui_->addMonitorCounter("nbDataErrors", &nbDataErrors_);
00681 gui_->addMonitorCounter("nbCrcErrors", &nbCrcErrors_);
00682
00683 gui_->addStandardParam("segmentationMode", &segmentationMode_);
00684 gui_->addStandardParam("nbClients", &nbClients_);
00685 gui_->addStandardParam("clientPrcIds", &clientPrcIds_);
00686 gui_->addStandardParam("nbRawCells", &nbRawCells_);
00687 gui_->addStandardParam("nbRecoCells", &nbRecoCells_);
00688 gui_->addStandardParam("nbDqmCells", &nbDqmCells_);
00689 gui_->addStandardParam("rawCellSize", &rawCellSize_);
00690 gui_->addStandardParam("recoCellSize", &recoCellSize_);
00691 gui_->addStandardParam("dqmCellSize", &dqmCellSize_);
00692
00693 gui_->addStandardParam("doDropEvents", &doDropEvents_);
00694 gui_->addStandardParam("doFedIdCheck", &doFedIdCheck_);
00695 gui_->addStandardParam("doCrcCheck", &doCrcCheck_);
00696 gui_->addStandardParam("doDumpEvents", &doDumpEvents_);
00697 gui_->addStandardParam("buClassName", &buClassName_);
00698 gui_->addStandardParam("buInstance", &buInstance_);
00699 gui_->addStandardParam("smClassName", &smClassName_);
00700 gui_->addStandardParam("smInstance", &smInstance_);
00701 gui_->addStandardParam("monSleepSec", &monSleepSec_);
00702 gui_->addStandardParam("watchSleepSec", &watchSleepSec_);
00703 gui_->addStandardParam("timeOutSec", &timeOutSec_);
00704 gui_->addStandardParam("processKillerEnabled", &processKillerEnabled_);
00705 gui_->addStandardParam("useEvmBoard", &useEvmBoard_);
00706 gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
00707 gui_->addStandardParam("foundRcmsStateListener", fsm_.foundRcmsStateListener());
00708 gui_->addStandardParam("reasonForFailed", &reasonForFailed_);
00709
00710 gui_->addDebugCounter("nbAllocateSent", &nbAllocateSent_);
00711 gui_->addDebugCounter("nbTakeReceived", &nbTakeReceived_);
00712 gui_->addDebugCounter("nbDataDiscardReceived", &nbDataDiscardReceived_);
00713 gui_->addDebugCounter("nbDqmDiscardReceived", &nbDqmDiscardReceived_);
00714
00715 gui_->exportParameters();
00716
00717 gui_->addItemChangedListener("doFedIdCheck", this);
00718 gui_->addItemChangedListener("useEvmBoard", this);
00719 gui_->addItemChangedListener("doCrcCheck", this);
00720 gui_->addItemChangedListener("doDumpEvents", this);
00721 }
00722
00723
00724
00725 void FUResourceBroker::reset()
00726 {
00727 gui_->resetCounters();
00728
00729 deltaT_ =0.0;
00730 deltaN_ = 0;
00731 deltaSumOfSquares_=0.0;
00732 deltaSumOfSizes_ = 0;
00733
00734 throughput_ =0.0;
00735 rate_ =0.0;
00736 average_ =0.0;
00737 rms_ =0.0;
00738
00739 nbSentLast_ = 0;
00740 sumOfSquaresLast_ = 0;
00741 sumOfSizesLast_ = 0;
00742 }
00743
00744
00745
00746 double FUResourceBroker::deltaT(const struct timeval *start,
00747 const struct timeval *end)
00748 {
00749 unsigned int sec;
00750 unsigned int usec;
00751
00752 sec = end->tv_sec - start->tv_sec;
00753
00754 if(end->tv_usec > start->tv_usec) {
00755 usec = end->tv_usec - start->tv_usec;
00756 }
00757 else {
00758 sec--;
00759 usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00760 }
00761
00762 return ((double)sec) + ((double)usec) / 1000000.0;
00763 }
00764
00765
00766
00767
00768 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00769 throw (xgi::exception::Exception)
00770 {
00771 using namespace cgicc;
00772
00773 *out<<"<html>"<<endl;
00774 gui_->htmlHead(in,out,sourceId_);
00775 *out<<"<body>"<<endl;
00776 gui_->htmlHeadline(in,out);
00777
00778 lock();
00779
00780 if (0!=resourceTable_) {
00781
00782 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00783 *out<<table().set("frame","void").set("rules","rows")
00784 .set("class","modules").set("width","250")<<endl
00785 <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00786 <<tr()
00787 <<th("client").set("align","left")
00788 <<th("process id").set("align","center")
00789 <<th("status").set("align","center")
00790 <<tr()
00791 <<endl;
00792 for (UInt_t i=0;i<client_prc_ids.size();i++) {
00793
00794 pid_t pid =client_prc_ids[i];
00795 int status=kill(pid,0);
00796
00797 stringstream ssi; ssi<<i+1;
00798 stringstream sspid; sspid<<pid;
00799 stringstream ssstatus; ssstatus<<status;
00800
00801 string bg_status = (status==0) ? "#00ff00" : "ff0000";
00802 *out<<tr()
00803 <<td(ssi.str()).set("align","left")
00804 <<td(sspid.str()).set("align","center")
00805 <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00806 <<tr()<<endl;
00807 }
00808 *out<<table()<<endl;
00809 *out<<"<br><br>"<<endl;
00810
00811 vector<string> states = resourceTable_->cellStates();
00812 vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00813 vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
00814 vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00815
00816 *out<<table().set("frame","void").set("rules","rows")
00817 .set("class","modules").set("width","500")<<endl
00818 <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00819 <<tr()
00820 <<th("cell").set("align","left")
00821 <<th("state").set("align","center")
00822 <<th("event").set("align","center")
00823 <<th("process id").set("align","center")
00824 <<th("timestamp").set("align","center")
00825 <<th("time").set("align","center")
00826 <<tr()
00827 <<endl;
00828 for (UInt_t i=0;i<states.size();i++) {
00829 string state=states[i];
00830 UInt_t evt = evt_numbers[i];
00831 pid_t pid = prc_ids[i];
00832 time_t tstamp= time_stamps[i];
00833 double tdiff = difftime(time(0),tstamp);
00834
00835 stringstream ssi; ssi<<i;
00836 stringstream ssevt; if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00837 stringstream sspid; if (pid!=0) sspid<<pid; else sspid<<" - ";
00838 stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00839 stringstream sstdiff; if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00840
00841 string bg_state = "#ffffff";
00842 if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00843 state=="RAWREADING"||state=="RAWREAD")
00844 bg_state="#99CCff";
00845 else if (state=="PROCESSING")
00846 bg_state="#ff0000";
00847 else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00848 bg_state="#CCff99";
00849 else if (state=="SENDING")
00850 bg_state="#00FF33";
00851 else if (state=="SENT")
00852 bg_state="#006633";
00853 else if (state=="DISCARDING")
00854 bg_state="#FFFF00";
00855
00856 *out<<tr()
00857 <<td(ssi.str()).set("align","left")
00858 <<td(state).set("align","center").set("bgcolor",bg_state)
00859 <<td(ssevt.str()).set("align","center")
00860 <<td(sspid.str()).set("align","center")
00861 <<td(sststamp.str()).set("align","center")
00862 <<td(sstdiff.str()).set("align","center")
00863 <<tr()<<endl;
00864 }
00865 *out<<table()<<endl;
00866
00867
00868 }
00869 *out<<"</body>"<<endl<<"</html>"<<endl;
00870
00871 unlock();
00872 }
00873
00874
00875
00877
00879
00880 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)