CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/ResourceBroker/src/FUResourceTable.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceTable
00004 // ---------------
00005 //
00006 //            12/10/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "FWCore/Utilities/interface/CRC16.h"
00012 #include "EvffedFillerRB.h"
00013 
00014 #include "toolbox/task/WorkLoopFactory.h"
00015 #include "interface/evb/i2oEVBMsgs.h"
00016 #include "xcept/tools.h"
00017 
00018 
00019 #include <fstream>
00020 #include <sstream>
00021 #include <iomanip>
00022 #include <unistd.h>
00023 
00024 
00025 using namespace evf;
00026 using namespace std;
00027 
00028 
00030 // construction/destruction
00032 
00033 //______________________________________________________________________________
00034 FUResourceTable::FUResourceTable(bool              segmentationMode,
00035                                  UInt_t            nbRawCells,
00036                                  UInt_t            nbRecoCells,
00037                                  UInt_t            nbDqmCells,
00038                                  UInt_t            rawCellSize,
00039                                  UInt_t            recoCellSize,
00040                                  UInt_t            dqmCellSize,
00041                                  BUProxy          *bu,
00042                                  SMProxy          *sm,
00043                                  log4cplus::Logger logger,
00044                                  unsigned int      timeout,
00045                                  EvffedFillerRB   *frb,
00046                                  xdaq::Application*app)
00047   throw (evf::Exception)
00048   : bu_(bu)
00049   , sm_(sm)
00050   , log_(logger)
00051   , wlSendData_(0)
00052   , asSendData_(0)
00053   , wlSendDqm_(0)
00054   , asSendDqm_(0)
00055   , wlDiscard_(0)
00056   , asDiscard_(0)
00057   , shmBuffer_(0)
00058   , nbDqmCells_(nbDqmCells)
00059   , nbRawCells_(nbRawCells)
00060   , nbRecoCells_(nbRecoCells)
00061   , acceptSMDataDiscard_(0)
00062   , acceptSMDqmDiscard_(0)
00063   , doCrcCheck_(1)
00064   , shutdownTimeout_(timeout)
00065   , nbPending_(0)
00066   , nbClientsToShutDown_(0)
00067   , isReadyToShutDown_(true)
00068   , isActive_(false)
00069   , isHalting_(false)
00070   , isStopping_(false)
00071   , runNumber_(0xffffffff)
00072   , lock_(toolbox::BSem::FULL)
00073   , frb_(frb)
00074   , app_(app)
00075 {
00076   initialize(segmentationMode,
00077              nbRawCells,nbRecoCells,nbDqmCells,
00078              rawCellSize,recoCellSize,dqmCellSize);
00079 }
00080 
00081 
00082 //______________________________________________________________________________
00083 FUResourceTable::~FUResourceTable()
00084 {
00085   clear();
00086   if(wlSendData_){
00087     wlSendData_->cancel();
00088     toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData","waiting");
00089   }
00090   if(wlSendDqm_){
00091     wlSendDqm_->cancel();
00092     toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm","waiting");
00093   }
00094   if(wlDiscard_){
00095     wlDiscard_->cancel();
00096     toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard","waiting");
00097   }
00098   shmdt(shmBuffer_);
00099   if (FUShmBuffer::releaseSharedMemory())
00100     LOG4CPLUS_INFO(log_,"SHARED MEMORY SUCCESSFULLY RELEASED.");
00101   if (0!=acceptSMDataDiscard_) delete [] acceptSMDataDiscard_;
00102   if (0!= acceptSMDqmDiscard_) delete [] acceptSMDqmDiscard_;
00103 }
00104 
00105 
00107 // implementation of member functions
00109 
00110 //______________________________________________________________________________
00111 void FUResourceTable::initialize(bool   segmentationMode,
00112                                  UInt_t nbRawCells,
00113                                  UInt_t nbRecoCells,
00114                                  UInt_t nbDqmCells,
00115                                  UInt_t rawCellSize,
00116                                  UInt_t recoCellSize,
00117                                  UInt_t dqmCellSize)
00118   throw (evf::Exception)
00119 {
00120   clear();
00121   
00122   shmBuffer_=FUShmBuffer::createShmBuffer(segmentationMode,
00123                                           nbRawCells,nbRecoCells,nbDqmCells,
00124                                           rawCellSize,recoCellSize,dqmCellSize);
00125   if (0==shmBuffer_) {
00126     string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00127     LOG4CPLUS_FATAL(log_,msg);
00128     XCEPT_RAISE(evf::Exception,msg);
00129   }
00130   
00131   for (UInt_t i=0;i<nbRawCells_;i++) {
00132     resources_.push_back(new FUResource(i,log_,frb_,app_));
00133     freeResourceIds_.push(i);
00134   }
00135 
00136   acceptSMDataDiscard_ = new bool[nbRecoCells];
00137   acceptSMDqmDiscard_  = new int[nbDqmCells];
00138   
00139   resetCounters();
00140 }
00141 
00142 
00143 //______________________________________________________________________________
00144 void FUResourceTable::startSendDataWorkLoop() throw (evf::Exception)
00145 {
00146   try {
00147     wlSendData_=
00148       toolbox::task::getWorkLoopFactory()->getWorkLoop("SendData","waiting");
00149     if (!wlSendData_->isActive()) wlSendData_->activate();
00150     asSendData_=toolbox::task::bind(this,&FUResourceTable::sendData,"SendData");
00151     wlSendData_->submit(asSendData_);
00152   }
00153   catch (xcept::Exception& e) {
00154     string msg = "Failed to start workloop 'SendData'.";
00155     XCEPT_RETHROW(evf::Exception,msg,e);
00156   }
00157 }
00158 
00159 
00160 //______________________________________________________________________________
00161 bool FUResourceTable::sendData(toolbox::task::WorkLoop* /* wl */)
00162 {
00163   bool reschedule=true;
00164 
00165   FUShmRecoCell* cell=shmBuffer_->recoCellToRead();
00166   
00167   if (0==cell->eventSize()) {
00168     LOG4CPLUS_INFO(log_,"Don't reschedule sendData workloop.");
00169     UInt_t cellIndex=cell->index();
00170     shmBuffer_->finishReadingRecoCell(cell);
00171     shmBuffer_->discardRecoCell(cellIndex);
00172     reschedule=false;
00173   }
00174   else if (isHalting_) {
00175     LOG4CPLUS_INFO(log_,"sendData: isHalting, discard recoCell.");
00176     UInt_t cellIndex=cell->index();
00177     shmBuffer_->finishReadingRecoCell(cell);
00178     shmBuffer_->discardRecoCell(cellIndex);
00179   }
00180   else {
00181     try {
00182       if (cell->type()==0) {
00183         UInt_t   cellIndex       = cell->index();
00184         UInt_t   cellOutModId    = cell->outModId();
00185         UInt_t   cellFUProcId    = cell->fuProcessId();
00186         UInt_t   cellFUGuid      = cell->fuGuid();
00187         UChar_t* cellPayloadAddr = cell->payloadAddr();
00188         UInt_t   cellEventSize   = cell->eventSize();
00189         shmBuffer_->finishReadingRecoCell(cell);
00190 
00191         lock();
00192         nbPendingSMDiscards_++;
00193         unlock();
00194 
00195         sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
00196                         cellPayloadAddr,cellEventSize);
00197       }
00198       else if (cell->type()==1) {
00199         UInt_t   cellIndex       = cell->index();
00200         UInt_t   cellRawIndex    = cell->rawCellIndex();
00201         UInt_t   cellRunNumber   = cell->runNumber();
00202         UInt_t   cellEvtNumber   = cell->evtNumber();
00203         UInt_t   cellOutModId    = cell->outModId();
00204         UInt_t   cellFUProcId    = cell->fuProcessId();
00205         UInt_t   cellFUGuid      = cell->fuGuid();
00206         UChar_t *cellPayloadAddr = cell->payloadAddr();
00207         UInt_t   cellEventSize   = cell->eventSize();
00208         shmBuffer_->finishReadingRecoCell(cell);        
00209 
00210         lock();
00211         nbPendingSMDiscards_++;
00212         resources_[cellRawIndex]->incNbSent();
00213         if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
00214         unlock();
00215 
00216         sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
00217                       cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00218       }
00219       else if (cell->type()==2) {
00220         UInt_t   cellIndex       = cell->index();
00221         UInt_t   cellRawIndex    = cell->rawCellIndex();
00222         //UInt_t   cellRunNumber   = cell->runNumber();
00223         UInt_t   cellEvtNumber   = cell->evtNumber();
00224         UInt_t   cellFUProcId    = cell->fuProcessId();
00225         UInt_t   cellFUGuid      = cell->fuGuid();
00226         UChar_t *cellPayloadAddr = cell->payloadAddr();
00227         UInt_t   cellEventSize   = cell->eventSize();
00228         shmBuffer_->finishReadingRecoCell(cell);        
00229 
00230         lock();
00231         nbPendingSMDiscards_++;
00232         resources_[cellRawIndex]->incNbSent();
00233         if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
00234         unlock();
00235         
00236         sendErrorEvent(cellIndex,runNumber_,cellEvtNumber,
00237                        cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00238       }
00239       else {
00240         string errmsg="Unknown RecoCell type (neither INIT/DATA/ERROR).";
00241         XCEPT_RAISE(evf::Exception,errmsg);
00242       }
00243     }
00244     catch (xcept::Exception& e) {
00245       LOG4CPLUS_FATAL(log_,"Failed to send EVENT DATA to StorageManager: "
00246                       <<xcept::stdformat_exception_history(e));
00247       reschedule=false;
00248     }
00249   }
00250   
00251   return reschedule;
00252 }
00253 
00254 
00255 //______________________________________________________________________________
00256 void FUResourceTable::startSendDqmWorkLoop() throw (evf::Exception)
00257 {
00258   try {
00259     wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop("SendDqm","waiting");
00260     if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
00261     asSendDqm_=toolbox::task::bind(this,&FUResourceTable::sendDqm,"SendDqm");
00262     wlSendDqm_->submit(asSendDqm_);
00263   }
00264   catch (xcept::Exception& e) {
00265     string msg = "Failed to start workloop 'SendDqm'.";
00266     XCEPT_RETHROW(evf::Exception,msg,e);
00267   }
00268 }
00269 
00270 
00271 //______________________________________________________________________________
00272 bool FUResourceTable::sendDqm(toolbox::task::WorkLoop* /* wl */)
00273 {
00274   bool reschedule=true;
00275   
00276   FUShmDqmCell* cell=shmBuffer_->dqmCellToRead();
00277   dqm::State_t  state=shmBuffer_->dqmState(cell->index());
00278   
00279   if (state==dqm::EMPTY) {
00280     LOG4CPLUS_WARN(log_,"Don't reschedule sendDqm workloop.");
00281     std::cout << "shut down dqm workloop " << std::endl;
00282     UInt_t cellIndex=cell->index();
00283     shmBuffer_->finishReadingDqmCell(cell);
00284     shmBuffer_->discardDqmCell(cellIndex);
00285     reschedule=false;
00286   }
00287   else if (isHalting_) {
00288     UInt_t cellIndex=cell->index();
00289     shmBuffer_->finishReadingDqmCell(cell);
00290     shmBuffer_->discardDqmCell(cellIndex);
00291   }
00292   else {
00293     try {
00294       UInt_t   cellIndex       = cell->index();
00295       UInt_t   cellRunNumber   = cell->runNumber();
00296       UInt_t   cellEvtAtUpdate = cell->evtAtUpdate();
00297       UInt_t   cellFolderId    = cell->folderId();
00298       UInt_t   cellFUProcId    = cell->fuProcessId();
00299       UInt_t   cellFUGuid      = cell->fuGuid();
00300       UChar_t *cellPayloadAddr = cell->payloadAddr();
00301       UInt_t   cellEventSize   = cell->eventSize();
00302       sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
00303                    cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00304       shmBuffer_->finishReadingDqmCell(cell);
00305     }
00306     catch (xcept::Exception& e) {
00307       LOG4CPLUS_FATAL(log_,"Failed to send DQM DATA to StorageManager: "
00308                       <<xcept::stdformat_exception_history(e));
00309       reschedule=false;
00310     }
00311   }
00312   
00313   return reschedule;
00314 }
00315 
00316 
00317 //______________________________________________________________________________
00318 void FUResourceTable::startDiscardWorkLoop() throw (evf::Exception)
00319 {
00320   try {
00321     LOG4CPLUS_INFO(log_,"Start 'discard' workloop.");
00322     wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop("Discard","waiting");
00323     if (!wlDiscard_->isActive()) wlDiscard_->activate();
00324     asDiscard_=toolbox::task::bind(this,&FUResourceTable::discard,"Discard");
00325     wlDiscard_->submit(asDiscard_);
00326     isActive_=true;
00327   }
00328   catch (xcept::Exception& e) {
00329     string msg = "Failed to start workloop 'Discard'.";
00330     XCEPT_RETHROW(evf::Exception,msg,e);
00331   }
00332   isReadyToShutDown_=false;
00333 }
00334 
00335 
00336 //______________________________________________________________________________
00337 bool FUResourceTable::discard(toolbox::task::WorkLoop* /* wl */)
00338 {
00339   FUShmRawCell* cell =shmBuffer_->rawCellToDiscard();
00340   evt::State_t  state=shmBuffer_->evtState(cell->index());
00341 
00342   bool   reschedule  =true;
00343   bool   shutDown    =(state==evt::STOP);
00344   bool   isLumi      =(state==evt::LUMISECTION);
00345   UInt_t fuResourceId=cell->fuResourceId();
00346   UInt_t buResourceId=cell->buResourceId();
00347 
00348   //  std::cout << "discard loop, state, shutDown, isLumi " << state << " " 
00349   //        << shutDown << " " << isLumi << std::endl;
00350   //  std::cout << "resource ids " << fuResourceId << " " << buResourceId << std::endl;
00351 
00352   if (shutDown) {
00353     LOG4CPLUS_INFO(log_,"nbClientsToShutDown = "<<nbClientsToShutDown_);
00354     if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
00355     if (nbClientsToShutDown_==0) {
00356       LOG4CPLUS_INFO(log_,"Don't reschedule discard-workloop.");
00357       isActive_ =false;
00358       reschedule=false;
00359     }
00360   }
00361   
00362   shmBuffer_->discardRawCell(cell);
00363   
00364   if (!shutDown && !isLumi) {
00365     resources_[fuResourceId]->release();
00366     lock();
00367     freeResourceIds_.push(fuResourceId);
00368     assert(freeResourceIds_.size()<=resources_.size());
00369     unlock();
00370     
00371     if (!isHalting_) {
00372       sendDiscard(buResourceId);
00373       if(!isStopping_)sendAllocate();
00374     }
00375   }
00376   
00377   if (!reschedule) {
00378     std::cout << " entered shutdown cycle " << std::endl;
00379     shmBuffer_->writeRecoEmptyEvent();
00380     UInt_t count=0;
00381     while (count<100) {
00382       std::cout << " shutdown cycle " <<shmBuffer_->nClients() << " "
00383                 <<  FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00384       if (shmBuffer_->nClients()==0&&
00385           FUShmBuffer::shm_nattch(shmBuffer_->shmid())==1) {
00386         //      isReadyToShutDown_ = true;
00387         break;
00388       }
00389       else {
00390         count++;
00391         std::cout << " shutdown cycle attempt " << count << std::endl;
00392         LOG4CPLUS_DEBUG(log_,"FUResourceTable: Wait for all clients to detach,"
00393                         <<" nClients="<<shmBuffer_->nClients()
00394                         <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00395                         <<" ("<<count<<")");
00396         ::usleep(shutdownTimeout_);
00397         if(count*shutdownTimeout_ > 10000000)
00398           LOG4CPLUS_WARN(log_,"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00399                           <<" nClients="<<shmBuffer_->nClients()
00400                           <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00401                           <<" ("<<count<<")");
00402 
00403       }
00404     }
00405     bool allEmpty = false;
00406     std::cout << "Checking if all dqm cells are empty " << std::endl;
00407     while(!allEmpty){
00408       UInt_t n=nbDqmCells_;
00409       allEmpty = true;
00410       shmBuffer_->lock();
00411       for (UInt_t i=0;i<n;i++) {
00412         dqm::State_t state=shmBuffer_->dqmState(i);
00413         if(state!=dqm::EMPTY) allEmpty = false; 
00414       } 
00415       shmBuffer_->unlock();
00416     }
00417     std::cout << "Making sure there are no dqm pending discards " << std::endl;
00418     if(nbPendingSMDqmDiscards_ != 0)
00419       {
00420         LOG4CPLUS_WARN(log_,"FUResourceTable: pending DQM discards not zero: ="
00421                        << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start ");
00422 
00423       }
00424     shmBuffer_->writeDqmEmptyEvent();
00425     isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the 
00426                                // sendDqm loop has been shut down as well
00427   }
00428   
00429   return reschedule;
00430 }
00431 
00432 
00433 
00434 //______________________________________________________________________________
00435 UInt_t FUResourceTable::allocateResource()
00436 {
00437   assert(!freeResourceIds_.empty());
00438 
00439   lock();
00440   UInt_t fuResourceId=freeResourceIds_.front();
00441   freeResourceIds_.pop();
00442   nbPending_++;
00443   nbAllocated_++;
00444   unlock();
00445   
00446   return fuResourceId;
00447 }
00448 
00449 
00450 //______________________________________________________________________________
00451 bool FUResourceTable::buildResource(MemRef_t* bufRef)
00452 {
00453   bool eventComplete=false;
00454   
00455   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
00456     (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
00457   
00458   UInt_t      fuResourceId=(UInt_t)block->fuTransactionId;
00459   UInt_t      buResourceId=(UInt_t)block->buResourceId;
00460   FUResource* resource    =resources_[fuResourceId];
00461   
00462   // allocate resource
00463   if (!resource->fatalError()&&!resource->isAllocated()) {
00464     FUShmRawCell* cell=shmBuffer_->rawCellToWrite();
00465     resource->allocate(cell);
00466     timeval now;
00467     gettimeofday(&now,0);
00468 
00469     frb_->setRBTimeStamp(((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00470 
00471     frb_->setRBEventCount(nbCompleted_);
00472 
00473     if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_)  resource->doCrcCheck(true);
00474     else                                             resource->doCrcCheck(false);
00475   }
00476   
00477   
00478   // keep building this resource if it is healthy
00479   if (!resource->fatalError()) {
00480     resource->process(bufRef);
00481     lock();
00482     nbErrors_   +=resource->nbErrors();
00483     nbCrcErrors_+=resource->nbCrcErrors();
00484     unlock();
00485         
00486     // make resource available for pick-up
00487     if (resource->isComplete()) {
00488       lock();
00489       nbCompleted_++;
00490       nbPending_--;
00491       unlock();
00492       if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
00493         dumpEvent(resource->shmCell());
00494       shmBuffer_->finishWritingRawCell(resource->shmCell());
00495       eventComplete=true;
00496     }
00497     
00498   }
00499   // bad event, release msg, and the whole resource if this was the last one
00500   //else {
00501   if (resource->fatalError()) {
00502     bool lastMsg=isLastMessageOfEvent(bufRef);
00503     if (lastMsg) {
00504       shmBuffer_->releaseRawCell(resource->shmCell());
00505       resource->release();
00506       lock();
00507       freeResourceIds_.push(fuResourceId);
00508       nbDiscarded_++;
00509       nbLost_++;
00510       nbPending_--;
00511       unlock();
00512       bu_->sendDiscard(buResourceId);
00513       sendAllocate();
00514     }
00515     bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed... 
00516   }
00517   
00518   return eventComplete;
00519 }
00520 
00521 
00522 //______________________________________________________________________________
00523 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef)
00524 {
00525   I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00526   msg=(I2O_FU_DATA_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00527   UInt_t recoIndex=msg->rbBufferID;
00528   
00529   if (acceptSMDataDiscard_[recoIndex]) {
00530     lock();
00531     nbPendingSMDiscards_--;
00532     unlock();
00533     acceptSMDataDiscard_[recoIndex] = false;
00534     
00535     if (!isHalting_) {
00536       shmBuffer_->discardRecoCell(recoIndex);
00537       bufRef->release();
00538     }
00539   }
00540   else {
00541     LOG4CPLUS_ERROR(log_,"Spurious DATA discard by StorageManager, skip!");
00542   }
00543   
00544   if (isHalting_) {
00545     bufRef->release();
00546     return false;
00547   }
00548   
00549   return true;
00550 }
00551 
00552 
00553 //______________________________________________________________________________
00554 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef)
00555 {
00556   I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00557   msg=(I2O_FU_DQM_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00558   UInt_t dqmIndex=msg->rbBufferID;
00559   unsigned int ntries = 0;
00560   while(shmBuffer_->dqmState(dqmIndex)!=dqm::SENT){
00561     LOG4CPLUS_WARN(log_,"DQM discard for cell "<< dqmIndex << " which is not yer in SENT state - waiting");
00562     ::usleep(10000);
00563     if(ntries++>10){
00564       LOG4CPLUS_ERROR(log_,"DQM cell " << dqmIndex 
00565                       << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
00566       bufRef->release();
00567       return true;
00568     }
00569   }
00570   if (acceptSMDqmDiscard_[dqmIndex]>0) {
00571     acceptSMDqmDiscard_[dqmIndex]--;
00572     if(nbPendingSMDqmDiscards_>0){
00573       nbPendingSMDqmDiscards_--;
00574     }
00575     else {
00576       LOG4CPLUS_WARN(log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex 
00577                      << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
00578     }
00579     
00580     if (!isHalting_) {
00581       shmBuffer_->discardDqmCell(dqmIndex);
00582       bufRef->release();
00583     }
00584 
00585   }
00586   else {
00587     LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex 
00588                     << " from StorageManager while cell is not accepting discards");
00589   }
00590   
00591   if (isHalting_) {
00592     bufRef->release();
00593     return false;
00594   }
00595   
00596   return true;
00597 }
00598 
00599 
00600 //______________________________________________________________________________
00601 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef)
00602 {
00603   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg = 
00604     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00605   //make sure to fill up the shmem so no process will miss it
00606   // but processes will have to handle duplicates
00607 
00608   for(unsigned int i = 0; i < nbRawCells_; i++) 
00609     shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00610 }
00611 
00612 
00613 //______________________________________________________________________________
00614 void FUResourceTable::dropEvent()
00615 {
00616   FUShmRawCell* cell=shmBuffer_->rawCellToRead();
00617   UInt_t fuResourceId=cell->fuResourceId();
00618   shmBuffer_->finishReadingRawCell(cell);
00619   shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00620 }
00621 
00622 
00623 //______________________________________________________________________________
00624 bool FUResourceTable::handleCrashedEP(UInt_t runNumber,pid_t pid)
00625 {
00626   bool retval = false;
00627   vector<pid_t> pids=cellPrcIds();
00628   UInt_t iRawCell=pids.size();
00629   for (UInt_t i=0;i<pids.size();i++) { if (pid==pids[i]) { iRawCell=i; break; } }
00630   
00631   if (iRawCell<pids.size()){
00632     shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
00633     retval = true;
00634   }
00635   else
00636     LOG4CPLUS_WARN(log_,"No raw data to send to error stream for process " << pid);
00637   shmBuffer_->removeClientPrcId(pid);
00638   return retval;
00639 }
00640 
00641 
00642 //______________________________________________________________________________
00643 void FUResourceTable::dumpEvent(FUShmRawCell* cell)
00644 {
00645   ostringstream oss; oss<<"/tmp/evt"<<cell->evtNumber()<<".dump";
00646   ofstream fout(oss.str().c_str());
00647   fout.fill('0');
00648 
00649   fout<<"#\n# evt "<<cell->evtNumber()<<"\n#\n"<<endl;
00650   for (unsigned int i=0;i<cell->nFed();i++) {
00651     if (cell->fedSize(i)==0) continue;
00652     fout<<"# fedid "<<i<<endl;
00653     unsigned char* addr=cell->fedAddr(i);
00654     for (unsigned int j=0;j<cell->fedSize(i);j++) {
00655       fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
00656       if ((j+1)%8) fout<<" "; else fout<<endl;
00657       ++addr;
00658     }
00659     fout<<endl;
00660   }
00661   fout.close();
00662 }
00663 
00664 
00665 //______________________________________________________________________________
00666 void FUResourceTable::stop()
00667 {
00668   isStopping_ = true;
00669   shutDownClients();
00670 }
00671 
00672 
00673 //______________________________________________________________________________
00674 void FUResourceTable::halt()
00675 {
00676   isHalting_=true;
00677   shutDownClients();
00678 }
00679 
00680 
00681 //______________________________________________________________________________
00682 void FUResourceTable::shutDownClients()
00683 {
00684   nbClientsToShutDown_ = nbClients();
00685   isReadyToShutDown_   = false;
00686   
00687   if (nbClientsToShutDown_==0) {
00688     LOG4CPLUS_INFO(log_,"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
00689     UInt_t n=nbResources();
00690     for (UInt_t i=0;i<n;i++) {
00691       evt::State_t state=shmBuffer_->evtState(i);
00692       if (state!=evt::EMPTY){
00693         LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state " 
00694                        << state);
00695         shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00696       }
00697     }
00698     shmBuffer_->scheduleRawEmptyCellForDiscard();
00699   }
00700   else {
00701     UInt_t n=nbClientsToShutDown_;
00702     for (UInt_t i=0;i<n;++i) shmBuffer_->writeRawEmptyEvent();
00703   }
00704 }
00705 
00706 
00707 //______________________________________________________________________________
00708 void FUResourceTable::clear()
00709 {
00710   for (UInt_t i=0;i<resources_.size();i++) {
00711     resources_[i]->release();
00712     delete resources_[i];
00713   }
00714   resources_.clear();
00715   while (!freeResourceIds_.empty()) freeResourceIds_.pop();
00716 }
00717 
00718 
00719 //______________________________________________________________________________
00720 void FUResourceTable::resetCounters()
00721 {
00722   if (0!=shmBuffer_) {
00723     for (UInt_t i=0;i<shmBuffer_->nRecoCells();i++) acceptSMDataDiscard_[i]= false;
00724     for (UInt_t i=0;i<shmBuffer_->nDqmCells();i++)  acceptSMDqmDiscard_[i] = 0;
00725   }
00726   
00727   nbAllocated_           =nbPending_;
00728   nbCompleted_           =0;
00729   nbSent_                =0;
00730   nbSentError_           =0;
00731   nbSentDqm_             =0;
00732   nbPendingSMDiscards_   =0;
00733   nbPendingSMDqmDiscards_=0;
00734   nbDiscarded_           =0;
00735   nbLost_                =0;
00736 
00737   nbErrors_              =0;
00738   nbCrcErrors_           =0;
00739   nbAllocSent_           =0;
00740 
00741   sumOfSquares_          =0;
00742   sumOfSizes_            =0;
00743   isStopping_            =false;
00744 }
00745 
00746 
00747 //______________________________________________________________________________
00748 UInt_t FUResourceTable::nbClients() const
00749 {
00750   UInt_t result(0);
00751   if (0!=shmBuffer_) result=shmBuffer_->nClients();
00752   return result;
00753 }
00754 
00755 
00756 //______________________________________________________________________________
00757 vector<pid_t> FUResourceTable::clientPrcIds() const
00758 {
00759   vector<pid_t> result;
00760   if (0!=shmBuffer_) {
00761     UInt_t n=nbClients();
00762     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->clientPrcId(i));
00763   }
00764   return result;
00765 }
00766 
00767 
00768 //______________________________________________________________________________
00769 string FUResourceTable::clientPrcIdsAsString() const
00770 {
00771   stringstream ss;
00772   if (0!=shmBuffer_) {
00773     UInt_t n=nbClients();
00774     for (UInt_t i=0;i<n;i++) {
00775       if (i>0) ss<<",";
00776       ss<<shmBuffer_->clientPrcId(i);
00777     }
00778   }
00779   return ss.str();
00780 }
00781 
00782 
00783 //______________________________________________________________________________
00784 vector<string> FUResourceTable::cellStates() const
00785 {
00786   vector<string> result;
00787   if (0!=shmBuffer_) {
00788     UInt_t n=nbResources();
00789     shmBuffer_->lock();
00790     for (UInt_t i=0;i<n;i++) {
00791       evt::State_t state=shmBuffer_->evtState(i);
00792       if      (state==evt::EMPTY)      result.push_back("EMPTY");
00793       else if (state==evt::STOP)       result.push_back("STOP");
00794       else if (state==evt::RAWWRITING) result.push_back("RAWWRITING");
00795       else if (state==evt::RAWWRITTEN) result.push_back("RAWWRITTEN");
00796       else if (state==evt::RAWREADING) result.push_back("RAWREADING");
00797       else if (state==evt::RAWREAD)    result.push_back("RAWREAD");
00798       else if (state==evt::PROCESSING) result.push_back("PROCESSING");
00799       else if (state==evt::PROCESSED)  result.push_back("PROCESSED");
00800       else if (state==evt::RECOWRITING)result.push_back("RECOWRITING");
00801       else if (state==evt::RECOWRITTEN)result.push_back("RECOWRITTEN");
00802       else if (state==evt::SENDING)    result.push_back("SENDING");
00803       else if (state==evt::SENT)       result.push_back("SENT");
00804       else if (state==evt::DISCARDING) result.push_back("DISCARDING");
00805     }
00806     shmBuffer_->unlock();
00807   }
00808   return result;
00809 }
00810 
00811 vector<string> FUResourceTable::dqmCellStates() const
00812 {
00813   vector<string> result;
00814   if (0!=shmBuffer_) {
00815     UInt_t n=nbDqmCells_;
00816     shmBuffer_->lock();
00817     for (UInt_t i=0;i<n;i++) {
00818       dqm::State_t state=shmBuffer_->dqmState(i);
00819       if      (state==dqm::EMPTY)      result.push_back("EMPTY");
00820       else if (state==dqm::WRITING) result.push_back("WRITING");
00821       else if (state==dqm::WRITTEN) result.push_back("WRITTEN");
00822       else if (state==dqm::SENDING)    result.push_back("SENDING");
00823       else if (state==dqm::SENT)       result.push_back("SENT");
00824       else if (state==dqm::DISCARDING) result.push_back("DISCARDING");
00825     }
00826     shmBuffer_->unlock();
00827   }
00828   return result;
00829 }
00830 
00831 
00832 //______________________________________________________________________________
00833 vector<UInt_t> FUResourceTable::cellEvtNumbers() const
00834 {
00835   vector<UInt_t> result;
00836   if (0!=shmBuffer_) {
00837     UInt_t n=nbResources();
00838     shmBuffer_->lock();
00839     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtNumber(i));
00840     shmBuffer_->unlock();
00841   }
00842   return result;
00843 }
00844 
00845 
00846 //______________________________________________________________________________
00847 vector<pid_t> FUResourceTable::cellPrcIds() const
00848 {
00849   vector<pid_t> result;
00850   if (0!=shmBuffer_) {
00851     UInt_t n=nbResources();
00852     shmBuffer_->lock();
00853     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtPrcId(i));
00854     shmBuffer_->unlock();
00855   }
00856   return result;
00857 }
00858 
00859 
00860 //______________________________________________________________________________
00861 vector<time_t> FUResourceTable::cellTimeStamps() const
00862 {
00863   vector<time_t> result;
00864   if (0!=shmBuffer_) {
00865     UInt_t n=nbResources();
00866     shmBuffer_->lock();
00867     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtTimeStamp(i));
00868     shmBuffer_->unlock();
00869   }
00870   return result;
00871 }
00872 
00873 
00875 // implementation of private member functions
00877 
00878 //______________________________________________________________________________
00879 void FUResourceTable::sendAllocate()
00880 {
00881   UInt_t nbFreeSlots    = this->nbFreeSlots();
00882   UInt_t nbFreeSlotsMax = resources_.size()/2;
00883   if (nbFreeSlots>nbFreeSlotsMax) {
00884     UIntVec_t fuResourceIds;
00885     for (UInt_t i=0;i<nbFreeSlots;i++)
00886       fuResourceIds.push_back(allocateResource());
00887     bu_->sendAllocate(fuResourceIds);
00888     nbAllocSent_++;
00889   }
00890 }
00891 
00892 
00893 //______________________________________________________________________________
00894 void FUResourceTable::sendDiscard(UInt_t buResourceId)
00895 {
00896   bu_->sendDiscard(buResourceId);
00897   nbDiscarded_++;
00898 }
00899 
00900 
00901 //______________________________________________________________________________
00902 void FUResourceTable::sendInitMessage(UInt_t   fuResourceId,
00903                                       UInt_t   outModId,
00904                                       UInt_t   fuProcessId,
00905                                       UInt_t   fuGuid,
00906                                       UChar_t *data,
00907                                       UInt_t   dataSize)
00908 {
00909   if (0==sm_) {
00910     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP INIT MESSAGE!");
00911   }
00912   else {
00913     acceptSMDataDiscard_[fuResourceId] = true;
00914     UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
00915                                         fuGuid,data,dataSize);
00916     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00917     sumOfSizes_  +=nbBytes;
00918   }
00919 }
00920 
00921 
00922 //______________________________________________________________________________
00923 void FUResourceTable::sendDataEvent(UInt_t   fuResourceId,
00924                                     UInt_t   runNumber,
00925                                     UInt_t   evtNumber,
00926                                     UInt_t   outModId,
00927                                     UInt_t   fuProcessId,
00928                                     UInt_t   fuGuid,
00929                                     UChar_t *data,
00930                                     UInt_t   dataSize)
00931 {
00932   if (0==sm_) {
00933     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP DATA EVENT!");
00934   }
00935   else {
00936     acceptSMDataDiscard_[fuResourceId] = true;
00937     UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
00938                                       outModId,fuProcessId,fuGuid,
00939                                       data,dataSize);
00940     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00941     sumOfSizes_  +=nbBytes;
00942   }
00943 }
00944 
00945 
00946 //______________________________________________________________________________
00947 void FUResourceTable::sendErrorEvent(UInt_t   fuResourceId,
00948                                      UInt_t   runNumber,
00949                                      UInt_t   evtNumber,
00950                                      UInt_t   fuProcessId,
00951                                      UInt_t   fuGuid,
00952                                      UChar_t *data,
00953                                      UInt_t   dataSize)
00954 {
00955   if (0==sm_) {
00956     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP ERROR EVENT!");
00957   }
00958   else {
00959     acceptSMDataDiscard_[fuResourceId] = true;
00960     UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
00961                                        fuProcessId,fuGuid,data,dataSize);
00962     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00963     sumOfSizes_  +=nbBytes;
00964   }
00965 
00966 //   if (0!=shmBuffer_) {
00967 //     UInt_t n=nbDqmCells_;
00968 
00969 //     for (UInt_t i=0;i<n;i++) {
00970 //       if(shmBuffer_->dqmCell(i)->fuProcessId()==fuProcessId)
00971 //      {
00972 //        if(shmBuffer_->dqmState(i)!=dqm::SENT){
00973 //          shmBuffer_->setDqmState(i,dqm::SENT);       
00974 //          shmBuffer_->discardDqmCell(i);      
00975 //          acceptSMDqmDiscard_[i] = false;
00976 //        }  
00977 //      }
00978 //     }
00979 //     n=nbRecoCells_;
00980 //     for (UInt_t i=0;i<n;i++) {
00981 //       if(shmBuffer_->recoCell(i)->fuProcessId()==fuProcessId)
00982 //      {
00983 //        shmBuffer_->discardOrphanedRecoCell(i);  
00984 //      }
00985 //     }
00986 
00987 //   }
00988 }
00989 
00990 
00991 //______________________________________________________________________________
00992 void FUResourceTable::sendDqmEvent(UInt_t   fuDqmId,
00993                                    UInt_t   runNumber,
00994                                    UInt_t   evtAtUpdate,
00995                                    UInt_t   folderId,
00996                                    UInt_t   fuProcessId,
00997                                    UInt_t   fuGuid,
00998                                    UChar_t* data,
00999                                    UInt_t   dataSize)
01000 {
01001   if (0==sm_) {
01002     LOG4CPLUS_WARN(log_,"No StorageManager, DROP DQM EVENT.");
01003   }
01004   else {
01005     sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
01006                       fuProcessId,fuGuid,data,dataSize);
01007 
01008     nbPendingSMDqmDiscards_++;
01009 
01010     acceptSMDqmDiscard_[fuDqmId]++;
01011     if(acceptSMDqmDiscard_[fuDqmId]>1)
01012       LOG4CPLUS_WARN(log_,"DQM Cell " << fuDqmId << " being sent more than once for folder " 
01013                      << folderId << " process " << fuProcessId << " guid " << fuGuid);
01014     nbSentDqm_++;
01015   }
01016 }
01017 
01018 
01019 //______________________________________________________________________________
01020 bool FUResourceTable::isLastMessageOfEvent(MemRef_t* bufRef)
01021 {
01022   while (0!=bufRef->getNextReference()) bufRef=bufRef->getNextReference();
01023   
01024   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
01025     (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
01026   
01027   UInt_t iBlock    =block->blockNb;
01028   UInt_t nBlock    =block->nbBlocksInSuperFragment;
01029   UInt_t iSuperFrag=block->superFragmentNb;
01030   UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
01031 
01032   return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
01033 }
01034 
01035 //______________________________________________________________________________
01036 void FUResourceTable::injectCRCError()
01037 {
01038   for (UInt_t i=0;i<resources_.size();i++) {
01039     resources_[i]->scheduleCRCError();
01040   }
01041 }
01042 void FUResourceTable::printWorkLoopStatus()
01043 {
01044   std::cout << "Workloop status===============" << std::endl;
01045   std::cout << "==============================" << std::endl;
01046   if(wlSendData_!=0)
01047     std::cout << "SendData -> " << wlSendData_->isActive() << std::endl;
01048   if(wlSendDqm_!=0)
01049     std::cout << "SendDqm  -> " << wlSendDqm_->isActive()  << std::endl;
01050   if(wlDiscard_!=0)
01051     std::cout << "Discard  -> " << wlDiscard_->isActive()  << std::endl;
01052   std::cout << "Workloops Active  -> " << isActive_  << std::endl;
01053 
01054 }
01055 
01056 
01057 void FUResourceTable::lastResort()
01058 {
01059   std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() 
01060             << " more rawcells to read " << std::endl;
01061   while(shmBuffer_->nbRawCellsToRead()!=0){
01062     FUShmRawCell* newCell=shmBuffer_->rawCellToRead();
01063     std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
01064     shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell);
01065     std::cout << "lastResort: schedule raw cell for discard" << std::endl;
01066   }
01067 }