00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011
00012 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00013 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00014 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00015
00016 #include "FWCore/Utilities/interface/CRC16.h"
00017
00018 #include "EvffedFillerRB.h"
00019
00020 #include "i2o/Method.h"
00021 #include "interface/shared/i2oXFunctionCodes.h"
00022 #include "interface/evb/i2oEVBMsgs.h"
00023 #include "xcept/tools.h"
00024
00025 #include "toolbox/mem/HeapAllocator.h"
00026 #include "toolbox/mem/Reference.h"
00027 #include "toolbox/mem/MemoryPoolFactory.h"
00028 #include "toolbox/mem/exception/Exception.h"
00029
00030 #include "xoap/MessageReference.h"
00031 #include "xoap/MessageFactory.h"
00032 #include "xoap/SOAPEnvelope.h"
00033 #include "xoap/SOAPBody.h"
00034 #include "xoap/domutils.h"
00035 #include "xoap/Method.h"
00036
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040 #include "cgicc/HTMLClasses.h"
00041
00042 #include <signal.h>
00043 #include <iostream>
00044 #include <sstream>
00045
00046
00047 using namespace std;
00048 using namespace evf;
00049
00050
00052
00054
00055
00056 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00057 : xdaq::Application(s)
00058 , fsm_(this)
00059 , gui_(0)
00060 , log_(getApplicationLogger())
00061 , bu_(0)
00062 , sm_(0)
00063 , i2oPool_(0)
00064 , resourceTable_(0)
00065 , wlMonitoring_(0)
00066 , asMonitoring_(0)
00067 , wlWatching_(0)
00068 , asWatching_(0)
00069 , instance_(0)
00070 , runNumber_(0)
00071 , deltaT_(0.0)
00072 , deltaN_(0)
00073 , deltaSumOfSquares_(0)
00074 , deltaSumOfSizes_(0)
00075 , throughput_(0.0)
00076 , rate_(0.0)
00077 , average_(0.0)
00078 , rms_(0.0)
00079 , nbAllocatedEvents_(0)
00080 , nbPendingRequests_(0)
00081 , nbReceivedEvents_(0)
00082 , nbSentEvents_(0)
00083 , nbSentDqmEvents_(0)
00084 , nbSentErrorEvents_(0)
00085 , nbPendingSMDiscards_(0)
00086 , nbPendingSMDqmDiscards_(0)
00087 , nbDiscardedEvents_(0)
00088 , nbReceivedEol_(0)
00089 , highestEolReceived_(0)
00090 , nbEolPosted_(0)
00091 , nbEolDiscarded_(0)
00092 , nbLostEvents_(0)
00093 , nbDataErrors_(0)
00094 , nbCrcErrors_(0)
00095 , nbTimeoutsWithEvent_(0)
00096 , nbTimeoutsWithoutEvent_(0)
00097 , dataErrorFlag_(0)
00098 , segmentationMode_(false)
00099 , nbClients_(0)
00100 , clientPrcIds_("")
00101 , nbRawCells_(16)
00102 , nbRecoCells_(8)
00103 , nbDqmCells_(8)
00104 , rawCellSize_(0x400000)
00105 , recoCellSize_(0x800000)
00106 , dqmCellSize_(0x800000)
00107 , doDropEvents_(false)
00108 , doFedIdCheck_(true)
00109 , doCrcCheck_(1)
00110 , doDumpEvents_(0)
00111 , buClassName_("BU")
00112 , buInstance_(0)
00113 , smClassName_("StorageManager")
00114 , smInstance_(0)
00115 , shmResourceTableTimeout_(200000)
00116 , monSleepSec_(2)
00117 , watchSleepSec_(10)
00118 , timeOutSec_(30)
00119 , processKillerEnabled_(true)
00120 , useEvmBoard_(true)
00121 , reasonForFailed_("")
00122 , nbAllocateSent_(0)
00123 , nbTakeReceived_(0)
00124 , nbDataDiscardReceived_(0)
00125 , nbDqmDiscardReceived_(0)
00126 , nbSentLast_(0)
00127 , sumOfSquaresLast_(0)
00128 , sumOfSizesLast_(0)
00129 , lock_(toolbox::BSem::FULL)
00130 , frb_(0)
00131 , shmInconsistent_(false)
00132 {
00133
00134 fsm_.initialize<evf::FUResourceBroker>(this);
00135
00136
00137 url_ =
00138 getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00139 getApplicationDescriptor()->getURN();
00140 class_ =getApplicationDescriptor()->getClassName();
00141 instance_=getApplicationDescriptor()->getInstance();
00142 sourceId_=class_.toString()+"_"+instance_.toString();
00143
00144
00145 i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00146 I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00147 i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00148 I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00149 i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00150 I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00151 i2o::bind(this,&FUResourceBroker::I2O_EVM_LUMISECTION_Callback,
00152 I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
00153
00154
00155
00156 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00157 gui_=new WebGUI(this,&fsm_);
00158 vector<toolbox::lang::Method*> methods=gui_->getMethods();
00159 vector<toolbox::lang::Method*>::iterator it;
00160 for (it=methods.begin();it!=methods.end();++it) {
00161 if ((*it)->type()=="cgi") {
00162 string name=static_cast<xgi::MethodSignature*>(*it)->name();
00163 xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00164 }
00165 }
00166 xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00167
00168
00169
00170 string i2oPoolName=sourceId_+"_i2oPool";
00171 try {
00172 toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00173 toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00174 toolbox::mem::MemoryPoolFactory* poolFactory=
00175 toolbox::mem::getMemoryPoolFactory();
00176 i2oPool_=poolFactory->createPool(urn,allocator);
00177 }
00178 catch (toolbox::mem::exception::Exception& e) {
00179 string s="Failed to create pool: "+i2oPoolName;
00180 LOG4CPLUS_FATAL(log_,s);
00181 XCEPT_RETHROW(xcept::Exception,s,e);
00182 }
00183
00184
00185 exportParameters();
00186
00187
00188 fsm_.findRcmsStateListener();
00189
00190
00191 getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00192
00193 }
00194
00195
00196
00197 FUResourceBroker::~FUResourceBroker()
00198 {
00199
00200 }
00201
00202
00203
00205
00207
00208
00209 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00210 {
00211 try {
00212 LOG4CPLUS_INFO(log_, "Start configuring ...");
00213 connectToBUandSM();
00214 frb_ = new EvffedFillerRB(this);
00215 configureResources();
00216 if(shmInconsistent_){
00217 std::ostringstream ost;
00218 ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw="
00219 << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00220 << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00221 XCEPT_RAISE(evf::Exception,ost.str());
00222 }
00223 LOG4CPLUS_INFO(log_, "Finished configuring!");
00224 fsm_.fireEvent("ConfigureDone",this);
00225
00226 }
00227 catch (xcept::Exception &e) {
00228 std::string msg = "configuring FAILED: " + (string)e.what();
00229 reasonForFailed_ = e.what();
00230 fsm_.fireFailed(msg,this);
00231 }
00232
00233 return false;
00234 }
00235
00236
00237
00238 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00239 {
00240 try {
00241 LOG4CPLUS_INFO(log_, "Start enabling ...");
00242 reset();
00243 startMonitoringWorkLoop();
00244 startWatchingWorkLoop();
00245 resourceTable_->setRunNumber(runNumber_);
00246 lock();
00247 resourceTable_->resetCounters();
00248 unlock();
00249 resourceTable_->startDiscardWorkLoop();
00250 resourceTable_->startSendDataWorkLoop();
00251 resourceTable_->startSendDqmWorkLoop();
00252 resourceTable_->sendAllocate();
00253
00254 nbTimeoutsWithEvent_ = 0;
00255 nbTimeoutsWithoutEvent_ = 0;
00256 dataErrorFlag_ = 0;
00257
00258 LOG4CPLUS_INFO(log_, "Finished enabling!");
00259 fsm_.fireEvent("EnableDone",this);
00260 }
00261 catch (xcept::Exception &e) {
00262 std::string msg = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00263 reasonForFailed_ = e.what();
00264 fsm_.fireFailed(msg,this);
00265 }
00266
00267 return false;
00268 }
00269
00270
00271
00272 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00273 {
00274 try {
00275 LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00276 resourceTable_->stop();
00277 timeval now;
00278 timeval then;
00279 gettimeofday(&then,0);
00280 while (!resourceTable_->isReadyToShutDown()) {
00281 ::usleep(shmResourceTableTimeout_.value_*10);
00282 gettimeofday(&now,0);
00283 if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
00284 std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " "
00285 << shmResourceTableTimeout_.value_/10000 << std::endl;
00286 LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
00287 emergencyStop();
00288 break;
00289 }
00290 }
00291
00292 if (resourceTable_->isReadyToShutDown()) {
00293 LOG4CPLUS_INFO(log_, "Finished stopping!");
00294 fsm_.fireEvent("StopDone",this);
00295 }
00296 }
00297 catch (xcept::Exception &e) {
00298 std::string msg = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00299 reasonForFailed_ = e.what();
00300 fsm_.fireFailed(msg,this);
00301 }
00302
00303 return false;
00304 }
00305
00306
00307
00308 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00309 {
00310 try {
00311 LOG4CPLUS_INFO(log_, "Start halting ...");
00312 if (resourceTable_->isActive()) {
00313 resourceTable_->halt();
00314 UInt_t count = 0;
00315 while (count<10) {
00316 if (resourceTable_->isReadyToShutDown()) {
00317 lock();
00318 delete resourceTable_;
00319 resourceTable_=0;
00320 unlock();
00321 LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00322 break;
00323 }
00324 else {
00325 count++;
00326 LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00327 ::sleep(1);
00328 }
00329 }
00330 }
00331 else {
00332 lock();
00333 delete resourceTable_;
00334 resourceTable_=0;
00335 unlock();
00336 }
00337
00338 if (0==resourceTable_) {
00339 LOG4CPLUS_INFO(log_,"Finished halting!");
00340 fsm_.fireEvent("HaltDone",this);
00341 }
00342 else {
00343 std::string msg = "halting FAILED: ResourceTable shutdown timed out.";
00344 reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00345 fsm_.fireFailed(msg,this);
00346 }
00347 }
00348 catch (xcept::Exception &e) {
00349 std::string msg = "halting FAILED: "+xcept::stdformat_exception_history(e);
00350 reasonForFailed_ = e.what();
00351 fsm_.fireFailed(msg,this);
00352 }
00353 if(frb_) delete frb_;
00354 return false;
00355 }
00356
00357
00358
00359 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00360 throw (xoap::exception::Exception)
00361 {
00362 return fsm_.commandCallback(msg);
00363 }
00364
00365
00366
00367 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00368 {
00369 nbTakeReceived_.value_++;
00370 if(fsm_.checkIfEnabled()){
00371 bool eventComplete=resourceTable_->buildResource(bufRef);
00372 if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00373 }
00374 else{
00375 LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state "
00376 << fsm_.stateName() << " is being lost");
00377 bufRef->release();
00378 }
00379 }
00380
00381
00382 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
00383 {
00384 if(fsm_.checkIfEnabled()){
00385
00386 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00387 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00388 if(msg->lumiSection==0){
00389 LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
00390 fsm_.fireFailed("EOL message received for ls=0!!! ",this);
00391 }
00392 nbReceivedEol_++;
00393 if(highestEolReceived_.value_+100 < msg->lumiSection)
00394 {
00395 LOG4CPLUS_ERROR(log_,"EOL message not in sequence, expected "
00396 << highestEolReceived_.value_+1
00397 << " received " << msg->lumiSection);
00398 fsm_.fireFailed("EOL message with corrupted LS ",this);
00399 }
00400 if(highestEolReceived_.value_+1 != msg->lumiSection)
00401 LOG4CPLUS_WARN(log_,"EOL message not in sequence, expected "
00402 << highestEolReceived_.value_+1
00403 << " received " << msg->lumiSection);
00404
00405 if(highestEolReceived_.value_ < msg->lumiSection)
00406 highestEolReceived_.value_ = msg->lumiSection;
00407 resourceTable_->postEndOfLumiSection(bufRef);
00408 }
00409 else{
00410 LOG4CPLUS_ERROR(log_,"EOL i2o frame received in state "
00411 << fsm_.stateName() << " is being lost");
00412 }
00413 bufRef->release();
00414
00415
00416
00417
00418
00419
00420 }
00421
00422
00423
00424
00425
00426 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00427 {
00428 nbDataDiscardReceived_.value_++;
00429 resourceTable_->discardDataEvent(bufRef);
00430 }
00431
00432
00433
00434 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00435 {
00436 nbDqmDiscardReceived_.value_++;
00437 resourceTable_->discardDqmEvent(bufRef);
00438 }
00439
00440
00441
00442 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00443 {
00444 typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00445 typedef AppDescSet_t::iterator AppDescIter_t;
00446
00447
00448 AppDescSet_t setOfBUs=
00449 getApplicationContext()->getDefaultZone()->
00450 getApplicationDescriptors(buClassName_.toString());
00451
00452 if (0!=bu_) { delete bu_; bu_=0; }
00453
00454 for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00455 if ((*it)->getInstance()==buInstance_)
00456 bu_=new BUProxy(getApplicationDescriptor(),*it,
00457 getApplicationContext(),i2oPool_);
00458
00459 if (0==bu_) {
00460 string msg=sourceId_+" failed to locate input BU!";
00461 XCEPT_RAISE(evf::Exception,msg);
00462 }
00463
00464
00465 AppDescSet_t setOfSMs=
00466 getApplicationContext()->getDefaultZone()->
00467 getApplicationDescriptors(smClassName_.toString());
00468
00469 if (0!=sm_) { delete sm_; sm_=0; }
00470
00471 for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00472 if ((*it)->getInstance()==smInstance_)
00473 sm_=new SMProxy(getApplicationDescriptor(),*it,
00474 getApplicationContext(),i2oPool_);
00475
00476 if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00477 }
00478
00479
00480
00481 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00482 throw (xgi::exception::Exception)
00483 {
00484 string name=in->getenv("PATH_INFO");
00485 if (name.empty()) name="defaultWebPage";
00486 static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00487 }
00488
00489
00490
00491 void FUResourceBroker::actionPerformed(xdata::Event& e)
00492 {
00493 lock();
00494
00495 if (0!=resourceTable_) {
00496
00497
00498
00499 if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00500 nbClients_ =resourceTable_->nbClients();
00501 clientPrcIds_ =resourceTable_->clientPrcIdsAsString();
00502 nbAllocatedEvents_ =resourceTable_->nbAllocated();
00503 nbPendingRequests_ =resourceTable_->nbPending();
00504 nbReceivedEvents_ =resourceTable_->nbCompleted();
00505 nbSentEvents_ =resourceTable_->nbSent();
00506 nbSentDqmEvents_ =resourceTable_->nbSentDqm();
00507 nbSentErrorEvents_ =resourceTable_->nbSentError();
00508 nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00509 nbPendingSMDqmDiscards_=resourceTable_->nbPendingSMDqmDiscards();
00510 nbDiscardedEvents_ =resourceTable_->nbDiscarded();
00511 nbLostEvents_ =resourceTable_->nbLost();
00512 nbEolPosted_ =resourceTable_->nbEolPosted();
00513 nbEolDiscarded_ =resourceTable_->nbEolDiscarded();
00514 nbDataErrors_ =resourceTable_->nbErrors();
00515 nbCrcErrors_ =resourceTable_->nbCrcErrors();
00516 nbAllocateSent_ =resourceTable_->nbAllocSent();
00517 dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u +
00518 ((nbDataErrors_.value_ != 0u) << 1) +
00519 ((nbLostEvents_.value_ != 0u) << 2) +
00520 ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
00521 ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
00522 ((nbSentErrorEvents_.value_ != 0u) << 5)
00523 );
00524 }
00525 else if (e.type()=="ItemChangedEvent") {
00526
00527 string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00528
00529 if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00530 if (item=="useEvmBoard") FUResource::useEvmBoard(useEvmBoard_);
00531 if (item=="doCrcCheck") resourceTable_->setDoCrcCheck(doCrcCheck_);
00532 if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00533 }
00534
00535
00536 }
00537 else {
00538 nbClients_ =0;
00539 clientPrcIds_ ="";
00540 nbAllocatedEvents_ =0;
00541 nbPendingRequests_ =0;
00542 nbReceivedEvents_ =0;
00543 nbSentEvents_ =0;
00544 nbSentDqmEvents_ =0;
00545 nbSentErrorEvents_ =0;
00546 nbPendingSMDiscards_=0;
00547 nbPendingSMDqmDiscards_=0;
00548 nbDiscardedEvents_ =0;
00549 nbLostEvents_ =0;
00550 nbDataErrors_ =0;
00551 nbCrcErrors_ =0;
00552 nbAllocateSent_ =0;
00553 }
00554 unlock();
00555 }
00556
00557
00558
00559 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00560 {
00561 struct timezone timezone;
00562 gettimeofday(&monStartTime_,&timezone);
00563
00564 try {
00565 wlMonitoring_=
00566 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00567 "waiting");
00568 if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00569 asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00570 sourceId_+"Monitoring");
00571 wlMonitoring_->submit(asMonitoring_);
00572 }
00573 catch (xcept::Exception& e) {
00574 string msg = "Failed to start workloop 'Monitoring'.";
00575 XCEPT_RETHROW(evf::Exception,msg,e);
00576 }
00577 }
00578
00579
00580
00581 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00582 {
00583 unsigned int nbSent;
00584 uint64_t sumOfSquares;
00585 unsigned int sumOfSizes;
00586 uint64_t deltaSumOfSquares;
00587
00588 lock();
00589 if (0==resourceTable_) {
00590 deltaT_.value_ =0.0;
00591 deltaN_.value_ = 0;
00592 deltaSumOfSquares_.value_=0.0;
00593 deltaSumOfSizes_.value_ = 0;
00594 throughput_ =0.0;
00595 rate_ =0.0;
00596 average_ =0.0;
00597 rms_ =0.0;
00598 unlock();
00599 return false;
00600 }
00601 else {
00602 nbSent =resourceTable_->nbSent();
00603 sumOfSquares=resourceTable_->sumOfSquares();
00604 sumOfSizes =resourceTable_->sumOfSizes();
00605 }
00606 unlock();
00607
00608 struct timeval monEndTime;
00609 struct timezone timezone;
00610
00611 gettimeofday(&monEndTime,&timezone);
00612
00613 xdata::getInfoSpaceFactory()->lock();
00614 gui_->monInfoSpace()->lock();
00615
00616 deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00617 monStartTime_=monEndTime;
00618
00619 deltaN_.value_=nbSent-nbSentLast_;
00620 nbSentLast_=nbSent;
00621
00622 deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00623 deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00624 sumOfSquaresLast_=sumOfSquares;
00625
00626 deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00627 sumOfSizesLast_=sumOfSizes;
00628
00629 if (deltaT_.value_!=0) {
00630 throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00631 rate_ =deltaN_.value_/deltaT_.value_;
00632 }
00633 else {
00634 throughput_=0.0;
00635 rate_ =0.0;
00636 }
00637
00638 double meanOfSquares,mean,squareOfMean,variance;
00639
00640 if(deltaN_.value_!=0) {
00641 meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00642 mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00643 squareOfMean=mean*mean;
00644 variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00645
00646 average_=deltaSumOfSizes_.value_/deltaN_.value_;
00647 rms_ =std::sqrt(variance);
00648 }
00649 else {
00650 average_=0.0;
00651 rms_ =0.0;
00652 }
00653
00654 gui_->monInfoSpace()->unlock();
00655 xdata::getInfoSpaceFactory()->unlock();
00656
00657 ::sleep(monSleepSec_.value_);
00658
00659 return true;
00660 }
00661
00662
00663
00664 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00665 {
00666 try {
00667 wlWatching_=
00668 toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00669 "waiting");
00670 if (!wlWatching_->isActive()) wlWatching_->activate();
00671 asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00672 sourceId_+"Watching");
00673 wlWatching_->submit(asWatching_);
00674 }
00675 catch (xcept::Exception& e) {
00676 string msg = "Failed to start workloop 'Watching'.";
00677 XCEPT_RETHROW(evf::Exception,msg,e);
00678 }
00679 }
00680
00681
00682
00683 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00684 {
00685 lock();
00686
00687 if (0==resourceTable_) {
00688 unlock();
00689 return false;
00690 }
00691
00692 vector<pid_t> evt_prcids =resourceTable_->cellPrcIds();
00693 vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00694 vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps();
00695
00696 time_t tcurr=time(0);
00697 for (UInt_t i=0;i<evt_tstamps.size();i++) {
00698 pid_t pid =evt_prcids[i];
00699 UInt_t evt =evt_numbers[i];
00700 time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00701 double tdiff =difftime(tcurr,tstamp);
00702 if (tdiff>timeOutSec_) {
00703 if(processKillerEnabled_) {
00704 kill(pid,9);
00705 nbTimeoutsWithEvent_++;
00706 }
00707 LOG4CPLUS_ERROR(log_,"evt "<<evt<<" under processing for more than "
00708 <<timeOutSec_<<"sec for process "<<pid);
00709 }
00710 }
00711
00712 vector<pid_t> prcids=resourceTable_->clientPrcIds();
00713 for (UInt_t i=0;i<prcids.size();i++) {
00714 pid_t pid =prcids[i];
00715 int status=kill(pid,0);
00716 if (status!=0) {
00717 LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
00718 if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00719 nbTimeoutsWithoutEvent_++;
00720 }
00721 }
00722
00723 if((resourceTable_->nbResources() != nbRawCells_.value_) && !shmInconsistent_){
00724 std::ostringstream ost;
00725 ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw="
00726 << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00727 << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00728 XCEPT_DECLARE(evf::Exception,
00729 sentinelException, ost.str());
00730 notifyQualified("error",sentinelException);
00731 shmInconsistent_ = true;
00732 }
00733
00734 unlock();
00735
00736 ::sleep(watchSleepSec_.value_);
00737 return true;
00738 }
00739
00740
00741
00742 void FUResourceBroker::exportParameters()
00743 {
00744 assert(0!=gui_);
00745
00746 gui_->addMonitorParam("url", &url_);
00747 gui_->addMonitorParam("class", &class_);
00748 gui_->addMonitorParam("instance", &instance_);
00749 gui_->addMonitorParam("runNumber", &runNumber_);
00750 gui_->addMonitorParam("stateName", fsm_.stateName());
00751
00752 gui_->addMonitorParam("deltaT", &deltaT_);
00753 gui_->addMonitorParam("deltaN", &deltaN_);
00754 gui_->addMonitorParam("deltaSumOfSquares", &deltaSumOfSquares_);
00755 gui_->addMonitorParam("deltaSumOfSizes", &deltaSumOfSizes_);
00756
00757 gui_->addMonitorParam("throughput", &throughput_);
00758 gui_->addMonitorParam("rate", &rate_);
00759 gui_->addMonitorParam("average", &average_);
00760 gui_->addMonitorParam("rms", &rms_);
00761 gui_->addMonitorParam("dataErrorFlag", &dataErrorFlag_);
00762
00763 gui_->addMonitorCounter("nbAllocatedEvents", &nbAllocatedEvents_);
00764 gui_->addMonitorCounter("nbPendingRequests", &nbPendingRequests_);
00765 gui_->addMonitorCounter("nbReceivedEvents", &nbReceivedEvents_);
00766 gui_->addMonitorCounter("nbSentEvents", &nbSentEvents_);
00767 gui_->addMonitorCounter("nbSentErrorEvents", &nbSentErrorEvents_);
00768 gui_->addMonitorCounter("nbDiscardedEvents", &nbDiscardedEvents_);
00769 gui_->addMonitorCounter("nbReceivedEol", &nbReceivedEol_);
00770 gui_->addMonitorCounter("highestEolReceived", &highestEolReceived_);
00771 gui_->addMonitorCounter("nbEolPosted", &nbEolPosted_);
00772 gui_->addMonitorCounter("nbEolDiscarded", &nbEolDiscarded_);
00773
00774 gui_->addMonitorCounter("nbPendingSMDiscards", &nbPendingSMDiscards_);
00775
00776 gui_->addMonitorCounter("nbSentDqmEvents", &nbSentDqmEvents_);
00777 gui_->addMonitorCounter("nbDqmDiscardReceived", &nbDqmDiscardReceived_);
00778 gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
00779
00780 gui_->addMonitorCounter("nbLostEvents", &nbLostEvents_);
00781 gui_->addMonitorCounter("nbDataErrors", &nbDataErrors_);
00782 gui_->addMonitorCounter("nbCrcErrors", &nbCrcErrors_);
00783 gui_->addMonitorCounter("nbTimeoutsWithEvent", &nbTimeoutsWithEvent_);
00784 gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
00785
00786 gui_->addStandardParam("segmentationMode", &segmentationMode_);
00787 gui_->addStandardParam("nbClients", &nbClients_);
00788 gui_->addStandardParam("clientPrcIds", &clientPrcIds_);
00789 gui_->addStandardParam("nbRawCells", &nbRawCells_);
00790 gui_->addStandardParam("nbRecoCells", &nbRecoCells_);
00791 gui_->addStandardParam("nbDqmCells", &nbDqmCells_);
00792 gui_->addStandardParam("rawCellSize", &rawCellSize_);
00793 gui_->addStandardParam("recoCellSize", &recoCellSize_);
00794 gui_->addStandardParam("dqmCellSize", &dqmCellSize_);
00795
00796 gui_->addStandardParam("doDropEvents", &doDropEvents_);
00797 gui_->addStandardParam("doFedIdCheck", &doFedIdCheck_);
00798 gui_->addStandardParam("doCrcCheck", &doCrcCheck_);
00799 gui_->addStandardParam("doDumpEvents", &doDumpEvents_);
00800 gui_->addStandardParam("buClassName", &buClassName_);
00801 gui_->addStandardParam("buInstance", &buInstance_);
00802 gui_->addStandardParam("smClassName", &smClassName_);
00803 gui_->addStandardParam("smInstance", &smInstance_);
00804 gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
00805 gui_->addStandardParam("monSleepSec", &monSleepSec_);
00806 gui_->addStandardParam("watchSleepSec", &watchSleepSec_);
00807 gui_->addStandardParam("timeOutSec", &timeOutSec_);
00808 gui_->addStandardParam("processKillerEnabled", &processKillerEnabled_);
00809 gui_->addStandardParam("useEvmBoard", &useEvmBoard_);
00810 gui_->addStandardParam("rcmsStateListener", fsm_.rcmsStateListener());
00811 gui_->addStandardParam("foundRcmsStateListener", fsm_.foundRcmsStateListener());
00812 gui_->addStandardParam("reasonForFailed", &reasonForFailed_);
00813
00814 gui_->addDebugCounter("nbAllocateSent", &nbAllocateSent_);
00815 gui_->addDebugCounter("nbTakeReceived", &nbTakeReceived_);
00816 gui_->addDebugCounter("nbDataDiscardReceived", &nbDataDiscardReceived_);
00817
00818 gui_->exportParameters();
00819
00820 gui_->addItemChangedListener("doFedIdCheck", this);
00821 gui_->addItemChangedListener("useEvmBoard", this);
00822 gui_->addItemChangedListener("doCrcCheck", this);
00823 gui_->addItemChangedListener("doDumpEvents", this);
00824 }
00825
00826
00827
00828 void FUResourceBroker::reset()
00829 {
00830 gui_->resetCounters();
00831
00832 deltaT_ =0.0;
00833 deltaN_ = 0;
00834 deltaSumOfSquares_=0.0;
00835 deltaSumOfSizes_ = 0;
00836
00837 throughput_ =0.0;
00838 rate_ =0.0;
00839 average_ =0.0;
00840 rms_ =0.0;
00841
00842 nbSentLast_ = 0;
00843 sumOfSquaresLast_ = 0;
00844 sumOfSizesLast_ = 0;
00845 }
00846
00847
00848
00849 double FUResourceBroker::deltaT(const struct timeval *start,
00850 const struct timeval *end)
00851 {
00852 unsigned int sec;
00853 unsigned int usec;
00854
00855 sec = end->tv_sec - start->tv_sec;
00856
00857 if(end->tv_usec > start->tv_usec) {
00858 usec = end->tv_usec - start->tv_usec;
00859 }
00860 else {
00861 sec--;
00862 usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00863 }
00864
00865 return ((double)sec) + ((double)usec) / 1000000.0;
00866 }
00867
00868
00869
00870
00871 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00872 throw (xgi::exception::Exception)
00873 {
00874 using namespace cgicc;
00875 Cgicc cgi(in);
00876 std::vector<FormEntry> els = cgi.getElements() ;
00877 for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
00878 std::cout << "form entry " << (*it).getValue() << std::endl;
00879
00880 std::vector<FormEntry> el1;
00881 cgi.getElement("crcError",el1);
00882 *out<<"<html>"<<endl;
00883 gui_->htmlHead(in,out,sourceId_);
00884 *out<<"<body>"<<endl;
00885 gui_->htmlHeadline(in,out);
00886
00887 lock();
00888
00889 if (0!=resourceTable_) {
00890 if(el1.size()!=0) {
00891 resourceTable_->injectCRCError();
00892 }
00893 *out << "<form method=\"GET\" action=\"customWebPage\" >";
00894 *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
00895 *out << "</form>" << endl;
00896 *out << "<hr/>" << std::endl;
00897 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00898 *out<<table().set("frame","void").set("rules","rows")
00899 .set("class","modules").set("width","250")<<endl
00900 <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00901 <<tr()
00902 <<th("client").set("align","left")
00903 <<th("process id").set("align","center")
00904 <<th("status").set("align","center")
00905 <<tr()
00906 <<endl;
00907 for (UInt_t i=0;i<client_prc_ids.size();i++) {
00908
00909 pid_t pid =client_prc_ids[i];
00910 int status=kill(pid,0);
00911
00912 stringstream ssi; ssi<<i+1;
00913 stringstream sspid; sspid<<pid;
00914 stringstream ssstatus; ssstatus<<status;
00915
00916 string bg_status = (status==0) ? "#00ff00" : "ff0000";
00917 *out<<tr()
00918 <<td(ssi.str()).set("align","left")
00919 <<td(sspid.str()).set("align","center")
00920 <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00921 <<tr()<<endl;
00922 }
00923 *out<<table()<<endl;
00924 *out<<"<br><br>"<<endl;
00925
00926 vector<string> states = resourceTable_->cellStates();
00927 vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00928 vector<pid_t> prc_ids = resourceTable_->cellPrcIds();
00929 vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00930
00931 *out<<table().set("frame","void").set("rules","rows")
00932 .set("class","modules").set("width","500")<<endl
00933 <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00934 <<tr()
00935 <<th("cell").set("align","left")
00936 <<th("state").set("align","center")
00937 <<th("event").set("align","center")
00938 <<th("process id").set("align","center")
00939 <<th("timestamp").set("align","center")
00940 <<th("time").set("align","center")
00941 <<tr()
00942 <<endl;
00943 for (UInt_t i=0;i<states.size();i++) {
00944 string state=states[i];
00945 UInt_t evt = evt_numbers[i];
00946 pid_t pid = prc_ids[i];
00947 time_t tstamp= time_stamps[i];
00948 double tdiff = difftime(time(0),tstamp);
00949
00950 stringstream ssi; ssi<<i;
00951 stringstream ssevt; if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00952 stringstream sspid; if (pid!=0) sspid<<pid; else sspid<<" - ";
00953 stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00954 stringstream sstdiff; if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00955
00956 string bg_state = "#ffffff";
00957 if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00958 state=="RAWREADING"||state=="RAWREAD")
00959 bg_state="#99CCff";
00960 else if (state=="PROCESSING")
00961 bg_state="#ff0000";
00962 else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00963 bg_state="#CCff99";
00964 else if (state=="SENDING")
00965 bg_state="#00FF33";
00966 else if (state=="SENT")
00967 bg_state="#006633";
00968 else if (state=="DISCARDING")
00969 bg_state="#FFFF00";
00970 else if (state=="LUMISECTION")
00971 bg_state="#0000FF";
00972
00973 *out<<tr()
00974 <<td(ssi.str()).set("align","left")
00975 <<td(state).set("align","center").set("bgcolor",bg_state)
00976 <<td(ssevt.str()).set("align","center")
00977 <<td(sspid.str()).set("align","center")
00978 <<td(sststamp.str()).set("align","center")
00979 <<td(sstdiff.str()).set("align","center")
00980 <<tr()<<endl;
00981 }
00982 *out<<table()<<endl;
00983 *out<<"<br><br>"<<endl;
00984
00985 vector<string> dqmstates = resourceTable_->dqmCellStates();
00986
00987 *out<<table().set("frame","void").set("rules","rows")
00988 .set("class","modules").set("width","500")<<endl
00989 <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
00990 <<tr()
00991 <<th("cell").set("align","left")
00992 <<th("state").set("align","center")
00993 <<tr()
00994 <<endl;
00995 for (UInt_t i=0;i<dqmstates.size();i++) {
00996 string state=dqmstates[i];
00997
00998 string bg_state = "#ffffff";
00999 if (state=="WRITING"||state=="WRITTEN")
01000 bg_state="#99CCff";
01001 else if (state=="SENDING")
01002 bg_state="#00FF33";
01003 else if (state=="SENT")
01004 bg_state="#006633";
01005 else if (state=="DISCARDING")
01006 bg_state="#FFFF00";
01007
01008 *out<<tr()<<"<td>"<<i<<"</td>"
01009 <<td(state).set("align","center").set("bgcolor",bg_state)
01010 <<tr()<<endl;
01011 }
01012 *out<<table()<<endl;
01013
01014
01015
01016 }
01017 *out<<"</body>"<<endl<<"</html>"<<endl;
01018
01019 unlock();
01020 }
01021
01022 void FUResourceBroker::emergencyStop()
01023 {
01024 LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
01025 vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
01026 for (UInt_t i=0;i<client_prc_ids.size();i++) {
01027 pid_t pid =client_prc_ids[i];
01028 std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
01029 if(pid!=0){
01030
01031 if(!resourceTable_->handleCrashedEP(runNumber_,pid))
01032 nbTimeoutsWithoutEvent_++;
01033 else
01034 nbTimeoutsWithEvent_++;
01035 }
01036 }
01037 resourceTable_->lastResort();
01038 ::sleep(1);
01039 if(!resourceTable_->isReadyToShutDown())
01040 {
01041 reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
01042 XCEPT_RAISE(evf::Exception,reasonForFailed_);
01043 }
01044 resourceTable_->printWorkLoopStatus();
01045 lock();
01046 std::cout << "delete resourcetable" <<std::endl;
01047 delete resourceTable_;
01048 resourceTable_=0;
01049 std::cout << "cycle through resourcetable config " << std::endl;
01050 configureResources();
01051 unlock();
01052 if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
01053 std::cout << "done with emergency stop" << std::endl;
01054 }
01055
01056 void FUResourceBroker::configureResources()
01057 {
01058 resourceTable_=new FUResourceTable(segmentationMode_.value_,
01059 nbRawCells_.value_,
01060 nbRecoCells_.value_,
01061 nbDqmCells_.value_,
01062 rawCellSize_.value_,
01063 recoCellSize_.value_,
01064 dqmCellSize_.value_,
01065 bu_,sm_,
01066 log_,
01067 shmResourceTableTimeout_.value_,
01068 frb_,
01069 this);
01070 FUResource::doFedIdCheck(doFedIdCheck_);
01071 FUResource::useEvmBoard(useEvmBoard_);
01072 resourceTable_->setDoCrcCheck(doCrcCheck_);
01073 resourceTable_->setDoDumpEvents(doDumpEvents_);
01074 reset();
01075 shmInconsistent_ = false;
01076 if(resourceTable_->nbResources() != nbRawCells_.value_ ||
01077 resourceTable_->nbFreeSlots() != nbRawCells_.value_)
01078 shmInconsistent_ = true;
01079 }
01080
01081
01083
01085
01086 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)