CMS 3D CMS Logo

FUResourceBroker.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceBroker
00004 // ----------------
00005 //
00006 //            10/20/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00012 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00013 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00014 
00015 #include "FWCore/Utilities/interface/CRC16.h"
00016 
00017 #include "i2o/Method.h"
00018 #include "interface/shared/i2oXFunctionCodes.h"
00019 #include "xcept/tools.h"
00020 
00021 #include "toolbox/mem/HeapAllocator.h"
00022 #include "toolbox/mem/Reference.h"
00023 #include "toolbox/mem/MemoryPoolFactory.h"
00024 #include "toolbox/mem/exception/Exception.h"
00025 
00026 #include "xoap/MessageReference.h"
00027 #include "xoap/MessageFactory.h"
00028 #include "xoap/SOAPEnvelope.h"
00029 #include "xoap/SOAPBody.h"
00030 #include "xoap/domutils.h"
00031 #include "xoap/Method.h"
00032 
00033 #include "cgicc/CgiDefs.h"
00034 #include "cgicc/Cgicc.h"
00035 #include "cgicc/HTMLClasses.h"
00036 
00037 #include <signal.h>
00038 #include <iostream>
00039 #include <sstream>
00040 
00041 
00042 using namespace std;
00043 using namespace evf;
00044 
00045 
00047 // construction/destruction
00049 
00050 //______________________________________________________________________________
00051 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00052   : xdaq::Application(s)
00053   , fsm_(this)
00054   , gui_(0)
00055   , log_(getApplicationLogger())
00056   , bu_(0)
00057   , sm_(0)
00058   , i2oPool_(0)
00059   , resourceTable_(0)
00060   , wlMonitoring_(0)
00061   , asMonitoring_(0)
00062   , wlWatching_(0)
00063   , asWatching_(0)
00064   , instance_(0)
00065   , runNumber_(0)
00066   , deltaT_(0.0)
00067   , deltaN_(0)
00068   , deltaSumOfSquares_(0)
00069   , deltaSumOfSizes_(0)
00070   , throughput_(0.0)
00071   , rate_(0.0)
00072   , average_(0.0)
00073   , rms_(0.0)
00074   , nbAllocatedEvents_(0)
00075   , nbPendingRequests_(0)
00076   , nbReceivedEvents_(0)
00077   , nbSentEvents_(0)
00078   , nbSentErrorEvents_(0)
00079   , nbPendingSMDiscards_(0)
00080   , nbDiscardedEvents_(0)
00081   , nbLostEvents_(0)
00082   , nbDataErrors_(0)
00083   , nbCrcErrors_(0)
00084   , segmentationMode_(false)
00085   , nbClients_(0)
00086   , clientPrcIds_("")
00087   , nbRawCells_(16)
00088   , nbRecoCells_(8)
00089   , nbDqmCells_(8)
00090   , rawCellSize_(0x400000)  // 4MB
00091   , recoCellSize_(0x800000) // 8MB
00092   , dqmCellSize_(0x800000)  // 8MB
00093   , doDropEvents_(false)
00094   , doFedIdCheck_(true)
00095   , doCrcCheck_(1)
00096   , doDumpEvents_(0)
00097   , buClassName_("BU")
00098   , buInstance_(0)
00099   , smClassName_("StorageManager")
00100   , smInstance_(0)
00101   , monSleepSec_(2)
00102   , watchSleepSec_(10)
00103   , timeOutSec_(30)
00104   , processKillerEnabled_(true)
00105   , useEvmBoard_(true)
00106   , reasonForFailed_("")
00107   , nbAllocateSent_(0)
00108   , nbTakeReceived_(0)
00109   , nbDataDiscardReceived_(0)
00110   , nbDqmDiscardReceived_(0)
00111   , nbSentLast_(0)
00112   , sumOfSquaresLast_(0)
00113   , sumOfSizesLast_(0)
00114   , lock_(toolbox::BSem::FULL)
00115 {
00116   // setup finite state machine (binding relevant callbacks)
00117   fsm_.initialize<evf::FUResourceBroker>(this);
00118   
00119   // set url, class, instance, and sourceId (=class_instance)
00120   url_     =
00121     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00122     getApplicationDescriptor()->getURN();
00123   class_   =getApplicationDescriptor()->getClassName();
00124   instance_=getApplicationDescriptor()->getInstance();
00125   sourceId_=class_.toString()+"_"+instance_.toString();
00126   
00127   // bind i2o callbacks
00128   i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00129             I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00130   i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00131             I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00132   i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00133             I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00134   
00135   
00136   // bind HyperDAQ web pages
00137   xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00138   gui_=new WebGUI(this,&fsm_);
00139   vector<toolbox::lang::Method*> methods=gui_->getMethods();
00140   vector<toolbox::lang::Method*>::iterator it;
00141   for (it=methods.begin();it!=methods.end();++it) {
00142     if ((*it)->type()=="cgi") {
00143       string name=static_cast<xgi::MethodSignature*>(*it)->name();
00144       xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00145     }
00146   }
00147   xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00148   
00149 
00150   // allocate i2o memery pool
00151   string i2oPoolName=sourceId_+"_i2oPool";
00152   try {
00153     toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00154     toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00155     toolbox::mem::MemoryPoolFactory* poolFactory=
00156       toolbox::mem::getMemoryPoolFactory();
00157     i2oPool_=poolFactory->createPool(urn,allocator);
00158   }
00159   catch (toolbox::mem::exception::Exception& e) {
00160     string s="Failed to create pool: "+i2oPoolName;
00161     LOG4CPLUS_FATAL(log_,s);
00162     XCEPT_RETHROW(xcept::Exception,s,e);
00163   }
00164   
00165   // publish all parameters to app info space
00166   exportParameters();
00167   
00168   // findRcmsStateListener
00169   fsm_.findRcmsStateListener();
00170   
00171   // set application icon for hyperdaq
00172   getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00173   //FUResource::useEvmBoard_ = useEvmBoard_;
00174 }
00175 
00176 
00177 //______________________________________________________________________________
00178 FUResourceBroker::~FUResourceBroker()
00179 {
00180 
00181 }
00182 
00183 
00184 
00186 // implementation of member functions
00188    
00189 //______________________________________________________________________________
00190 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00191 {
00192   try {
00193     LOG4CPLUS_INFO(log_, "Start configuring ...");
00194     connectToBUandSM();
00195     resourceTable_=new FUResourceTable(segmentationMode_.value_,
00196                                        nbRawCells_.value_,
00197                                        nbRecoCells_.value_,
00198                                        nbDqmCells_.value_,
00199                                        rawCellSize_.value_,
00200                                        recoCellSize_.value_,
00201                                        dqmCellSize_.value_,
00202                                        bu_,sm_,
00203                                        log_);
00204     FUResource::doFedIdCheck(doFedIdCheck_);
00205     FUResource::useEvmBoard(useEvmBoard_);
00206     resourceTable_->setDoCrcCheck(doCrcCheck_);
00207     resourceTable_->setDoDumpEvents(doDumpEvents_);
00208     reset();
00209     LOG4CPLUS_INFO(log_, "Finished configuring!");
00210     
00211     fsm_.fireEvent("ConfigureDone",this);
00212   }
00213   catch (xcept::Exception &e) {
00214     std::string msg  = "configuring FAILED: " + (string)e.what();
00215     reasonForFailed_ = e.what();
00216     fsm_.fireFailed(msg,this);
00217   }
00218   
00219   return false;
00220 }
00221 
00222 
00223 //______________________________________________________________________________
00224 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00225 {
00226   try {
00227     LOG4CPLUS_INFO(log_, "Start enabling ...");
00228     startMonitoringWorkLoop();
00229     startWatchingWorkLoop();
00230     resourceTable_->setRunNumber(runNumber_);
00231     resourceTable_->resetCounters();
00232     resourceTable_->startDiscardWorkLoop();
00233     resourceTable_->startSendDataWorkLoop();
00234     resourceTable_->startSendDqmWorkLoop();
00235     resourceTable_->sendAllocate();
00236     LOG4CPLUS_INFO(log_, "Finished enabling!");
00237     fsm_.fireEvent("EnableDone",this);
00238   }
00239   catch (xcept::Exception &e) {
00240     std::string msg  = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00241     reasonForFailed_ = e.what();
00242     fsm_.fireFailed(msg,this);
00243   }
00244   
00245   return false;
00246 }
00247 
00248 
00249 //______________________________________________________________________________
00250 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00251 {
00252   try {
00253     LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00254     resourceTable_->stop();
00255     UInt_t count = 0;
00256     while (count<10) {
00257       if (resourceTable_->isReadyToShutDown()) {
00258         LOG4CPLUS_INFO(log_,"ResourceTable successfully shutdown ("<<count+1<<").");
00259         break;
00260       }
00261       else {
00262         count++;
00263         LOG4CPLUS_DEBUG(log_,"Waiting for ResourceTable to shutdown ("<<count<<")");
00264         ::sleep(1);
00265       }
00266     }
00267     
00268     if (count<10) {
00269       LOG4CPLUS_INFO(log_, "Finished stopping!");
00270       fsm_.fireEvent("StopDone",this);
00271     }
00272     else {
00273       std::string msg  = "stopping FAILED: ResourceTable shutdown timed out.";
00274       reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT.";
00275       fsm_.fireFailed(msg,this);
00276     }
00277   }
00278   catch (xcept::Exception &e) {
00279     std::string msg  = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00280     reasonForFailed_ = e.what();
00281     fsm_.fireFailed(msg,this);
00282   }
00283   
00284   return false;
00285 }
00286 
00287 
00288 //______________________________________________________________________________
00289 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00290 {
00291   try {
00292     LOG4CPLUS_INFO(log_, "Start halting ...");
00293     if (resourceTable_->isActive()) {
00294       resourceTable_->halt();
00295       UInt_t count = 0;
00296       while (count<10) {
00297         if (resourceTable_->isReadyToShutDown()) {
00298           lock();
00299           delete resourceTable_;
00300           resourceTable_=0;
00301           unlock();
00302           LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00303           break;
00304         }
00305         else {
00306           count++;
00307           LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00308           ::sleep(1);
00309         }
00310       }
00311     }
00312     else {
00313       lock();
00314       delete resourceTable_;
00315       resourceTable_=0;
00316       unlock();
00317     }
00318     
00319     if (0==resourceTable_) {
00320       LOG4CPLUS_INFO(log_,"Finished halting!");
00321       fsm_.fireEvent("HaltDone",this);
00322     }
00323     else {
00324       std::string msg  = "halting FAILED: ResourceTable shutdown timed out.";
00325       reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00326       fsm_.fireFailed(msg,this);
00327     }
00328   }
00329   catch (xcept::Exception &e) {
00330     std::string msg  = "halting FAILED: "+xcept::stdformat_exception_history(e);
00331     reasonForFailed_ = e.what();
00332     fsm_.fireFailed(msg,this);
00333   }
00334   
00335   return false;
00336 }
00337 
00338 
00339 //______________________________________________________________________________
00340 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00341   throw (xoap::exception::Exception)
00342 {
00343   return fsm_.commandCallback(msg);
00344 }
00345 
00346 
00347 //______________________________________________________________________________
00348 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00349 {
00350   nbTakeReceived_.value_++;
00351   bool eventComplete=resourceTable_->buildResource(bufRef);
00352   if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00353   
00354 }
00355 
00356 
00357 //______________________________________________________________________________
00358 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00359 {
00360   nbDataDiscardReceived_.value_++;
00361   resourceTable_->discardDataEvent(bufRef);
00362 }
00363 
00364 
00365 //______________________________________________________________________________
00366 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00367 {
00368   nbDqmDiscardReceived_.value_++;
00369   resourceTable_->discardDqmEvent(bufRef);
00370 }
00371 
00372 
00373 //______________________________________________________________________________
00374 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00375 {
00376   typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00377   typedef AppDescSet_t::iterator            AppDescIter_t;
00378   
00379   // locate input BU
00380   AppDescSet_t setOfBUs=
00381     getApplicationContext()->getDefaultZone()->
00382     getApplicationDescriptors(buClassName_.toString());
00383   
00384   if (0!=bu_) { delete bu_; bu_=0; }
00385   
00386   for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00387     if ((*it)->getInstance()==buInstance_)
00388       bu_=new BUProxy(getApplicationDescriptor(),*it,
00389                       getApplicationContext(),i2oPool_);
00390   
00391   if (0==bu_) {
00392     string msg=sourceId_+" failed to locate input BU!";
00393     XCEPT_RAISE(evf::Exception,msg);
00394   }
00395   
00396   // locate output SM
00397   AppDescSet_t setOfSMs=
00398     getApplicationContext()->getDefaultZone()->
00399     getApplicationDescriptors(smClassName_.toString());
00400   
00401   if (0!=sm_) { delete sm_; sm_=0; }
00402   
00403   for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00404     if ((*it)->getInstance()==smInstance_)
00405       sm_=new SMProxy(getApplicationDescriptor(),*it,
00406                       getApplicationContext(),i2oPool_);
00407   
00408   if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00409 }
00410 
00411 
00412 //______________________________________________________________________________
00413 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00414   throw (xgi::exception::Exception)
00415 {
00416   string name=in->getenv("PATH_INFO");
00417   if (name.empty()) name="defaultWebPage";
00418   static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00419 }
00420 
00421 
00422 //______________________________________________________________________________
00423 void FUResourceBroker::actionPerformed(xdata::Event& e)
00424 {
00425   lock();
00426   
00427   if (0!=resourceTable_) {
00428     
00429     //gui_->monInfoSpace()->lock();
00430     
00431     if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00432       nbClients_          =resourceTable_->nbClients();
00433       clientPrcIds_       =resourceTable_->clientPrcIdsAsString();
00434       nbAllocatedEvents_  =resourceTable_->nbAllocated();
00435       nbPendingRequests_  =resourceTable_->nbPending();
00436       nbReceivedEvents_   =resourceTable_->nbCompleted();
00437       nbSentEvents_       =resourceTable_->nbSent();
00438       nbSentErrorEvents_  =resourceTable_->nbSentError();
00439       nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00440       nbDiscardedEvents_  =resourceTable_->nbDiscarded();
00441       nbLostEvents_       =resourceTable_->nbLost();
00442       nbDataErrors_       =resourceTable_->nbErrors();
00443       nbCrcErrors_        =resourceTable_->nbCrcErrors();
00444       nbAllocateSent_     =resourceTable_->nbAllocSent();
00445     }
00446     else if (e.type()=="ItemChangedEvent") {
00447       
00448       string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00449       
00450       if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00451       if (item=="useEvmBoard")  FUResource::useEvmBoard(useEvmBoard_);
00452       if (item=="doCrcCheck")   resourceTable_->setDoCrcCheck(doCrcCheck_);
00453       if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00454     }
00455     
00456     //gui_->monInfoSpace()->unlock();
00457   }
00458   else {
00459     nbClients_          =0;
00460     clientPrcIds_       ="";
00461     nbAllocatedEvents_  =0;
00462     nbPendingRequests_  =0;
00463     nbReceivedEvents_   =0;
00464     nbSentEvents_       =0;
00465     nbSentErrorEvents_  =0;
00466     nbPendingSMDiscards_=0;
00467     nbDiscardedEvents_  =0;
00468     nbLostEvents_       =0;
00469     nbDataErrors_       =0;
00470     nbCrcErrors_        =0;
00471     nbAllocateSent_     =0;
00472   }
00473   unlock();
00474 }
00475 
00476 
00477 //______________________________________________________________________________
00478 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00479 {
00480   struct timezone timezone;
00481   gettimeofday(&monStartTime_,&timezone);
00482   
00483   try {
00484     wlMonitoring_=
00485       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00486                                                        "waiting");
00487     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00488     asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00489                                       sourceId_+"Monitoring");
00490     wlMonitoring_->submit(asMonitoring_);
00491   }
00492   catch (xcept::Exception& e) {
00493     string msg = "Failed to start workloop 'Monitoring'.";
00494     XCEPT_RETHROW(evf::Exception,msg,e);
00495   }
00496 }
00497 
00498 
00499 //______________________________________________________________________________
00500 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00501 {
00502   unsigned int nbSent;
00503   uint64_t     sumOfSquares;
00504   unsigned int sumOfSizes;
00505   uint64_t     deltaSumOfSquares;
00506 
00507   lock();
00508   if (0==resourceTable_) {
00509     deltaT_.value_           =0.0;
00510     deltaN_.value_           =  0;
00511     deltaSumOfSquares_.value_=0.0;
00512     deltaSumOfSizes_.value_  =  0;
00513     throughput_              =0.0;
00514     rate_                    =0.0;
00515     average_                 =0.0;
00516     rms_                     =0.0;
00517     unlock();    
00518     return false;
00519   }
00520   else {
00521     nbSent      =resourceTable_->nbSent();
00522     sumOfSquares=resourceTable_->sumOfSquares();
00523     sumOfSizes  =resourceTable_->sumOfSizes();
00524   }
00525   unlock();
00526   
00527   struct timeval  monEndTime;
00528   struct timezone timezone;
00529   
00530   gettimeofday(&monEndTime,&timezone);
00531   
00532   xdata::getInfoSpaceFactory()->lock();
00533   gui_->monInfoSpace()->lock();
00534   
00535   deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00536   monStartTime_=monEndTime;
00537   
00538   deltaN_.value_=nbSent-nbSentLast_;
00539   nbSentLast_=nbSent;
00540   
00541   deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00542   deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00543   sumOfSquaresLast_=sumOfSquares;
00544   
00545   deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00546   sumOfSizesLast_=sumOfSizes;
00547   
00548   if (deltaT_.value_!=0) {
00549     throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00550     rate_      =deltaN_.value_/deltaT_.value_;
00551   }
00552   else {
00553     throughput_=0.0;
00554     rate_      =0.0;
00555   }
00556   
00557   double meanOfSquares,mean,squareOfMean,variance;
00558   
00559   if(deltaN_.value_!=0) {
00560     meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00561     mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00562     squareOfMean=mean*mean;
00563     variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00564     
00565     average_=deltaSumOfSizes_.value_/deltaN_.value_;
00566     rms_    =std::sqrt(variance);
00567   }
00568   else {
00569     average_=0.0;
00570     rms_    =0.0;
00571   }
00572   
00573   gui_->monInfoSpace()->unlock();  
00574   xdata::getInfoSpaceFactory()->unlock();
00575     
00576   ::sleep(monSleepSec_.value_);
00577   
00578   return true;
00579 }
00580 
00581 
00582 //______________________________________________________________________________
00583 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00584 {
00585   try {
00586     wlWatching_=
00587       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00588                                                        "waiting");
00589     if (!wlWatching_->isActive()) wlWatching_->activate();
00590     asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00591                                     sourceId_+"Watching");
00592     wlWatching_->submit(asWatching_);
00593   }
00594   catch (xcept::Exception& e) {
00595     string msg = "Failed to start workloop 'Watching'.";
00596     XCEPT_RETHROW(evf::Exception,msg,e);
00597   }
00598 }
00599 
00600 
00601 //______________________________________________________________________________
00602 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00603 {
00604   lock();
00605   
00606   if (0==resourceTable_) {
00607     unlock();
00608     return false;
00609   }
00610   
00611   vector<pid_t> prcids=resourceTable_->clientPrcIds();
00612   for (UInt_t i=0;i<prcids.size();i++) {
00613     pid_t pid   =prcids[i];
00614     int   status=kill(pid,0);
00615     if (status!=0) {
00616       LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send raw data to err stream.");
00617       resourceTable_->handleCrashedEP(runNumber_,pid);
00618     }
00619   }
00620   
00621   vector<pid_t>  evt_prcids =resourceTable_->cellPrcIds();
00622   vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00623   vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps(); 
00624   
00625   time_t tcurr=time(0);  
00626   for (UInt_t i=0;i<evt_tstamps.size();i++) {
00627     pid_t  pid   =evt_prcids[i];
00628     UInt_t evt   =evt_numbers[i];
00629     time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00630     double tdiff =difftime(tcurr,tstamp);
00631     if (tdiff>timeOutSec_) {
00632       if(processKillerEnabled_) {
00633         LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
00634         kill(pid,9);
00635       }
00636       else {
00637         LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
00638                        <<timeOutSec_<<"sec for process "<<pid);
00639       }
00640     }
00641   }
00642   
00643   unlock();
00644   
00645   ::sleep(watchSleepSec_.value_);
00646   
00647   return true;
00648 }
00649     
00650 
00651 //______________________________________________________________________________
00652 void FUResourceBroker::exportParameters()
00653 {
00654   assert(0!=gui_);
00655   
00656   gui_->addMonitorParam("url",                      &url_);
00657   gui_->addMonitorParam("class",                    &class_);
00658   gui_->addMonitorParam("instance",                 &instance_);
00659   gui_->addMonitorParam("runNumber",                &runNumber_);
00660   gui_->addMonitorParam("stateName",                 fsm_.stateName());
00661 
00662   gui_->addMonitorParam("deltaT",                   &deltaT_);
00663   gui_->addMonitorParam("deltaN",                   &deltaN_);
00664   gui_->addMonitorParam("deltaSumOfSquares",        &deltaSumOfSquares_);
00665   gui_->addMonitorParam("deltaSumOfSizes",          &deltaSumOfSizes_);
00666     
00667   gui_->addMonitorParam("throughput",               &throughput_);
00668   gui_->addMonitorParam("rate",                     &rate_);
00669   gui_->addMonitorParam("average",                  &average_);
00670   gui_->addMonitorParam("rms",                      &rms_);
00671   
00672   gui_->addMonitorCounter("nbAllocatedEvents",      &nbAllocatedEvents_);
00673   gui_->addMonitorCounter("nbPendingRequests",      &nbPendingRequests_);
00674   gui_->addMonitorCounter("nbReceivedEvents",       &nbReceivedEvents_);
00675   gui_->addMonitorCounter("nbSentEvents",           &nbSentEvents_);
00676   gui_->addMonitorCounter("nbSentErrorEvents",      &nbSentErrorEvents_);
00677   gui_->addMonitorCounter("nbPendingSMDiscards",    &nbPendingSMDiscards_);
00678   gui_->addMonitorCounter("nbDiscardedEvents",      &nbDiscardedEvents_);
00679   gui_->addMonitorCounter("nbLostEvents",           &nbLostEvents_);
00680   gui_->addMonitorCounter("nbDataErrors",           &nbDataErrors_);
00681   gui_->addMonitorCounter("nbCrcErrors",            &nbCrcErrors_);
00682 
00683   gui_->addStandardParam("segmentationMode",        &segmentationMode_);
00684   gui_->addStandardParam("nbClients",               &nbClients_);
00685   gui_->addStandardParam("clientPrcIds",            &clientPrcIds_);
00686   gui_->addStandardParam("nbRawCells",              &nbRawCells_);
00687   gui_->addStandardParam("nbRecoCells",             &nbRecoCells_);
00688   gui_->addStandardParam("nbDqmCells",              &nbDqmCells_);
00689   gui_->addStandardParam("rawCellSize",             &rawCellSize_);
00690   gui_->addStandardParam("recoCellSize",            &recoCellSize_);
00691   gui_->addStandardParam("dqmCellSize",             &dqmCellSize_);
00692 
00693   gui_->addStandardParam("doDropEvents",            &doDropEvents_);
00694   gui_->addStandardParam("doFedIdCheck",            &doFedIdCheck_);
00695   gui_->addStandardParam("doCrcCheck",              &doCrcCheck_);
00696   gui_->addStandardParam("doDumpEvents",            &doDumpEvents_);
00697   gui_->addStandardParam("buClassName",             &buClassName_);
00698   gui_->addStandardParam("buInstance",              &buInstance_);
00699   gui_->addStandardParam("smClassName",             &smClassName_);
00700   gui_->addStandardParam("smInstance",              &smInstance_);
00701   gui_->addStandardParam("monSleepSec",             &monSleepSec_);
00702   gui_->addStandardParam("watchSleepSec",           &watchSleepSec_);
00703   gui_->addStandardParam("timeOutSec",              &timeOutSec_);
00704   gui_->addStandardParam("processKillerEnabled",    &processKillerEnabled_);
00705   gui_->addStandardParam("useEvmBoard",             &useEvmBoard_);
00706   gui_->addStandardParam("rcmsStateListener",        fsm_.rcmsStateListener());
00707   gui_->addStandardParam("foundRcmsStateListener",   fsm_.foundRcmsStateListener());
00708   gui_->addStandardParam("reasonForFailed",         &reasonForFailed_);
00709   
00710   gui_->addDebugCounter("nbAllocateSent",           &nbAllocateSent_);
00711   gui_->addDebugCounter("nbTakeReceived",           &nbTakeReceived_);
00712   gui_->addDebugCounter("nbDataDiscardReceived",    &nbDataDiscardReceived_);
00713   gui_->addDebugCounter("nbDqmDiscardReceived",     &nbDqmDiscardReceived_);
00714 
00715   gui_->exportParameters();
00716 
00717   gui_->addItemChangedListener("doFedIdCheck",      this);
00718   gui_->addItemChangedListener("useEvmBoard",       this);
00719   gui_->addItemChangedListener("doCrcCheck",        this);
00720   gui_->addItemChangedListener("doDumpEvents",      this);
00721 }
00722 
00723 
00724 //______________________________________________________________________________
00725 void FUResourceBroker::reset()
00726 {
00727   gui_->resetCounters();
00728   
00729   deltaT_           =0.0;
00730   deltaN_           =  0;
00731   deltaSumOfSquares_=0.0;
00732   deltaSumOfSizes_  =  0;
00733   
00734   throughput_       =0.0;
00735   rate_             =0.0;
00736   average_          =0.0;
00737   rms_              =0.0;
00738   
00739   nbSentLast_       =  0;
00740   sumOfSquaresLast_ =  0;
00741   sumOfSizesLast_   =  0;
00742 }
00743 
00744 
00745 //______________________________________________________________________________
00746 double FUResourceBroker::deltaT(const struct timeval *start,
00747                                 const struct timeval *end)
00748 {
00749   unsigned int  sec;
00750   unsigned int  usec;
00751   
00752   sec = end->tv_sec - start->tv_sec;
00753   
00754   if(end->tv_usec > start->tv_usec) {
00755     usec = end->tv_usec - start->tv_usec;
00756   }
00757   else {
00758     sec--;
00759     usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00760   }
00761   
00762   return ((double)sec) + ((double)usec) / 1000000.0;
00763 }
00764 
00765 
00766 
00767 //______________________________________________________________________________
00768 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00769   throw (xgi::exception::Exception)
00770 {
00771   using namespace cgicc;
00772   
00773   *out<<"<html>"<<endl;
00774   gui_->htmlHead(in,out,sourceId_);
00775   *out<<"<body>"<<endl;
00776   gui_->htmlHeadline(in,out);
00777 
00778   lock();
00779   
00780   if (0!=resourceTable_) {
00781 
00782     vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00783     *out<<table().set("frame","void").set("rules","rows")
00784                  .set("class","modules").set("width","250")<<endl
00785         <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00786         <<tr()
00787         <<th("client").set("align","left")
00788         <<th("process id").set("align","center")
00789         <<th("status").set("align","center")
00790         <<tr()
00791         <<endl;
00792     for (UInt_t i=0;i<client_prc_ids.size();i++) {
00793 
00794       pid_t pid   =client_prc_ids[i];
00795       int   status=kill(pid,0);
00796 
00797       stringstream ssi;      ssi<<i+1;
00798       stringstream sspid;    sspid<<pid;
00799       stringstream ssstatus; ssstatus<<status;
00800       
00801       string bg_status = (status==0) ? "#00ff00" : "ff0000";
00802       *out<<tr()
00803           <<td(ssi.str()).set("align","left")
00804           <<td(sspid.str()).set("align","center")
00805           <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00806           <<tr()<<endl;
00807     }
00808     *out<<table()<<endl;
00809     *out<<"<br><br>"<<endl;
00810 
00811     vector<string> states      = resourceTable_->cellStates();
00812     vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00813     vector<pid_t>  prc_ids     = resourceTable_->cellPrcIds();
00814     vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00815 
00816     *out<<table().set("frame","void").set("rules","rows")
00817                  .set("class","modules").set("width","500")<<endl
00818         <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00819         <<tr()
00820         <<th("cell").set("align","left")
00821         <<th("state").set("align","center")
00822         <<th("event").set("align","center")
00823         <<th("process id").set("align","center")
00824         <<th("timestamp").set("align","center")
00825         <<th("time").set("align","center")
00826         <<tr()
00827         <<endl;
00828     for (UInt_t i=0;i<states.size();i++) {
00829       string state=states[i];
00830       UInt_t evt   = evt_numbers[i];
00831       pid_t  pid   = prc_ids[i];
00832       time_t tstamp= time_stamps[i];
00833       double tdiff = difftime(time(0),tstamp);
00834       
00835       stringstream ssi;      ssi<<i;
00836       stringstream ssevt;    if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00837       stringstream sspid;    if (pid!=0) sspid<<pid; else sspid<<" - ";
00838       stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00839       stringstream sstdiff;  if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00840       
00841       string bg_state = "#ffffff";
00842       if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00843           state=="RAWREADING"||state=="RAWREAD")
00844         bg_state="#99CCff";
00845       else if (state=="PROCESSING")
00846         bg_state="#ff0000";
00847       else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00848         bg_state="#CCff99";
00849       else if (state=="SENDING")
00850         bg_state="#00FF33";
00851       else if (state=="SENT")
00852         bg_state="#006633";
00853       else if (state=="DISCARDING")
00854         bg_state="#FFFF00";
00855       
00856       *out<<tr()
00857           <<td(ssi.str()).set("align","left")
00858           <<td(state).set("align","center").set("bgcolor",bg_state)
00859           <<td(ssevt.str()).set("align","center")
00860           <<td(sspid.str()).set("align","center")
00861           <<td(sststamp.str()).set("align","center")
00862           <<td(sstdiff.str()).set("align","center")
00863           <<tr()<<endl;
00864     }
00865     *out<<table()<<endl;
00866 
00867     
00868   }
00869   *out<<"</body>"<<endl<<"</html>"<<endl;
00870   
00871   unlock();
00872 }
00873 
00874 
00875 
00877 // XDAQ instantiator implementation macro
00879 
00880 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)

Generated on Tue Jun 9 17:34:46 2009 for CMSSW by  doxygen 1.5.4