CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/ResourceBroker/src/FUResourceBroker.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceBroker
00004 // ----------------
00005 //
00006 //            10/20/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011 
00012 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00013 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00014 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00015 
00016 #include "FWCore/Utilities/interface/CRC16.h"
00017 
00018 #include "EvffedFillerRB.h"
00019 
00020 #include "i2o/Method.h"
00021 #include "interface/shared/i2oXFunctionCodes.h"
00022 #include "interface/evb/i2oEVBMsgs.h"
00023 #include "xcept/tools.h"
00024 
00025 #include "toolbox/mem/HeapAllocator.h"
00026 #include "toolbox/mem/Reference.h"
00027 #include "toolbox/mem/MemoryPoolFactory.h"
00028 #include "toolbox/mem/exception/Exception.h"
00029 
00030 #include "xoap/MessageReference.h"
00031 #include "xoap/MessageFactory.h"
00032 #include "xoap/SOAPEnvelope.h"
00033 #include "xoap/SOAPBody.h"
00034 #include "xoap/domutils.h"
00035 #include "xoap/Method.h"
00036 
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040 #include "cgicc/HTMLClasses.h"
00041 
00042 #include <signal.h>
00043 #include <iostream>
00044 #include <sstream>
00045 
00046 
00047 using namespace std;
00048 using namespace evf;
00049 
00050 
00052 // construction/destruction
00054 
00055 //______________________________________________________________________________
00056 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00057   : xdaq::Application(s)
00058   , fsm_(this)
00059   , gui_(0)
00060   , log_(getApplicationLogger())
00061   , bu_(0)
00062   , sm_(0)
00063   , i2oPool_(0)
00064   , resourceTable_(0)
00065   , wlMonitoring_(0)
00066   , asMonitoring_(0)
00067   , wlWatching_(0)
00068   , asWatching_(0)
00069   , instance_(0)
00070   , runNumber_(0)
00071   , deltaT_(0.0)
00072   , deltaN_(0)
00073   , deltaSumOfSquares_(0)
00074   , deltaSumOfSizes_(0)
00075   , throughput_(0.0)
00076   , rate_(0.0)
00077   , average_(0.0)
00078   , rms_(0.0)
00079   , nbAllocatedEvents_(0)
00080   , nbPendingRequests_(0)
00081   , nbReceivedEvents_(0)
00082   , nbSentEvents_(0)
00083   , nbSentDqmEvents_(0)
00084   , nbSentErrorEvents_(0)
00085   , nbPendingSMDiscards_(0)
00086   , nbPendingSMDqmDiscards_(0)
00087   , nbDiscardedEvents_(0)
00088   , nbLostEvents_(0)
00089   , nbDataErrors_(0)
00090   , nbCrcErrors_(0)
00091   , nbTimeoutsWithEvent_(0)
00092   , nbTimeoutsWithoutEvent_(0)
00093   , dataErrorFlag_(0)
00094   , segmentationMode_(false)
00095   , nbClients_(0)
00096   , clientPrcIds_("")
00097   , nbRawCells_(16)
00098   , nbRecoCells_(8)
00099   , nbDqmCells_(8)
00100   , rawCellSize_(0x400000)  // 4MB
00101   , recoCellSize_(0x800000) // 8MB
00102   , dqmCellSize_(0x800000)  // 8MB
00103   , doDropEvents_(false)
00104   , doFedIdCheck_(true)
00105   , doCrcCheck_(1)
00106   , doDumpEvents_(0)
00107   , buClassName_("BU")
00108   , buInstance_(0)
00109   , smClassName_("StorageManager")
00110   , smInstance_(0)
00111   , shmResourceTableTimeout_(200000)
00112   , monSleepSec_(2)
00113   , watchSleepSec_(10)
00114   , timeOutSec_(30)
00115   , processKillerEnabled_(true)
00116   , useEvmBoard_(true)
00117   , reasonForFailed_("")
00118   , nbAllocateSent_(0)
00119   , nbTakeReceived_(0)
00120   , nbDataDiscardReceived_(0)
00121   , nbDqmDiscardReceived_(0)
00122   , nbSentLast_(0)
00123   , sumOfSquaresLast_(0)
00124   , sumOfSizesLast_(0)
00125   , lock_(toolbox::BSem::FULL)
00126   , frb_(0)
00127   , shmInconsistent_(false)
00128 {
00129   // setup finite state machine (binding relevant callbacks)
00130   fsm_.initialize<evf::FUResourceBroker>(this);
00131   
00132   // set url, class, instance, and sourceId (=class_instance)
00133   url_     =
00134     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00135     getApplicationDescriptor()->getURN();
00136   class_   =getApplicationDescriptor()->getClassName();
00137   instance_=getApplicationDescriptor()->getInstance();
00138   sourceId_=class_.toString()+"_"+instance_.toString();
00139   
00140   // bind i2o callbacks
00141   i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00142             I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00143   i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00144             I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00145   i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00146             I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00147   i2o::bind(this,&FUResourceBroker::I2O_EVM_LUMISECTION_Callback,
00148             I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
00149   
00150   
00151   // bind HyperDAQ web pages
00152   xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00153   gui_=new WebGUI(this,&fsm_);
00154   vector<toolbox::lang::Method*> methods=gui_->getMethods();
00155   vector<toolbox::lang::Method*>::iterator it;
00156   for (it=methods.begin();it!=methods.end();++it) {
00157     if ((*it)->type()=="cgi") {
00158       string name=static_cast<xgi::MethodSignature*>(*it)->name();
00159       xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00160     }
00161   }
00162   xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00163   
00164 
00165   // allocate i2o memery pool
00166   string i2oPoolName=sourceId_+"_i2oPool";
00167   try {
00168     toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00169     toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00170     toolbox::mem::MemoryPoolFactory* poolFactory=
00171       toolbox::mem::getMemoryPoolFactory();
00172     i2oPool_=poolFactory->createPool(urn,allocator);
00173   }
00174   catch (toolbox::mem::exception::Exception& e) {
00175     string s="Failed to create pool: "+i2oPoolName;
00176     LOG4CPLUS_FATAL(log_,s);
00177     XCEPT_RETHROW(xcept::Exception,s,e);
00178   }
00179   
00180   // publish all parameters to app info space
00181   exportParameters();
00182   
00183   // findRcmsStateListener
00184   fsm_.findRcmsStateListener();
00185   
00186   // set application icon for hyperdaq
00187   getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00188   //FUResource::useEvmBoard_ = useEvmBoard_;
00189 }
00190 
00191 
00192 //______________________________________________________________________________
00193 FUResourceBroker::~FUResourceBroker()
00194 {
00195 
00196 }
00197 
00198 
00199 
00201 // implementation of member functions
00203    
00204 //______________________________________________________________________________
00205 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00206 {
00207   try {
00208     LOG4CPLUS_INFO(log_, "Start configuring ...");
00209     connectToBUandSM();
00210     frb_ = new EvffedFillerRB(this);
00211     configureResources();
00212     if(shmInconsistent_){
00213       std::ostringstream ost;
00214       ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw=" 
00215           << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00216           << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00217       XCEPT_RAISE(evf::Exception,ost.str());
00218     } 
00219     LOG4CPLUS_INFO(log_, "Finished configuring!");
00220     fsm_.fireEvent("ConfigureDone",this);
00221 
00222   }
00223   catch (xcept::Exception &e) {
00224     std::string msg  = "configuring FAILED: " + (string)e.what();
00225     reasonForFailed_ = e.what();
00226     fsm_.fireFailed(msg,this);
00227   }
00228 
00229   return false;
00230 }
00231 
00232 
00233 //______________________________________________________________________________
00234 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00235 {
00236   try {
00237     LOG4CPLUS_INFO(log_, "Start enabling ...");
00238     startMonitoringWorkLoop();
00239     startWatchingWorkLoop();
00240     resourceTable_->setRunNumber(runNumber_);
00241     resourceTable_->resetCounters();
00242     resourceTable_->startDiscardWorkLoop();
00243     resourceTable_->startSendDataWorkLoop();
00244     resourceTable_->startSendDqmWorkLoop();
00245     resourceTable_->sendAllocate();
00246     
00247     nbTimeoutsWithEvent_         = 0;
00248     nbTimeoutsWithoutEvent_      = 0;
00249     dataErrorFlag_               = 0;
00250 
00251     LOG4CPLUS_INFO(log_, "Finished enabling!");
00252     fsm_.fireEvent("EnableDone",this);
00253   }
00254   catch (xcept::Exception &e) {
00255     std::string msg  = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00256     reasonForFailed_ = e.what();
00257     fsm_.fireFailed(msg,this);
00258   }
00259   
00260   return false;
00261 }
00262 
00263 
00264 //______________________________________________________________________________
00265 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00266 {
00267   try {
00268     LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00269     resourceTable_->stop();
00270     timeval now;
00271     timeval then;
00272     gettimeofday(&then,0);
00273     while (!resourceTable_->isReadyToShutDown()) {
00274       ::usleep(shmResourceTableTimeout_.value_*10);
00275       gettimeofday(&now,0);
00276       if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
00277         std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " " 
00278                   <<  shmResourceTableTimeout_.value_/10000 << std::endl;
00279         LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
00280         emergencyStop();
00281         break;
00282       }
00283     }
00284 
00285     if (resourceTable_->isReadyToShutDown()) {
00286       LOG4CPLUS_INFO(log_, "Finished stopping!");
00287       fsm_.fireEvent("StopDone",this);
00288     }
00289   }
00290   catch (xcept::Exception &e) {
00291     std::string msg  = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00292     reasonForFailed_ = e.what();
00293     fsm_.fireFailed(msg,this);
00294   }
00295   
00296   return false;
00297 }
00298 
00299 
00300 //______________________________________________________________________________
00301 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00302 {
00303   try {
00304     LOG4CPLUS_INFO(log_, "Start halting ...");
00305     if (resourceTable_->isActive()) {
00306       resourceTable_->halt();
00307       UInt_t count = 0;
00308       while (count<10) {
00309         if (resourceTable_->isReadyToShutDown()) {
00310           lock();
00311           delete resourceTable_;
00312           resourceTable_=0;
00313           unlock();
00314           LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00315           break;
00316         }
00317         else {
00318           count++;
00319           LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00320 	  ::sleep(1);
00321         }
00322       }
00323     }
00324     else {
00325       lock();
00326       delete resourceTable_;
00327       resourceTable_=0;
00328       unlock();
00329     }
00330     
00331     if (0==resourceTable_) {
00332       LOG4CPLUS_INFO(log_,"Finished halting!");
00333       fsm_.fireEvent("HaltDone",this);
00334     }
00335     else {
00336       std::string msg  = "halting FAILED: ResourceTable shutdown timed out.";
00337       reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00338       fsm_.fireFailed(msg,this);
00339     }
00340   }
00341   catch (xcept::Exception &e) {
00342     std::string msg  = "halting FAILED: "+xcept::stdformat_exception_history(e);
00343     reasonForFailed_ = e.what();
00344     fsm_.fireFailed(msg,this);
00345   }
00346   if(frb_) delete frb_;
00347   return false;
00348 }
00349 
00350 
00351 //______________________________________________________________________________
00352 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00353   throw (xoap::exception::Exception)
00354 {
00355   return fsm_.commandCallback(msg);
00356 }
00357 
00358 
00359 //______________________________________________________________________________
00360 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00361 {
00362   nbTakeReceived_.value_++;
00363   if(fsm_.checkIfEnabled()){
00364     bool eventComplete=resourceTable_->buildResource(bufRef);
00365     if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00366   }
00367   else{
00368     LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state " 
00369                     << fsm_.stateName() << " is being lost");
00370     bufRef->release();
00371   }
00372 }
00373 
00374 //______________________________________________________________________________
00375 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
00376 {
00377 
00378   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg = 
00379     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00380   if(msg->lumiSection==0){
00381     LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
00382     fsm_.fireFailed("EOL message received for ls=0!!! ",this);
00383   }
00384   resourceTable_->postEndOfLumiSection(bufRef); // this method dummy for now
00385 //   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00386 //     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00387   
00388 //   LOG4CPLUS_WARN(log_, "Received END-OF-LS from EVM for LS " << msg->lumiSection);
00389   
00390 }
00391 
00392 
00393 
00394 
00395 //______________________________________________________________________________
00396 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00397 {
00398   nbDataDiscardReceived_.value_++;
00399   resourceTable_->discardDataEvent(bufRef);
00400 }
00401 
00402 
00403 //______________________________________________________________________________
00404 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00405 {
00406   nbDqmDiscardReceived_.value_++;
00407   resourceTable_->discardDqmEvent(bufRef);
00408 }
00409 
00410 
00411 //______________________________________________________________________________
00412 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00413 {
00414   typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00415   typedef AppDescSet_t::iterator            AppDescIter_t;
00416   
00417   // locate input BU
00418   AppDescSet_t setOfBUs=
00419     getApplicationContext()->getDefaultZone()->
00420     getApplicationDescriptors(buClassName_.toString());
00421   
00422   if (0!=bu_) { delete bu_; bu_=0; }
00423   
00424   for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00425     if ((*it)->getInstance()==buInstance_)
00426       bu_=new BUProxy(getApplicationDescriptor(),*it,
00427                       getApplicationContext(),i2oPool_);
00428   
00429   if (0==bu_) {
00430     string msg=sourceId_+" failed to locate input BU!";
00431     XCEPT_RAISE(evf::Exception,msg);
00432   }
00433   
00434   // locate output SM
00435   AppDescSet_t setOfSMs=
00436     getApplicationContext()->getDefaultZone()->
00437     getApplicationDescriptors(smClassName_.toString());
00438   
00439   if (0!=sm_) { delete sm_; sm_=0; }
00440   
00441   for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00442     if ((*it)->getInstance()==smInstance_)
00443       sm_=new SMProxy(getApplicationDescriptor(),*it,
00444                       getApplicationContext(),i2oPool_);
00445   
00446   if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00447 }
00448 
00449 
00450 //______________________________________________________________________________
00451 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00452   throw (xgi::exception::Exception)
00453 {
00454   string name=in->getenv("PATH_INFO");
00455   if (name.empty()) name="defaultWebPage";
00456   static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00457 }
00458 
00459 
00460 //______________________________________________________________________________
00461 void FUResourceBroker::actionPerformed(xdata::Event& e)
00462 {
00463   lock();
00464   
00465   if (0!=resourceTable_) {
00466     
00467     //gui_->monInfoSpace()->lock();
00468     
00469     if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00470       nbClients_          =resourceTable_->nbClients();
00471       clientPrcIds_       =resourceTable_->clientPrcIdsAsString();
00472       nbAllocatedEvents_  =resourceTable_->nbAllocated();
00473       nbPendingRequests_  =resourceTable_->nbPending();
00474       nbReceivedEvents_   =resourceTable_->nbCompleted();
00475       nbSentEvents_       =resourceTable_->nbSent();
00476       nbSentDqmEvents_    =resourceTable_->nbSentDqm();
00477       nbSentErrorEvents_  =resourceTable_->nbSentError();
00478       nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00479       nbPendingSMDqmDiscards_=resourceTable_->nbPendingSMDqmDiscards();
00480       nbDiscardedEvents_  =resourceTable_->nbDiscarded();
00481       nbLostEvents_       =resourceTable_->nbLost();
00482       nbDataErrors_       =resourceTable_->nbErrors();
00483       nbCrcErrors_        =resourceTable_->nbCrcErrors();
00484       nbAllocateSent_     =resourceTable_->nbAllocSent();
00485       dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u + 
00486                                ((nbDataErrors_.value_ != 0u) << 1) +
00487                                ((nbLostEvents_.value_ != 0u) << 2) +
00488                                ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
00489                                ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
00490                                ((nbSentErrorEvents_.value_ != 0u) << 5)
00491                                );
00492     }
00493     else if (e.type()=="ItemChangedEvent") {
00494       
00495       string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00496       
00497       if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00498       if (item=="useEvmBoard")  FUResource::useEvmBoard(useEvmBoard_);
00499       if (item=="doCrcCheck")   resourceTable_->setDoCrcCheck(doCrcCheck_);
00500       if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00501     }
00502     
00503     //gui_->monInfoSpace()->unlock();
00504   }
00505   else {
00506     nbClients_          =0;
00507     clientPrcIds_       ="";
00508     nbAllocatedEvents_  =0;
00509     nbPendingRequests_  =0;
00510     nbReceivedEvents_   =0;
00511     nbSentEvents_       =0;
00512     nbSentDqmEvents_    =0;
00513     nbSentErrorEvents_  =0;
00514     nbPendingSMDiscards_=0;
00515     nbPendingSMDqmDiscards_=0;
00516     nbDiscardedEvents_  =0;
00517     nbLostEvents_       =0;
00518     nbDataErrors_       =0;
00519     nbCrcErrors_        =0;
00520     nbAllocateSent_     =0;
00521   }
00522   unlock();
00523 }
00524 
00525 
00526 //______________________________________________________________________________
00527 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00528 {
00529   struct timezone timezone;
00530   gettimeofday(&monStartTime_,&timezone);
00531   
00532   try {
00533     wlMonitoring_=
00534       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00535                                                        "waiting");
00536     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00537     asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00538                                       sourceId_+"Monitoring");
00539     wlMonitoring_->submit(asMonitoring_);
00540   }
00541   catch (xcept::Exception& e) {
00542     string msg = "Failed to start workloop 'Monitoring'.";
00543     XCEPT_RETHROW(evf::Exception,msg,e);
00544   }
00545 }
00546 
00547 
00548 //______________________________________________________________________________
00549 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00550 {
00551   unsigned int nbSent;
00552   uint64_t     sumOfSquares;
00553   unsigned int sumOfSizes;
00554   uint64_t     deltaSumOfSquares;
00555 
00556   lock();
00557   if (0==resourceTable_) {
00558     deltaT_.value_           =0.0;
00559     deltaN_.value_           =  0;
00560     deltaSumOfSquares_.value_=0.0;
00561     deltaSumOfSizes_.value_  =  0;
00562     throughput_              =0.0;
00563     rate_                    =0.0;
00564     average_                 =0.0;
00565     rms_                     =0.0;
00566     unlock();    
00567     return false;
00568   }
00569   else {
00570     nbSent      =resourceTable_->nbSent();
00571     sumOfSquares=resourceTable_->sumOfSquares();
00572     sumOfSizes  =resourceTable_->sumOfSizes();
00573   }
00574   unlock();
00575   
00576   struct timeval  monEndTime;
00577   struct timezone timezone;
00578   
00579   gettimeofday(&monEndTime,&timezone);
00580   
00581   xdata::getInfoSpaceFactory()->lock();
00582   gui_->monInfoSpace()->lock();
00583   
00584   deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00585   monStartTime_=monEndTime;
00586   
00587   deltaN_.value_=nbSent-nbSentLast_;
00588   nbSentLast_=nbSent;
00589   
00590   deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00591   deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00592   sumOfSquaresLast_=sumOfSquares;
00593   
00594   deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00595   sumOfSizesLast_=sumOfSizes;
00596   
00597   if (deltaT_.value_!=0) {
00598     throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00599     rate_      =deltaN_.value_/deltaT_.value_;
00600   }
00601   else {
00602     throughput_=0.0;
00603     rate_      =0.0;
00604   }
00605   
00606   double meanOfSquares,mean,squareOfMean,variance;
00607   
00608   if(deltaN_.value_!=0) {
00609     meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00610     mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00611     squareOfMean=mean*mean;
00612     variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00613     
00614     average_=deltaSumOfSizes_.value_/deltaN_.value_;
00615     rms_    =std::sqrt(variance);
00616   }
00617   else {
00618     average_=0.0;
00619     rms_    =0.0;
00620   }
00621   
00622   gui_->monInfoSpace()->unlock();  
00623   xdata::getInfoSpaceFactory()->unlock();
00624     
00625   ::sleep(monSleepSec_.value_);
00626   
00627   return true;
00628 }
00629 
00630 
00631 //______________________________________________________________________________
00632 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00633 {
00634   try {
00635     wlWatching_=
00636       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00637                                                        "waiting");
00638     if (!wlWatching_->isActive()) wlWatching_->activate();
00639     asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00640                                     sourceId_+"Watching");
00641     wlWatching_->submit(asWatching_);
00642   }
00643   catch (xcept::Exception& e) {
00644     string msg = "Failed to start workloop 'Watching'.";
00645     XCEPT_RETHROW(evf::Exception,msg,e);
00646   }
00647 }
00648 
00649 
00650 //______________________________________________________________________________
00651 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00652 {
00653   lock();
00654   
00655   if (0==resourceTable_) {
00656     unlock();
00657     return false;
00658   }
00659 
00660   vector<pid_t>  evt_prcids =resourceTable_->cellPrcIds();
00661   vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00662   vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps(); 
00663   
00664   time_t tcurr=time(0);  
00665   for (UInt_t i=0;i<evt_tstamps.size();i++) {
00666     pid_t  pid   =evt_prcids[i];
00667     UInt_t evt   =evt_numbers[i];
00668     time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00669     double tdiff =difftime(tcurr,tstamp);
00670     if (tdiff>timeOutSec_) {
00671       if(processKillerEnabled_) {
00672         LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
00673         kill(pid,9);
00674         nbTimeoutsWithEvent_++;
00675       }
00676       else {
00677         LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
00678                        <<timeOutSec_<<"sec for process "<<pid);
00679       }
00680     }
00681   }
00682   
00683   vector<pid_t> prcids=resourceTable_->clientPrcIds();
00684   for (UInt_t i=0;i<prcids.size();i++) {
00685     pid_t pid   =prcids[i];
00686     int   status=kill(pid,0);
00687     if (status!=0) {
00688       LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
00689       if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00690         nbTimeoutsWithoutEvent_++;
00691     }
00692   }
00693   
00694   if((resourceTable_->nbResources() != nbRawCells_.value_) && !shmInconsistent_){
00695     std::ostringstream ost;
00696     ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw=" 
00697         << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00698         << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00699     XCEPT_DECLARE(evf::Exception,
00700                   sentinelException, ost.str());
00701     notifyQualified("error",sentinelException);
00702     shmInconsistent_ = true;
00703   }
00704 
00705   unlock();
00706   
00707   ::sleep(watchSleepSec_.value_);
00708   return true;
00709 }
00710     
00711 
00712 //______________________________________________________________________________
00713 void FUResourceBroker::exportParameters()
00714 {
00715   assert(0!=gui_);
00716   
00717   gui_->addMonitorParam("url",                      &url_);
00718   gui_->addMonitorParam("class",                    &class_);
00719   gui_->addMonitorParam("instance",                 &instance_);
00720   gui_->addMonitorParam("runNumber",                &runNumber_);
00721   gui_->addMonitorParam("stateName",                 fsm_.stateName());
00722 
00723   gui_->addMonitorParam("deltaT",                   &deltaT_);
00724   gui_->addMonitorParam("deltaN",                   &deltaN_);
00725   gui_->addMonitorParam("deltaSumOfSquares",        &deltaSumOfSquares_);
00726   gui_->addMonitorParam("deltaSumOfSizes",          &deltaSumOfSizes_);
00727     
00728   gui_->addMonitorParam("throughput",               &throughput_);
00729   gui_->addMonitorParam("rate",                     &rate_);
00730   gui_->addMonitorParam("average",                  &average_);
00731   gui_->addMonitorParam("rms",                      &rms_);
00732   gui_->addMonitorParam("dataErrorFlag",            &dataErrorFlag_);
00733   
00734   gui_->addMonitorCounter("nbAllocatedEvents",      &nbAllocatedEvents_);
00735   gui_->addMonitorCounter("nbPendingRequests",      &nbPendingRequests_);
00736   gui_->addMonitorCounter("nbReceivedEvents",       &nbReceivedEvents_);
00737   gui_->addMonitorCounter("nbSentEvents",           &nbSentEvents_);
00738   gui_->addMonitorCounter("nbSentErrorEvents",      &nbSentErrorEvents_);
00739   gui_->addMonitorCounter("nbDiscardedEvents",      &nbDiscardedEvents_);
00740   gui_->addMonitorCounter("nbPendingSMDiscards",    &nbPendingSMDiscards_);
00741 
00742   gui_->addMonitorCounter("nbSentDqmEvents",        &nbSentDqmEvents_);
00743   gui_->addMonitorCounter("nbDqmDiscardReceived",   &nbDqmDiscardReceived_);
00744   gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
00745 
00746   gui_->addMonitorCounter("nbLostEvents",           &nbLostEvents_);
00747   gui_->addMonitorCounter("nbDataErrors",           &nbDataErrors_);
00748   gui_->addMonitorCounter("nbCrcErrors",            &nbCrcErrors_);
00749   gui_->addMonitorCounter("nbTimeoutsWithEvent",    &nbTimeoutsWithEvent_);
00750   gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
00751 
00752   gui_->addStandardParam("segmentationMode",        &segmentationMode_);
00753   gui_->addStandardParam("nbClients",               &nbClients_);
00754   gui_->addStandardParam("clientPrcIds",            &clientPrcIds_);
00755   gui_->addStandardParam("nbRawCells",              &nbRawCells_);
00756   gui_->addStandardParam("nbRecoCells",             &nbRecoCells_);
00757   gui_->addStandardParam("nbDqmCells",              &nbDqmCells_);
00758   gui_->addStandardParam("rawCellSize",             &rawCellSize_);
00759   gui_->addStandardParam("recoCellSize",            &recoCellSize_);
00760   gui_->addStandardParam("dqmCellSize",             &dqmCellSize_);
00761 
00762   gui_->addStandardParam("doDropEvents",            &doDropEvents_);
00763   gui_->addStandardParam("doFedIdCheck",            &doFedIdCheck_);
00764   gui_->addStandardParam("doCrcCheck",              &doCrcCheck_);
00765   gui_->addStandardParam("doDumpEvents",            &doDumpEvents_);
00766   gui_->addStandardParam("buClassName",             &buClassName_);
00767   gui_->addStandardParam("buInstance",              &buInstance_);
00768   gui_->addStandardParam("smClassName",             &smClassName_);
00769   gui_->addStandardParam("smInstance",              &smInstance_);
00770   gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
00771   gui_->addStandardParam("monSleepSec",             &monSleepSec_);
00772   gui_->addStandardParam("watchSleepSec",           &watchSleepSec_);
00773   gui_->addStandardParam("timeOutSec",              &timeOutSec_);
00774   gui_->addStandardParam("processKillerEnabled",    &processKillerEnabled_);
00775   gui_->addStandardParam("useEvmBoard",             &useEvmBoard_);
00776   gui_->addStandardParam("rcmsStateListener",        fsm_.rcmsStateListener());
00777   gui_->addStandardParam("foundRcmsStateListener",   fsm_.foundRcmsStateListener());
00778   gui_->addStandardParam("reasonForFailed",         &reasonForFailed_);
00779   
00780   gui_->addDebugCounter("nbAllocateSent",           &nbAllocateSent_);
00781   gui_->addDebugCounter("nbTakeReceived",           &nbTakeReceived_);
00782   gui_->addDebugCounter("nbDataDiscardReceived",    &nbDataDiscardReceived_);
00783 
00784   gui_->exportParameters();
00785 
00786   gui_->addItemChangedListener("doFedIdCheck",      this);
00787   gui_->addItemChangedListener("useEvmBoard",       this);
00788   gui_->addItemChangedListener("doCrcCheck",        this);
00789   gui_->addItemChangedListener("doDumpEvents",      this);
00790 }
00791 
00792 
00793 //______________________________________________________________________________
00794 void FUResourceBroker::reset()
00795 {
00796   gui_->resetCounters();
00797   
00798   deltaT_           =0.0;
00799   deltaN_           =  0;
00800   deltaSumOfSquares_=0.0;
00801   deltaSumOfSizes_  =  0;
00802   
00803   throughput_       =0.0;
00804   rate_             =0.0;
00805   average_          =0.0;
00806   rms_              =0.0;
00807   
00808   nbSentLast_       =  0;
00809   sumOfSquaresLast_ =  0;
00810   sumOfSizesLast_   =  0;
00811 }
00812 
00813 
00814 //______________________________________________________________________________
00815 double FUResourceBroker::deltaT(const struct timeval *start,
00816                                 const struct timeval *end)
00817 {
00818   unsigned int  sec;
00819   unsigned int  usec;
00820   
00821   sec = end->tv_sec - start->tv_sec;
00822   
00823   if(end->tv_usec > start->tv_usec) {
00824     usec = end->tv_usec - start->tv_usec;
00825   }
00826   else {
00827     sec--;
00828     usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00829   }
00830   
00831   return ((double)sec) + ((double)usec) / 1000000.0;
00832 }
00833 
00834 
00835 
00836 //______________________________________________________________________________
00837 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00838   throw (xgi::exception::Exception)
00839 {
00840   using namespace cgicc;
00841   Cgicc cgi(in);
00842   std::vector<FormEntry> els = cgi.getElements() ;
00843   for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
00844     std::cout << "form entry " << (*it).getValue() << std::endl;
00845 
00846   std::vector<FormEntry> el1;
00847   cgi.getElement("crcError",el1);
00848   *out<<"<html>"<<endl;
00849   gui_->htmlHead(in,out,sourceId_);
00850   *out<<"<body>"<<endl;
00851   gui_->htmlHeadline(in,out);
00852 
00853   lock();
00854   
00855   if (0!=resourceTable_) {
00856     if(el1.size()!=0) {
00857       resourceTable_->injectCRCError();
00858     }
00859     *out << "<form method=\"GET\" action=\"customWebPage\" >";
00860     *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
00861     *out << "</form>" << endl;
00862     *out << "<hr/>" << std::endl;
00863     vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00864     *out<<table().set("frame","void").set("rules","rows")
00865                  .set("class","modules").set("width","250")<<endl
00866         <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00867         <<tr()
00868         <<th("client").set("align","left")
00869         <<th("process id").set("align","center")
00870         <<th("status").set("align","center")
00871         <<tr()
00872         <<endl;
00873     for (UInt_t i=0;i<client_prc_ids.size();i++) {
00874 
00875       pid_t pid   =client_prc_ids[i];
00876       int   status=kill(pid,0);
00877 
00878       stringstream ssi;      ssi<<i+1;
00879       stringstream sspid;    sspid<<pid;
00880       stringstream ssstatus; ssstatus<<status;
00881       
00882       string bg_status = (status==0) ? "#00ff00" : "ff0000";
00883       *out<<tr()
00884           <<td(ssi.str()).set("align","left")
00885           <<td(sspid.str()).set("align","center")
00886           <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00887           <<tr()<<endl;
00888     }
00889     *out<<table()<<endl;
00890     *out<<"<br><br>"<<endl;
00891 
00892     vector<string> states      = resourceTable_->cellStates();
00893     vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00894     vector<pid_t>  prc_ids     = resourceTable_->cellPrcIds();
00895     vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00896 
00897     *out<<table().set("frame","void").set("rules","rows")
00898                  .set("class","modules").set("width","500")<<endl
00899         <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00900         <<tr()
00901         <<th("cell").set("align","left")
00902         <<th("state").set("align","center")
00903         <<th("event").set("align","center")
00904         <<th("process id").set("align","center")
00905         <<th("timestamp").set("align","center")
00906         <<th("time").set("align","center")
00907         <<tr()
00908         <<endl;
00909     for (UInt_t i=0;i<states.size();i++) {
00910       string state=states[i];
00911       UInt_t evt   = evt_numbers[i];
00912       pid_t  pid   = prc_ids[i];
00913       time_t tstamp= time_stamps[i];
00914       double tdiff = difftime(time(0),tstamp);
00915       
00916       stringstream ssi;      ssi<<i;
00917       stringstream ssevt;    if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00918       stringstream sspid;    if (pid!=0) sspid<<pid; else sspid<<" - ";
00919       stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00920       stringstream sstdiff;  if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00921       
00922       string bg_state = "#ffffff";
00923       if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00924           state=="RAWREADING"||state=="RAWREAD")
00925         bg_state="#99CCff";
00926       else if (state=="PROCESSING")
00927         bg_state="#ff0000";
00928       else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00929         bg_state="#CCff99";
00930       else if (state=="SENDING")
00931         bg_state="#00FF33";
00932       else if (state=="SENT")
00933         bg_state="#006633";
00934       else if (state=="DISCARDING")
00935         bg_state="#FFFF00";
00936       else if (state=="LUMISECTION")
00937         bg_state="#0000FF";
00938       
00939       *out<<tr()
00940           <<td(ssi.str()).set("align","left")
00941           <<td(state).set("align","center").set("bgcolor",bg_state)
00942           <<td(ssevt.str()).set("align","center")
00943           <<td(sspid.str()).set("align","center")
00944           <<td(sststamp.str()).set("align","center")
00945           <<td(sstdiff.str()).set("align","center")
00946           <<tr()<<endl;
00947     }
00948     *out<<table()<<endl;
00949     *out<<"<br><br>"<<endl;
00950 
00951     vector<string> dqmstates      = resourceTable_->dqmCellStates();
00952 
00953     *out<<table().set("frame","void").set("rules","rows")
00954                  .set("class","modules").set("width","500")<<endl
00955         <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
00956         <<tr()
00957         <<th("cell").set("align","left")
00958         <<th("state").set("align","center")
00959         <<tr()
00960         <<endl;
00961     for (UInt_t i=0;i<dqmstates.size();i++) {
00962       string state=dqmstates[i];
00963       
00964       string bg_state = "#ffffff";
00965       if (state=="WRITING"||state=="WRITTEN")
00966         bg_state="#99CCff";
00967       else if (state=="SENDING")
00968         bg_state="#00FF33";
00969       else if (state=="SENT")
00970         bg_state="#006633";
00971       else if (state=="DISCARDING")
00972         bg_state="#FFFF00";
00973       
00974       *out<<tr()
00975           <<td(state).set("align","center").set("bgcolor",bg_state)
00976           <<tr()<<endl;
00977     }
00978     *out<<table()<<endl;
00979 
00980 
00981     
00982   }
00983   *out<<"</body>"<<endl<<"</html>"<<endl;
00984   
00985   unlock();
00986 }
00987 
00988 void FUResourceBroker::emergencyStop()
00989 {
00990   LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
00991   vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00992   for (UInt_t i=0;i<client_prc_ids.size();i++) {
00993     pid_t  pid   =client_prc_ids[i];
00994     std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
00995     if(pid!=0){
00996       //assume processes are dead by now
00997       if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00998         nbTimeoutsWithoutEvent_++;
00999       else
01000         nbTimeoutsWithEvent_++;
01001     }
01002   }
01003   resourceTable_->lastResort();
01004   ::sleep(1);
01005   if(!resourceTable_->isReadyToShutDown())
01006     {
01007       reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
01008       XCEPT_RAISE(evf::Exception,reasonForFailed_);
01009     }
01010   resourceTable_->printWorkLoopStatus();
01011   lock();
01012   std::cout << "delete resourcetable" <<std::endl;
01013   delete resourceTable_;
01014   resourceTable_=0;
01015   std::cout << "cycle through resourcetable config " << std::endl;
01016   configureResources();
01017   unlock();
01018   if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
01019   std::cout << "done with emergency stop"  << std::endl;
01020 }
01021 
01022 void FUResourceBroker::configureResources()
01023 {
01024   resourceTable_=new FUResourceTable(segmentationMode_.value_,
01025                                      nbRawCells_.value_,
01026                                      nbRecoCells_.value_,
01027                                      nbDqmCells_.value_,
01028                                      rawCellSize_.value_,
01029                                      recoCellSize_.value_,
01030                                      dqmCellSize_.value_,
01031                                      bu_,sm_,
01032                                      log_, 
01033                                      shmResourceTableTimeout_.value_, 
01034                                      frb_,
01035                                      this);
01036   FUResource::doFedIdCheck(doFedIdCheck_);
01037   FUResource::useEvmBoard(useEvmBoard_);
01038   resourceTable_->setDoCrcCheck(doCrcCheck_);
01039   resourceTable_->setDoDumpEvents(doDumpEvents_);
01040   reset();
01041   shmInconsistent_ = false;
01042   if(resourceTable_->nbResources() != nbRawCells_.value_ || 
01043      resourceTable_->nbFreeSlots() != nbRawCells_.value_)
01044     shmInconsistent_ = true;
01045 }
01046 
01047 
01049 // XDAQ instantiator implementation macro
01051 
01052 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)