CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/ResourceBroker/src/FUResourceTable.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceTable
00004 // ---------------
00005 //
00006 //            12/10/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "FWCore/Utilities/interface/CRC16.h"
00012 #include "EvffedFillerRB.h"
00013 
00014 #include "toolbox/task/WorkLoopFactory.h"
00015 #include "interface/evb/i2oEVBMsgs.h"
00016 #include "xcept/tools.h"
00017 
00018 
00019 #include <fstream>
00020 #include <sstream>
00021 #include <iomanip>
00022 #include <unistd.h>
00023 
00024 
00025 using namespace evf;
00026 using namespace std;
00027 
00028 
00030 // construction/destruction
00032 
00033 //______________________________________________________________________________
00034 FUResourceTable::FUResourceTable(bool              segmentationMode,
00035                                  UInt_t            nbRawCells,
00036                                  UInt_t            nbRecoCells,
00037                                  UInt_t            nbDqmCells,
00038                                  UInt_t            rawCellSize,
00039                                  UInt_t            recoCellSize,
00040                                  UInt_t            dqmCellSize,
00041                                  BUProxy          *bu,
00042                                  SMProxy          *sm,
00043                                  log4cplus::Logger logger,
00044                                  unsigned int      timeout,
00045                                  EvffedFillerRB   *frb,
00046                                  xdaq::Application*app)
00047   throw (evf::Exception)
00048   : bu_(bu)
00049   , sm_(sm)
00050   , log_(logger)
00051   , wlSendData_(0)
00052   , asSendData_(0)
00053   , wlSendDqm_(0)
00054   , asSendDqm_(0)
00055   , wlDiscard_(0)
00056   , asDiscard_(0)
00057   , shmBuffer_(0)
00058   , nbDqmCells_(nbDqmCells)
00059   , nbRawCells_(nbRawCells)
00060   , nbRecoCells_(nbRecoCells)
00061   , acceptSMDataDiscard_(0)
00062   , acceptSMDqmDiscard_(0)
00063   , doCrcCheck_(1)
00064   , shutdownTimeout_(timeout)
00065   , nbPending_(0)
00066   , nbClientsToShutDown_(0)
00067   , isReadyToShutDown_(true)
00068   , isActive_(false)
00069   , isHalting_(false)
00070   , isStopping_(false)
00071   , runNumber_(0xffffffff)
00072   , lock_(toolbox::BSem::FULL)
00073   , frb_(frb)
00074   , app_(app)
00075 {
00076   initialize(segmentationMode,
00077              nbRawCells,nbRecoCells,nbDqmCells,
00078              rawCellSize,recoCellSize,dqmCellSize);
00079 }
00080 
00081 
00082 //______________________________________________________________________________
00083 FUResourceTable::~FUResourceTable()
00084 {
00085   clear();
00086   if(wlSendData_){
00087     wlSendData_->cancel();
00088     toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData","waiting");
00089   }
00090   if(wlSendDqm_){
00091     wlSendDqm_->cancel();
00092     toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm","waiting");
00093   }
00094   if(wlDiscard_){
00095     wlDiscard_->cancel();
00096     toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard","waiting");
00097   }
00098   shmdt(shmBuffer_);
00099   if (FUShmBuffer::releaseSharedMemory())
00100     LOG4CPLUS_INFO(log_,"SHARED MEMORY SUCCESSFULLY RELEASED.");
00101   if (0!=acceptSMDataDiscard_) delete [] acceptSMDataDiscard_;
00102   if (0!= acceptSMDqmDiscard_) delete [] acceptSMDqmDiscard_;
00103 }
00104 
00105 
00107 // implementation of member functions
00109 
00110 //______________________________________________________________________________
00111 void FUResourceTable::initialize(bool   segmentationMode,
00112                                  UInt_t nbRawCells,
00113                                  UInt_t nbRecoCells,
00114                                  UInt_t nbDqmCells,
00115                                  UInt_t rawCellSize,
00116                                  UInt_t recoCellSize,
00117                                  UInt_t dqmCellSize)
00118   throw (evf::Exception)
00119 {
00120   clear();
00121   
00122   shmBuffer_=FUShmBuffer::createShmBuffer(segmentationMode,
00123                                           nbRawCells,nbRecoCells,nbDqmCells,
00124                                           rawCellSize,recoCellSize,dqmCellSize);
00125   if (0==shmBuffer_) {
00126     string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00127     LOG4CPLUS_FATAL(log_,msg);
00128     XCEPT_RAISE(evf::Exception,msg);
00129   }
00130   
00131   for (UInt_t i=0;i<nbRawCells_;i++) {
00132     resources_.push_back(new FUResource(i,log_,frb_,app_));
00133     freeResourceIds_.push(i);
00134   }
00135 
00136   acceptSMDataDiscard_ = new bool[nbRecoCells];
00137   acceptSMDqmDiscard_  = new int[nbDqmCells];
00138   
00139   resetCounters();
00140 }
00141 
00142 
00143 //______________________________________________________________________________
00144 void FUResourceTable::startSendDataWorkLoop() throw (evf::Exception)
00145 {
00146   try {
00147     wlSendData_=
00148       toolbox::task::getWorkLoopFactory()->getWorkLoop("SendData","waiting");
00149     if (!wlSendData_->isActive()) wlSendData_->activate();
00150     asSendData_=toolbox::task::bind(this,&FUResourceTable::sendData,"SendData");
00151     wlSendData_->submit(asSendData_);
00152   }
00153   catch (xcept::Exception& e) {
00154     string msg = "Failed to start workloop 'SendData'.";
00155     XCEPT_RETHROW(evf::Exception,msg,e);
00156   }
00157 }
00158 
00159 
00160 //______________________________________________________________________________
00161 bool FUResourceTable::sendData(toolbox::task::WorkLoop* /* wl */)
00162 {
00163   bool reschedule=true;
00164 
00165   FUShmRecoCell* cell=shmBuffer_->recoCellToRead();
00166   
00167   if (0==cell->eventSize()) {
00168     LOG4CPLUS_INFO(log_,"Don't reschedule sendData workloop.");
00169     UInt_t cellIndex=cell->index();
00170     shmBuffer_->finishReadingRecoCell(cell);
00171     shmBuffer_->discardRecoCell(cellIndex);
00172     reschedule=false;
00173   }
00174   else if (isHalting_) {
00175     LOG4CPLUS_INFO(log_,"sendData: isHalting, discard recoCell.");
00176     UInt_t cellIndex=cell->index();
00177     shmBuffer_->finishReadingRecoCell(cell);
00178     shmBuffer_->discardRecoCell(cellIndex);
00179   }
00180   else {
00181     try {
00182       if (cell->type()==0) {
00183         UInt_t   cellIndex       = cell->index();
00184         UInt_t   cellOutModId    = cell->outModId();
00185         UInt_t   cellFUProcId    = cell->fuProcessId();
00186         UInt_t   cellFUGuid      = cell->fuGuid();
00187         UChar_t* cellPayloadAddr = cell->payloadAddr();
00188         UInt_t   cellEventSize   = cell->eventSize();
00189         shmBuffer_->finishReadingRecoCell(cell);
00190 
00191         lock();
00192         nbPendingSMDiscards_++;
00193         unlock();
00194 
00195         sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
00196                         cellPayloadAddr,cellEventSize);
00197       }
00198       else if (cell->type()==1) {
00199         UInt_t   cellIndex       = cell->index();
00200         UInt_t   cellRawIndex    = cell->rawCellIndex();
00201         UInt_t   cellRunNumber   = cell->runNumber();
00202         UInt_t   cellEvtNumber   = cell->evtNumber();
00203         UInt_t   cellOutModId    = cell->outModId();
00204         UInt_t   cellFUProcId    = cell->fuProcessId();
00205         UInt_t   cellFUGuid      = cell->fuGuid();
00206         UChar_t *cellPayloadAddr = cell->payloadAddr();
00207         UInt_t   cellEventSize   = cell->eventSize();
00208         shmBuffer_->finishReadingRecoCell(cell);        
00209 
00210         lock();
00211         nbPendingSMDiscards_++;
00212         resources_[cellRawIndex]->incNbSent();
00213         if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
00214         unlock();
00215 
00216         sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
00217                       cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00218       }
00219       else if (cell->type()==2) {
00220         UInt_t   cellIndex       = cell->index();
00221         UInt_t   cellRawIndex    = cell->rawCellIndex();
00222         //UInt_t   cellRunNumber   = cell->runNumber();
00223         UInt_t   cellEvtNumber   = cell->evtNumber();
00224         UInt_t   cellFUProcId    = cell->fuProcessId();
00225         UInt_t   cellFUGuid      = cell->fuGuid();
00226         UChar_t *cellPayloadAddr = cell->payloadAddr();
00227         UInt_t   cellEventSize   = cell->eventSize();
00228         shmBuffer_->finishReadingRecoCell(cell);        
00229 
00230         lock();
00231         nbPendingSMDiscards_++;
00232         resources_[cellRawIndex]->incNbSent();
00233         if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
00234         unlock();
00235         
00236         sendErrorEvent(cellIndex,runNumber_,cellEvtNumber,
00237                        cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00238       }
00239       else {
00240         string errmsg="Unknown RecoCell type (neither INIT/DATA/ERROR).";
00241         XCEPT_RAISE(evf::Exception,errmsg);
00242       }
00243     }
00244     catch (xcept::Exception& e) {
00245       LOG4CPLUS_FATAL(log_,"Failed to send EVENT DATA to StorageManager: "
00246                       <<xcept::stdformat_exception_history(e));
00247       reschedule=false;
00248     }
00249   }
00250   
00251   return reschedule;
00252 }
00253 
00254 
00255 //______________________________________________________________________________
00256 void FUResourceTable::startSendDqmWorkLoop() throw (evf::Exception)
00257 {
00258   try {
00259     wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop("SendDqm","waiting");
00260     if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
00261     asSendDqm_=toolbox::task::bind(this,&FUResourceTable::sendDqm,"SendDqm");
00262     wlSendDqm_->submit(asSendDqm_);
00263   }
00264   catch (xcept::Exception& e) {
00265     string msg = "Failed to start workloop 'SendDqm'.";
00266     XCEPT_RETHROW(evf::Exception,msg,e);
00267   }
00268 }
00269 
00270 
00271 //______________________________________________________________________________
00272 bool FUResourceTable::sendDqm(toolbox::task::WorkLoop* /* wl */)
00273 {
00274   bool reschedule=true;
00275   
00276   FUShmDqmCell* cell=shmBuffer_->dqmCellToRead();
00277   dqm::State_t  state=shmBuffer_->dqmState(cell->index());
00278   
00279   if (state==dqm::EMPTY) {
00280     LOG4CPLUS_WARN(log_,"Don't reschedule sendDqm workloop.");
00281     std::cout << "shut down dqm workloop " << std::endl;
00282     UInt_t cellIndex=cell->index();
00283     shmBuffer_->finishReadingDqmCell(cell);
00284     shmBuffer_->discardDqmCell(cellIndex);
00285     reschedule=false;
00286   }
00287   else if (isHalting_) {
00288     UInt_t cellIndex=cell->index();
00289     shmBuffer_->finishReadingDqmCell(cell);
00290     shmBuffer_->discardDqmCell(cellIndex);
00291   }
00292   else {
00293     try {
00294       UInt_t   cellIndex       = cell->index();
00295       UInt_t   cellRunNumber   = cell->runNumber();
00296       UInt_t   cellEvtAtUpdate = cell->evtAtUpdate();
00297       UInt_t   cellFolderId    = cell->folderId();
00298       UInt_t   cellFUProcId    = cell->fuProcessId();
00299       UInt_t   cellFUGuid      = cell->fuGuid();
00300       UChar_t *cellPayloadAddr = cell->payloadAddr();
00301       UInt_t   cellEventSize   = cell->eventSize();
00302       sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
00303                    cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00304       shmBuffer_->finishReadingDqmCell(cell);
00305     }
00306     catch (xcept::Exception& e) {
00307       LOG4CPLUS_FATAL(log_,"Failed to send DQM DATA to StorageManager: "
00308                       <<xcept::stdformat_exception_history(e));
00309       reschedule=false;
00310     }
00311   }
00312   
00313   return reschedule;
00314 }
00315 
00316 
00317 //______________________________________________________________________________
00318 void FUResourceTable::startDiscardWorkLoop() throw (evf::Exception)
00319 {
00320   try {
00321     LOG4CPLUS_INFO(log_,"Start 'discard' workloop.");
00322     wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop("Discard","waiting");
00323     if (!wlDiscard_->isActive()) wlDiscard_->activate();
00324     asDiscard_=toolbox::task::bind(this,&FUResourceTable::discard,"Discard");
00325     wlDiscard_->submit(asDiscard_);
00326     isActive_=true;
00327   }
00328   catch (xcept::Exception& e) {
00329     string msg = "Failed to start workloop 'Discard'.";
00330     XCEPT_RETHROW(evf::Exception,msg,e);
00331   }
00332   isReadyToShutDown_=false;
00333 }
00334 
00335 
00336 //______________________________________________________________________________
00337 bool FUResourceTable::discard(toolbox::task::WorkLoop* /* wl */)
00338 {
00339   FUShmRawCell* cell =shmBuffer_->rawCellToDiscard();
00340   evt::State_t  state=shmBuffer_->evtState(cell->index());
00341 
00342   bool   reschedule  =true;
00343   bool   shutDown    =(state==evt::STOP);
00344   bool   isLumi      =(state==evt::USEDLS);
00345   UInt_t fuResourceId=cell->fuResourceId();
00346   UInt_t buResourceId=cell->buResourceId();
00347 
00348   //  std::cout << "discard loop, state, shutDown, isLumi " << state << " " 
00349   //        << shutDown << " " << isLumi << std::endl;
00350   //  std::cout << "resource ids " << fuResourceId << " " << buResourceId << std::endl;
00351 
00352   if (shutDown) {
00353     LOG4CPLUS_INFO(log_,"nbClientsToShutDown = "<<nbClientsToShutDown_);
00354     if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
00355     if (nbClientsToShutDown_==0) {
00356       LOG4CPLUS_INFO(log_,"Don't reschedule discard-workloop.");
00357       isActive_ =false;
00358       reschedule=false;
00359     }
00360   }
00361   
00362   shmBuffer_->discardRawCell(cell);
00363   if(isLumi) nbEolDiscarded_++;
00364 
00365   if (!shutDown && !isLumi) {
00366     if(fuResourceId >= nbResources()){
00367       LOG4CPLUS_WARN(log_,"cell " << cell->index() << " in state " << state 
00368                      << " scheduled for discard has no associated FU resource ");
00369     }
00370     else{
00371       resources_[fuResourceId]->release();
00372       lock();
00373       freeResourceIds_.push(fuResourceId);
00374       assert(freeResourceIds_.size()<=resources_.size());
00375       unlock();
00376     
00377       if (!isHalting_) {
00378         sendDiscard(buResourceId);
00379         if(!isStopping_)sendAllocate();
00380       }
00381     }
00382   }
00383   
00384   if (!reschedule) {
00385     std::cout << " entered shutdown cycle " << std::endl;
00386     shmBuffer_->writeRecoEmptyEvent();
00387     UInt_t count=0;
00388     while (count<100) {
00389       std::cout << " shutdown cycle " <<shmBuffer_->nClients() << " "
00390                 <<  FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00391       if (shmBuffer_->nClients()==0&&
00392           FUShmBuffer::shm_nattch(shmBuffer_->shmid())==1) {
00393         //      isReadyToShutDown_ = true;
00394         break;
00395       }
00396       else {
00397         count++;
00398         std::cout << " shutdown cycle attempt " << count << std::endl;
00399         LOG4CPLUS_DEBUG(log_,"FUResourceTable: Wait for all clients to detach,"
00400                         <<" nClients="<<shmBuffer_->nClients()
00401                         <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00402                         <<" ("<<count<<")");
00403         ::usleep(shutdownTimeout_);
00404         if(count*shutdownTimeout_ > 10000000)
00405           LOG4CPLUS_WARN(log_,"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00406                           <<" nClients="<<shmBuffer_->nClients()
00407                           <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00408                           <<" ("<<count<<")");
00409 
00410       }
00411     }
00412     bool allEmpty = false;
00413     std::cout << "Checking if all dqm cells are empty " << std::endl;
00414     while(!allEmpty){
00415       UInt_t n=nbDqmCells_;
00416       allEmpty = true;
00417       shmBuffer_->lock();
00418       for (UInt_t i=0;i<n;i++) {
00419         dqm::State_t state=shmBuffer_->dqmState(i);
00420         if(state!=dqm::EMPTY) allEmpty = false; 
00421       } 
00422       shmBuffer_->unlock();
00423     }
00424     std::cout << "Making sure there are no dqm pending discards " << std::endl;
00425     if(nbPendingSMDqmDiscards_ != 0)
00426       {
00427         LOG4CPLUS_WARN(log_,"FUResourceTable: pending DQM discards not zero: ="
00428                        << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start ");
00429 
00430       }
00431     shmBuffer_->writeDqmEmptyEvent();
00432     isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the 
00433                                // sendDqm loop has been shut down as well
00434   }
00435   
00436   return reschedule;
00437 }
00438 
00439 
00440 
00441 //______________________________________________________________________________
00442 UInt_t FUResourceTable::allocateResource()
00443 {
00444   assert(!freeResourceIds_.empty());
00445 
00446   lock();
00447   UInt_t fuResourceId=freeResourceIds_.front();
00448   freeResourceIds_.pop();
00449   nbPending_++;
00450   nbAllocated_++;
00451   unlock();
00452   
00453   return fuResourceId;
00454 }
00455 
00456 
00457 //______________________________________________________________________________
00458 bool FUResourceTable::buildResource(MemRef_t* bufRef)
00459 {
00460   bool eventComplete=false;
00461   
00462   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
00463     (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
00464   
00465   UInt_t      fuResourceId=(UInt_t)block->fuTransactionId;
00466   UInt_t      buResourceId=(UInt_t)block->buResourceId;
00467   FUResource* resource    =resources_[fuResourceId];
00468   
00469   // allocate resource
00470   if (!resource->fatalError()&&!resource->isAllocated()) {
00471     FUShmRawCell* cell=shmBuffer_->rawCellToWrite();
00472     resource->allocate(cell);
00473     timeval now;
00474     gettimeofday(&now,0);
00475 
00476     frb_->setRBTimeStamp(((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00477 
00478     frb_->setRBEventCount(nbCompleted_);
00479 
00480     if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_)  resource->doCrcCheck(true);
00481     else                                             resource->doCrcCheck(false);
00482   }
00483   
00484   
00485   // keep building this resource if it is healthy
00486   if (!resource->fatalError()) {
00487     resource->process(bufRef);
00488     lock();
00489     nbErrors_   +=resource->nbErrors();
00490     nbCrcErrors_+=resource->nbCrcErrors();
00491     unlock();
00492         
00493     // make resource available for pick-up
00494     if (resource->isComplete()) {
00495       lock();
00496       nbCompleted_++;
00497       nbPending_--;
00498       unlock();
00499       if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
00500         dumpEvent(resource->shmCell());
00501       shmBuffer_->finishWritingRawCell(resource->shmCell());
00502       eventComplete=true;
00503     }
00504     
00505   }
00506   // bad event, release msg, and the whole resource if this was the last one
00507   //else {
00508   if (resource->fatalError()) {
00509     bool lastMsg=isLastMessageOfEvent(bufRef);
00510     if (lastMsg) {
00511       shmBuffer_->releaseRawCell(resource->shmCell());
00512       resource->release();
00513       lock();
00514       freeResourceIds_.push(fuResourceId);
00515       nbDiscarded_++;
00516       nbLost_++;
00517       nbPending_--;
00518       unlock();
00519       bu_->sendDiscard(buResourceId);
00520       sendAllocate();
00521     }
00522     bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed... 
00523   }
00524   
00525   return eventComplete;
00526 }
00527 
00528 
00529 //______________________________________________________________________________
00530 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef)
00531 {
00532   I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00533   msg=(I2O_FU_DATA_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00534   UInt_t recoIndex=msg->rbBufferID;
00535   
00536   if (acceptSMDataDiscard_[recoIndex]) {
00537     lock();
00538     nbPendingSMDiscards_--;
00539     unlock();
00540     acceptSMDataDiscard_[recoIndex] = false;
00541     
00542     if (!isHalting_) {
00543       shmBuffer_->discardRecoCell(recoIndex);
00544       bufRef->release();
00545     }
00546   }
00547   else {
00548     LOG4CPLUS_ERROR(log_,"Spurious DATA discard by StorageManager, skip!");
00549   }
00550   
00551   if (isHalting_) {
00552     bufRef->release();
00553     return false;
00554   }
00555   
00556   return true;
00557 }
00558 
00559 
00560 //______________________________________________________________________________
00561 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef)
00562 {
00563   I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00564   msg=(I2O_FU_DQM_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00565   UInt_t dqmIndex=msg->rbBufferID;
00566   unsigned int ntries = 0;
00567   while(shmBuffer_->dqmState(dqmIndex)!=dqm::SENT){
00568     LOG4CPLUS_WARN(log_,"DQM discard for cell "<< dqmIndex << " which is not yer in SENT state - waiting");
00569     ::usleep(10000);
00570     if(ntries++>10){
00571       LOG4CPLUS_ERROR(log_,"DQM cell " << dqmIndex 
00572                       << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
00573       bufRef->release();
00574       return true;
00575     }
00576   }
00577   if (acceptSMDqmDiscard_[dqmIndex]>0) {
00578     acceptSMDqmDiscard_[dqmIndex]--;
00579     if(nbPendingSMDqmDiscards_>0){
00580       nbPendingSMDqmDiscards_--;
00581     }
00582     else {
00583       LOG4CPLUS_WARN(log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex 
00584                      << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
00585     }
00586     
00587     if (!isHalting_) {
00588       shmBuffer_->discardDqmCell(dqmIndex);
00589       bufRef->release();
00590     }
00591 
00592   }
00593   else {
00594     LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex 
00595                     << " from StorageManager while cell is not accepting discards");
00596   }
00597   
00598   if (isHalting_) {
00599     bufRef->release();
00600     return false;
00601   }
00602   
00603   return true;
00604 }
00605 
00606 
00607 //______________________________________________________________________________
00608 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef)
00609 {
00610   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg = 
00611     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00612   //make sure to fill up the shmem so no process will miss it
00613   // but processes will have to handle duplicates
00614 
00615   for(unsigned int i = 0; i < nbRawCells_; i++) 
00616     {
00617       nbEolPosted_++;
00618       shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00619     }
00620 }
00621 
00622 
00623 //______________________________________________________________________________
00624 void FUResourceTable::dropEvent()
00625 {
00626   FUShmRawCell* cell=shmBuffer_->rawCellToRead();
00627   UInt_t fuResourceId=cell->fuResourceId();
00628   shmBuffer_->finishReadingRawCell(cell);
00629   shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00630 }
00631 
00632 
00633 //______________________________________________________________________________
00634 bool FUResourceTable::handleCrashedEP(UInt_t runNumber,pid_t pid)
00635 {
00636   bool retval = false;
00637   vector<pid_t> pids=cellPrcIds();
00638   UInt_t iRawCell=pids.size();
00639   for (UInt_t i=0;i<pids.size();i++) { if (pid==pids[i]) { iRawCell=i; break; } }
00640   
00641   if (iRawCell<pids.size()){
00642     shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
00643     retval = true;
00644   }
00645   else
00646     LOG4CPLUS_WARN(log_,"No raw data to send to error stream for process " << pid);
00647   shmBuffer_->removeClientPrcId(pid);
00648   return retval;
00649 }
00650 
00651 
00652 //______________________________________________________________________________
00653 void FUResourceTable::dumpEvent(FUShmRawCell* cell)
00654 {
00655   ostringstream oss; oss<<"/tmp/evt"<<cell->evtNumber()<<".dump";
00656   ofstream fout(oss.str().c_str());
00657   fout.fill('0');
00658 
00659   fout<<"#\n# evt "<<cell->evtNumber()<<"\n#\n"<<endl;
00660   for (unsigned int i=0;i<cell->nFed();i++) {
00661     if (cell->fedSize(i)==0) continue;
00662     fout<<"# fedid "<<i<<endl;
00663     unsigned char* addr=cell->fedAddr(i);
00664     for (unsigned int j=0;j<cell->fedSize(i);j++) {
00665       fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
00666       if ((j+1)%8) fout<<" "; else fout<<endl;
00667       ++addr;
00668     }
00669     fout<<endl;
00670   }
00671   fout.close();
00672 }
00673 
00674 
00675 //______________________________________________________________________________
00676 void FUResourceTable::stop()
00677 {
00678   isStopping_ = true;
00679   shutDownClients();
00680 }
00681 
00682 
00683 //______________________________________________________________________________
00684 void FUResourceTable::halt()
00685 {
00686   isHalting_=true;
00687   shutDownClients();
00688 }
00689 
00690 
00691 //______________________________________________________________________________
00692 void FUResourceTable::shutDownClients()
00693 {
00694   nbClientsToShutDown_ = nbClients();
00695   isReadyToShutDown_   = false;
00696   
00697   if (nbClientsToShutDown_==0) {
00698     LOG4CPLUS_INFO(log_,"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
00699     UInt_t n=nbResources();
00700     for (UInt_t i=0;i<n;i++) {
00701       evt::State_t state=shmBuffer_->evtState(i);
00702       if (state!=evt::EMPTY){
00703         LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state " 
00704                        << state);
00705         shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00706       }
00707     }
00708     shmBuffer_->scheduleRawEmptyCellForDiscard();
00709   }
00710   else {
00711     UInt_t n=nbClientsToShutDown_;
00712     for (UInt_t i=0;i<n;++i) shmBuffer_->writeRawEmptyEvent();
00713   }
00714 }
00715 
00716 
00717 //______________________________________________________________________________
00718 void FUResourceTable::clear()
00719 {
00720   for (UInt_t i=0;i<resources_.size();i++) {
00721     resources_[i]->release();
00722     delete resources_[i];
00723   }
00724   resources_.clear();
00725   while (!freeResourceIds_.empty()) freeResourceIds_.pop();
00726 }
00727 
00728 
00729 //______________________________________________________________________________
00730 void FUResourceTable::resetCounters()
00731 {
00732   if (0!=shmBuffer_) {
00733     for (UInt_t i=0;i<shmBuffer_->nRecoCells();i++) acceptSMDataDiscard_[i]= false;
00734     for (UInt_t i=0;i<shmBuffer_->nDqmCells();i++)  acceptSMDqmDiscard_[i] = 0;
00735   }
00736   
00737   nbAllocated_           =nbPending_;
00738   nbCompleted_           =0;
00739   nbSent_                =0;
00740   nbSentError_           =0;
00741   nbSentDqm_             =0;
00742   nbPendingSMDiscards_   =0;
00743   nbPendingSMDqmDiscards_=0;
00744   nbDiscarded_           =0;
00745   nbLost_                =0;
00746   nbEolPosted_           =0;
00747   nbEolDiscarded_        =0;
00748 
00749   nbErrors_              =0;
00750   nbCrcErrors_           =0;
00751   nbAllocSent_           =0;
00752 
00753   sumOfSquares_          =0;
00754   sumOfSizes_            =0;
00755   isStopping_            =false;
00756 }
00757 
00758 
00759 //______________________________________________________________________________
00760 UInt_t FUResourceTable::nbClients() const
00761 {
00762   UInt_t result(0);
00763   if (0!=shmBuffer_) result=shmBuffer_->nClients();
00764   return result;
00765 }
00766 
00767 
00768 //______________________________________________________________________________
00769 vector<pid_t> FUResourceTable::clientPrcIds() const
00770 {
00771   vector<pid_t> result;
00772   if (0!=shmBuffer_) {
00773     UInt_t n=nbClients();
00774     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->clientPrcId(i));
00775   }
00776   return result;
00777 }
00778 
00779 
00780 //______________________________________________________________________________
00781 string FUResourceTable::clientPrcIdsAsString() const
00782 {
00783   stringstream ss;
00784   if (0!=shmBuffer_) {
00785     UInt_t n=nbClients();
00786     for (UInt_t i=0;i<n;i++) {
00787       if (i>0) ss<<",";
00788       ss<<shmBuffer_->clientPrcId(i);
00789     }
00790   }
00791   return ss.str();
00792 }
00793 
00794 
00795 //______________________________________________________________________________
00796 vector<string> FUResourceTable::cellStates() const
00797 {
00798   vector<string> result;
00799   if (0!=shmBuffer_) {
00800     UInt_t n=nbResources();
00801     shmBuffer_->lock();
00802     for (UInt_t i=0;i<n;i++) {
00803       evt::State_t state=shmBuffer_->evtState(i);
00804       if      (state==evt::EMPTY)      result.push_back("EMPTY");
00805       else if (state==evt::STOP)       result.push_back("STOP");
00806       else if (state==evt::LUMISECTION)result.push_back("LUMISECTION");
00807       else if (state==evt::USEDLS)     result.push_back("USEDLS");
00808       else if (state==evt::RAWWRITING) result.push_back("RAWWRITING");
00809       else if (state==evt::RAWWRITTEN) result.push_back("RAWWRITTEN");
00810       else if (state==evt::RAWREADING) result.push_back("RAWREADING");
00811       else if (state==evt::RAWREAD)    result.push_back("RAWREAD");
00812       else if (state==evt::PROCESSING) result.push_back("PROCESSING");
00813       else if (state==evt::PROCESSED)  result.push_back("PROCESSED");
00814       else if (state==evt::RECOWRITING)result.push_back("RECOWRITING");
00815       else if (state==evt::RECOWRITTEN)result.push_back("RECOWRITTEN");
00816       else if (state==evt::SENDING)    result.push_back("SENDING");
00817       else if (state==evt::SENT)       result.push_back("SENT");
00818       else if (state==evt::DISCARDING) result.push_back("DISCARDING");
00819     }
00820     shmBuffer_->unlock();
00821   }
00822   return result;
00823 }
00824 
00825 vector<string> FUResourceTable::dqmCellStates() const
00826 {
00827   vector<string> result;
00828   if (0!=shmBuffer_) {
00829     UInt_t n=nbDqmCells_;
00830     shmBuffer_->lock();
00831     for (UInt_t i=0;i<n;i++) {
00832       dqm::State_t state=shmBuffer_->dqmState(i);
00833       if      (state==dqm::EMPTY)      result.push_back("EMPTY");
00834       else if (state==dqm::WRITING) result.push_back("WRITING");
00835       else if (state==dqm::WRITTEN) result.push_back("WRITTEN");
00836       else if (state==dqm::SENDING)    result.push_back("SENDING");
00837       else if (state==dqm::SENT)       result.push_back("SENT");
00838       else if (state==dqm::DISCARDING) result.push_back("DISCARDING");
00839     }
00840     shmBuffer_->unlock();
00841   }
00842   return result;
00843 }
00844 
00845 
00846 //______________________________________________________________________________
00847 vector<UInt_t> FUResourceTable::cellEvtNumbers() const
00848 {
00849   vector<UInt_t> result;
00850   if (0!=shmBuffer_) {
00851     UInt_t n=nbResources();
00852     shmBuffer_->lock();
00853     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtNumber(i));
00854     shmBuffer_->unlock();
00855   }
00856   return result;
00857 }
00858 
00859 
00860 //______________________________________________________________________________
00861 vector<pid_t> FUResourceTable::cellPrcIds() const
00862 {
00863   vector<pid_t> result;
00864   if (0!=shmBuffer_) {
00865     UInt_t n=nbResources();
00866     shmBuffer_->lock();
00867     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtPrcId(i));
00868     shmBuffer_->unlock();
00869   }
00870   return result;
00871 }
00872 
00873 
00874 //______________________________________________________________________________
00875 vector<time_t> FUResourceTable::cellTimeStamps() const
00876 {
00877   vector<time_t> result;
00878   if (0!=shmBuffer_) {
00879     UInt_t n=nbResources();
00880     shmBuffer_->lock();
00881     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtTimeStamp(i));
00882     shmBuffer_->unlock();
00883   }
00884   return result;
00885 }
00886 
00887 
00889 // implementation of private member functions
00891 
00892 //______________________________________________________________________________
00893 void FUResourceTable::sendAllocate()
00894 {
00895   UInt_t nbFreeSlots    = this->nbFreeSlots();
00896   UInt_t nbFreeSlotsMax = resources_.size()/2;
00897   if (nbFreeSlots>nbFreeSlotsMax) {
00898     UIntVec_t fuResourceIds;
00899     for (UInt_t i=0;i<nbFreeSlots;i++)
00900       fuResourceIds.push_back(allocateResource());
00901     bu_->sendAllocate(fuResourceIds);
00902     nbAllocSent_++;
00903   }
00904 }
00905 
00906 
00907 //______________________________________________________________________________
00908 void FUResourceTable::sendDiscard(UInt_t buResourceId)
00909 {
00910   bu_->sendDiscard(buResourceId);
00911   nbDiscarded_++;
00912 }
00913 
00914 
00915 //______________________________________________________________________________
00916 void FUResourceTable::sendInitMessage(UInt_t   fuResourceId,
00917                                       UInt_t   outModId,
00918                                       UInt_t   fuProcessId,
00919                                       UInt_t   fuGuid,
00920                                       UChar_t *data,
00921                                       UInt_t   dataSize)
00922 {
00923   if (0==sm_) {
00924     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP INIT MESSAGE!");
00925   }
00926   else {
00927     acceptSMDataDiscard_[fuResourceId] = true;
00928     UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
00929                                         fuGuid,data,dataSize);
00930     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00931     sumOfSizes_  +=nbBytes;
00932   }
00933 }
00934 
00935 
00936 //______________________________________________________________________________
00937 void FUResourceTable::sendDataEvent(UInt_t   fuResourceId,
00938                                     UInt_t   runNumber,
00939                                     UInt_t   evtNumber,
00940                                     UInt_t   outModId,
00941                                     UInt_t   fuProcessId,
00942                                     UInt_t   fuGuid,
00943                                     UChar_t *data,
00944                                     UInt_t   dataSize)
00945 {
00946   if (0==sm_) {
00947     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP DATA EVENT!");
00948   }
00949   else {
00950     acceptSMDataDiscard_[fuResourceId] = true;
00951     UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
00952                                       outModId,fuProcessId,fuGuid,
00953                                       data,dataSize);
00954     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00955     sumOfSizes_  +=nbBytes;
00956   }
00957 }
00958 
00959 
00960 //______________________________________________________________________________
00961 void FUResourceTable::sendErrorEvent(UInt_t   fuResourceId,
00962                                      UInt_t   runNumber,
00963                                      UInt_t   evtNumber,
00964                                      UInt_t   fuProcessId,
00965                                      UInt_t   fuGuid,
00966                                      UChar_t *data,
00967                                      UInt_t   dataSize)
00968 {
00969   if (0==sm_) {
00970     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP ERROR EVENT!");
00971   }
00972   else {
00973     acceptSMDataDiscard_[fuResourceId] = true;
00974     UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
00975                                        fuProcessId,fuGuid,data,dataSize);
00976     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00977     sumOfSizes_  +=nbBytes;
00978   }
00979 
00980 //   if (0!=shmBuffer_) {
00981 //     UInt_t n=nbDqmCells_;
00982 
00983 //     for (UInt_t i=0;i<n;i++) {
00984 //       if(shmBuffer_->dqmCell(i)->fuProcessId()==fuProcessId)
00985 //      {
00986 //        if(shmBuffer_->dqmState(i)!=dqm::SENT){
00987 //          shmBuffer_->setDqmState(i,dqm::SENT);       
00988 //          shmBuffer_->discardDqmCell(i);      
00989 //          acceptSMDqmDiscard_[i] = false;
00990 //        }  
00991 //      }
00992 //     }
00993 //     n=nbRecoCells_;
00994 //     for (UInt_t i=0;i<n;i++) {
00995 //       if(shmBuffer_->recoCell(i)->fuProcessId()==fuProcessId)
00996 //      {
00997 //        shmBuffer_->discardOrphanedRecoCell(i);  
00998 //      }
00999 //     }
01000 
01001 //   }
01002 }
01003 
01004 
01005 //______________________________________________________________________________
01006 void FUResourceTable::sendDqmEvent(UInt_t   fuDqmId,
01007                                    UInt_t   runNumber,
01008                                    UInt_t   evtAtUpdate,
01009                                    UInt_t   folderId,
01010                                    UInt_t   fuProcessId,
01011                                    UInt_t   fuGuid,
01012                                    UChar_t* data,
01013                                    UInt_t   dataSize)
01014 {
01015   if (0==sm_) {
01016     LOG4CPLUS_WARN(log_,"No StorageManager, DROP DQM EVENT.");
01017   }
01018   else {
01019     sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
01020                       fuProcessId,fuGuid,data,dataSize);
01021 
01022     nbPendingSMDqmDiscards_++;
01023 
01024     acceptSMDqmDiscard_[fuDqmId]++;
01025     if(acceptSMDqmDiscard_[fuDqmId]>1)
01026       LOG4CPLUS_WARN(log_,"DQM Cell " << fuDqmId << " being sent more than once for folder " 
01027                      << folderId << " process " << fuProcessId << " guid " << fuGuid);
01028     nbSentDqm_++;
01029   }
01030 }
01031 
01032 
01033 //______________________________________________________________________________
01034 bool FUResourceTable::isLastMessageOfEvent(MemRef_t* bufRef)
01035 {
01036   while (0!=bufRef->getNextReference()) bufRef=bufRef->getNextReference();
01037   
01038   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
01039     (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
01040   
01041   UInt_t iBlock    =block->blockNb;
01042   UInt_t nBlock    =block->nbBlocksInSuperFragment;
01043   UInt_t iSuperFrag=block->superFragmentNb;
01044   UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
01045 
01046   return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
01047 }
01048 
01049 //______________________________________________________________________________
01050 void FUResourceTable::injectCRCError()
01051 {
01052   for (UInt_t i=0;i<resources_.size();i++) {
01053     resources_[i]->scheduleCRCError();
01054   }
01055 }
01056 void FUResourceTable::printWorkLoopStatus()
01057 {
01058   std::cout << "Workloop status===============" << std::endl;
01059   std::cout << "==============================" << std::endl;
01060   if(wlSendData_!=0)
01061     std::cout << "SendData -> " << wlSendData_->isActive() << std::endl;
01062   if(wlSendDqm_!=0)
01063     std::cout << "SendDqm  -> " << wlSendDqm_->isActive()  << std::endl;
01064   if(wlDiscard_!=0)
01065     std::cout << "Discard  -> " << wlDiscard_->isActive()  << std::endl;
01066   std::cout << "Workloops Active  -> " << isActive_  << std::endl;
01067 
01068 }
01069 
01070 
01071 void FUResourceTable::lastResort()
01072 {
01073   std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() 
01074             << " more rawcells to read " << std::endl;
01075   while(shmBuffer_->nbRawCellsToRead()!=0){
01076     FUShmRawCell* newCell=shmBuffer_->rawCellToRead();
01077     std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
01078     shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
01079     std::cout << "lastResort: schedule raw cell for discard" << std::endl;
01080   }
01081 }