00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011
00012 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00013 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00014 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00015
00016 #include "FWCore/Utilities/interface/CRC16.h"
00017
00018 #include "EvffedFillerRB.h"
00019
00020 #include "i2o/Method.h"
00021 #include "interface/shared/i2oXFunctionCodes.h"
00022 #include "interface/evb/i2oEVBMsgs.h"
00023 #include "xcept/tools.h"
00024
00025 #include "toolbox/mem/HeapAllocator.h"
00026 #include "toolbox/mem/Reference.h"
00027 #include "toolbox/mem/MemoryPoolFactory.h"
00028 #include "toolbox/mem/exception/Exception.h"
00029
00030 #include "xoap/MessageReference.h"
00031 #include "xoap/MessageFactory.h"
00032 #include "xoap/SOAPEnvelope.h"
00033 #include "xoap/SOAPBody.h"
00034 #include "xoap/domutils.h"
00035 #include "xoap/Method.h"
00036
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040 #include "cgicc/HTMLClasses.h"
00041
00042 #include <signal.h>
00043 #include <iostream>
00044 #include <sstream>
00045
00046
00047 using namespace std;
00048 using namespace evf;
00049
00050
00052
00054
00055
00056 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00057 : xdaq::Application(s)
00058 , fsm_(this)
00059 , gui_(0)
00060 , log_(getApplicationLogger())
00061 , bu_(0)
00062 , sm_(0)
00063 , i2oPool_(0)
00064 , resourceTable_(0)
00065 , wlMonitoring_(0)
00066 , asMonitoring_(0)
00067 , wlWatching_(0)
00068 , asWatching_(0)
00069 , instance_(0)
00070 , runNumber_(0)
00071 , deltaT_(0.0)
00072 , deltaN_(0)
00073 , deltaSumOfSquares_(0)
00074 , deltaSumOfSizes_(0)
00075 , throughput_(0.0)
00076 , rate_(0.0)
00077 , average_(0.0)
00078 , rms_(0.0)
00079 , nbAllocatedEvents_(0)
00080 , nbPendingRequests_(0)
00081 , nbReceivedEvents_(0)
00082 , nbSentEvents_(0)
00083 , nbSentDqmEvents_(0)
00084 , nbSentErrorEvents_(0)
00085 , nbPendingSMDiscards_(0)
00086 , nbPendingSMDqmDiscards_(0)
00087 , nbDiscardedEvents_(0)
00088 , nbReceivedEol_(0)
00089 , highestEolReceived_(0)
00090 , nbEolPosted_(0)
00091 , nbEolDiscarded_(0)
00092 , nbLostEvents_(0)
00093 , nbDataErrors_(0)
00094 , nbCrcErrors_(0)
00095 , nbTimeoutsWithEvent_(0)
00096 , nbTimeoutsWithoutEvent_(0)
00097 , dataErrorFlag_(0)
00098 , segmentationMode_(false)
00099 , nbClients_(0)
00100 , clientPrcIds_("")
00101 , nbRawCells_(16)
00102 , nbRecoCells_(8)
00103 , nbDqmCells_(8)
00104 , rawCellSize_(0x400000)
00105 , recoCellSize_(0x800000)
00106 , dqmCellSize_(0x800000)
00107 , doDropEvents_(false)
00108 , doFedIdCheck_(true)
00109 , doCrcCheck_(1)
00110 , doDumpEvents_(0)
00111 , buClassName_("BU")
00112 , buInstance_(0)
00113 , smClassName_("StorageManager")
00114 , smInstance_(0)
00115 , shmResourceTableTimeout_(200000)
00116 , monSleepSec_(2)
00117 , watchSleepSec_(10)
00118 , timeOutSec_(30)
00119 , processKillerEnabled_(true)
00120 , useEvmBoard_(true)
00121 , reasonForFailed_("")
00122 , nbAllocateSent_(0)
00123 , nbTakeReceived_(0)
00124 , nbDataDiscardReceived_(0)
00125 , nbDqmDiscardReceived_(0)
00126 , nbSentLast_(0)
00127 , sumOfSquaresLast_(0)
00128 , sumOfSizesLast_(0)
00129 , lock_(toolbox::BSem::FULL)
00130 , frb_(0)
00131 , shmInconsistent_(false)
00132 {
00133
00134 fsm_.initialize<evf::FUResourceBroker>(this);
00135
00136
00137 url_ =
00138 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00139 getApplicationDescriptor()->getURN();
00140 class_ =getApplicationDescriptor()->getClassName();
00141 instance_=getApplicationDescriptor()->getInstance();
00142 sourceId_=class_.toString()+"_"+instance_.toString();
00143
00144
00145 i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00146 I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00147 i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00148 I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00149 i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00150 I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00151 i2o::bind(this,&FUResourceBroker::I2O_EVM_LUMISECTION_Callback,
00152 I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
00153
00154
00155
00156 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00157 gui_=new WebGUI(this,&fsm_);
00158 vector<toolbox::lang::Method*> methods=gui_->getMethods();
00159 vector<toolbox::lang::Method*>::iterator it;
00160 for (it=methods.begin();it!=methods.end();++it) {
00161 if ((*it)->type()=="cgi") {
00162 string name=static_cast<xgi::MethodSignature*>(*it)->name();
00163 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00164 }
00165 }
00166 xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00167
00168
00169
00170 string i2oPoolName=sourceId_+"_i2oPool";
00171 try {
00172 toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00173 toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00174 toolbox::mem::MemoryPoolFactory* poolFactory=
00175 toolbox::mem::getMemoryPoolFactory();
00176 i2oPool_=poolFactory->createPool(urn,allocator);
00177 }
00178 catch (toolbox::mem::exception::Exception& e) {
00179 string s="Failed to create pool: "+i2oPoolName;
00180 LOG4CPLUS_FATAL(log_,s);
00181 XCEPT_RETHROW(xcept::Exception,s,e);
00182 }
00183
00184
00185 exportParameters();
00186
00187
00188 fsm_.findRcmsStateListener();
00189
00190
00191 getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00192
00193 }
00194
00195
00196
00197 FUResourceBroker::~FUResourceBroker()
00198 {
00199
00200 }
00201
00202
00203
00205
00207
00208
00209 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00210 {
00211 try {
00212 LOG4CPLUS_INFO(log_, "Start configuring ...");
00213 connectToBUandSM();
00214 frb_ = new EvffedFillerRB(this);
00215 configureResources();
00216 if(shmInconsistent_){
00217 std::ostringstream ost;
00218 ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw="
00219 << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00220 << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00221 XCEPT_RAISE(evf::Exception,ost.str());
00222 }
00223 LOG4CPLUS_INFO(log_, "Finished configuring!");
00224 fsm_.fireEvent("ConfigureDone",this);
00225
00226 }
00227 catch (xcept::Exception &e) {
00228 std::string msg = "configuring FAILED: " + (string)e.what();
00229 reasonForFailed_ = e.what();
00230 fsm_.fireFailed(msg,this);
00231 }
00232
00233 return false;
00234 }
00235
00236
00237
00238 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00239 {
00240 try {
00241 LOG4CPLUS_INFO(log_, "Start enabling ...");
00242 reset();
00243 startMonitoringWorkLoop();
00244 startWatchingWorkLoop();
00245 resourceTable_->setRunNumber(runNumber_);
00246 lock();
00247 resourceTable_->resetCounters();
00248 unlock();
00249 resourceTable_->startDiscardWorkLoop();
00250 resourceTable_->startSendDataWorkLoop();
00251 resourceTable_->startSendDqmWorkLoop();
00252 resourceTable_->sendAllocate();
00253
00254 nbTimeoutsWithEvent_ = 0;
00255 nbTimeoutsWithoutEvent_ = 0;
00256 dataErrorFlag_ = 0;
00257
00258 LOG4CPLUS_INFO(log_, "Finished enabling!");
00259 fsm_.fireEvent("EnableDone",this);
00260 }
00261 catch (xcept::Exception &e) {
00262 std::string msg = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00263 reasonForFailed_ = e.what();
00264 fsm_.fireFailed(msg,this);
00265 }
00266
00267 return false;
00268 }
00269
00270
00271
00272 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00273 {
00274 try {
00275 LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00276 resourceTable_->stop();
00277 timeval now;
00278 timeval then;
00279 gettimeofday(&then,0);
00280 while (!resourceTable_->isReadyToShutDown()) {
00281 ::usleep(shmResourceTableTimeout_.value_*10);
00282 gettimeofday(&now,0);
00283 if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
00284 std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " "
00285 << shmResourceTableTimeout_.value_/10000 << std::endl;
00286 LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
00287 emergencyStop();
00288 break;
00289 }
00290 }
00291
00292 if (resourceTable_->isReadyToShutDown()) {
00293 LOG4CPLUS_INFO(log_, "Finished stopping!");
00294 fsm_.fireEvent("StopDone",this);
00295 }
00296 }
00297 catch (xcept::Exception &e) {
00298 std::string msg = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00299 reasonForFailed_ = e.what();
00300 fsm_.fireFailed(msg,this);
00301 }
00302
00303 return false;
00304 }
00305
00306
00307
00308 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00309 {
00310 try {
00311 LOG4CPLUS_INFO(log_, "Start halting ...");
00312 if (resourceTable_->isActive()) {
00313 resourceTable_->halt();
00314 UInt_t count = 0;
00315 while (count<10) {
00316 if (resourceTable_->isReadyToShutDown()) {
00317 lock();
00318 delete resourceTable_;
00319 resourceTable_=0;
00320 unlock();
00321 LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00322 break;
00323 }
00324 else {
00325 count++;
00326 LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00327 ::sleep(1);
00328 }
00329 }
00330 }
00331 else {
00332 lock();
00333 delete resourceTable_;
00334 resourceTable_=0;
00335 unlock();
00336 }
00337
00338 if (0==resourceTable_) {
00339 LOG4CPLUS_INFO(log_,"Finished halting!");
00340 fsm_.fireEvent("HaltDone",this);
00341 }
00342 else {
00343 std::string msg = "halting FAILED: ResourceTable shutdown timed out.";
00344 reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00345 fsm_.fireFailed(msg,this);
00346 }
00347 }
00348 catch (xcept::Exception &e) {
00349 std::string msg = "halting FAILED: "+xcept::stdformat_exception_history(e);
00350 reasonForFailed_ = e.what();
00351 fsm_.fireFailed(msg,this);
00352 }
00353 if(frb_) delete frb_;
00354 return false;
00355 }
00356
00357
00358
00359 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00360 throw (xoap::exception::Exception)
00361 {
00362 return fsm_.commandCallback(msg);
00363 }
00364
00365
00366
00367 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00368 {
00369 nbTakeReceived_.value_++;
00370 if(fsm_.checkIfEnabled()){
00371 bool eventComplete=resourceTable_->buildResource(bufRef);
00372 if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00373 }
00374 else{
00375 LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state "
00376 << fsm_.stateName() << " is being lost");
00377 bufRef->release();
00378 }
00379 }
00380
00381
00382 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
00383 {
00384
00385 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00386 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00387 if(msg->lumiSection==0){
00388 LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
00389 fsm_.fireFailed("EOL message received for ls=0!!! ",this);
00390 }
00391 nbReceivedEol_++;
00392 if(highestEolReceived_.value_+100 < msg->lumiSection)
00393 {
00394 LOG4CPLUS_ERROR(log_,"EOL message not in sequence, expected "
00395 << highestEolReceived_.value_+1
00396 << " received " << msg->lumiSection);
00397 fsm_.fireFailed("EOL message with corrupted LS ",this);
00398 }
00399 if(highestEolReceived_.value_+1 != msg->lumiSection)
00400 LOG4CPLUS_WARN(log_,"EOL message not in sequence, expected "
00401 << highestEolReceived_.value_+1
00402 << " received " << msg->lumiSection);
00403
00404 if(highestEolReceived_.value_ < msg->lumiSection)
00405 highestEolReceived_.value_ = msg->lumiSection;
00406 resourceTable_->postEndOfLumiSection(bufRef);
00407
00408
00409
00410
00411
00412 }
00413
00414
00415
00416
00417
00418 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00419 {
00420 nbDataDiscardReceived_.value_++;
00421 resourceTable_->discardDataEvent(bufRef);
00422 }
00423
00424
00425
00426 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00427 {
00428 nbDqmDiscardReceived_.value_++;
00429 resourceTable_->discardDqmEvent(bufRef);
00430 }
00431
00432
00433
00434 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00435 {
00436 typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00437 typedef AppDescSet_t::iterator AppDescIter_t;
00438
00439
00440 AppDescSet_t setOfBUs=
00441 getApplicationContext()->getDefaultZone()->
00442 getApplicationDescriptors(buClassName_.toString());
00443
00444 if (0!=bu_) { delete bu_; bu_=0; }
00445
00446 for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00447 if ((*it)->getInstance()==buInstance_)
00448 bu_=new BUProxy(getApplicationDescriptor(),*it,
00449 getApplicationContext(),i2oPool_);
00450
00451 if (0==bu_) {
00452 string msg=sourceId_+" failed to locate input BU!";
00453 XCEPT_RAISE(evf::Exception,msg);
00454 }
00455
00456
00457 AppDescSet_t setOfSMs=
00458 getApplicationContext()->getDefaultZone()->
00459 getApplicationDescriptors(smClassName_.toString());
00460
00461 if (0!=sm_) { delete sm_; sm_=0; }
00462
00463 for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00464 if ((*it)->getInstance()==smInstance_)
00465 sm_=new SMProxy(getApplicationDescriptor(),*it,
00466 getApplicationContext(),i2oPool_);
00467
00468 if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00469 }
00470
00471
00472
00473 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00474 throw (xgi::exception::Exception)
00475 {
00476 string name=in->getenv("PATH_INFO");
00477 if (name.empty()) name="defaultWebPage";
00478 static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00479 }
00480
00481
00482
00483 void FUResourceBroker::actionPerformed(xdata::Event& e)
00484 {
00485 lock();
00486
00487 if (0!=resourceTable_) {
00488
00489
00490
00491 if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00492 nbClients_ =resourceTable_->nbClients();
00493 clientPrcIds_ =resourceTable_->clientPrcIdsAsString();
00494 nbAllocatedEvents_ =resourceTable_->nbAllocated();
00495 nbPendingRequests_ =resourceTable_->nbPending();
00496 nbReceivedEvents_ =resourceTable_->nbCompleted();
00497 nbSentEvents_ =resourceTable_->nbSent();
00498 nbSentDqmEvents_ =resourceTable_->nbSentDqm();
00499 nbSentErrorEvents_ =resourceTable_->nbSentError();
00500 nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00501 nbPendingSMDqmDiscards_=resourceTable_->nbPendingSMDqmDiscards();
00502 nbDiscardedEvents_ =resourceTable_->nbDiscarded();
00503 nbLostEvents_ =resourceTable_->nbLost();
00504 nbEolPosted_ =resourceTable_->nbEolPosted();
00505 nbEolDiscarded_ =resourceTable_->nbEolDiscarded();
00506 nbDataErrors_ =resourceTable_->nbErrors();
00507 nbCrcErrors_ =resourceTable_->nbCrcErrors();
00508 nbAllocateSent_ =resourceTable_->nbAllocSent();
00509 dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u +
00510 ((nbDataErrors_.value_ != 0u) << 1) +
00511 ((nbLostEvents_.value_ != 0u) << 2) +
00512 ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
00513 ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
00514 ((nbSentErrorEvents_.value_ != 0u) << 5)
00515 );
00516 }
00517 else if (e.type()=="ItemChangedEvent") {
00518
00519 string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00520
00521 if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00522 if (item=="useEvmBoard") FUResource::useEvmBoard(useEvmBoard_);
00523 if (item=="doCrcCheck") resourceTable_->setDoCrcCheck(doCrcCheck_);
00524 if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00525 }
00526
00527
00528 }
00529 else {
00530 nbClients_ =0;
00531 clientPrcIds_ ="";
00532 nbAllocatedEvents_ =0;
00533 nbPendingRequests_ =0;
00534 nbReceivedEvents_ =0;
00535 nbSentEvents_ =0;
00536 nbSentDqmEvents_ =0;
00537 nbSentErrorEvents_ =0;
00538 nbPendingSMDiscards_=0;
00539 nbPendingSMDqmDiscards_=0;
00540 nbDiscardedEvents_ =0;
00541 nbLostEvents_ =0;
00542 nbDataErrors_ =0;
00543 nbCrcErrors_ =0;
00544 nbAllocateSent_ =0;
00545 }
00546 unlock();
00547 }
00548
00549
00550
00551 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00552 {
00553 struct timezone timezone;
00554 gettimeofday(&monStartTime_,&timezone);
00555
00556 try {
00557 wlMonitoring_=
00558 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00559 "waiting");
00560 if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00561 asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00562 sourceId_+"Monitoring");
00563 wlMonitoring_->submit(asMonitoring_);
00564 }
00565 catch (xcept::Exception& e) {
00566 string msg = "Failed to start workloop 'Monitoring'.";
00567 XCEPT_RETHROW(evf::Exception,msg,e);
00568 }
00569 }
00570
00571
00572
00573 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00574 {
00575 unsigned int nbSent;
00576 uint64_t sumOfSquares;
00577 unsigned int sumOfSizes;
00578 uint64_t deltaSumOfSquares;
00579
00580 lock();
00581 if (0==resourceTable_) {
00582 deltaT_.value_ =0.0;
00583 deltaN_.value_ = 0;
00584 deltaSumOfSquares_.value_=0.0;
00585 deltaSumOfSizes_.value_ = 0;
00586 throughput_ =0.0;
00587 rate_ =0.0;
00588 average_ =0.0;
00589 rms_ =0.0;
00590 unlock();
00591 return false;
00592 }
00593 else {
00594 nbSent =resourceTable_->nbSent();
00595 sumOfSquares=resourceTable_->sumOfSquares();
00596 sumOfSizes =resourceTable_->sumOfSizes();
00597 }
00598 unlock();
00599
00600 struct timeval monEndTime;
00601 struct timezone timezone;
00602
00603 gettimeofday(&monEndTime,&timezone);
00604
00605 xdata::getInfoSpaceFactory()->lock();
00606 gui_->monInfoSpace()->lock();
00607
00608 deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00609 monStartTime_=monEndTime;
00610
00611 deltaN_.value_=nbSent-nbSentLast_;
00612 nbSentLast_=nbSent;
00613
00614 deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00615 deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00616 sumOfSquaresLast_=sumOfSquares;
00617
00618 deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00619 sumOfSizesLast_=sumOfSizes;
00620
00621 if (deltaT_.value_!=0) {
00622 throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00623 rate_ =deltaN_.value_/deltaT_.value_;
00624 }
00625 else {
00626 throughput_=0.0;
00627 rate_ =0.0;
00628 }
00629
00630 double meanOfSquares,mean,squareOfMean,variance;
00631
00632 if(deltaN_.value_!=0) {
00633 meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00634 mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00635 squareOfMean=mean*mean;
00636 variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00637
00638 average_=deltaSumOfSizes_.value_/deltaN_.value_;
00639 rms_ =std::sqrt(variance);
00640 }
00641 else {
00642 average_=0.0;
00643 rms_ =0.0;
00644 }
00645
00646 gui_->monInfoSpace()->unlock();
00647 xdata::getInfoSpaceFactory()->unlock();
00648
00649 ::sleep(monSleepSec_.value_);
00650
00651 return true;
00652 }
00653
00654
00655
00656 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00657 {
00658 try {
00659 wlWatching_=
00660 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00661 "waiting");
00662 if (!wlWatching_->isActive()) wlWatching_->activate();
00663 asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00664 sourceId_+"Watching");
00665 wlWatching_->submit(asWatching_);
00666 }
00667 catch (xcept::Exception& e) {
00668 string msg = "Failed to start workloop 'Watching'.";
00669 XCEPT_RETHROW(evf::Exception,msg,e);
00670 }
00671 }
00672
00673
00674
00675 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00676 {
00677 lock();
00678
00679 if (0==resourceTable_) {
00680 unlock();
00681 return false;
00682 }
00683
00684 vector<pid_t> evt_prcids =resourceTable_->cellPrcIds();
00685 vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00686 vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps();
00687
00688 time_t tcurr=time(0);
00689 for (UInt_t i=0;i<evt_tstamps.size();i++) {
00690 pid_t pid =evt_prcids[i];
00691 UInt_t evt =evt_numbers[i];
00692 time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00693 double tdiff =difftime(tcurr,tstamp);
00694 if (tdiff>timeOutSec_) {
00695 if(processKillerEnabled_) {
00696 LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
00697 kill(pid,9);
00698 nbTimeoutsWithEvent_++;
00699 }
00700 else {
00701 LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
00702 <<timeOutSec_<<"sec for process "<<pid);
00703 }
00704 }
00705 }
00706
00707 vector<pid_t> prcids=resourceTable_->clientPrcIds();
00708 for (UInt_t i=0;i<prcids.size();i++) {
00709 pid_t pid =prcids[i];
00710 int status=kill(pid,0);
00711 if (status!=0) {
00712 LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
00713 if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00714 nbTimeoutsWithoutEvent_++;
00715 }
00716 }
00717
00718 if((resourceTable_->nbResources() != nbRawCells_.value_) && !shmInconsistent_){
00719 std::ostringstream ost;
00720 ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
00721 << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00722 << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00723 XCEPT_DECLARE(evf::Exception,
00724 sentinelException, ost.str());
00725 notifyQualified("error",sentinelException);
00726 shmInconsistent_ = true;
00727 }
00728
00729 unlock();
00730
00731 ::sleep(watchSleepSec_.value_);
00732 return true;
00733 }
00734
00735
00736
00737 void FUResourceBroker::exportParameters()
00738 {
00739 assert(0!=gui_);
00740
00741 gui_->addMonitorParam("url", &url_);
00742 gui_->addMonitorParam("class", &class_);
00743 gui_->addMonitorParam("instance", &instance_);
00744 gui_->addMonitorParam("runNumber", &runNumber_);
00745 gui_->addMonitorParam("stateName", fsm_.stateName());
00746
00747 gui_->addMonitorParam("deltaT", &deltaT_);
00748 gui_->addMonitorParam("deltaN", &deltaN_);
00749 gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
00750 gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
00751
00752 gui_->addMonitorParam("throughput", &throughput_);
00753 gui_->addMonitorParam("rate", &rate_);
00754 gui_->addMonitorParam("average", &average_);
00755 gui_->addMonitorParam("rms", &rms_);
00756 gui_->addMonitorParam("dataErrorFlag", &dataErrorFlag_);
00757
00758 gui_->addMonitorCounter("nbAllocatedEvents", &nbAllocatedEvents_);
00759 gui_->addMonitorCounter("nbPendingRequests", &nbPendingRequests_);
00760 gui_->addMonitorCounter("nbReceivedEvents", &nbReceivedEvents_);
00761 gui_->addMonitorCounter("nbSentEvents", &nbSentEvents_);
00762 gui_->addMonitorCounter("nbSentErrorEvents", &nbSentErrorEvents_);
00763 gui_->addMonitorCounter("nbDiscardedEvents", &nbDiscardedEvents_);
00764 gui_->addMonitorCounter("nbReceivedEol", &nbReceivedEol_);
00765 gui_->addMonitorCounter("highestEolReceived", &highestEolReceived_);
00766 gui_->addMonitorCounter("nbEolPosted", &nbEolPosted_);
00767 gui_->addMonitorCounter("nbEolDiscarded", &nbEolDiscarded_);
00768
00769 gui_->addMonitorCounter("nbPendingSMDiscards", &nbPendingSMDiscards_);
00770
00771 gui_->addMonitorCounter("nbSentDqmEvents", &nbSentDqmEvents_);
00772 gui_->addMonitorCounter("nbDqmDiscardReceived", &nbDqmDiscardReceived_);
00773 gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
00774
00775 gui_->addMonitorCounter("nbLostEvents", &nbLostEvents_);
00776 gui_->addMonitorCounter("nbDataErrors", &nbDataErrors_);
00777 gui_->addMonitorCounter("nbCrcErrors", &nbCrcErrors_);
00778 gui_->addMonitorCounter("nbTimeoutsWithEvent", &nbTimeoutsWithEvent_);
00779 gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
00780
00781 gui_->addStandardParam("segmentationMode", &segmentationMode_);
00782 gui_->addStandardParam("nbClients", &nbClients_);
00783 gui_->addStandardParam("clientPrcIds", &clientPrcIds_);
00784 gui_->addStandardParam("nbRawCells", &nbRawCells_);
00785 gui_->addStandardParam("nbRecoCells", &nbRecoCells_);
00786 gui_->addStandardParam("nbDqmCells", &nbDqmCells_);
00787 gui_->addStandardParam("rawCellSize", &rawCellSize_);
00788 gui_->addStandardParam("recoCellSize", &recoCellSize_);
00789 gui_->addStandardParam("dqmCellSize", &dqmCellSize_);
00790
00791 gui_->addStandardParam("doDropEvents", &doDropEvents_);
00792 gui_->addStandardParam("doFedIdCheck", &doFedIdCheck_);
00793 gui_->addStandardParam("doCrcCheck", &doCrcCheck_);
00794 gui_->addStandardParam("doDumpEvents", &doDumpEvents_);
00795 gui_->addStandardParam("buClassName", &buClassName_);
00796 gui_->addStandardParam("buInstance", &buInstance_);
00797 gui_->addStandardParam("smClassName", &smClassName_);
00798 gui_->addStandardParam("smInstance", &smInstance_);
00799 gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
00800 gui_->addStandardParam("monSleepSec", &monSleepSec_);
00801 gui_->addStandardParam("watchSleepSec", &watchSleepSec_);
00802 gui_->addStandardParam("timeOutSec", &timeOutSec_);
00803 gui_->addStandardParam("processKillerEnabled", &processKillerEnabled_);
00804 gui_->addStandardParam("useEvmBoard", &useEvmBoard_);
00805 gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
00806 gui_->addStandardParam("foundRcmsStateListener", fsm_.foundRcmsStateListener());
00807 gui_->addStandardParam("reasonForFailed", &reasonForFailed_);
00808
00809 gui_->addDebugCounter("nbAllocateSent", &nbAllocateSent_);
00810 gui_->addDebugCounter("nbTakeReceived", &nbTakeReceived_);
00811 gui_->addDebugCounter("nbDataDiscardReceived", &nbDataDiscardReceived_);
00812
00813 gui_->exportParameters();
00814
00815 gui_->addItemChangedListener("doFedIdCheck", this);
00816 gui_->addItemChangedListener("useEvmBoard", this);
00817 gui_->addItemChangedListener("doCrcCheck", this);
00818 gui_->addItemChangedListener("doDumpEvents", this);
00819 }
00820
00821
00822
00823 void FUResourceBroker::reset()
00824 {
00825 gui_->resetCounters();
00826
00827 deltaT_ =0.0;
00828 deltaN_ = 0;
00829 deltaSumOfSquares_=0.0;
00830 deltaSumOfSizes_ = 0;
00831
00832 throughput_ =0.0;
00833 rate_ =0.0;
00834 average_ =0.0;
00835 rms_ =0.0;
00836
00837 nbSentLast_ = 0;
00838 sumOfSquaresLast_ = 0;
00839 sumOfSizesLast_ = 0;
00840 }
00841
00842
00843
00844 double FUResourceBroker::deltaT(const struct timeval *start,
00845 const struct timeval *end)
00846 {
00847 unsigned int sec;
00848 unsigned int usec;
00849
00850 sec = end->tv_sec - start->tv_sec;
00851
00852 if(end->tv_usec > start->tv_usec) {
00853 usec = end->tv_usec - start->tv_usec;
00854 }
00855 else {
00856 sec--;
00857 usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00858 }
00859
00860 return ((double)sec) + ((double)usec) / 1000000.0;
00861 }
00862
00863
00864
00865
00866 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00867 throw (xgi::exception::Exception)
00868 {
00869 using namespace cgicc;
00870 Cgicc cgi(in);
00871 std::vector<FormEntry> els = cgi.getElements() ;
00872 for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
00873 std::cout << "form entry " << (*it).getValue() << std::endl;
00874
00875 std::vector<FormEntry> el1;
00876 cgi.getElement("crcError",el1);
00877 *out<<"<html>"<<endl;
00878 gui_->htmlHead(in,out,sourceId_);
00879 *out<<"<body>"<<endl;
00880 gui_->htmlHeadline(in,out);
00881
00882 lock();
00883
00884 if (0!=resourceTable_) {
00885 if(el1.size()!=0) {
00886 resourceTable_->injectCRCError();
00887 }
00888 *out << "<form method=\"GET\" action=\"customWebPage\" >";
00889 *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
00890 *out << "</form>" << endl;
00891 *out << "<hr/>" << std::endl;
00892 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00893 *out<<table().set("frame","void").set("rules","rows")
00894 .set("class","modules").set("width","250")<<endl
00895 <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00896 <<tr()
00897 <<th("client").set("align","left")
00898 <<th("process id").set("align","center")
00899 <<th("status").set("align","center")
00900 <<tr()
00901 <<endl;
00902 for (UInt_t i=0;i<client_prc_ids.size();i++) {
00903
00904 pid_t pid =client_prc_ids[i];
00905 int status=kill(pid,0);
00906
00907 stringstream ssi; ssi<<i+1;
00908 stringstream sspid; sspid<<pid;
00909 stringstream ssstatus; ssstatus<<status;
00910
00911 string bg_status = (status==0) ? "#00ff00" : "ff0000";
00912 *out<<tr()
00913 <<td(ssi.str()).set("align","left")
00914 <<td(sspid.str()).set("align","center")
00915 <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00916 <<tr()<<endl;
00917 }
00918 *out<<table()<<endl;
00919 *out<<"<br><br>"<<endl;
00920
00921 vector<string> states = resourceTable_->cellStates();
00922 vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00923 vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
00924 vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00925
00926 *out<<table().set("frame","void").set("rules","rows")
00927 .set("class","modules").set("width","500")<<endl
00928 <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00929 <<tr()
00930 <<th("cell").set("align","left")
00931 <<th("state").set("align","center")
00932 <<th("event").set("align","center")
00933 <<th("process id").set("align","center")
00934 <<th("timestamp").set("align","center")
00935 <<th("time").set("align","center")
00936 <<tr()
00937 <<endl;
00938 for (UInt_t i=0;i<states.size();i++) {
00939 string state=states[i];
00940 UInt_t evt = evt_numbers[i];
00941 pid_t pid = prc_ids[i];
00942 time_t tstamp= time_stamps[i];
00943 double tdiff = difftime(time(0),tstamp);
00944
00945 stringstream ssi; ssi<<i;
00946 stringstream ssevt; if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00947 stringstream sspid; if (pid!=0) sspid<<pid; else sspid<<" - ";
00948 stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00949 stringstream sstdiff; if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00950
00951 string bg_state = "#ffffff";
00952 if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00953 state=="RAWREADING"||state=="RAWREAD")
00954 bg_state="#99CCff";
00955 else if (state=="PROCESSING")
00956 bg_state="#ff0000";
00957 else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00958 bg_state="#CCff99";
00959 else if (state=="SENDING")
00960 bg_state="#00FF33";
00961 else if (state=="SENT")
00962 bg_state="#006633";
00963 else if (state=="DISCARDING")
00964 bg_state="#FFFF00";
00965 else if (state=="LUMISECTION")
00966 bg_state="#0000FF";
00967
00968 *out<<tr()
00969 <<td(ssi.str()).set("align","left")
00970 <<td(state).set("align","center").set("bgcolor",bg_state)
00971 <<td(ssevt.str()).set("align","center")
00972 <<td(sspid.str()).set("align","center")
00973 <<td(sststamp.str()).set("align","center")
00974 <<td(sstdiff.str()).set("align","center")
00975 <<tr()<<endl;
00976 }
00977 *out<<table()<<endl;
00978 *out<<"<br><br>"<<endl;
00979
00980 vector<string> dqmstates = resourceTable_->dqmCellStates();
00981
00982 *out<<table().set("frame","void").set("rules","rows")
00983 .set("class","modules").set("width","500")<<endl
00984 <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
00985 <<tr()
00986 <<th("cell").set("align","left")
00987 <<th("state").set("align","center")
00988 <<tr()
00989 <<endl;
00990 for (UInt_t i=0;i<dqmstates.size();i++) {
00991 string state=dqmstates[i];
00992
00993 string bg_state = "#ffffff";
00994 if (state=="WRITING"||state=="WRITTEN")
00995 bg_state="#99CCff";
00996 else if (state=="SENDING")
00997 bg_state="#00FF33";
00998 else if (state=="SENT")
00999 bg_state="#006633";
01000 else if (state=="DISCARDING")
01001 bg_state="#FFFF00";
01002
01003 *out<<tr()<<"<td>"<<i<<"</td>"
01004 <<td(state).set("align","center").set("bgcolor",bg_state)
01005 <<tr()<<endl;
01006 }
01007 *out<<table()<<endl;
01008
01009
01010
01011 }
01012 *out<<"</body>"<<endl<<"</html>"<<endl;
01013
01014 unlock();
01015 }
01016
01017 void FUResourceBroker::emergencyStop()
01018 {
01019 LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
01020 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
01021 for (UInt_t i=0;i<client_prc_ids.size();i++) {
01022 pid_t pid =client_prc_ids[i];
01023 std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
01024 if(pid!=0){
01025
01026 if(!resourceTable_->handleCrashedEP(runNumber_,pid))
01027 nbTimeoutsWithoutEvent_++;
01028 else
01029 nbTimeoutsWithEvent_++;
01030 }
01031 }
01032 resourceTable_->lastResort();
01033 ::sleep(1);
01034 if(!resourceTable_->isReadyToShutDown())
01035 {
01036 reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
01037 XCEPT_RAISE(evf::Exception,reasonForFailed_);
01038 }
01039 resourceTable_->printWorkLoopStatus();
01040 lock();
01041 std::cout << "delete resourcetable" <<std::endl;
01042 delete resourceTable_;
01043 resourceTable_=0;
01044 std::cout << "cycle through resourcetable config " << std::endl;
01045 configureResources();
01046 unlock();
01047 if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
01048 std::cout << "done with emergency stop" << std::endl;
01049 }
01050
01051 void FUResourceBroker::configureResources()
01052 {
01053 resourceTable_=new FUResourceTable(segmentationMode_.value_,
01054 nbRawCells_.value_,
01055 nbRecoCells_.value_,
01056 nbDqmCells_.value_,
01057 rawCellSize_.value_,
01058 recoCellSize_.value_,
01059 dqmCellSize_.value_,
01060 bu_,sm_,
01061 log_,
01062 shmResourceTableTimeout_.value_,
01063 frb_,
01064 this);
01065 FUResource::doFedIdCheck(doFedIdCheck_);
01066 FUResource::useEvmBoard(useEvmBoard_);
01067 resourceTable_->setDoCrcCheck(doCrcCheck_);
01068 resourceTable_->setDoDumpEvents(doDumpEvents_);
01069 reset();
01070 shmInconsistent_ = false;
01071 if(resourceTable_->nbResources() != nbRawCells_.value_ ||
01072 resourceTable_->nbFreeSlots() != nbRawCells_.value_)
01073 shmInconsistent_ = true;
01074 }
01075
01076
01078
01080
01081 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)