00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011
00012 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00013 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00014 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00015
00016 #include "FWCore/Utilities/interface/CRC16.h"
00017
00018 #include "EvffedFillerRB.h"
00019
00020 #include "i2o/Method.h"
00021 #include "interface/shared/i2oXFunctionCodes.h"
00022 #include "interface/evb/i2oEVBMsgs.h"
00023 #include "xcept/tools.h"
00024
00025 #include "toolbox/mem/HeapAllocator.h"
00026 #include "toolbox/mem/Reference.h"
00027 #include "toolbox/mem/MemoryPoolFactory.h"
00028 #include "toolbox/mem/exception/Exception.h"
00029
00030 #include "xoap/MessageReference.h"
00031 #include "xoap/MessageFactory.h"
00032 #include "xoap/SOAPEnvelope.h"
00033 #include "xoap/SOAPBody.h"
00034 #include "xoap/domutils.h"
00035 #include "xoap/Method.h"
00036
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040 #include "cgicc/HTMLClasses.h"
00041
00042 #include <signal.h>
00043 #include <iostream>
00044 #include <sstream>
00045
00046
00047 using namespace std;
00048 using namespace evf;
00049
00050
00052
00054
00055
00056 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00057 : xdaq::Application(s)
00058 , fsm_(this)
00059 , gui_(0)
00060 , log_(getApplicationLogger())
00061 , bu_(0)
00062 , sm_(0)
00063 , i2oPool_(0)
00064 , resourceTable_(0)
00065 , wlMonitoring_(0)
00066 , asMonitoring_(0)
00067 , wlWatching_(0)
00068 , asWatching_(0)
00069 , instance_(0)
00070 , runNumber_(0)
00071 , deltaT_(0.0)
00072 , deltaN_(0)
00073 , deltaSumOfSquares_(0)
00074 , deltaSumOfSizes_(0)
00075 , throughput_(0.0)
00076 , rate_(0.0)
00077 , average_(0.0)
00078 , rms_(0.0)
00079 , nbAllocatedEvents_(0)
00080 , nbPendingRequests_(0)
00081 , nbReceivedEvents_(0)
00082 , nbSentEvents_(0)
00083 , nbSentDqmEvents_(0)
00084 , nbSentErrorEvents_(0)
00085 , nbPendingSMDiscards_(0)
00086 , nbPendingSMDqmDiscards_(0)
00087 , nbDiscardedEvents_(0)
00088 , nbLostEvents_(0)
00089 , nbDataErrors_(0)
00090 , nbCrcErrors_(0)
00091 , nbTimeoutsWithEvent_(0)
00092 , nbTimeoutsWithoutEvent_(0)
00093 , dataErrorFlag_(0)
00094 , segmentationMode_(false)
00095 , nbClients_(0)
00096 , clientPrcIds_("")
00097 , nbRawCells_(16)
00098 , nbRecoCells_(8)
00099 , nbDqmCells_(8)
00100 , rawCellSize_(0x400000)
00101 , recoCellSize_(0x800000)
00102 , dqmCellSize_(0x800000)
00103 , doDropEvents_(false)
00104 , doFedIdCheck_(true)
00105 , doCrcCheck_(1)
00106 , doDumpEvents_(0)
00107 , buClassName_("BU")
00108 , buInstance_(0)
00109 , smClassName_("StorageManager")
00110 , smInstance_(0)
00111 , shmResourceTableTimeout_(200000)
00112 , monSleepSec_(2)
00113 , watchSleepSec_(10)
00114 , timeOutSec_(30)
00115 , processKillerEnabled_(true)
00116 , useEvmBoard_(true)
00117 , reasonForFailed_("")
00118 , nbAllocateSent_(0)
00119 , nbTakeReceived_(0)
00120 , nbDataDiscardReceived_(0)
00121 , nbDqmDiscardReceived_(0)
00122 , nbSentLast_(0)
00123 , sumOfSquaresLast_(0)
00124 , sumOfSizesLast_(0)
00125 , lock_(toolbox::BSem::FULL)
00126 , frb_(0)
00127 , shmInconsistent_(false)
00128 {
00129
00130 fsm_.initialize<evf::FUResourceBroker>(this);
00131
00132
00133 url_ =
00134 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00135 getApplicationDescriptor()->getURN();
00136 class_ =getApplicationDescriptor()->getClassName();
00137 instance_=getApplicationDescriptor()->getInstance();
00138 sourceId_=class_.toString()+"_"+instance_.toString();
00139
00140
00141 i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00142 I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00143 i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00144 I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00145 i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00146 I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00147 i2o::bind(this,&FUResourceBroker::I2O_EVM_LUMISECTION_Callback,
00148 I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
00149
00150
00151
00152 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00153 gui_=new WebGUI(this,&fsm_);
00154 vector<toolbox::lang::Method*> methods=gui_->getMethods();
00155 vector<toolbox::lang::Method*>::iterator it;
00156 for (it=methods.begin();it!=methods.end();++it) {
00157 if ((*it)->type()=="cgi") {
00158 string name=static_cast<xgi::MethodSignature*>(*it)->name();
00159 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00160 }
00161 }
00162 xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00163
00164
00165
00166 string i2oPoolName=sourceId_+"_i2oPool";
00167 try {
00168 toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00169 toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00170 toolbox::mem::MemoryPoolFactory* poolFactory=
00171 toolbox::mem::getMemoryPoolFactory();
00172 i2oPool_=poolFactory->createPool(urn,allocator);
00173 }
00174 catch (toolbox::mem::exception::Exception& e) {
00175 string s="Failed to create pool: "+i2oPoolName;
00176 LOG4CPLUS_FATAL(log_,s);
00177 XCEPT_RETHROW(xcept::Exception,s,e);
00178 }
00179
00180
00181 exportParameters();
00182
00183
00184 fsm_.findRcmsStateListener();
00185
00186
00187 getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00188
00189 }
00190
00191
00192
00193 FUResourceBroker::~FUResourceBroker()
00194 {
00195
00196 }
00197
00198
00199
00201
00203
00204
00205 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00206 {
00207 try {
00208 LOG4CPLUS_INFO(log_, "Start configuring ...");
00209 connectToBUandSM();
00210 frb_ = new EvffedFillerRB(this);
00211 configureResources();
00212 if(shmInconsistent_){
00213 std::ostringstream ost;
00214 ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw="
00215 << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00216 << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00217 XCEPT_RAISE(evf::Exception,ost.str());
00218 }
00219 LOG4CPLUS_INFO(log_, "Finished configuring!");
00220 fsm_.fireEvent("ConfigureDone",this);
00221
00222 }
00223 catch (xcept::Exception &e) {
00224 std::string msg = "configuring FAILED: " + (string)e.what();
00225 reasonForFailed_ = e.what();
00226 fsm_.fireFailed(msg,this);
00227 }
00228
00229 return false;
00230 }
00231
00232
00233
00234 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00235 {
00236 try {
00237 LOG4CPLUS_INFO(log_, "Start enabling ...");
00238 startMonitoringWorkLoop();
00239 startWatchingWorkLoop();
00240 resourceTable_->setRunNumber(runNumber_);
00241 resourceTable_->resetCounters();
00242 resourceTable_->startDiscardWorkLoop();
00243 resourceTable_->startSendDataWorkLoop();
00244 resourceTable_->startSendDqmWorkLoop();
00245 resourceTable_->sendAllocate();
00246
00247 nbTimeoutsWithEvent_ = 0;
00248 nbTimeoutsWithoutEvent_ = 0;
00249 dataErrorFlag_ = 0;
00250
00251 LOG4CPLUS_INFO(log_, "Finished enabling!");
00252 fsm_.fireEvent("EnableDone",this);
00253 }
00254 catch (xcept::Exception &e) {
00255 std::string msg = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00256 reasonForFailed_ = e.what();
00257 fsm_.fireFailed(msg,this);
00258 }
00259
00260 return false;
00261 }
00262
00263
00264
00265 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00266 {
00267 try {
00268 LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00269 resourceTable_->stop();
00270 timeval now;
00271 timeval then;
00272 gettimeofday(&then,0);
00273 while (!resourceTable_->isReadyToShutDown()) {
00274 ::usleep(shmResourceTableTimeout_.value_*10);
00275 gettimeofday(&now,0);
00276 if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
00277 std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " "
00278 << shmResourceTableTimeout_.value_/10000 << std::endl;
00279 LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
00280 emergencyStop();
00281 break;
00282 }
00283 }
00284
00285 if (resourceTable_->isReadyToShutDown()) {
00286 LOG4CPLUS_INFO(log_, "Finished stopping!");
00287 fsm_.fireEvent("StopDone",this);
00288 }
00289 }
00290 catch (xcept::Exception &e) {
00291 std::string msg = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00292 reasonForFailed_ = e.what();
00293 fsm_.fireFailed(msg,this);
00294 }
00295
00296 return false;
00297 }
00298
00299
00300
00301 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00302 {
00303 try {
00304 LOG4CPLUS_INFO(log_, "Start halting ...");
00305 if (resourceTable_->isActive()) {
00306 resourceTable_->halt();
00307 UInt_t count = 0;
00308 while (count<10) {
00309 if (resourceTable_->isReadyToShutDown()) {
00310 lock();
00311 delete resourceTable_;
00312 resourceTable_=0;
00313 unlock();
00314 LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00315 break;
00316 }
00317 else {
00318 count++;
00319 LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00320 ::sleep(1);
00321 }
00322 }
00323 }
00324 else {
00325 lock();
00326 delete resourceTable_;
00327 resourceTable_=0;
00328 unlock();
00329 }
00330
00331 if (0==resourceTable_) {
00332 LOG4CPLUS_INFO(log_,"Finished halting!");
00333 fsm_.fireEvent("HaltDone",this);
00334 }
00335 else {
00336 std::string msg = "halting FAILED: ResourceTable shutdown timed out.";
00337 reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00338 fsm_.fireFailed(msg,this);
00339 }
00340 }
00341 catch (xcept::Exception &e) {
00342 std::string msg = "halting FAILED: "+xcept::stdformat_exception_history(e);
00343 reasonForFailed_ = e.what();
00344 fsm_.fireFailed(msg,this);
00345 }
00346 if(frb_) delete frb_;
00347 return false;
00348 }
00349
00350
00351
00352 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00353 throw (xoap::exception::Exception)
00354 {
00355 return fsm_.commandCallback(msg);
00356 }
00357
00358
00359
00360 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00361 {
00362 nbTakeReceived_.value_++;
00363 if(fsm_.checkIfEnabled()){
00364 bool eventComplete=resourceTable_->buildResource(bufRef);
00365 if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00366 }
00367 else{
00368 LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state "
00369 << fsm_.stateName() << " is being lost");
00370 bufRef->release();
00371 }
00372 }
00373
00374
00375 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
00376 {
00377
00378 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00379 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00380 if(msg->lumiSection==0){
00381 LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
00382 fsm_.fireFailed("EOL message received for ls=0!!! ",this);
00383 }
00384 resourceTable_->postEndOfLumiSection(bufRef);
00385
00386
00387
00388
00389
00390 }
00391
00392
00393
00394
00395
00396 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00397 {
00398 nbDataDiscardReceived_.value_++;
00399 resourceTable_->discardDataEvent(bufRef);
00400 }
00401
00402
00403
00404 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00405 {
00406 nbDqmDiscardReceived_.value_++;
00407 resourceTable_->discardDqmEvent(bufRef);
00408 }
00409
00410
00411
00412 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00413 {
00414 typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00415 typedef AppDescSet_t::iterator AppDescIter_t;
00416
00417
00418 AppDescSet_t setOfBUs=
00419 getApplicationContext()->getDefaultZone()->
00420 getApplicationDescriptors(buClassName_.toString());
00421
00422 if (0!=bu_) { delete bu_; bu_=0; }
00423
00424 for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00425 if ((*it)->getInstance()==buInstance_)
00426 bu_=new BUProxy(getApplicationDescriptor(),*it,
00427 getApplicationContext(),i2oPool_);
00428
00429 if (0==bu_) {
00430 string msg=sourceId_+" failed to locate input BU!";
00431 XCEPT_RAISE(evf::Exception,msg);
00432 }
00433
00434
00435 AppDescSet_t setOfSMs=
00436 getApplicationContext()->getDefaultZone()->
00437 getApplicationDescriptors(smClassName_.toString());
00438
00439 if (0!=sm_) { delete sm_; sm_=0; }
00440
00441 for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00442 if ((*it)->getInstance()==smInstance_)
00443 sm_=new SMProxy(getApplicationDescriptor(),*it,
00444 getApplicationContext(),i2oPool_);
00445
00446 if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00447 }
00448
00449
00450
00451 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00452 throw (xgi::exception::Exception)
00453 {
00454 string name=in->getenv("PATH_INFO");
00455 if (name.empty()) name="defaultWebPage";
00456 static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00457 }
00458
00459
00460
00461 void FUResourceBroker::actionPerformed(xdata::Event& e)
00462 {
00463 lock();
00464
00465 if (0!=resourceTable_) {
00466
00467
00468
00469 if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00470 nbClients_ =resourceTable_->nbClients();
00471 clientPrcIds_ =resourceTable_->clientPrcIdsAsString();
00472 nbAllocatedEvents_ =resourceTable_->nbAllocated();
00473 nbPendingRequests_ =resourceTable_->nbPending();
00474 nbReceivedEvents_ =resourceTable_->nbCompleted();
00475 nbSentEvents_ =resourceTable_->nbSent();
00476 nbSentDqmEvents_ =resourceTable_->nbSentDqm();
00477 nbSentErrorEvents_ =resourceTable_->nbSentError();
00478 nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00479 nbPendingSMDqmDiscards_=resourceTable_->nbPendingSMDqmDiscards();
00480 nbDiscardedEvents_ =resourceTable_->nbDiscarded();
00481 nbLostEvents_ =resourceTable_->nbLost();
00482 nbDataErrors_ =resourceTable_->nbErrors();
00483 nbCrcErrors_ =resourceTable_->nbCrcErrors();
00484 nbAllocateSent_ =resourceTable_->nbAllocSent();
00485 dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u +
00486 ((nbDataErrors_.value_ != 0u) << 1) +
00487 ((nbLostEvents_.value_ != 0u) << 2) +
00488 ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
00489 ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
00490 ((nbSentErrorEvents_.value_ != 0u) << 5)
00491 );
00492 }
00493 else if (e.type()=="ItemChangedEvent") {
00494
00495 string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00496
00497 if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00498 if (item=="useEvmBoard") FUResource::useEvmBoard(useEvmBoard_);
00499 if (item=="doCrcCheck") resourceTable_->setDoCrcCheck(doCrcCheck_);
00500 if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00501 }
00502
00503
00504 }
00505 else {
00506 nbClients_ =0;
00507 clientPrcIds_ ="";
00508 nbAllocatedEvents_ =0;
00509 nbPendingRequests_ =0;
00510 nbReceivedEvents_ =0;
00511 nbSentEvents_ =0;
00512 nbSentDqmEvents_ =0;
00513 nbSentErrorEvents_ =0;
00514 nbPendingSMDiscards_=0;
00515 nbPendingSMDqmDiscards_=0;
00516 nbDiscardedEvents_ =0;
00517 nbLostEvents_ =0;
00518 nbDataErrors_ =0;
00519 nbCrcErrors_ =0;
00520 nbAllocateSent_ =0;
00521 }
00522 unlock();
00523 }
00524
00525
00526
00527 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00528 {
00529 struct timezone timezone;
00530 gettimeofday(&monStartTime_,&timezone);
00531
00532 try {
00533 wlMonitoring_=
00534 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00535 "waiting");
00536 if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00537 asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00538 sourceId_+"Monitoring");
00539 wlMonitoring_->submit(asMonitoring_);
00540 }
00541 catch (xcept::Exception& e) {
00542 string msg = "Failed to start workloop 'Monitoring'.";
00543 XCEPT_RETHROW(evf::Exception,msg,e);
00544 }
00545 }
00546
00547
00548
00549 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00550 {
00551 unsigned int nbSent;
00552 uint64_t sumOfSquares;
00553 unsigned int sumOfSizes;
00554 uint64_t deltaSumOfSquares;
00555
00556 lock();
00557 if (0==resourceTable_) {
00558 deltaT_.value_ =0.0;
00559 deltaN_.value_ = 0;
00560 deltaSumOfSquares_.value_=0.0;
00561 deltaSumOfSizes_.value_ = 0;
00562 throughput_ =0.0;
00563 rate_ =0.0;
00564 average_ =0.0;
00565 rms_ =0.0;
00566 unlock();
00567 return false;
00568 }
00569 else {
00570 nbSent =resourceTable_->nbSent();
00571 sumOfSquares=resourceTable_->sumOfSquares();
00572 sumOfSizes =resourceTable_->sumOfSizes();
00573 }
00574 unlock();
00575
00576 struct timeval monEndTime;
00577 struct timezone timezone;
00578
00579 gettimeofday(&monEndTime,&timezone);
00580
00581 xdata::getInfoSpaceFactory()->lock();
00582 gui_->monInfoSpace()->lock();
00583
00584 deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00585 monStartTime_=monEndTime;
00586
00587 deltaN_.value_=nbSent-nbSentLast_;
00588 nbSentLast_=nbSent;
00589
00590 deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00591 deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00592 sumOfSquaresLast_=sumOfSquares;
00593
00594 deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00595 sumOfSizesLast_=sumOfSizes;
00596
00597 if (deltaT_.value_!=0) {
00598 throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00599 rate_ =deltaN_.value_/deltaT_.value_;
00600 }
00601 else {
00602 throughput_=0.0;
00603 rate_ =0.0;
00604 }
00605
00606 double meanOfSquares,mean,squareOfMean,variance;
00607
00608 if(deltaN_.value_!=0) {
00609 meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00610 mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00611 squareOfMean=mean*mean;
00612 variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00613
00614 average_=deltaSumOfSizes_.value_/deltaN_.value_;
00615 rms_ =std::sqrt(variance);
00616 }
00617 else {
00618 average_=0.0;
00619 rms_ =0.0;
00620 }
00621
00622 gui_->monInfoSpace()->unlock();
00623 xdata::getInfoSpaceFactory()->unlock();
00624
00625 ::sleep(monSleepSec_.value_);
00626
00627 return true;
00628 }
00629
00630
00631
00632 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00633 {
00634 try {
00635 wlWatching_=
00636 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00637 "waiting");
00638 if (!wlWatching_->isActive()) wlWatching_->activate();
00639 asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00640 sourceId_+"Watching");
00641 wlWatching_->submit(asWatching_);
00642 }
00643 catch (xcept::Exception& e) {
00644 string msg = "Failed to start workloop 'Watching'.";
00645 XCEPT_RETHROW(evf::Exception,msg,e);
00646 }
00647 }
00648
00649
00650
00651 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00652 {
00653 lock();
00654
00655 if (0==resourceTable_) {
00656 unlock();
00657 return false;
00658 }
00659
00660 vector<pid_t> evt_prcids =resourceTable_->cellPrcIds();
00661 vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00662 vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps();
00663
00664 time_t tcurr=time(0);
00665 for (UInt_t i=0;i<evt_tstamps.size();i++) {
00666 pid_t pid =evt_prcids[i];
00667 UInt_t evt =evt_numbers[i];
00668 time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00669 double tdiff =difftime(tcurr,tstamp);
00670 if (tdiff>timeOutSec_) {
00671 if(processKillerEnabled_) {
00672 LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
00673 kill(pid,9);
00674 nbTimeoutsWithEvent_++;
00675 }
00676 else {
00677 LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
00678 <<timeOutSec_<<"sec for process "<<pid);
00679 }
00680 }
00681 }
00682
00683 vector<pid_t> prcids=resourceTable_->clientPrcIds();
00684 for (UInt_t i=0;i<prcids.size();i++) {
00685 pid_t pid =prcids[i];
00686 int status=kill(pid,0);
00687 if (status!=0) {
00688 LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
00689 if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00690 nbTimeoutsWithoutEvent_++;
00691 }
00692 }
00693
00694 if((resourceTable_->nbResources() != nbRawCells_.value_) && !shmInconsistent_){
00695 std::ostringstream ost;
00696 ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
00697 << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00698 << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00699 XCEPT_DECLARE(evf::Exception,
00700 sentinelException, ost.str());
00701 notifyQualified("error",sentinelException);
00702 shmInconsistent_ = true;
00703 }
00704
00705 unlock();
00706
00707 ::sleep(watchSleepSec_.value_);
00708 return true;
00709 }
00710
00711
00712
00713 void FUResourceBroker::exportParameters()
00714 {
00715 assert(0!=gui_);
00716
00717 gui_->addMonitorParam("url", &url_);
00718 gui_->addMonitorParam("class", &class_);
00719 gui_->addMonitorParam("instance", &instance_);
00720 gui_->addMonitorParam("runNumber", &runNumber_);
00721 gui_->addMonitorParam("stateName", fsm_.stateName());
00722
00723 gui_->addMonitorParam("deltaT", &deltaT_);
00724 gui_->addMonitorParam("deltaN", &deltaN_);
00725 gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
00726 gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
00727
00728 gui_->addMonitorParam("throughput", &throughput_);
00729 gui_->addMonitorParam("rate", &rate_);
00730 gui_->addMonitorParam("average", &average_);
00731 gui_->addMonitorParam("rms", &rms_);
00732 gui_->addMonitorParam("dataErrorFlag", &dataErrorFlag_);
00733
00734 gui_->addMonitorCounter("nbAllocatedEvents", &nbAllocatedEvents_);
00735 gui_->addMonitorCounter("nbPendingRequests", &nbPendingRequests_);
00736 gui_->addMonitorCounter("nbReceivedEvents", &nbReceivedEvents_);
00737 gui_->addMonitorCounter("nbSentEvents", &nbSentEvents_);
00738 gui_->addMonitorCounter("nbSentErrorEvents", &nbSentErrorEvents_);
00739 gui_->addMonitorCounter("nbDiscardedEvents", &nbDiscardedEvents_);
00740 gui_->addMonitorCounter("nbPendingSMDiscards", &nbPendingSMDiscards_);
00741
00742 gui_->addMonitorCounter("nbSentDqmEvents", &nbSentDqmEvents_);
00743 gui_->addMonitorCounter("nbDqmDiscardReceived", &nbDqmDiscardReceived_);
00744 gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
00745
00746 gui_->addMonitorCounter("nbLostEvents", &nbLostEvents_);
00747 gui_->addMonitorCounter("nbDataErrors", &nbDataErrors_);
00748 gui_->addMonitorCounter("nbCrcErrors", &nbCrcErrors_);
00749 gui_->addMonitorCounter("nbTimeoutsWithEvent", &nbTimeoutsWithEvent_);
00750 gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
00751
00752 gui_->addStandardParam("segmentationMode", &segmentationMode_);
00753 gui_->addStandardParam("nbClients", &nbClients_);
00754 gui_->addStandardParam("clientPrcIds", &clientPrcIds_);
00755 gui_->addStandardParam("nbRawCells", &nbRawCells_);
00756 gui_->addStandardParam("nbRecoCells", &nbRecoCells_);
00757 gui_->addStandardParam("nbDqmCells", &nbDqmCells_);
00758 gui_->addStandardParam("rawCellSize", &rawCellSize_);
00759 gui_->addStandardParam("recoCellSize", &recoCellSize_);
00760 gui_->addStandardParam("dqmCellSize", &dqmCellSize_);
00761
00762 gui_->addStandardParam("doDropEvents", &doDropEvents_);
00763 gui_->addStandardParam("doFedIdCheck", &doFedIdCheck_);
00764 gui_->addStandardParam("doCrcCheck", &doCrcCheck_);
00765 gui_->addStandardParam("doDumpEvents", &doDumpEvents_);
00766 gui_->addStandardParam("buClassName", &buClassName_);
00767 gui_->addStandardParam("buInstance", &buInstance_);
00768 gui_->addStandardParam("smClassName", &smClassName_);
00769 gui_->addStandardParam("smInstance", &smInstance_);
00770 gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
00771 gui_->addStandardParam("monSleepSec", &monSleepSec_);
00772 gui_->addStandardParam("watchSleepSec", &watchSleepSec_);
00773 gui_->addStandardParam("timeOutSec", &timeOutSec_);
00774 gui_->addStandardParam("processKillerEnabled", &processKillerEnabled_);
00775 gui_->addStandardParam("useEvmBoard", &useEvmBoard_);
00776 gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
00777 gui_->addStandardParam("foundRcmsStateListener", fsm_.foundRcmsStateListener());
00778 gui_->addStandardParam("reasonForFailed", &reasonForFailed_);
00779
00780 gui_->addDebugCounter("nbAllocateSent", &nbAllocateSent_);
00781 gui_->addDebugCounter("nbTakeReceived", &nbTakeReceived_);
00782 gui_->addDebugCounter("nbDataDiscardReceived", &nbDataDiscardReceived_);
00783
00784 gui_->exportParameters();
00785
00786 gui_->addItemChangedListener("doFedIdCheck", this);
00787 gui_->addItemChangedListener("useEvmBoard", this);
00788 gui_->addItemChangedListener("doCrcCheck", this);
00789 gui_->addItemChangedListener("doDumpEvents", this);
00790 }
00791
00792
00793
00794 void FUResourceBroker::reset()
00795 {
00796 gui_->resetCounters();
00797
00798 deltaT_ =0.0;
00799 deltaN_ = 0;
00800 deltaSumOfSquares_=0.0;
00801 deltaSumOfSizes_ = 0;
00802
00803 throughput_ =0.0;
00804 rate_ =0.0;
00805 average_ =0.0;
00806 rms_ =0.0;
00807
00808 nbSentLast_ = 0;
00809 sumOfSquaresLast_ = 0;
00810 sumOfSizesLast_ = 0;
00811 }
00812
00813
00814
00815 double FUResourceBroker::deltaT(const struct timeval *start,
00816 const struct timeval *end)
00817 {
00818 unsigned int sec;
00819 unsigned int usec;
00820
00821 sec = end->tv_sec - start->tv_sec;
00822
00823 if(end->tv_usec > start->tv_usec) {
00824 usec = end->tv_usec - start->tv_usec;
00825 }
00826 else {
00827 sec--;
00828 usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00829 }
00830
00831 return ((double)sec) + ((double)usec) / 1000000.0;
00832 }
00833
00834
00835
00836
00837 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00838 throw (xgi::exception::Exception)
00839 {
00840 using namespace cgicc;
00841 Cgicc cgi(in);
00842 std::vector<FormEntry> els = cgi.getElements() ;
00843 for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
00844 std::cout << "form entry " << (*it).getValue() << std::endl;
00845
00846 std::vector<FormEntry> el1;
00847 cgi.getElement("crcError",el1);
00848 *out<<"<html>"<<endl;
00849 gui_->htmlHead(in,out,sourceId_);
00850 *out<<"<body>"<<endl;
00851 gui_->htmlHeadline(in,out);
00852
00853 lock();
00854
00855 if (0!=resourceTable_) {
00856 if(el1.size()!=0) {
00857 resourceTable_->injectCRCError();
00858 }
00859 *out << "<form method=\"GET\" action=\"customWebPage\" >";
00860 *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
00861 *out << "</form>" << endl;
00862 *out << "<hr/>" << std::endl;
00863 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00864 *out<<table().set("frame","void").set("rules","rows")
00865 .set("class","modules").set("width","250")<<endl
00866 <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00867 <<tr()
00868 <<th("client").set("align","left")
00869 <<th("process id").set("align","center")
00870 <<th("status").set("align","center")
00871 <<tr()
00872 <<endl;
00873 for (UInt_t i=0;i<client_prc_ids.size();i++) {
00874
00875 pid_t pid =client_prc_ids[i];
00876 int status=kill(pid,0);
00877
00878 stringstream ssi; ssi<<i+1;
00879 stringstream sspid; sspid<<pid;
00880 stringstream ssstatus; ssstatus<<status;
00881
00882 string bg_status = (status==0) ? "#00ff00" : "ff0000";
00883 *out<<tr()
00884 <<td(ssi.str()).set("align","left")
00885 <<td(sspid.str()).set("align","center")
00886 <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00887 <<tr()<<endl;
00888 }
00889 *out<<table()<<endl;
00890 *out<<"<br><br>"<<endl;
00891
00892 vector<string> states = resourceTable_->cellStates();
00893 vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00894 vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
00895 vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00896
00897 *out<<table().set("frame","void").set("rules","rows")
00898 .set("class","modules").set("width","500")<<endl
00899 <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00900 <<tr()
00901 <<th("cell").set("align","left")
00902 <<th("state").set("align","center")
00903 <<th("event").set("align","center")
00904 <<th("process id").set("align","center")
00905 <<th("timestamp").set("align","center")
00906 <<th("time").set("align","center")
00907 <<tr()
00908 <<endl;
00909 for (UInt_t i=0;i<states.size();i++) {
00910 string state=states[i];
00911 UInt_t evt = evt_numbers[i];
00912 pid_t pid = prc_ids[i];
00913 time_t tstamp= time_stamps[i];
00914 double tdiff = difftime(time(0),tstamp);
00915
00916 stringstream ssi; ssi<<i;
00917 stringstream ssevt; if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00918 stringstream sspid; if (pid!=0) sspid<<pid; else sspid<<" - ";
00919 stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00920 stringstream sstdiff; if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00921
00922 string bg_state = "#ffffff";
00923 if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00924 state=="RAWREADING"||state=="RAWREAD")
00925 bg_state="#99CCff";
00926 else if (state=="PROCESSING")
00927 bg_state="#ff0000";
00928 else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00929 bg_state="#CCff99";
00930 else if (state=="SENDING")
00931 bg_state="#00FF33";
00932 else if (state=="SENT")
00933 bg_state="#006633";
00934 else if (state=="DISCARDING")
00935 bg_state="#FFFF00";
00936 else if (state=="LUMISECTION")
00937 bg_state="#0000FF";
00938
00939 *out<<tr()
00940 <<td(ssi.str()).set("align","left")
00941 <<td(state).set("align","center").set("bgcolor",bg_state)
00942 <<td(ssevt.str()).set("align","center")
00943 <<td(sspid.str()).set("align","center")
00944 <<td(sststamp.str()).set("align","center")
00945 <<td(sstdiff.str()).set("align","center")
00946 <<tr()<<endl;
00947 }
00948 *out<<table()<<endl;
00949 *out<<"<br><br>"<<endl;
00950
00951 vector<string> dqmstates = resourceTable_->dqmCellStates();
00952
00953 *out<<table().set("frame","void").set("rules","rows")
00954 .set("class","modules").set("width","500")<<endl
00955 <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
00956 <<tr()
00957 <<th("cell").set("align","left")
00958 <<th("state").set("align","center")
00959 <<tr()
00960 <<endl;
00961 for (UInt_t i=0;i<dqmstates.size();i++) {
00962 string state=dqmstates[i];
00963
00964 string bg_state = "#ffffff";
00965 if (state=="WRITING"||state=="WRITTEN")
00966 bg_state="#99CCff";
00967 else if (state=="SENDING")
00968 bg_state="#00FF33";
00969 else if (state=="SENT")
00970 bg_state="#006633";
00971 else if (state=="DISCARDING")
00972 bg_state="#FFFF00";
00973
00974 *out<<tr()
00975 <<td(state).set("align","center").set("bgcolor",bg_state)
00976 <<tr()<<endl;
00977 }
00978 *out<<table()<<endl;
00979
00980
00981
00982 }
00983 *out<<"</body>"<<endl<<"</html>"<<endl;
00984
00985 unlock();
00986 }
00987
00988 void FUResourceBroker::emergencyStop()
00989 {
00990 LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
00991 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00992 for (UInt_t i=0;i<client_prc_ids.size();i++) {
00993 pid_t pid =client_prc_ids[i];
00994 std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
00995 if(pid!=0){
00996
00997 if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00998 nbTimeoutsWithoutEvent_++;
00999 else
01000 nbTimeoutsWithEvent_++;
01001 }
01002 }
01003 resourceTable_->lastResort();
01004 ::sleep(1);
01005 if(!resourceTable_->isReadyToShutDown())
01006 {
01007 reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
01008 XCEPT_RAISE(evf::Exception,reasonForFailed_);
01009 }
01010 resourceTable_->printWorkLoopStatus();
01011 lock();
01012 std::cout << "delete resourcetable" <<std::endl;
01013 delete resourceTable_;
01014 resourceTable_=0;
01015 std::cout << "cycle through resourcetable config " << std::endl;
01016 configureResources();
01017 unlock();
01018 if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
01019 std::cout << "done with emergency stop" << std::endl;
01020 }
01021
01022 void FUResourceBroker::configureResources()
01023 {
01024 resourceTable_=new FUResourceTable(segmentationMode_.value_,
01025 nbRawCells_.value_,
01026 nbRecoCells_.value_,
01027 nbDqmCells_.value_,
01028 rawCellSize_.value_,
01029 recoCellSize_.value_,
01030 dqmCellSize_.value_,
01031 bu_,sm_,
01032 log_,
01033 shmResourceTableTimeout_.value_,
01034 frb_,
01035 this);
01036 FUResource::doFedIdCheck(doFedIdCheck_);
01037 FUResource::useEvmBoard(useEvmBoard_);
01038 resourceTable_->setDoCrcCheck(doCrcCheck_);
01039 resourceTable_->setDoDumpEvents(doDumpEvents_);
01040 reset();
01041 shmInconsistent_ = false;
01042 if(resourceTable_->nbResources() != nbRawCells_.value_ ||
01043 resourceTable_->nbFreeSlots() != nbRawCells_.value_)
01044 shmInconsistent_ = true;
01045 }
01046
01047
01049
01051
01052 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)