CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/ResourceBroker/src/FUResourceBroker.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceBroker
00004 // ----------------
00005 //
00006 //            10/20/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceBroker.h"
00011 
00012 #include "EventFilter/ResourceBroker/interface/FUResource.h"
00013 #include "EventFilter/ResourceBroker/interface/BUProxy.h"
00014 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00015 
00016 #include "FWCore/Utilities/interface/CRC16.h"
00017 
00018 #include "EvffedFillerRB.h"
00019 
00020 #include "i2o/Method.h"
00021 #include "interface/shared/i2oXFunctionCodes.h"
00022 #include "interface/evb/i2oEVBMsgs.h"
00023 #include "xcept/tools.h"
00024 
00025 #include "toolbox/mem/HeapAllocator.h"
00026 #include "toolbox/mem/Reference.h"
00027 #include "toolbox/mem/MemoryPoolFactory.h"
00028 #include "toolbox/mem/exception/Exception.h"
00029 
00030 #include "xoap/MessageReference.h"
00031 #include "xoap/MessageFactory.h"
00032 #include "xoap/SOAPEnvelope.h"
00033 #include "xoap/SOAPBody.h"
00034 #include "xoap/domutils.h"
00035 #include "xoap/Method.h"
00036 
00037 #include "cgicc/CgiDefs.h"
00038 #include "cgicc/Cgicc.h"
00039 #include "cgicc/FormEntry.h"
00040 #include "cgicc/HTMLClasses.h"
00041 
00042 #include <signal.h>
00043 #include <iostream>
00044 #include <sstream>
00045 
00046 
00047 using namespace std;
00048 using namespace evf;
00049 
00050 
00052 // construction/destruction
00054 
00055 //______________________________________________________________________________
00056 FUResourceBroker::FUResourceBroker(xdaq::ApplicationStub *s)
00057   : xdaq::Application(s)
00058   , fsm_(this)
00059   , gui_(0)
00060   , log_(getApplicationLogger())
00061   , bu_(0)
00062   , sm_(0)
00063   , i2oPool_(0)
00064   , resourceTable_(0)
00065   , wlMonitoring_(0)
00066   , asMonitoring_(0)
00067   , wlWatching_(0)
00068   , asWatching_(0)
00069   , instance_(0)
00070   , runNumber_(0)
00071   , deltaT_(0.0)
00072   , deltaN_(0)
00073   , deltaSumOfSquares_(0)
00074   , deltaSumOfSizes_(0)
00075   , throughput_(0.0)
00076   , rate_(0.0)
00077   , average_(0.0)
00078   , rms_(0.0)
00079   , nbAllocatedEvents_(0)
00080   , nbPendingRequests_(0)
00081   , nbReceivedEvents_(0)
00082   , nbSentEvents_(0)
00083   , nbSentDqmEvents_(0)
00084   , nbSentErrorEvents_(0)
00085   , nbPendingSMDiscards_(0)
00086   , nbPendingSMDqmDiscards_(0)
00087   , nbDiscardedEvents_(0)
00088   , nbReceivedEol_(0)
00089   , highestEolReceived_(0)
00090   , nbEolPosted_(0)
00091   , nbEolDiscarded_(0)
00092   , nbLostEvents_(0)
00093   , nbDataErrors_(0)
00094   , nbCrcErrors_(0)
00095   , nbTimeoutsWithEvent_(0)
00096   , nbTimeoutsWithoutEvent_(0)
00097   , dataErrorFlag_(0)
00098   , segmentationMode_(false)
00099   , nbClients_(0)
00100   , clientPrcIds_("")
00101   , nbRawCells_(16)
00102   , nbRecoCells_(8)
00103   , nbDqmCells_(8)
00104   , rawCellSize_(0x400000)  // 4MB
00105   , recoCellSize_(0x800000) // 8MB
00106   , dqmCellSize_(0x800000)  // 8MB
00107   , doDropEvents_(false)
00108   , doFedIdCheck_(true)
00109   , doCrcCheck_(1)
00110   , doDumpEvents_(0)
00111   , buClassName_("BU")
00112   , buInstance_(0)
00113   , smClassName_("StorageManager")
00114   , smInstance_(0)
00115   , shmResourceTableTimeout_(200000)
00116   , monSleepSec_(2)
00117   , watchSleepSec_(10)
00118   , timeOutSec_(30)
00119   , processKillerEnabled_(true)
00120   , useEvmBoard_(true)
00121   , reasonForFailed_("")
00122   , nbAllocateSent_(0)
00123   , nbTakeReceived_(0)
00124   , nbDataDiscardReceived_(0)
00125   , nbDqmDiscardReceived_(0)
00126   , nbSentLast_(0)
00127   , sumOfSquaresLast_(0)
00128   , sumOfSizesLast_(0)
00129   , lock_(toolbox::BSem::FULL)
00130   , frb_(0)
00131   , shmInconsistent_(false)
00132 {
00133   // setup finite state machine (binding relevant callbacks)
00134   fsm_.initialize<evf::FUResourceBroker>(this);
00135   
00136   // set url, class, instance, and sourceId (=class_instance)
00137   url_     =
00138     getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
00139     getApplicationDescriptor()->getURN();
00140   class_   =getApplicationDescriptor()->getClassName();
00141   instance_=getApplicationDescriptor()->getInstance();
00142   sourceId_=class_.toString()+"_"+instance_.toString();
00143   
00144   // bind i2o callbacks
00145   i2o::bind(this,&FUResourceBroker::I2O_FU_TAKE_Callback,
00146             I2O_FU_TAKE,XDAQ_ORGANIZATION_ID);
00147   i2o::bind(this,&FUResourceBroker::I2O_FU_DATA_DISCARD_Callback,
00148             I2O_FU_DATA_DISCARD,XDAQ_ORGANIZATION_ID);
00149   i2o::bind(this,&FUResourceBroker::I2O_FU_DQM_DISCARD_Callback,
00150             I2O_FU_DQM_DISCARD,XDAQ_ORGANIZATION_ID);
00151   i2o::bind(this,&FUResourceBroker::I2O_EVM_LUMISECTION_Callback,
00152             I2O_EVM_LUMISECTION,XDAQ_ORGANIZATION_ID);
00153   
00154   
00155   // bind HyperDAQ web pages
00156   xgi::bind(this,&evf::FUResourceBroker::webPageRequest,"Default");
00157   gui_=new WebGUI(this,&fsm_);
00158   vector<toolbox::lang::Method*> methods=gui_->getMethods();
00159   vector<toolbox::lang::Method*>::iterator it;
00160   for (it=methods.begin();it!=methods.end();++it) {
00161     if ((*it)->type()=="cgi") {
00162       string name=static_cast<xgi::MethodSignature*>(*it)->name();
00163       xgi::bind(this,&evf::FUResourceBroker::webPageRequest,name);
00164     }
00165   }
00166   xgi::bind(this,&evf::FUResourceBroker::customWebPage,"customWebPage");
00167   
00168 
00169   // allocate i2o memery pool
00170   string i2oPoolName=sourceId_+"_i2oPool";
00171   try {
00172     toolbox::mem::HeapAllocator *allocator=new toolbox::mem::HeapAllocator();
00173     toolbox::net::URN urn("toolbox-mem-pool",i2oPoolName);
00174     toolbox::mem::MemoryPoolFactory* poolFactory=
00175       toolbox::mem::getMemoryPoolFactory();
00176     i2oPool_=poolFactory->createPool(urn,allocator);
00177   }
00178   catch (toolbox::mem::exception::Exception& e) {
00179     string s="Failed to create pool: "+i2oPoolName;
00180     LOG4CPLUS_FATAL(log_,s);
00181     XCEPT_RETHROW(xcept::Exception,s,e);
00182   }
00183   
00184   // publish all parameters to app info space
00185   exportParameters();
00186   
00187   // findRcmsStateListener
00188   fsm_.findRcmsStateListener();
00189   
00190   // set application icon for hyperdaq
00191   getApplicationDescriptor()->setAttribute("icon", "/evf/images/rbicon.jpg");
00192   //FUResource::useEvmBoard_ = useEvmBoard_;
00193 }
00194 
00195 
00196 //______________________________________________________________________________
00197 FUResourceBroker::~FUResourceBroker()
00198 {
00199 
00200 }
00201 
00202 
00203 
00205 // implementation of member functions
00207    
00208 //______________________________________________________________________________
00209 bool FUResourceBroker::configuring(toolbox::task::WorkLoop* wl)
00210 {
00211   try {
00212     LOG4CPLUS_INFO(log_, "Start configuring ...");
00213     connectToBUandSM();
00214     frb_ = new EvffedFillerRB(this);
00215     configureResources();
00216     if(shmInconsistent_){
00217       std::ostringstream ost;
00218       ost << "configuring FAILED: Inconsistency in ResourceTable - nbRaw=" 
00219           << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00220           << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00221       XCEPT_RAISE(evf::Exception,ost.str());
00222     } 
00223     LOG4CPLUS_INFO(log_, "Finished configuring!");
00224     fsm_.fireEvent("ConfigureDone",this);
00225 
00226   }
00227   catch (xcept::Exception &e) {
00228     std::string msg  = "configuring FAILED: " + (string)e.what();
00229     reasonForFailed_ = e.what();
00230     fsm_.fireFailed(msg,this);
00231   }
00232 
00233   return false;
00234 }
00235 
00236 
00237 //______________________________________________________________________________
00238 bool FUResourceBroker::enabling(toolbox::task::WorkLoop* wl)
00239 {
00240   try {
00241     LOG4CPLUS_INFO(log_, "Start enabling ...");
00242     reset();
00243     startMonitoringWorkLoop();
00244     startWatchingWorkLoop();
00245     resourceTable_->setRunNumber(runNumber_);
00246     lock();
00247     resourceTable_->resetCounters();
00248     unlock();
00249     resourceTable_->startDiscardWorkLoop();
00250     resourceTable_->startSendDataWorkLoop();
00251     resourceTable_->startSendDqmWorkLoop();
00252     resourceTable_->sendAllocate();
00253     
00254     nbTimeoutsWithEvent_         = 0;
00255     nbTimeoutsWithoutEvent_      = 0;
00256     dataErrorFlag_               = 0;
00257 
00258     LOG4CPLUS_INFO(log_, "Finished enabling!");
00259     fsm_.fireEvent("EnableDone",this);
00260   }
00261   catch (xcept::Exception &e) {
00262     std::string msg  = "enabling FAILED: "+xcept::stdformat_exception_history(e);
00263     reasonForFailed_ = e.what();
00264     fsm_.fireFailed(msg,this);
00265   }
00266   
00267   return false;
00268 }
00269 
00270 
00271 //______________________________________________________________________________
00272 bool FUResourceBroker::stopping(toolbox::task::WorkLoop* wl)
00273 {
00274   try {
00275     LOG4CPLUS_INFO(log_, "Start stopping :) ...");
00276     resourceTable_->stop();
00277     timeval now;
00278     timeval then;
00279     gettimeofday(&then,0);
00280     while (!resourceTable_->isReadyToShutDown()) {
00281       ::usleep(shmResourceTableTimeout_.value_*10);
00282       gettimeofday(&now,0);
00283       if ((unsigned int)(now.tv_sec-then.tv_sec) > shmResourceTableTimeout_.value_/10000) {
00284         std::cout << "times: " << now.tv_sec << " " << then.tv_sec << " " 
00285                   <<  shmResourceTableTimeout_.value_/10000 << std::endl;
00286         LOG4CPLUS_WARN(log_, "Some Process did not detach - going to Emergency stop!");
00287         emergencyStop();
00288         break;
00289       }
00290     }
00291 
00292     if (resourceTable_->isReadyToShutDown()) {
00293       LOG4CPLUS_INFO(log_, "Finished stopping!");
00294       fsm_.fireEvent("StopDone",this);
00295     }
00296   }
00297   catch (xcept::Exception &e) {
00298     std::string msg  = "stopping FAILED: "+xcept::stdformat_exception_history(e);
00299     reasonForFailed_ = e.what();
00300     fsm_.fireFailed(msg,this);
00301   }
00302   
00303   return false;
00304 }
00305 
00306 
00307 //______________________________________________________________________________
00308 bool FUResourceBroker::halting(toolbox::task::WorkLoop* wl)
00309 {
00310   try {
00311     LOG4CPLUS_INFO(log_, "Start halting ...");
00312     if (resourceTable_->isActive()) {
00313       resourceTable_->halt();
00314       UInt_t count = 0;
00315       while (count<10) {
00316         if (resourceTable_->isReadyToShutDown()) {
00317           lock();
00318           delete resourceTable_;
00319           resourceTable_=0;
00320           unlock();
00321           LOG4CPLUS_INFO(log_,count+1<<". try to destroy resource table succeeded!");
00322           break;
00323         }
00324         else {
00325           count++;
00326           LOG4CPLUS_DEBUG(log_,count<<". try to destroy resource table failed ...");
00327 	  ::sleep(1);
00328         }
00329       }
00330     }
00331     else {
00332       lock();
00333       delete resourceTable_;
00334       resourceTable_=0;
00335       unlock();
00336     }
00337     
00338     if (0==resourceTable_) {
00339       LOG4CPLUS_INFO(log_,"Finished halting!");
00340       fsm_.fireEvent("HaltDone",this);
00341     }
00342     else {
00343       std::string msg  = "halting FAILED: ResourceTable shutdown timed out.";
00344       reasonForFailed_ = "RESOURCETABLE SHUTDOWN TIMED OUT";
00345       fsm_.fireFailed(msg,this);
00346     }
00347   }
00348   catch (xcept::Exception &e) {
00349     std::string msg  = "halting FAILED: "+xcept::stdformat_exception_history(e);
00350     reasonForFailed_ = e.what();
00351     fsm_.fireFailed(msg,this);
00352   }
00353   if(frb_) delete frb_;
00354   return false;
00355 }
00356 
00357 
00358 //______________________________________________________________________________
00359 xoap::MessageReference FUResourceBroker::fsmCallback(xoap::MessageReference msg)
00360   throw (xoap::exception::Exception)
00361 {
00362   return fsm_.commandCallback(msg);
00363 }
00364 
00365 
00366 //______________________________________________________________________________
00367 void FUResourceBroker::I2O_FU_TAKE_Callback(toolbox::mem::Reference* bufRef)
00368 {
00369   nbTakeReceived_.value_++;
00370   if(fsm_.checkIfEnabled()){
00371     bool eventComplete=resourceTable_->buildResource(bufRef);
00372     if (eventComplete&&doDropEvents_) resourceTable_->dropEvent();
00373   }
00374   else{
00375     LOG4CPLUS_ERROR(log_,"TAKE i2o frame received in state " 
00376                     << fsm_.stateName() << " is being lost");
00377     bufRef->release();
00378   }
00379 }
00380 
00381 //______________________________________________________________________________
00382 void FUResourceBroker::I2O_EVM_LUMISECTION_Callback(toolbox::mem::Reference* bufRef)
00383 {
00384 
00385   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg = 
00386     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00387   if(msg->lumiSection==0){
00388     LOG4CPLUS_ERROR(log_,"EOL message received for ls=0!!! ");
00389     fsm_.fireFailed("EOL message received for ls=0!!! ",this);
00390   }
00391   nbReceivedEol_++;
00392   if(highestEolReceived_.value_+100 < msg->lumiSection) 
00393     {
00394       LOG4CPLUS_ERROR(log_,"EOL message not in sequence, expected " 
00395                       << highestEolReceived_.value_+1
00396                       << " received " << msg->lumiSection);
00397       fsm_.fireFailed("EOL message with corrupted LS ",this);
00398     }
00399   if(highestEolReceived_.value_+1 != msg->lumiSection) 
00400     LOG4CPLUS_WARN(log_,"EOL message not in sequence, expected " 
00401                     << highestEolReceived_.value_+1
00402                     << " received " << msg->lumiSection);
00403 
00404   if(highestEolReceived_.value_ < msg->lumiSection) 
00405     highestEolReceived_.value_ = msg->lumiSection;
00406   resourceTable_->postEndOfLumiSection(bufRef); // this method dummy for now
00407 //   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
00408 //     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00409   
00410 //   LOG4CPLUS_WARN(log_, "Received END-OF-LS from EVM for LS " << msg->lumiSection);
00411   
00412 }
00413 
00414 
00415 
00416 
00417 //______________________________________________________________________________
00418 void FUResourceBroker::I2O_FU_DATA_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00419 {
00420   nbDataDiscardReceived_.value_++;
00421   resourceTable_->discardDataEvent(bufRef);
00422 }
00423 
00424 
00425 //______________________________________________________________________________
00426 void FUResourceBroker::I2O_FU_DQM_DISCARD_Callback(toolbox::mem::Reference* bufRef)
00427 {
00428   nbDqmDiscardReceived_.value_++;
00429   resourceTable_->discardDqmEvent(bufRef);
00430 }
00431 
00432 
00433 //______________________________________________________________________________
00434 void FUResourceBroker::connectToBUandSM() throw (evf::Exception)
00435 {
00436   typedef set<xdaq::ApplicationDescriptor*> AppDescSet_t;
00437   typedef AppDescSet_t::iterator            AppDescIter_t;
00438   
00439   // locate input BU
00440   AppDescSet_t setOfBUs=
00441     getApplicationContext()->getDefaultZone()->
00442     getApplicationDescriptors(buClassName_.toString());
00443   
00444   if (0!=bu_) { delete bu_; bu_=0; }
00445   
00446   for (AppDescIter_t it=setOfBUs.begin();it!=setOfBUs.end();++it)
00447     if ((*it)->getInstance()==buInstance_)
00448       bu_=new BUProxy(getApplicationDescriptor(),*it,
00449                       getApplicationContext(),i2oPool_);
00450   
00451   if (0==bu_) {
00452     string msg=sourceId_+" failed to locate input BU!";
00453     XCEPT_RAISE(evf::Exception,msg);
00454   }
00455   
00456   // locate output SM
00457   AppDescSet_t setOfSMs=
00458     getApplicationContext()->getDefaultZone()->
00459     getApplicationDescriptors(smClassName_.toString());
00460   
00461   if (0!=sm_) { delete sm_; sm_=0; }
00462   
00463   for (AppDescIter_t it=setOfSMs.begin();it!=setOfSMs.end();++it)
00464     if ((*it)->getInstance()==smInstance_)
00465       sm_=new SMProxy(getApplicationDescriptor(),*it,
00466                       getApplicationContext(),i2oPool_);
00467   
00468   if (0==sm_) LOG4CPLUS_WARN(log_,sourceId_<<" failed to locate output SM!");
00469 }
00470 
00471 
00472 //______________________________________________________________________________
00473 void FUResourceBroker::webPageRequest(xgi::Input *in,xgi::Output *out)
00474   throw (xgi::exception::Exception)
00475 {
00476   string name=in->getenv("PATH_INFO");
00477   if (name.empty()) name="defaultWebPage";
00478   static_cast<xgi::MethodSignature*>(gui_->getMethod(name))->invoke(in,out);
00479 }
00480 
00481 
00482 //______________________________________________________________________________
00483 void FUResourceBroker::actionPerformed(xdata::Event& e)
00484 {
00485   lock();
00486   
00487   if (0!=resourceTable_) {
00488     
00489     //gui_->monInfoSpace()->lock();
00490     
00491     if (e.type()=="urn:xdata-event:ItemGroupRetrieveEvent") {
00492       nbClients_          =resourceTable_->nbClients();
00493       clientPrcIds_       =resourceTable_->clientPrcIdsAsString();
00494       nbAllocatedEvents_  =resourceTable_->nbAllocated();
00495       nbPendingRequests_  =resourceTable_->nbPending();
00496       nbReceivedEvents_   =resourceTable_->nbCompleted();
00497       nbSentEvents_       =resourceTable_->nbSent();
00498       nbSentDqmEvents_    =resourceTable_->nbSentDqm();
00499       nbSentErrorEvents_  =resourceTable_->nbSentError();
00500       nbPendingSMDiscards_=resourceTable_->nbPendingSMDiscards();
00501       nbPendingSMDqmDiscards_=resourceTable_->nbPendingSMDqmDiscards();
00502       nbDiscardedEvents_  =resourceTable_->nbDiscarded();
00503       nbLostEvents_       =resourceTable_->nbLost();
00504       nbEolPosted_        =resourceTable_->nbEolPosted();
00505       nbEolDiscarded_     =resourceTable_->nbEolDiscarded();
00506       nbDataErrors_       =resourceTable_->nbErrors();
00507       nbCrcErrors_        =resourceTable_->nbCrcErrors();
00508       nbAllocateSent_     =resourceTable_->nbAllocSent();
00509       dataErrorFlag_.value_ = (nbCrcErrors_.value_ != 0u + 
00510                                ((nbDataErrors_.value_ != 0u) << 1) +
00511                                ((nbLostEvents_.value_ != 0u) << 2) +
00512                                ((nbTimeoutsWithEvent_.value_ != 0u) << 3) +
00513                                ((nbTimeoutsWithoutEvent_.value_ != 0u) << 4) +
00514                                ((nbSentErrorEvents_.value_ != 0u) << 5)
00515                                );
00516     }
00517     else if (e.type()=="ItemChangedEvent") {
00518       
00519       string item=dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
00520       
00521       if (item=="doFedIdCheck") FUResource::doFedIdCheck(doFedIdCheck_);
00522       if (item=="useEvmBoard")  FUResource::useEvmBoard(useEvmBoard_);
00523       if (item=="doCrcCheck")   resourceTable_->setDoCrcCheck(doCrcCheck_);
00524       if (item=="doDumpEvents") resourceTable_->setDoDumpEvents(doDumpEvents_);
00525     }
00526     
00527     //gui_->monInfoSpace()->unlock();
00528   }
00529   else {
00530     nbClients_          =0;
00531     clientPrcIds_       ="";
00532     nbAllocatedEvents_  =0;
00533     nbPendingRequests_  =0;
00534     nbReceivedEvents_   =0;
00535     nbSentEvents_       =0;
00536     nbSentDqmEvents_    =0;
00537     nbSentErrorEvents_  =0;
00538     nbPendingSMDiscards_=0;
00539     nbPendingSMDqmDiscards_=0;
00540     nbDiscardedEvents_  =0;
00541     nbLostEvents_       =0;
00542     nbDataErrors_       =0;
00543     nbCrcErrors_        =0;
00544     nbAllocateSent_     =0;
00545   }
00546   unlock();
00547 }
00548 
00549 
00550 //______________________________________________________________________________
00551 void FUResourceBroker::startMonitoringWorkLoop() throw (evf::Exception)
00552 {
00553   struct timezone timezone;
00554   gettimeofday(&monStartTime_,&timezone);
00555   
00556   try {
00557     wlMonitoring_=
00558       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
00559                                                        "waiting");
00560     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
00561     asMonitoring_=toolbox::task::bind(this,&FUResourceBroker::monitoring,
00562                                       sourceId_+"Monitoring");
00563     wlMonitoring_->submit(asMonitoring_);
00564   }
00565   catch (xcept::Exception& e) {
00566     string msg = "Failed to start workloop 'Monitoring'.";
00567     XCEPT_RETHROW(evf::Exception,msg,e);
00568   }
00569 }
00570 
00571 
00572 //______________________________________________________________________________
00573 bool FUResourceBroker::monitoring(toolbox::task::WorkLoop* wl)
00574 {
00575   unsigned int nbSent;
00576   uint64_t     sumOfSquares;
00577   unsigned int sumOfSizes;
00578   uint64_t     deltaSumOfSquares;
00579 
00580   lock();
00581   if (0==resourceTable_) {
00582     deltaT_.value_           =0.0;
00583     deltaN_.value_           =  0;
00584     deltaSumOfSquares_.value_=0.0;
00585     deltaSumOfSizes_.value_  =  0;
00586     throughput_              =0.0;
00587     rate_                    =0.0;
00588     average_                 =0.0;
00589     rms_                     =0.0;
00590     unlock();    
00591     return false;
00592   }
00593   else {
00594     nbSent      =resourceTable_->nbSent();
00595     sumOfSquares=resourceTable_->sumOfSquares();
00596     sumOfSizes  =resourceTable_->sumOfSizes();
00597   }
00598   unlock();
00599   
00600   struct timeval  monEndTime;
00601   struct timezone timezone;
00602   
00603   gettimeofday(&monEndTime,&timezone);
00604   
00605   xdata::getInfoSpaceFactory()->lock();
00606   gui_->monInfoSpace()->lock();
00607   
00608   deltaT_.value_=deltaT(&monStartTime_,&monEndTime);
00609   monStartTime_=monEndTime;
00610   
00611   deltaN_.value_=nbSent-nbSentLast_;
00612   nbSentLast_=nbSent;
00613   
00614   deltaSumOfSquares=sumOfSquares-sumOfSquaresLast_;
00615   deltaSumOfSquares_.value_=(double)deltaSumOfSquares;
00616   sumOfSquaresLast_=sumOfSquares;
00617   
00618   deltaSumOfSizes_.value_=sumOfSizes-sumOfSizesLast_;
00619   sumOfSizesLast_=sumOfSizes;
00620   
00621   if (deltaT_.value_!=0) {
00622     throughput_=deltaSumOfSizes_.value_/deltaT_.value_;
00623     rate_      =deltaN_.value_/deltaT_.value_;
00624   }
00625   else {
00626     throughput_=0.0;
00627     rate_      =0.0;
00628   }
00629   
00630   double meanOfSquares,mean,squareOfMean,variance;
00631   
00632   if(deltaN_.value_!=0) {
00633     meanOfSquares=deltaSumOfSquares_.value_/((double)(deltaN_.value_));
00634     mean=((double)(deltaSumOfSizes_.value_))/((double)(deltaN_.value_));
00635     squareOfMean=mean*mean;
00636     variance=meanOfSquares-squareOfMean; if(variance<0.0) variance=0.0;
00637     
00638     average_=deltaSumOfSizes_.value_/deltaN_.value_;
00639     rms_    =std::sqrt(variance);
00640   }
00641   else {
00642     average_=0.0;
00643     rms_    =0.0;
00644   }
00645   
00646   gui_->monInfoSpace()->unlock();  
00647   xdata::getInfoSpaceFactory()->unlock();
00648     
00649   ::sleep(monSleepSec_.value_);
00650   
00651   return true;
00652 }
00653 
00654 
00655 //______________________________________________________________________________
00656 void FUResourceBroker::startWatchingWorkLoop() throw (evf::Exception)
00657 {
00658   try {
00659     wlWatching_=
00660       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Watching",
00661                                                        "waiting");
00662     if (!wlWatching_->isActive()) wlWatching_->activate();
00663     asWatching_=toolbox::task::bind(this,&FUResourceBroker::watching,
00664                                     sourceId_+"Watching");
00665     wlWatching_->submit(asWatching_);
00666   }
00667   catch (xcept::Exception& e) {
00668     string msg = "Failed to start workloop 'Watching'.";
00669     XCEPT_RETHROW(evf::Exception,msg,e);
00670   }
00671 }
00672 
00673 
00674 //______________________________________________________________________________
00675 bool FUResourceBroker::watching(toolbox::task::WorkLoop* wl)
00676 {
00677   lock();
00678   
00679   if (0==resourceTable_) {
00680     unlock();
00681     return false;
00682   }
00683 
00684   vector<pid_t>  evt_prcids =resourceTable_->cellPrcIds();
00685   vector<UInt_t> evt_numbers=resourceTable_->cellEvtNumbers();
00686   vector<time_t> evt_tstamps=resourceTable_->cellTimeStamps(); 
00687   
00688   time_t tcurr=time(0);  
00689   for (UInt_t i=0;i<evt_tstamps.size();i++) {
00690     pid_t  pid   =evt_prcids[i];
00691     UInt_t evt   =evt_numbers[i];
00692     time_t tstamp=evt_tstamps[i]; if (tstamp==0) continue;
00693     double tdiff =difftime(tcurr,tstamp);
00694     if (tdiff>timeOutSec_) {
00695       if(processKillerEnabled_) {
00696         LOG4CPLUS_ERROR(log_,"evt "<<evt<<" timed out, "<<"kill prc "<<pid);
00697         kill(pid,9);
00698         nbTimeoutsWithEvent_++;
00699       }
00700       else {
00701         LOG4CPLUS_INFO(log_,"evt "<<evt<<" under processing for more than "
00702                        <<timeOutSec_<<"sec for process "<<pid);
00703       }
00704     }
00705   }
00706   
00707   vector<pid_t> prcids=resourceTable_->clientPrcIds();
00708   for (UInt_t i=0;i<prcids.size();i++) {
00709     pid_t pid   =prcids[i];
00710     int   status=kill(pid,0);
00711     if (status!=0) {
00712       LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" died, send to error stream if processing.");
00713       if(!resourceTable_->handleCrashedEP(runNumber_,pid))
00714         nbTimeoutsWithoutEvent_++;
00715     }
00716   }
00717   
00718   if((resourceTable_->nbResources() != nbRawCells_.value_) && !shmInconsistent_){
00719     std::ostringstream ost;
00720     ost << "Watchdog spotted inconsistency in ResourceTable - nbRaw=" 
00721         << nbRawCells_.value_ << " but nbResources=" << resourceTable_->nbResources()
00722         << " and nbFreeSlots=" << resourceTable_->nbFreeSlots();
00723     XCEPT_DECLARE(evf::Exception,
00724                   sentinelException, ost.str());
00725     notifyQualified("error",sentinelException);
00726     shmInconsistent_ = true;
00727   }
00728 
00729   unlock();
00730   
00731   ::sleep(watchSleepSec_.value_);
00732   return true;
00733 }
00734     
00735 
00736 //______________________________________________________________________________
00737 void FUResourceBroker::exportParameters()
00738 {
00739   assert(0!=gui_);
00740   
00741   gui_->addMonitorParam("url",                      &url_);
00742   gui_->addMonitorParam("class",                    &class_);
00743   gui_->addMonitorParam("instance",                 &instance_);
00744   gui_->addMonitorParam("runNumber",                &runNumber_);
00745   gui_->addMonitorParam("stateName",                 fsm_.stateName());
00746 
00747   gui_->addMonitorParam("deltaT",                   &deltaT_);
00748   gui_->addMonitorParam("deltaN",                   &deltaN_);
00749   gui_->addMonitorParam("deltaSumOfSquares",        &deltaSumOfSquares_);
00750   gui_->addMonitorParam("deltaSumOfSizes",          &deltaSumOfSizes_);
00751     
00752   gui_->addMonitorParam("throughput",               &throughput_);
00753   gui_->addMonitorParam("rate",                     &rate_);
00754   gui_->addMonitorParam("average",                  &average_);
00755   gui_->addMonitorParam("rms",                      &rms_);
00756   gui_->addMonitorParam("dataErrorFlag",            &dataErrorFlag_);
00757   
00758   gui_->addMonitorCounter("nbAllocatedEvents",      &nbAllocatedEvents_);
00759   gui_->addMonitorCounter("nbPendingRequests",      &nbPendingRequests_);
00760   gui_->addMonitorCounter("nbReceivedEvents",       &nbReceivedEvents_);
00761   gui_->addMonitorCounter("nbSentEvents",           &nbSentEvents_);
00762   gui_->addMonitorCounter("nbSentErrorEvents",      &nbSentErrorEvents_);
00763   gui_->addMonitorCounter("nbDiscardedEvents",      &nbDiscardedEvents_);
00764   gui_->addMonitorCounter("nbReceivedEol",          &nbReceivedEol_);
00765   gui_->addMonitorCounter("highestEolReceived",     &highestEolReceived_);
00766   gui_->addMonitorCounter("nbEolPosted",            &nbEolPosted_);
00767   gui_->addMonitorCounter("nbEolDiscarded",         &nbEolDiscarded_);
00768 
00769   gui_->addMonitorCounter("nbPendingSMDiscards",    &nbPendingSMDiscards_);
00770 
00771   gui_->addMonitorCounter("nbSentDqmEvents",        &nbSentDqmEvents_);
00772   gui_->addMonitorCounter("nbDqmDiscardReceived",   &nbDqmDiscardReceived_);
00773   gui_->addMonitorCounter("nbPendingSMDqmDiscards", &nbPendingSMDqmDiscards_);
00774 
00775   gui_->addMonitorCounter("nbLostEvents",           &nbLostEvents_);
00776   gui_->addMonitorCounter("nbDataErrors",           &nbDataErrors_);
00777   gui_->addMonitorCounter("nbCrcErrors",            &nbCrcErrors_);
00778   gui_->addMonitorCounter("nbTimeoutsWithEvent",    &nbTimeoutsWithEvent_);
00779   gui_->addMonitorCounter("nbTimeoutsWithoutEvent", &nbTimeoutsWithoutEvent_);
00780 
00781   gui_->addStandardParam("segmentationMode",        &segmentationMode_);
00782   gui_->addStandardParam("nbClients",               &nbClients_);
00783   gui_->addStandardParam("clientPrcIds",            &clientPrcIds_);
00784   gui_->addStandardParam("nbRawCells",              &nbRawCells_);
00785   gui_->addStandardParam("nbRecoCells",             &nbRecoCells_);
00786   gui_->addStandardParam("nbDqmCells",              &nbDqmCells_);
00787   gui_->addStandardParam("rawCellSize",             &rawCellSize_);
00788   gui_->addStandardParam("recoCellSize",            &recoCellSize_);
00789   gui_->addStandardParam("dqmCellSize",             &dqmCellSize_);
00790 
00791   gui_->addStandardParam("doDropEvents",            &doDropEvents_);
00792   gui_->addStandardParam("doFedIdCheck",            &doFedIdCheck_);
00793   gui_->addStandardParam("doCrcCheck",              &doCrcCheck_);
00794   gui_->addStandardParam("doDumpEvents",            &doDumpEvents_);
00795   gui_->addStandardParam("buClassName",             &buClassName_);
00796   gui_->addStandardParam("buInstance",              &buInstance_);
00797   gui_->addStandardParam("smClassName",             &smClassName_);
00798   gui_->addStandardParam("smInstance",              &smInstance_);
00799   gui_->addStandardParam("shmResourceTableTimeout", &shmResourceTableTimeout_);
00800   gui_->addStandardParam("monSleepSec",             &monSleepSec_);
00801   gui_->addStandardParam("watchSleepSec",           &watchSleepSec_);
00802   gui_->addStandardParam("timeOutSec",              &timeOutSec_);
00803   gui_->addStandardParam("processKillerEnabled",    &processKillerEnabled_);
00804   gui_->addStandardParam("useEvmBoard",             &useEvmBoard_);
00805   gui_->addStandardParam("rcmsStateListener",        fsm_.rcmsStateListener());
00806   gui_->addStandardParam("foundRcmsStateListener",   fsm_.foundRcmsStateListener());
00807   gui_->addStandardParam("reasonForFailed",         &reasonForFailed_);
00808   
00809   gui_->addDebugCounter("nbAllocateSent",           &nbAllocateSent_);
00810   gui_->addDebugCounter("nbTakeReceived",           &nbTakeReceived_);
00811   gui_->addDebugCounter("nbDataDiscardReceived",    &nbDataDiscardReceived_);
00812 
00813   gui_->exportParameters();
00814 
00815   gui_->addItemChangedListener("doFedIdCheck",      this);
00816   gui_->addItemChangedListener("useEvmBoard",       this);
00817   gui_->addItemChangedListener("doCrcCheck",        this);
00818   gui_->addItemChangedListener("doDumpEvents",      this);
00819 }
00820 
00821 
00822 //______________________________________________________________________________
00823 void FUResourceBroker::reset()
00824 {
00825   gui_->resetCounters();
00826   
00827   deltaT_           =0.0;
00828   deltaN_           =  0;
00829   deltaSumOfSquares_=0.0;
00830   deltaSumOfSizes_  =  0;
00831   
00832   throughput_       =0.0;
00833   rate_             =0.0;
00834   average_          =0.0;
00835   rms_              =0.0;
00836   
00837   nbSentLast_       =  0;
00838   sumOfSquaresLast_ =  0;
00839   sumOfSizesLast_   =  0;
00840 }
00841 
00842 
00843 //______________________________________________________________________________
00844 double FUResourceBroker::deltaT(const struct timeval *start,
00845                                 const struct timeval *end)
00846 {
00847   unsigned int  sec;
00848   unsigned int  usec;
00849   
00850   sec = end->tv_sec - start->tv_sec;
00851   
00852   if(end->tv_usec > start->tv_usec) {
00853     usec = end->tv_usec - start->tv_usec;
00854   }
00855   else {
00856     sec--;
00857     usec = 1000000 - ((unsigned int )(start->tv_usec - end->tv_usec));
00858   }
00859   
00860   return ((double)sec) + ((double)usec) / 1000000.0;
00861 }
00862 
00863 
00864 
00865 //______________________________________________________________________________
00866 void FUResourceBroker::customWebPage(xgi::Input*in,xgi::Output*out)
00867   throw (xgi::exception::Exception)
00868 {
00869   using namespace cgicc;
00870   Cgicc cgi(in);
00871   std::vector<FormEntry> els = cgi.getElements() ;
00872   for(std::vector<FormEntry>::iterator it = els.begin(); it != els.end(); it++)
00873     std::cout << "form entry " << (*it).getValue() << std::endl;
00874 
00875   std::vector<FormEntry> el1;
00876   cgi.getElement("crcError",el1);
00877   *out<<"<html>"<<endl;
00878   gui_->htmlHead(in,out,sourceId_);
00879   *out<<"<body>"<<endl;
00880   gui_->htmlHeadline(in,out);
00881 
00882   lock();
00883   
00884   if (0!=resourceTable_) {
00885     if(el1.size()!=0) {
00886       resourceTable_->injectCRCError();
00887     }
00888     *out << "<form method=\"GET\" action=\"customWebPage\" >";
00889     *out << "<button name=\"crcError\" type=\"submit\" value=\"injCRC\">Inject CRC</button>" << endl;
00890     *out << "</form>" << endl;
00891     *out << "<hr/>" << std::endl;
00892     vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
00893     *out<<table().set("frame","void").set("rules","rows")
00894                  .set("class","modules").set("width","250")<<endl
00895         <<tr()<<th("Client Processes").set("colspan","3")<<tr()<<endl
00896         <<tr()
00897         <<th("client").set("align","left")
00898         <<th("process id").set("align","center")
00899         <<th("status").set("align","center")
00900         <<tr()
00901         <<endl;
00902     for (UInt_t i=0;i<client_prc_ids.size();i++) {
00903 
00904       pid_t pid   =client_prc_ids[i];
00905       int   status=kill(pid,0);
00906 
00907       stringstream ssi;      ssi<<i+1;
00908       stringstream sspid;    sspid<<pid;
00909       stringstream ssstatus; ssstatus<<status;
00910       
00911       string bg_status = (status==0) ? "#00ff00" : "ff0000";
00912       *out<<tr()
00913           <<td(ssi.str()).set("align","left")
00914           <<td(sspid.str()).set("align","center")
00915           <<td(ssstatus.str()).set("align","center").set("bgcolor",bg_status)
00916           <<tr()<<endl;
00917     }
00918     *out<<table()<<endl;
00919     *out<<"<br><br>"<<endl;
00920 
00921     vector<string> states      = resourceTable_->cellStates();
00922     vector<UInt_t> evt_numbers = resourceTable_->cellEvtNumbers();
00923     vector<pid_t>  prc_ids     = resourceTable_->cellPrcIds();
00924     vector<time_t> time_stamps = resourceTable_->cellTimeStamps();
00925 
00926     *out<<table().set("frame","void").set("rules","rows")
00927                  .set("class","modules").set("width","500")<<endl
00928         <<tr()<<th("Shared Memory Cells").set("colspan","6")<<tr()<<endl
00929         <<tr()
00930         <<th("cell").set("align","left")
00931         <<th("state").set("align","center")
00932         <<th("event").set("align","center")
00933         <<th("process id").set("align","center")
00934         <<th("timestamp").set("align","center")
00935         <<th("time").set("align","center")
00936         <<tr()
00937         <<endl;
00938     for (UInt_t i=0;i<states.size();i++) {
00939       string state=states[i];
00940       UInt_t evt   = evt_numbers[i];
00941       pid_t  pid   = prc_ids[i];
00942       time_t tstamp= time_stamps[i];
00943       double tdiff = difftime(time(0),tstamp);
00944       
00945       stringstream ssi;      ssi<<i;
00946       stringstream ssevt;    if (evt!=0xffffffff) ssevt<<evt; else ssevt<<" - ";
00947       stringstream sspid;    if (pid!=0) sspid<<pid; else sspid<<" - ";
00948       stringstream sststamp; if (tstamp!=0) sststamp<<tstamp; else sststamp<<" - ";
00949       stringstream sstdiff;  if (tstamp!=0) sstdiff<<tdiff; else sstdiff<<" - ";
00950       
00951       string bg_state = "#ffffff";
00952       if (state=="RAWWRITING"||state=="RAWWRITTEN"||
00953           state=="RAWREADING"||state=="RAWREAD")
00954         bg_state="#99CCff";
00955       else if (state=="PROCESSING")
00956         bg_state="#ff0000";
00957       else if (state=="PROCESSED"||state=="RECOWRITING"||state=="RECOWRITTEN")
00958         bg_state="#CCff99";
00959       else if (state=="SENDING")
00960         bg_state="#00FF33";
00961       else if (state=="SENT")
00962         bg_state="#006633";
00963       else if (state=="DISCARDING")
00964         bg_state="#FFFF00";
00965       else if (state=="LUMISECTION")
00966         bg_state="#0000FF";
00967       
00968       *out<<tr()
00969           <<td(ssi.str()).set("align","left")
00970           <<td(state).set("align","center").set("bgcolor",bg_state)
00971           <<td(ssevt.str()).set("align","center")
00972           <<td(sspid.str()).set("align","center")
00973           <<td(sststamp.str()).set("align","center")
00974           <<td(sstdiff.str()).set("align","center")
00975           <<tr()<<endl;
00976     }
00977     *out<<table()<<endl;
00978     *out<<"<br><br>"<<endl;
00979 
00980     vector<string> dqmstates      = resourceTable_->dqmCellStates();
00981 
00982     *out<<table().set("frame","void").set("rules","rows")
00983                  .set("class","modules").set("width","500")<<endl
00984         <<tr()<<th("Shared Memory DQM Cells").set("colspan","6")<<tr()<<endl
00985         <<tr()
00986         <<th("cell").set("align","left")
00987         <<th("state").set("align","center")
00988         <<tr()
00989         <<endl;
00990     for (UInt_t i=0;i<dqmstates.size();i++) {
00991       string state=dqmstates[i];
00992       
00993       string bg_state = "#ffffff";
00994       if (state=="WRITING"||state=="WRITTEN")
00995         bg_state="#99CCff";
00996       else if (state=="SENDING")
00997         bg_state="#00FF33";
00998       else if (state=="SENT")
00999         bg_state="#006633";
01000       else if (state=="DISCARDING")
01001         bg_state="#FFFF00";
01002       
01003       *out<<tr()<<"<td>"<<i<<"</td>"
01004           <<td(state).set("align","center").set("bgcolor",bg_state)
01005           <<tr()<<endl;
01006     }
01007     *out<<table()<<endl;
01008 
01009 
01010     
01011   }
01012   *out<<"</body>"<<endl<<"</html>"<<endl;
01013   
01014   unlock();
01015 }
01016 
01017 void FUResourceBroker::emergencyStop()
01018 {
01019   LOG4CPLUS_WARN(log_, "in Emergency stop - handle non-clean stops");
01020   vector<pid_t> client_prc_ids = resourceTable_->clientPrcIds();
01021   for (UInt_t i=0;i<client_prc_ids.size();i++) {
01022     pid_t  pid   =client_prc_ids[i];
01023     std::cout << "B: killing process " << i << "pid=" << pid << std::endl;
01024     if(pid!=0){
01025       //assume processes are dead by now
01026       if(!resourceTable_->handleCrashedEP(runNumber_,pid))
01027         nbTimeoutsWithoutEvent_++;
01028       else
01029         nbTimeoutsWithEvent_++;
01030     }
01031   }
01032   resourceTable_->lastResort();
01033   ::sleep(1);
01034   if(!resourceTable_->isReadyToShutDown())
01035     {
01036       reasonForFailed_ = "EmergencyStop: failed to shut down ResourceTable";
01037       XCEPT_RAISE(evf::Exception,reasonForFailed_);
01038     }
01039   resourceTable_->printWorkLoopStatus();
01040   lock();
01041   std::cout << "delete resourcetable" <<std::endl;
01042   delete resourceTable_;
01043   resourceTable_=0;
01044   std::cout << "cycle through resourcetable config " << std::endl;
01045   configureResources();
01046   unlock();
01047   if(shmInconsistent_) XCEPT_RAISE(evf::Exception,"Inconsistent shm state");
01048   std::cout << "done with emergency stop"  << std::endl;
01049 }
01050 
01051 void FUResourceBroker::configureResources()
01052 {
01053   resourceTable_=new FUResourceTable(segmentationMode_.value_,
01054                                      nbRawCells_.value_,
01055                                      nbRecoCells_.value_,
01056                                      nbDqmCells_.value_,
01057                                      rawCellSize_.value_,
01058                                      recoCellSize_.value_,
01059                                      dqmCellSize_.value_,
01060                                      bu_,sm_,
01061                                      log_, 
01062                                      shmResourceTableTimeout_.value_, 
01063                                      frb_,
01064                                      this);
01065   FUResource::doFedIdCheck(doFedIdCheck_);
01066   FUResource::useEvmBoard(useEvmBoard_);
01067   resourceTable_->setDoCrcCheck(doCrcCheck_);
01068   resourceTable_->setDoDumpEvents(doDumpEvents_);
01069   reset();
01070   shmInconsistent_ = false;
01071   if(resourceTable_->nbResources() != nbRawCells_.value_ || 
01072      resourceTable_->nbFreeSlots() != nbRawCells_.value_)
01073     shmInconsistent_ = true;
01074 }
01075 
01076 
01078 // XDAQ instantiator implementation macro
01080 
01081 XDAQ_INSTANTIATOR_IMPL(FUResourceBroker)