CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/ResourceBroker/src/FUResourceBroker.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceBroker
00004 // ----------------
00005 //
00006 //            10/20/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011 
00012 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00013 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00014 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00015 
00016 #include "FWCore/Utilities/interface/CRC16.h"
00017 
00018 #include "EvffedFillerRB.h"
00019 
00020 #include "i2o/Method.h"
00021 #include "interface/shared/i2oXFunctionCodes.h"
00022 #include "interface/evb/i2oEVBMsgs.h"
00023 #include "xcept/tools.h"
00024 
00025 #include "toolbox/mem/HeapAllocator.h"
00026 #include "toolbox/mem/Reference.h"
00027 #include "toolbox/mem/MemoryPoolFactory.h"
00028 #include "toolbox/mem/exception/Exception.h"
00029 
00030 #include "xoap/MessageReference.h"
00031 #include "xoap/MessageFactory.h"
00032 #include "xoap/SOAPEnvelope.h"
00033 #include "xoap/SOAPBody.h"
00034 #include "xoap/domutils.h"
00035 #include "xoap/Method.h"
00036 
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040 #include "cgicc/HTMLClasses.h"
00041 
00042 #include <signal.h>
00043 #include <iostream>
00044 #include <sstream>
00045 
00046 
00047 using namespace std;
00048 using namespace evf;
00049 
00050 
00052 // construction/destruction
00054 
00055 //______________________________________________________________________________
00056 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00057   : xdaq::Application(s)
00058   , fsm_(this)
00059   , gui_(0)
00060   , log_(getApplicationLogger())
00061   , bu_(0)
00062   , sm_(0)
00063   , i2oPool_(0)
00064   , resourceTable_(0)
00065   , wlMonitoring_(0)
00066   , asMonitoring_(0)
00067   , wlWatching_(0)
00068   , asWatching_(0)
00069   , instance_(0)
00070   , runNumber_(0)
00071   , deltaT_(0.0)
00072   , deltaN_(0)
00073   , deltaSumOfSquares_(0)
00074   , deltaSumOfSizes_(0)
00075   , throughput_(0.0)
00076   , rate_(0.0)
00077   , average_(0.0)
00078   , rms_(0.0)
00079   , nbAllocatedEvents_(0)
00080   , nbPendingRequests_(0)
00081   , nbReceivedEvents_(0)
00082   , nbSentEvents_(0)
00083   , nbSentDqmEvents_(0)
00084   , nbSentErrorEvents_(0)
00085   , nbPendingSMDiscards_(0)
00086   , nbPendingSMDqmDiscards_(0)
00087   , nbDiscardedEvents_(0)
00088   , nbReceivedEol_(0)
00089   , highestEolReceived_(0)
00090   , nbEolPosted_(0)
00091   , nbEolDiscarded_(0)
00092   , nbLostEvents_(0)
00093   , nbDataErrors_(0)
00094   , nbCrcErrors_(0)
00095   , nbTimeoutsWithEvent_(0)
00096   , nbTimeoutsWithoutEvent_(0)
00097   , dataErrorFlag_(0)
00098   , segmentationMode_(false)
00099   , nbClients_(0)
00100   , clientPrcIds_("")
00101   , nbRawCells_(16)
00102   , nbRecoCells_(8)
00103   , nbDqmCells_(8)
00104   , rawCellSize_(0x400000)  // 4MB
00105   , recoCellSize_(0x800000) // 8MB
00106   , dqmCellSize_(0x800000)  // 8MB
00107   , doDropEvents_(false)
00108   , doFedIdCheck_(true)
00109   , doCrcCheck_(1)
00110   , doDumpEvents_(0)
00111   , buClassName_("BU")
00112   , buInstance_(0)
00113   , smClassName_("StorageManager")
00114   , smInstance_(0)
00115   , shmResourceTableTimeout_(200000)
00116   , monSleepSec_(2)
00117   , watchSleepSec_(10)
00118   , timeOutSec_(30)
00119   , processKillerEnabled_(true)
00120   , useEvmBoard_(true)
00121   , reasonForFailed_("")
00122   , nbAllocateSent_(0)
00123   , nbTakeReceived_(0)
00124   , nbDataDiscardReceived_(0)
00125   , nbDqmDiscardReceived_(0)
00126   , nbSentLast_(0)
00127   , sumOfSquaresLast_(0)
00128   , sumOfSizesLast_(0)
00129   , lock_(toolbox::BSem::FULL)
00130   , frb_(0)
00131   , shmInconsistent_(false)
00132 {
00133   // setup finite state machine (binding relevant callbacks)
00134   fsm_.initialize<evf::FUResourceBroker>(this);
00135   
00136   // set url, class, instance, and sourceId (=class_instance)
00137   url_     =
00138     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00139     getApplicationDescriptor()->getURN();
00140   class_   =getApplicationDescriptor()->getClassName();
00141   instance_=getApplicationDescriptor()->getInstance();
00142   sourceId_=class_.toString()+"_"+instance_.toString();
00143   
00144   // bind i2o callbacks
00145   i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00146             I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00147   i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00148             I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00149   i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00150             I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00151   i2o::bind(this,&FUResourceBroker::I2O_EVM_LUMISECTION_Callback,
00152             I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
00153   
00154   
00155   // bind HyperDAQ web pages
00156   xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00157   gui_=new WebGUI(this,&fsm_);
00158   vector<toolbox::lang::Method*> methods=gui_->getMethods();
00159   vector<toolbox::lang::Method*>::iterator it;
00160   for (it=methods.begin();it!=methods.end();++it) {
00161     if ((*it)->type()=="cgi") {
00162       string name=static_cast<xgi::MethodSignature*>(*it)->name();
00163       xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00164     }
00165   }
00166   xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00167   
00168 
00169   // allocate i2o memery pool
00170   string i2oPoolName=sourceId_+"_i2oPool";
00171   try {
00172     toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00173     toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00174     toolbox::mem::MemoryPoolFactory* poolFactory=
00175       toolbox::mem::getMemoryPoolFactory();
00176     i2oPool_=poolFactory->createPool(urn,allocator);
00177   }
00178   catch (toolbox::mem::exception::Exception& e) {
00179     string s="Failed to create pool: "+i2oPoolName;
00180     LOG4CPLUS_FATAL(log_,s);
00181     XCEPT_RETHROW(xcept::Exception,s,e);
00182   }
00183   
00184   // publish all parameters to app info space
00185   exportParameters();
00186   
00187   // findRcmsStateListener
00188   fsm_.findRcmsStateListener();
00189   
00190   // set application icon for hyperdaq
00191   getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00192   //FUResource::useEvmBoard_ = useEvmBoard_;
00193 }
00194 
00195 
00196 //______________________________________________________________________________
00197 FUResourceBroker::~FUResourceBroker()
00198 {
00199 
00200 }
00201 
00202 
00203 
00205 // implementation of member functions
00207    
00208 //______________________________________________________________________________
00209 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00210 {
00211   try {
00212     LOG4CPLUS_INFO(log_, "Start configuring ...");
00213     connectToBUandSM();
00214     frb_ = new EvffedFillerRB(this);
00215     configureResources();
00216     if(shmInconsistent_){
00217       std::ostringstream ost;
00218       ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw=" 
00219           << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00220           << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00221       XCEPT_RAISE(evf::Exception,ost.str());
00222     } 
00223     LOG4CPLUS_INFO(log_, "Finished configuring!");
00224     fsm_.fireEvent("ConfigureDone",this);
00225 
00226   }
00227   catch (xcept::Exception &e) {
00228     std::string msg  = "configuring FAILED: " + (string)e.what();
00229     reasonForFailed_ = e.what();
00230     fsm_.fireFailed(msg,this);
00231   }
00232 
00233   return false;
00234 }
00235 
00236 
00237 //______________________________________________________________________________
00238 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00239 {
00240   try {
00241     LOG4CPLUS_INFO(log_, "Start enabling ...");
00242     reset();
00243     startMonitoringWorkLoop();
00244     startWatchingWorkLoop();
00245     resourceTable_->setRunNumber(runNumber_);
00246     lock();
00247     resourceTable_->resetCounters();
00248     unlock();
00249     resourceTable_->startDiscardWorkLoop();
00250     resourceTable_->startSendDataWorkLoop();
00251     resourceTable_->startSendDqmWorkLoop();
00252     resourceTable_->sendAllocate();
00253     
00254     nbTimeoutsWithEvent_         = 0;
00255     nbTimeoutsWithoutEvent_      = 0;
00256     dataErrorFlag_               = 0;
00257 
00258     LOG4CPLUS_INFO(log_, "Finished enabling!");
00259     fsm_.fireEvent("EnableDone",this);
00260   }
00261   catch (xcept::Exception &e) {
00262     std::string msg  = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00263     reasonForFailed_ = e.what();
00264     fsm_.fireFailed(msg,this);
00265   }
00266   
00267   return false;
00268 }
00269 
00270 
00271 //______________________________________________________________________________
00272 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00273 {
00274   try {
00275     LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00276     resourceTable_->stop();
00277     timeval now;
00278     timeval then;
00279     gettimeofday(&then,0);
00280     while (!resourceTable_->isReadyToShutDown()) {
00281       ::usleep(shmResourceTableTimeout_.value_*10);
00282       gettimeofday(&now,0);
00283       if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
00284         std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " " 
00285                   <<  shmResourceTableTimeout_.value_/10000 << std::endl;
00286         LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
00287         emergencyStop();
00288         break;
00289       }
00290     }
00291 
00292     if (resourceTable_->isReadyToShutDown()) {
00293       LOG4CPLUS_INFO(log_, "Finished stopping!");
00294       fsm_.fireEvent("StopDone",this);
00295     }
00296   }
00297   catch (xcept::Exception &e) {
00298     std::string msg  = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00299     reasonForFailed_ = e.what();
00300     fsm_.fireFailed(msg,this);
00301   }
00302   
00303   return false;
00304 }
00305 
00306 
00307 //______________________________________________________________________________
00308 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00309 {
00310   try {
00311     LOG4CPLUS_INFO(log_, "Start halting ...");
00312     if (resourceTable_->isActive()) {
00313       resourceTable_->halt();
00314       UInt_t count = 0;
00315       while (count<10) {
00316         if (resourceTable_->isReadyToShutDown()) {
00317           lock();
00318           delete resourceTable_;
00319           resourceTable_=0;
00320           unlock();
00321           LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00322           break;
00323         }
00324         else {
00325           count++;
00326           LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00327 	  ::sleep(1);
00328         }
00329       }
00330     }
00331     else {
00332       lock();
00333       delete resourceTable_;
00334       resourceTable_=0;
00335       unlock();
00336     }
00337     
00338     if (0==resourceTable_) {
00339       LOG4CPLUS_INFO(log_,"Finished halting!");
00340       fsm_.fireEvent("HaltDone",this);
00341     }
00342     else {
00343       std::string msg  = "halting FAILED: ResourceTable shutdown timed out.";
00344       reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00345       fsm_.fireFailed(msg,this);
00346     }
00347   }
00348   catch (xcept::Exception &e) {
00349     std::string msg  = "halting FAILED: "+xcept::stdformat_exception_history(e);
00350     reasonForFailed_ = e.what();
00351     fsm_.fireFailed(msg,this);
00352   }
00353   if(frb_) delete frb_;
00354   return false;
00355 }
00356 
00357 
00358 //______________________________________________________________________________
00359 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00360   throw (xoap::exception::Exception)
00361 {
00362   return fsm_.commandCallback(msg);
00363 }
00364 
00365 
00366 //______________________________________________________________________________
00367 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00368 {
00369   nbTakeReceived_.value_++;
00370   if(fsm_.checkIfEnabled()){
00371     bool eventComplete=resourceTable_->buildResource(bufRef);
00372     if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00373   }
00374   else{
00375     LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state " 
00376                     << fsm_.stateName() << " is being lost");
00377     bufRef->release();
00378   }
00379 }
00380 
00381 //______________________________________________________________________________
00382 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
00383 {
00384   if(fsm_.checkIfEnabled()){
00385 
00386     I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg = 
00387     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00388     if(msg->lumiSection==0){
00389       LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
00390       fsm_.fireFailed("EOL message received for ls=0!!! ",this);
00391     }
00392     nbReceivedEol_++;
00393     if(highestEolReceived_.value_+100 < msg->lumiSection) 
00394       {
00395         LOG4CPLUS_ERROR(log_,"EOL message not in sequence, expected " 
00396                         << highestEolReceived_.value_+1
00397                         << " received " << msg->lumiSection);
00398         fsm_.fireFailed("EOL message with corrupted LS ",this);
00399       }
00400     if(highestEolReceived_.value_+1 != msg->lumiSection) 
00401       LOG4CPLUS_WARN(log_,"EOL message not in sequence, expected " 
00402                      << highestEolReceived_.value_+1
00403                      << " received " << msg->lumiSection);
00404     
00405     if(highestEolReceived_.value_ < msg->lumiSection) 
00406       highestEolReceived_.value_ = msg->lumiSection;
00407     resourceTable_->postEndOfLumiSection(bufRef); 
00408   }
00409   else{
00410     LOG4CPLUS_ERROR(log_,"EOL i2o frame received in state " 
00411                     << fsm_.stateName() << " is being lost");
00412   }
00413   bufRef->release();
00414 
00415 //   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00416 //     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00417   
00418 //   LOG4CPLUS_WARN(log_, "Received END-OF-LS from EVM for LS " << msg->lumiSection);
00419   
00420 }
00421 
00422 
00423 
00424 
00425 //______________________________________________________________________________
00426 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00427 {
00428   nbDataDiscardReceived_.value_++;
00429   resourceTable_->discardDataEvent(bufRef);
00430 }
00431 
00432 
00433 //______________________________________________________________________________
00434 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00435 {
00436   nbDqmDiscardReceived_.value_++;
00437   resourceTable_->discardDqmEvent(bufRef);
00438 }
00439 
00440 
00441 //______________________________________________________________________________
00442 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00443 {
00444   typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00445   typedef AppDescSet_t::iterator            AppDescIter_t;
00446   
00447   // locate input BU
00448   AppDescSet_t setOfBUs=
00449     getApplicationContext()->getDefaultZone()->
00450     getApplicationDescriptors(buClassName_.toString());
00451   
00452   if (0!=bu_) { delete bu_; bu_=0; }
00453   
00454   for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00455     if ((*it)->getInstance()==buInstance_)
00456       bu_=new BUProxy(getApplicationDescriptor(),*it,
00457                       getApplicationContext(),i2oPool_);
00458   
00459   if (0==bu_) {
00460     string msg=sourceId_+" failed to locate input BU!";
00461     XCEPT_RAISE(evf::Exception,msg);
00462   }
00463   
00464   // locate output SM
00465   AppDescSet_t setOfSMs=
00466     getApplicationContext()->getDefaultZone()->
00467     getApplicationDescriptors(smClassName_.toString());
00468   
00469   if (0!=sm_) { delete sm_; sm_=0; }
00470   
00471   for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00472     if ((*it)->getInstance()==smInstance_)
00473       sm_=new SMProxy(getApplicationDescriptor(),*it,
00474                       getApplicationContext(),i2oPool_);
00475   
00476   if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00477 }
00478 
00479 
00480 //______________________________________________________________________________
00481 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00482   throw (xgi::exception::Exception)
00483 {
00484   string name=in->getenv("PATH_INFO");
00485   if (name.empty()) name="defaultWebPage";
00486   static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00487 }
00488 
00489 
00490 //______________________________________________________________________________
00491 void FUResourceBroker::actionPerformed(xdata::Event& e)
00492 {
00493   lock();
00494   
00495   if (0!=resourceTable_) {
00496     
00497     //gui_->monInfoSpace()->lock();
00498     
00499     if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00500       nbClients_          =resourceTable_->nbClients();
00501       clientPrcIds_       =resourceTable_->clientPrcIdsAsString();
00502       nbAllocatedEvents_  =resourceTable_->nbAllocated();
00503       nbPendingRequests_  =resourceTable_->nbPending();
00504       nbReceivedEvents_   =resourceTable_->nbCompleted();
00505       nbSentEvents_       =resourceTable_->nbSent();
00506       nbSentDqmEvents_    =resourceTable_->nbSentDqm();
00507       nbSentErrorEvents_  =resourceTable_->nbSentError();
00508       nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00509       nbPendingSMDqmDiscards_=resourceTable_->nbPendingSMDqmDiscards();
00510       nbDiscardedEvents_  =resourceTable_->nbDiscarded();
00511       nbLostEvents_       =resourceTable_->nbLost();
00512       nbEolPosted_        =resourceTable_->nbEolPosted();
00513       nbEolDiscarded_     =resourceTable_->nbEolDiscarded();
00514       nbDataErrors_       =resourceTable_->nbErrors();
00515       nbCrcErrors_        =resourceTable_->nbCrcErrors();
00516       nbAllocateSent_     =resourceTable_->nbAllocSent();
00517       dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u + 
00518                                ((nbDataErrors_.value_ != 0u) << 1) +
00519                                ((nbLostEvents_.value_ != 0u) << 2) +
00520                                ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
00521                                ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
00522                                ((nbSentErrorEvents_.value_ != 0u) << 5)
00523                                );
00524     }
00525     else if (e.type()=="ItemChangedEvent") {
00526       
00527       string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00528       
00529       if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00530       if (item=="useEvmBoard")  FUResource::useEvmBoard(useEvmBoard_);
00531       if (item=="doCrcCheck")   resourceTable_->setDoCrcCheck(doCrcCheck_);
00532       if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00533     }
00534     
00535     //gui_->monInfoSpace()->unlock();
00536   }
00537   else {
00538     nbClients_          =0;
00539     clientPrcIds_       ="";
00540     nbAllocatedEvents_  =0;
00541     nbPendingRequests_  =0;
00542     nbReceivedEvents_   =0;
00543     nbSentEvents_       =0;
00544     nbSentDqmEvents_    =0;
00545     nbSentErrorEvents_  =0;
00546     nbPendingSMDiscards_=0;
00547     nbPendingSMDqmDiscards_=0;
00548     nbDiscardedEvents_  =0;
00549     nbLostEvents_       =0;
00550     nbDataErrors_       =0;
00551     nbCrcErrors_        =0;
00552     nbAllocateSent_     =0;
00553   }
00554   unlock();
00555 }
00556 
00557 
00558 //______________________________________________________________________________
00559 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00560 {
00561   struct timezone timezone;
00562   gettimeofday(&monStartTime_,&timezone);
00563   
00564   try {
00565     wlMonitoring_=
00566       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00567                                                        "waiting");
00568     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00569     asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00570                                       sourceId_+"Monitoring");
00571     wlMonitoring_->submit(asMonitoring_);
00572   }
00573   catch (xcept::Exception& e) {
00574     string msg = "Failed to start workloop 'Monitoring'.";
00575     XCEPT_RETHROW(evf::Exception,msg,e);
00576   }
00577 }
00578 
00579 
00580 //______________________________________________________________________________
00581 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00582 {
00583   unsigned int nbSent;
00584   uint64_t     sumOfSquares;
00585   unsigned int sumOfSizes;
00586   uint64_t     deltaSumOfSquares;
00587 
00588   lock();
00589   if (0==resourceTable_) {
00590     deltaT_.value_           =0.0;
00591     deltaN_.value_           =  0;
00592     deltaSumOfSquares_.value_=0.0;
00593     deltaSumOfSizes_.value_  =  0;
00594     throughput_              =0.0;
00595     rate_                    =0.0;
00596     average_                 =0.0;
00597     rms_                     =0.0;
00598     unlock();    
00599     return false;
00600   }
00601   else {
00602     nbSent      =resourceTable_->nbSent();
00603     sumOfSquares=resourceTable_->sumOfSquares();
00604     sumOfSizes  =resourceTable_->sumOfSizes();
00605   }
00606   unlock();
00607   
00608   struct timeval  monEndTime;
00609   struct timezone timezone;
00610   
00611   gettimeofday(&monEndTime,&timezone);
00612   
00613   xdata::getInfoSpaceFactory()->lock();
00614   gui_->monInfoSpace()->lock();
00615   
00616   deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00617   monStartTime_=monEndTime;
00618   
00619   deltaN_.value_=nbSent-nbSentLast_;
00620   nbSentLast_=nbSent;
00621   
00622   deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00623   deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00624   sumOfSquaresLast_=sumOfSquares;
00625   
00626   deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00627   sumOfSizesLast_=sumOfSizes;
00628   
00629   if (deltaT_.value_!=0) {
00630     throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00631     rate_      =deltaN_.value_/deltaT_.value_;
00632   }
00633   else {
00634     throughput_=0.0;
00635     rate_      =0.0;
00636   }
00637   
00638   double meanOfSquares,mean,squareOfMean,variance;
00639   
00640   if(deltaN_.value_!=0) {
00641     meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00642     mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00643     squareOfMean=mean*mean;
00644     variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00645     
00646     average_=deltaSumOfSizes_.value_/deltaN_.value_;
00647     rms_    =std::sqrt(variance);
00648   }
00649   else {
00650     average_=0.0;
00651     rms_    =0.0;
00652   }
00653   
00654   gui_->monInfoSpace()->unlock();  
00655   xdata::getInfoSpaceFactory()->unlock();
00656     
00657   ::sleep(monSleepSec_.value_);
00658   
00659   return true;
00660 }
00661 
00662 
00663 //______________________________________________________________________________
00664 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00665 {
00666   try {
00667     wlWatching_=
00668       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00669                                                        "waiting");
00670     if (!wlWatching_->isActive()) wlWatching_->activate();
00671     asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00672                                     sourceId_+"Watching");
00673     wlWatching_->submit(asWatching_);
00674   }
00675   catch (xcept::Exception& e) {
00676     string msg = "Failed to start workloop 'Watching'.";
00677     XCEPT_RETHROW(evf::Exception,msg,e);
00678   }
00679 }
00680 
00681 
00682 //______________________________________________________________________________
00683 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00684 {
00685   lock();
00686   
00687   if (0==resourceTable_) {
00688     unlock();
00689     return false;
00690   }
00691 
00692   vector<pid_t>  evt_prcids =resourceTable_->cellPrcIds();
00693   vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00694   vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps(); 
00695   
00696   time_t tcurr=time(0);  
00697   for (UInt_t i=0;i<evt_tstamps.size();i++) {
00698     pid_t  pid   =evt_prcids[i];
00699     UInt_t evt   =evt_numbers[i];
00700     time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00701     double tdiff =difftime(tcurr,tstamp);
00702     if (tdiff>timeOutSec_) {
00703       if(processKillerEnabled_) {
00704         kill(pid,9);
00705         nbTimeoutsWithEvent_++;
00706       }
00707       LOG4CPLUS_ERROR(log_,"evt "<<evt<<" under processing for more than "
00708                         <<timeOutSec_<<"sec for process "<<pid);
00709     }
00710   }
00711   
00712   vector<pid_t> prcids=resourceTable_->clientPrcIds();
00713   for (UInt_t i=0;i<prcids.size();i++) {
00714     pid_t pid   =prcids[i];
00715     int   status=kill(pid,0);
00716     if (status!=0) {
00717       LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
00718       if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00719         nbTimeoutsWithoutEvent_++;
00720     }
00721   }
00722   
00723   if((resourceTable_->nbResources() != nbRawCells_.value_) && !shmInconsistent_){
00724     std::ostringstream ost;
00725     ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw=" 
00726         << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00727         << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00728     XCEPT_DECLARE(evf::Exception,
00729                   sentinelException, ost.str());
00730     notifyQualified("error",sentinelException);
00731     shmInconsistent_ = true;
00732   }
00733 
00734   unlock();
00735   
00736   ::sleep(watchSleepSec_.value_);
00737   return true;
00738 }
00739     
00740 
00741 //______________________________________________________________________________
00742 void FUResourceBroker::exportParameters()
00743 {
00744   assert(0!=gui_);
00745   
00746   gui_->addMonitorParam("url",                      &url_);
00747   gui_->addMonitorParam("class",                    &class_);
00748   gui_->addMonitorParam("instance",                 &instance_);
00749   gui_->addMonitorParam("runNumber",                &runNumber_);
00750   gui_->addMonitorParam("stateName",                 fsm_.stateName());
00751 
00752   gui_->addMonitorParam("deltaT",                   &deltaT_);
00753   gui_->addMonitorParam("deltaN",                   &deltaN_);
00754   gui_->addMonitorParam("deltaSumOfSquares",        &deltaSumOfSquares_);
00755   gui_->addMonitorParam("deltaSumOfSizes",          &deltaSumOfSizes_);
00756     
00757   gui_->addMonitorParam("throughput",               &throughput_);
00758   gui_->addMonitorParam("rate",                     &rate_);
00759   gui_->addMonitorParam("average",                  &average_);
00760   gui_->addMonitorParam("rms",                      &rms_);
00761   gui_->addMonitorParam("dataErrorFlag",            &dataErrorFlag_);
00762   
00763   gui_->addMonitorCounter("nbAllocatedEvents",      &nbAllocatedEvents_);
00764   gui_->addMonitorCounter("nbPendingRequests",      &nbPendingRequests_);
00765   gui_->addMonitorCounter("nbReceivedEvents",       &nbReceivedEvents_);
00766   gui_->addMonitorCounter("nbSentEvents",           &nbSentEvents_);
00767   gui_->addMonitorCounter("nbSentErrorEvents",      &nbSentErrorEvents_);
00768   gui_->addMonitorCounter("nbDiscardedEvents",      &nbDiscardedEvents_);
00769   gui_->addMonitorCounter("nbReceivedEol",          &nbReceivedEol_);
00770   gui_->addMonitorCounter("highestEolReceived",     &highestEolReceived_);
00771   gui_->addMonitorCounter("nbEolPosted",            &nbEolPosted_);
00772   gui_->addMonitorCounter("nbEolDiscarded",         &nbEolDiscarded_);
00773 
00774   gui_->addMonitorCounter("nbPendingSMDiscards",    &nbPendingSMDiscards_);
00775 
00776   gui_->addMonitorCounter("nbSentDqmEvents",        &nbSentDqmEvents_);
00777   gui_->addMonitorCounter("nbDqmDiscardReceived",   &nbDqmDiscardReceived_);
00778   gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
00779 
00780   gui_->addMonitorCounter("nbLostEvents",           &nbLostEvents_);
00781   gui_->addMonitorCounter("nbDataErrors",           &nbDataErrors_);
00782   gui_->addMonitorCounter("nbCrcErrors",            &nbCrcErrors_);
00783   gui_->addMonitorCounter("nbTimeoutsWithEvent",    &nbTimeoutsWithEvent_);
00784   gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
00785 
00786   gui_->addStandardParam("segmentationMode",        &segmentationMode_);
00787   gui_->addStandardParam("nbClients",               &nbClients_);
00788   gui_->addStandardParam("clientPrcIds",            &clientPrcIds_);
00789   gui_->addStandardParam("nbRawCells",              &nbRawCells_);
00790   gui_->addStandardParam("nbRecoCells",             &nbRecoCells_);
00791   gui_->addStandardParam("nbDqmCells",              &nbDqmCells_);
00792   gui_->addStandardParam("rawCellSize",             &rawCellSize_);
00793   gui_->addStandardParam("recoCellSize",            &recoCellSize_);
00794   gui_->addStandardParam("dqmCellSize",             &dqmCellSize_);
00795 
00796   gui_->addStandardParam("doDropEvents",            &doDropEvents_);
00797   gui_->addStandardParam("doFedIdCheck",            &doFedIdCheck_);
00798   gui_->addStandardParam("doCrcCheck",              &doCrcCheck_);
00799   gui_->addStandardParam("doDumpEvents",            &doDumpEvents_);
00800   gui_->addStandardParam("buClassName",             &buClassName_);
00801   gui_->addStandardParam("buInstance",              &buInstance_);
00802   gui_->addStandardParam("smClassName",             &smClassName_);
00803   gui_->addStandardParam("smInstance",              &smInstance_);
00804   gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
00805   gui_->addStandardParam("monSleepSec",             &monSleepSec_);
00806   gui_->addStandardParam("watchSleepSec",           &watchSleepSec_);
00807   gui_->addStandardParam("timeOutSec",              &timeOutSec_);
00808   gui_->addStandardParam("processKillerEnabled",    &processKillerEnabled_);
00809   gui_->addStandardParam("useEvmBoard",             &useEvmBoard_);
00810   gui_->addStandardParam("rcmsStateListener",        fsm_.rcmsStateListener());
00811   gui_->addStandardParam("foundRcmsStateListener",   fsm_.foundRcmsStateListener());
00812   gui_->addStandardParam("reasonForFailed",         &reasonForFailed_);
00813   
00814   gui_->addDebugCounter("nbAllocateSent",           &nbAllocateSent_);
00815   gui_->addDebugCounter("nbTakeReceived",           &nbTakeReceived_);
00816   gui_->addDebugCounter("nbDataDiscardReceived",    &nbDataDiscardReceived_);
00817 
00818   gui_->exportParameters();
00819 
00820   gui_->addItemChangedListener("doFedIdCheck",      this);
00821   gui_->addItemChangedListener("useEvmBoard",       this);
00822   gui_->addItemChangedListener("doCrcCheck",        this);
00823   gui_->addItemChangedListener("doDumpEvents",      this);
00824 }
00825 
00826 
00827 //______________________________________________________________________________
00828 void FUResourceBroker::reset()
00829 {
00830   gui_->resetCounters();
00831   
00832   deltaT_           =0.0;
00833   deltaN_           =  0;
00834   deltaSumOfSquares_=0.0;
00835   deltaSumOfSizes_  =  0;
00836   
00837   throughput_       =0.0;
00838   rate_             =0.0;
00839   average_          =0.0;
00840   rms_              =0.0;
00841   
00842   nbSentLast_       =  0;
00843   sumOfSquaresLast_ =  0;
00844   sumOfSizesLast_   =  0;
00845 }
00846 
00847 
00848 //______________________________________________________________________________
00849 double FUResourceBroker::deltaT(const struct timeval *start,
00850                                 const struct timeval *end)
00851 {
00852   unsigned int  sec;
00853   unsigned int  usec;
00854   
00855   sec = end->tv_sec - start->tv_sec;
00856   
00857   if(end->tv_usec > start->tv_usec) {
00858     usec = end->tv_usec - start->tv_usec;
00859   }
00860   else {
00861     sec--;
00862     usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00863   }
00864   
00865   return ((double)sec) + ((double)usec) / 1000000.0;
00866 }
00867 
00868 
00869 
00870 //______________________________________________________________________________
00871 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00872   throw (xgi::exception::Exception)
00873 {
00874   using namespace cgicc;
00875   Cgicc cgi(in);
00876   std::vector<FormEntry> els = cgi.getElements() ;
00877   for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
00878     std::cout << "form entry " << (*it).getValue() << std::endl;
00879 
00880   std::vector<FormEntry> el1;
00881   cgi.getElement("crcError",el1);
00882   *out<<"<html>"<<endl;
00883   gui_->htmlHead(in,out,sourceId_);
00884   *out<<"<body>"<<endl;
00885   gui_->htmlHeadline(in,out);
00886 
00887   lock();
00888   
00889   if (0!=resourceTable_) {
00890     if(el1.size()!=0) {
00891       resourceTable_->injectCRCError();
00892     }
00893     *out << "<form method=\"GET\" action=\"customWebPage\" >";
00894     *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
00895     *out << "</form>" << endl;
00896     *out << "<hr/>" << std::endl;
00897     vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00898     *out<<table().set("frame","void").set("rules","rows")
00899                  .set("class","modules").set("width","250")<<endl
00900         <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00901         <<tr()
00902         <<th("client").set("align","left")
00903         <<th("process id").set("align","center")
00904         <<th("status").set("align","center")
00905         <<tr()
00906         <<endl;
00907     for (UInt_t i=0;i<client_prc_ids.size();i++) {
00908 
00909       pid_t pid   =client_prc_ids[i];
00910       int   status=kill(pid,0);
00911 
00912       stringstream ssi;      ssi<<i+1;
00913       stringstream sspid;    sspid<<pid;
00914       stringstream ssstatus; ssstatus<<status;
00915       
00916       string bg_status = (status==0) ? "#00ff00" : "ff0000";
00917       *out<<tr()
00918           <<td(ssi.str()).set("align","left")
00919           <<td(sspid.str()).set("align","center")
00920           <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00921           <<tr()<<endl;
00922     }
00923     *out<<table()<<endl;
00924     *out<<"<br><br>"<<endl;
00925 
00926     vector<string> states      = resourceTable_->cellStates();
00927     vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00928     vector<pid_t>  prc_ids     = resourceTable_->cellPrcIds();
00929     vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00930 
00931     *out<<table().set("frame","void").set("rules","rows")
00932                  .set("class","modules").set("width","500")<<endl
00933         <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00934         <<tr()
00935         <<th("cell").set("align","left")
00936         <<th("state").set("align","center")
00937         <<th("event").set("align","center")
00938         <<th("process id").set("align","center")
00939         <<th("timestamp").set("align","center")
00940         <<th("time").set("align","center")
00941         <<tr()
00942         <<endl;
00943     for (UInt_t i=0;i<states.size();i++) {
00944       string state=states[i];
00945       UInt_t evt   = evt_numbers[i];
00946       pid_t  pid   = prc_ids[i];
00947       time_t tstamp= time_stamps[i];
00948       double tdiff = difftime(time(0),tstamp);
00949       
00950       stringstream ssi;      ssi<<i;
00951       stringstream ssevt;    if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00952       stringstream sspid;    if (pid!=0) sspid<<pid; else sspid<<" - ";
00953       stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00954       stringstream sstdiff;  if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00955       
00956       string bg_state = "#ffffff";
00957       if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00958           state=="RAWREADING"||state=="RAWREAD")
00959         bg_state="#99CCff";
00960       else if (state=="PROCESSING")
00961         bg_state="#ff0000";
00962       else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00963         bg_state="#CCff99";
00964       else if (state=="SENDING")
00965         bg_state="#00FF33";
00966       else if (state=="SENT")
00967         bg_state="#006633";
00968       else if (state=="DISCARDING")
00969         bg_state="#FFFF00";
00970       else if (state=="LUMISECTION")
00971         bg_state="#0000FF";
00972       
00973       *out<<tr()
00974           <<td(ssi.str()).set("align","left")
00975           <<td(state).set("align","center").set("bgcolor",bg_state)
00976           <<td(ssevt.str()).set("align","center")
00977           <<td(sspid.str()).set("align","center")
00978           <<td(sststamp.str()).set("align","center")
00979           <<td(sstdiff.str()).set("align","center")
00980           <<tr()<<endl;
00981     }
00982     *out<<table()<<endl;
00983     *out<<"<br><br>"<<endl;
00984 
00985     vector<string> dqmstates      = resourceTable_->dqmCellStates();
00986 
00987     *out<<table().set("frame","void").set("rules","rows")
00988                  .set("class","modules").set("width","500")<<endl
00989         <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
00990         <<tr()
00991         <<th("cell").set("align","left")
00992         <<th("state").set("align","center")
00993         <<tr()
00994         <<endl;
00995     for (UInt_t i=0;i<dqmstates.size();i++) {
00996       string state=dqmstates[i];
00997       
00998       string bg_state = "#ffffff";
00999       if (state=="WRITING"||state=="WRITTEN")
01000         bg_state="#99CCff";
01001       else if (state=="SENDING")
01002         bg_state="#00FF33";
01003       else if (state=="SENT")
01004         bg_state="#006633";
01005       else if (state=="DISCARDING")
01006         bg_state="#FFFF00";
01007       
01008       *out<<tr()<<"<td>"<<i<<"</td>"
01009           <<td(state).set("align","center").set("bgcolor",bg_state)
01010           <<tr()<<endl;
01011     }
01012     *out<<table()<<endl;
01013 
01014 
01015     
01016   }
01017   *out<<"</body>"<<endl<<"</html>"<<endl;
01018   
01019   unlock();
01020 }
01021 
01022 void FUResourceBroker::emergencyStop()
01023 {
01024   LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
01025   vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
01026   for (UInt_t i=0;i<client_prc_ids.size();i++) {
01027     pid_t  pid   =client_prc_ids[i];
01028     std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
01029     if(pid!=0){
01030       //assume processes are dead by now
01031       if(!resourceTable_->handleCrashedEP(runNumber_,pid))
01032         nbTimeoutsWithoutEvent_++;
01033       else
01034         nbTimeoutsWithEvent_++;
01035     }
01036   }
01037   resourceTable_->lastResort();
01038   ::sleep(1);
01039   if(!resourceTable_->isReadyToShutDown())
01040     {
01041       reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
01042       XCEPT_RAISE(evf::Exception,reasonForFailed_);
01043     }
01044   resourceTable_->printWorkLoopStatus();
01045   lock();
01046   std::cout << "delete resourcetable" <<std::endl;
01047   delete resourceTable_;
01048   resourceTable_=0;
01049   std::cout << "cycle through resourcetable config " << std::endl;
01050   configureResources();
01051   unlock();
01052   if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
01053   std::cout << "done with emergency stop"  << std::endl;
01054 }
01055 
01056 void FUResourceBroker::configureResources()
01057 {
01058   resourceTable_=new FUResourceTable(segmentationMode_.value_,
01059                                      nbRawCells_.value_,
01060                                      nbRecoCells_.value_,
01061                                      nbDqmCells_.value_,
01062                                      rawCellSize_.value_,
01063                                      recoCellSize_.value_,
01064                                      dqmCellSize_.value_,
01065                                      bu_,sm_,
01066                                      log_, 
01067                                      shmResourceTableTimeout_.value_, 
01068                                      frb_,
01069                                      this);
01070   FUResource::doFedIdCheck(doFedIdCheck_);
01071   FUResource::useEvmBoard(useEvmBoard_);
01072   resourceTable_->setDoCrcCheck(doCrcCheck_);
01073   resourceTable_->setDoDumpEvents(doDumpEvents_);
01074   reset();
01075   shmInconsistent_ = false;
01076   if(resourceTable_->nbResources() != nbRawCells_.value_ || 
01077      resourceTable_->nbFreeSlots() != nbRawCells_.value_)
01078     shmInconsistent_ = true;
01079 }
01080 
01081 
01083 // XDAQ instantiator implementation macro
01085 
01086 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)