CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/ResourceBroker/src/FUResourceTable.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceTable
00004 // ---------------
00005 //
00006 //            12/10/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "FWCore/Utilities/interface/CRC16.h"
00012 #include "EvffedFillerRB.h"
00013 
00014 #include "toolbox/task/WorkLoopFactory.h"
00015 #include "interface/evb/i2oEVBMsgs.h"
00016 #include "xcept/tools.h"
00017 
00018 
00019 #include <fstream>
00020 #include <sstream>
00021 #include <iomanip>
00022 #include <unistd.h>
00023 
00024 #include <sys/types.h>
00025 #include <signal.h>
00026 
00027 using namespace evf;
00028 using namespace std;
00029 
00030 
00032 // construction/destruction
00034 
00035 //______________________________________________________________________________
00036 FUResourceTable::FUResourceTable(bool              segmentationMode,
00037                                  UInt_t            nbRawCells,
00038                                  UInt_t            nbRecoCells,
00039                                  UInt_t            nbDqmCells,
00040                                  UInt_t            rawCellSize,
00041                                  UInt_t            recoCellSize,
00042                                  UInt_t            dqmCellSize,
00043                                  BUProxy          *bu,
00044                                  SMProxy          *sm,
00045                                  log4cplus::Logger logger,
00046                                  unsigned int      timeout,
00047                                  EvffedFillerRB   *frb,
00048                                  xdaq::Application*app)
00049   throw (evf::Exception)
00050   : bu_(bu)
00051   , sm_(sm)
00052   , log_(logger)
00053   , wlSendData_(0)
00054   , asSendData_(0)
00055   , wlSendDqm_(0)
00056   , asSendDqm_(0)
00057   , wlDiscard_(0)
00058   , asDiscard_(0)
00059   , shmBuffer_(0)
00060   , nbDqmCells_(nbDqmCells)
00061   , nbRawCells_(nbRawCells)
00062   , nbRecoCells_(nbRecoCells)
00063   , acceptSMDataDiscard_(0)
00064   , acceptSMDqmDiscard_(0)
00065   , doCrcCheck_(1)
00066   , shutdownTimeout_(timeout)
00067   , nbPending_(0)
00068   , nbClientsToShutDown_(0)
00069   , isReadyToShutDown_(true)
00070   , isActive_(false)
00071   , isHalting_(false)
00072   , isStopping_(false)
00073   , runNumber_(0xffffffff)
00074   , lock_(toolbox::BSem::FULL)
00075   , frb_(frb)
00076   , app_(app)
00077 {
00078   initialize(segmentationMode,
00079              nbRawCells,nbRecoCells,nbDqmCells,
00080              rawCellSize,recoCellSize,dqmCellSize);
00081 }
00082 
00083 
00084 //______________________________________________________________________________
00085 FUResourceTable::~FUResourceTable()
00086 {
00087   clear();
00088   if(wlSendData_){
00089     wlSendData_->cancel();
00090     toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData","waiting");
00091   }
00092   if(wlSendDqm_){
00093     wlSendDqm_->cancel();
00094     toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm","waiting");
00095   }
00096   if(wlDiscard_){
00097     wlDiscard_->cancel();
00098     toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard","waiting");
00099   }
00100   shmdt(shmBuffer_);
00101   if (FUShmBuffer::releaseSharedMemory())
00102     LOG4CPLUS_INFO(log_,"SHARED MEMORY SUCCESSFULLY RELEASED.");
00103   if (0!=acceptSMDataDiscard_) delete [] acceptSMDataDiscard_;
00104   if (0!= acceptSMDqmDiscard_) delete [] acceptSMDqmDiscard_;
00105 }
00106 
00107 
00109 // implementation of member functions
00111 
00112 //______________________________________________________________________________
00113 void FUResourceTable::initialize(bool   segmentationMode,
00114                                  UInt_t nbRawCells,
00115                                  UInt_t nbRecoCells,
00116                                  UInt_t nbDqmCells,
00117                                  UInt_t rawCellSize,
00118                                  UInt_t recoCellSize,
00119                                  UInt_t dqmCellSize)
00120   throw (evf::Exception)
00121 {
00122   clear();
00123   
00124   shmBuffer_=FUShmBuffer::createShmBuffer(segmentationMode,
00125                                           nbRawCells,nbRecoCells,nbDqmCells,
00126                                           rawCellSize,recoCellSize,dqmCellSize);
00127   if (0==shmBuffer_) {
00128     string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00129     LOG4CPLUS_FATAL(log_,msg);
00130     XCEPT_RAISE(evf::Exception,msg);
00131   }
00132   
00133   for (UInt_t i=0;i<nbRawCells_;i++) {
00134     resources_.push_back(new FUResource(i,log_,frb_,app_));
00135     freeResourceIds_.push(i);
00136   }
00137 
00138   acceptSMDataDiscard_ = new bool[nbRecoCells];
00139   acceptSMDqmDiscard_  = new int[nbDqmCells];
00140   
00141   resetCounters();
00142 }
00143 
00144 
00145 //______________________________________________________________________________
00146 void FUResourceTable::startSendDataWorkLoop() throw (evf::Exception)
00147 {
00148   try {
00149     wlSendData_=
00150       toolbox::task::getWorkLoopFactory()->getWorkLoop("SendData","waiting");
00151     if (!wlSendData_->isActive()) wlSendData_->activate();
00152     asSendData_=toolbox::task::bind(this,&FUResourceTable::sendData,"SendData");
00153     wlSendData_->submit(asSendData_);
00154   }
00155   catch (xcept::Exception& e) {
00156     string msg = "Failed to start workloop 'SendData'.";
00157     XCEPT_RETHROW(evf::Exception,msg,e);
00158   }
00159 }
00160 
00161 
00162 //______________________________________________________________________________
00163 bool FUResourceTable::sendData(toolbox::task::WorkLoop* /* wl */)
00164 {
00165   bool reschedule=true;
00166 
00167   FUShmRecoCell* cell=shmBuffer_->recoCellToRead();
00168   
00169   if (0==cell->eventSize()) {
00170     LOG4CPLUS_INFO(log_,"Don't reschedule sendData workloop.");
00171     UInt_t cellIndex=cell->index();
00172     shmBuffer_->finishReadingRecoCell(cell);
00173     shmBuffer_->discardRecoCell(cellIndex);
00174     reschedule=false;
00175   }
00176   else if (isHalting_) {
00177     LOG4CPLUS_INFO(log_,"sendData: isHalting, discard recoCell.");
00178     UInt_t cellIndex=cell->index();
00179     shmBuffer_->finishReadingRecoCell(cell);
00180     shmBuffer_->discardRecoCell(cellIndex);
00181   }
00182   else {
00183     try {
00184       if (cell->type()==0) {
00185         UInt_t   cellIndex       = cell->index();
00186         UInt_t   cellOutModId    = cell->outModId();
00187         UInt_t   cellFUProcId    = cell->fuProcessId();
00188         UInt_t   cellFUGuid      = cell->fuGuid();
00189         UChar_t* cellPayloadAddr = cell->payloadAddr();
00190         UInt_t   cellEventSize   = cell->eventSize();
00191         shmBuffer_->finishReadingRecoCell(cell);
00192 
00193         lock();
00194         nbPendingSMDiscards_++;
00195         unlock();
00196 
00197         sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
00198                         cellPayloadAddr,cellEventSize);
00199       }
00200       else if (cell->type()==1) {
00201         UInt_t   cellIndex       = cell->index();
00202         UInt_t   cellRawIndex    = cell->rawCellIndex();
00203         UInt_t   cellRunNumber   = cell->runNumber();
00204         UInt_t   cellEvtNumber   = cell->evtNumber();
00205         UInt_t   cellOutModId    = cell->outModId();
00206         UInt_t   cellFUProcId    = cell->fuProcessId();
00207         UInt_t   cellFUGuid      = cell->fuGuid();
00208         UChar_t *cellPayloadAddr = cell->payloadAddr();
00209         UInt_t   cellEventSize   = cell->eventSize();
00210         shmBuffer_->finishReadingRecoCell(cell);        
00211 
00212         lock();
00213         nbPendingSMDiscards_++;
00214         resources_[cellRawIndex]->incNbSent();
00215         if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
00216         unlock();
00217 
00218         sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
00219                       cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00220       }
00221       else if (cell->type()==2) {
00222         UInt_t   cellIndex       = cell->index();
00223         UInt_t   cellRawIndex    = cell->rawCellIndex();
00224         //UInt_t   cellRunNumber   = cell->runNumber();
00225         UInt_t   cellEvtNumber   = cell->evtNumber();
00226         UInt_t   cellFUProcId    = cell->fuProcessId();
00227         UInt_t   cellFUGuid      = cell->fuGuid();
00228         UChar_t *cellPayloadAddr = cell->payloadAddr();
00229         UInt_t   cellEventSize   = cell->eventSize();
00230         shmBuffer_->finishReadingRecoCell(cell);        
00231 
00232         lock();
00233         nbPendingSMDiscards_++;
00234         resources_[cellRawIndex]->incNbSent();
00235         if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
00236         unlock();
00237         
00238         sendErrorEvent(cellIndex,runNumber_,cellEvtNumber,
00239                        cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00240       }
00241       else {
00242         string errmsg="Unknown RecoCell type (neither INIT/DATA/ERROR).";
00243         XCEPT_RAISE(evf::Exception,errmsg);
00244       }
00245     }
00246     catch (xcept::Exception& e) {
00247       LOG4CPLUS_FATAL(log_,"Failed to send EVENT DATA to StorageManager: "
00248                       <<xcept::stdformat_exception_history(e));
00249       reschedule=false;
00250     }
00251   }
00252   
00253   return reschedule;
00254 }
00255 
00256 
00257 //______________________________________________________________________________
00258 void FUResourceTable::startSendDqmWorkLoop() throw (evf::Exception)
00259 {
00260   try {
00261     wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop("SendDqm","waiting");
00262     if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
00263     asSendDqm_=toolbox::task::bind(this,&FUResourceTable::sendDqm,"SendDqm");
00264     wlSendDqm_->submit(asSendDqm_);
00265   }
00266   catch (xcept::Exception& e) {
00267     string msg = "Failed to start workloop 'SendDqm'.";
00268     XCEPT_RETHROW(evf::Exception,msg,e);
00269   }
00270 }
00271 
00272 
00273 //______________________________________________________________________________
00274 bool FUResourceTable::sendDqm(toolbox::task::WorkLoop* /* wl */)
00275 {
00276   bool reschedule=true;
00277   
00278   FUShmDqmCell* cell=shmBuffer_->dqmCellToRead();
00279   dqm::State_t  state=shmBuffer_->dqmState(cell->index());
00280   
00281   if (state==dqm::EMPTY) {
00282     LOG4CPLUS_WARN(log_,"Don't reschedule sendDqm workloop.");
00283     std::cout << "shut down dqm workloop " << std::endl;
00284     UInt_t cellIndex=cell->index();
00285     shmBuffer_->finishReadingDqmCell(cell);
00286     shmBuffer_->discardDqmCell(cellIndex);
00287     reschedule=false;
00288   }
00289   else if (isHalting_) {
00290     UInt_t cellIndex=cell->index();
00291     shmBuffer_->finishReadingDqmCell(cell);
00292     shmBuffer_->discardDqmCell(cellIndex);
00293   }
00294   else {
00295     try {
00296       UInt_t   cellIndex       = cell->index();
00297       UInt_t   cellRunNumber   = cell->runNumber();
00298       UInt_t   cellEvtAtUpdate = cell->evtAtUpdate();
00299       UInt_t   cellFolderId    = cell->folderId();
00300       UInt_t   cellFUProcId    = cell->fuProcessId();
00301       UInt_t   cellFUGuid      = cell->fuGuid();
00302       UChar_t *cellPayloadAddr = cell->payloadAddr();
00303       UInt_t   cellEventSize   = cell->eventSize();
00304       sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
00305                    cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
00306       shmBuffer_->finishReadingDqmCell(cell);
00307     }
00308     catch (xcept::Exception& e) {
00309       LOG4CPLUS_FATAL(log_,"Failed to send DQM DATA to StorageManager: "
00310                       <<xcept::stdformat_exception_history(e));
00311       reschedule=false;
00312     }
00313   }
00314   
00315   return reschedule;
00316 }
00317 
00318 
00319 //______________________________________________________________________________
00320 void FUResourceTable::startDiscardWorkLoop() throw (evf::Exception)
00321 {
00322   try {
00323     LOG4CPLUS_INFO(log_,"Start 'discard' workloop.");
00324     wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop("Discard","waiting");
00325     if (!wlDiscard_->isActive()) wlDiscard_->activate();
00326     asDiscard_=toolbox::task::bind(this,&FUResourceTable::discard,"Discard");
00327     wlDiscard_->submit(asDiscard_);
00328     isActive_=true;
00329   }
00330   catch (xcept::Exception& e) {
00331     string msg = "Failed to start workloop 'Discard'.";
00332     XCEPT_RETHROW(evf::Exception,msg,e);
00333   }
00334   isReadyToShutDown_=false;
00335 }
00336 
00337 
00338 //______________________________________________________________________________
00339 bool FUResourceTable::discard(toolbox::task::WorkLoop* /* wl */)
00340 {
00341   FUShmRawCell* cell =shmBuffer_->rawCellToDiscard();
00342   evt::State_t  state=shmBuffer_->evtState(cell->index());
00343 
00344   bool   reschedule  =true;
00345   bool   shutDown    =(state==evt::STOP);
00346   bool   isLumi      =(state==evt::USEDLS);
00347   UInt_t fuResourceId=cell->fuResourceId();
00348   UInt_t buResourceId=cell->buResourceId();
00349 
00350   //  std::cout << "discard loop, state, shutDown, isLumi " << state << " " 
00351   //        << shutDown << " " << isLumi << std::endl;
00352   //  std::cout << "resource ids " << fuResourceId << " " << buResourceId << std::endl;
00353 
00354   if (shutDown) {
00355     LOG4CPLUS_INFO(log_,"nbClientsToShutDown = "<<nbClientsToShutDown_);
00356     if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
00357     if (nbClientsToShutDown_==0) {
00358       LOG4CPLUS_INFO(log_,"Don't reschedule discard-workloop.");
00359       isActive_ =false;
00360       reschedule=false;
00361     }
00362   }
00363   
00364   shmBuffer_->discardRawCell(cell);
00365   if(isLumi) nbEolDiscarded_++;
00366 
00367   if (!shutDown && !isLumi) {
00368     if(fuResourceId >= nbResources()){
00369       LOG4CPLUS_WARN(log_,"cell " << cell->index() << " in state " << state 
00370                      << " scheduled for discard has no associated FU resource ");
00371     }
00372     else{
00373       resources_[fuResourceId]->release();
00374       lock();
00375       freeResourceIds_.push(fuResourceId);
00376       assert(freeResourceIds_.size()<=resources_.size());
00377       unlock();
00378     
00379       if (!isHalting_) {
00380         sendDiscard(buResourceId);
00381         if(!isStopping_)sendAllocate();
00382       }
00383     }
00384   }
00385   
00386   if (!reschedule) {
00387     std::cout << " entered shutdown cycle " << std::endl;
00388     shmBuffer_->writeRecoEmptyEvent();
00389     UInt_t count=0;
00390     while (count<100) {
00391       std::cout << " shutdown cycle " <<shmBuffer_->nClients() << " "
00392                 <<  FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00393       if (shmBuffer_->nClients()==0&&
00394           FUShmBuffer::shm_nattch(shmBuffer_->shmid())==1) {
00395         //      isReadyToShutDown_ = true;
00396         break;
00397       }
00398       else {
00399         count++;
00400         std::cout << " shutdown cycle attempt " << count << std::endl;
00401         LOG4CPLUS_DEBUG(log_,"FUResourceTable: Wait for all clients to detach,"
00402                         <<" nClients="<<shmBuffer_->nClients()
00403                         <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00404                         <<" ("<<count<<")");
00405         ::usleep(shutdownTimeout_);
00406         if(count*shutdownTimeout_ > 10000000)
00407           LOG4CPLUS_WARN(log_,"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00408                           <<" nClients="<<shmBuffer_->nClients()
00409                           <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00410                           <<" ("<<count<<")");
00411 
00412       }
00413     }
00414     bool allEmpty = false;
00415     std::cout << "Checking if all dqm cells are empty " << std::endl;
00416     while(!allEmpty){
00417       UInt_t n=nbDqmCells_;
00418       allEmpty = true;
00419       shmBuffer_->lock();
00420       for (UInt_t i=0;i<n;i++) {
00421         dqm::State_t state=shmBuffer_->dqmState(i);
00422         if(state!=dqm::EMPTY) allEmpty = false; 
00423       } 
00424       shmBuffer_->unlock();
00425     }
00426     std::cout << "Making sure there are no dqm pending discards " << std::endl;
00427     if(nbPendingSMDqmDiscards_ != 0)
00428       {
00429         LOG4CPLUS_WARN(log_,"FUResourceTable: pending DQM discards not zero: ="
00430                        << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start ");
00431 
00432       }
00433     shmBuffer_->writeDqmEmptyEvent();
00434     isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the 
00435                                // sendDqm loop has been shut down as well
00436   }
00437   
00438   return reschedule;
00439 }
00440 
00441 
00442 
00443 //______________________________________________________________________________
00444 UInt_t FUResourceTable::allocateResource()
00445 {
00446   assert(!freeResourceIds_.empty());
00447 
00448   lock();
00449   UInt_t fuResourceId=freeResourceIds_.front();
00450   freeResourceIds_.pop();
00451   nbPending_++;
00452   nbAllocated_++;
00453   unlock();
00454   
00455   return fuResourceId;
00456 }
00457 
00458 
00459 //______________________________________________________________________________
00460 bool FUResourceTable::buildResource(MemRef_t* bufRef)
00461 {
00462   bool eventComplete=false;
00463   bool lastMsg=isLastMessageOfEvent(bufRef);  
00464   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
00465     (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
00466   
00467   UInt_t      fuResourceId=(UInt_t)block->fuTransactionId;
00468   UInt_t      buResourceId=(UInt_t)block->buResourceId;
00469   FUResource* resource    =resources_[fuResourceId];
00470   // allocate resource
00471   if (!resource->fatalError()&&!resource->isAllocated()) {
00472     FUShmRawCell* cell=shmBuffer_->rawCellToWrite();
00473     if(cell==0){bufRef->release(); return eventComplete;}
00474     resource->allocate(cell);
00475     timeval now;
00476     gettimeofday(&now,0);
00477 
00478     frb_->setRBTimeStamp(((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00479 
00480     frb_->setRBEventCount(nbCompleted_);
00481 
00482     if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_)  resource->doCrcCheck(true);
00483     else                                             resource->doCrcCheck(false);
00484   }
00485   
00486   
00487   // keep building this resource if it is healthy
00488   if (!resource->fatalError()) {
00489     resource->process(bufRef);
00490     lock();
00491     nbErrors_   +=resource->nbErrors();
00492     nbCrcErrors_+=resource->nbCrcErrors();
00493     unlock();
00494         
00495     // make resource available for pick-up
00496     if (resource->isComplete()) {
00497       lock();
00498       nbCompleted_++;
00499       nbPending_--;
00500       unlock();
00501       if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
00502         dumpEvent(resource->shmCell());
00503       shmBuffer_->finishWritingRawCell(resource->shmCell());
00504       eventComplete=true;
00505     }
00506     
00507   }
00508   // bad event, release msg, and the whole resource if this was the last one
00509   //else {
00510   if (resource->fatalError()) {
00511     if (lastMsg) {
00512       shmBuffer_->releaseRawCell(resource->shmCell());
00513       resource->release();
00514       lock();
00515       freeResourceIds_.push(fuResourceId);
00516       nbDiscarded_++;
00517       nbLost_++;
00518       nbPending_--;
00519       unlock();
00520       bu_->sendDiscard(buResourceId);
00521       sendAllocate();
00522     }
00523     //    bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed... 
00524   }
00525   
00526   return eventComplete;
00527 }
00528 
00529 
00530 //______________________________________________________________________________
00531 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef)
00532 {
00533   I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00534   msg=(I2O_FU_DATA_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00535   UInt_t recoIndex=msg->rbBufferID;
00536   
00537   if (acceptSMDataDiscard_[recoIndex]) {
00538     lock();
00539     nbPendingSMDiscards_--;
00540     unlock();
00541     acceptSMDataDiscard_[recoIndex] = false;
00542     
00543     if (!isHalting_) {
00544       shmBuffer_->discardRecoCell(recoIndex);
00545       bufRef->release();
00546     }
00547   }
00548   else {
00549     LOG4CPLUS_ERROR(log_,"Spurious DATA discard by StorageManager, skip!");
00550   }
00551   
00552   if (isHalting_) {
00553     bufRef->release();
00554     return false;
00555   }
00556   
00557   return true;
00558 }
00559 
00560 
00561 //______________________________________________________________________________
00562 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef)
00563 {
00564   I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00565   msg=(I2O_FU_DQM_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
00566   UInt_t dqmIndex=msg->rbBufferID;
00567   unsigned int ntries = 0;
00568   while(shmBuffer_->dqmState(dqmIndex)!=dqm::SENT){
00569     LOG4CPLUS_WARN(log_,"DQM discard for cell "<< dqmIndex << " which is not yer in SENT state - waiting");
00570     ::usleep(10000);
00571     if(ntries++>10){
00572       LOG4CPLUS_ERROR(log_,"DQM cell " << dqmIndex 
00573                       << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
00574       bufRef->release();
00575       return true;
00576     }
00577   }
00578   if (acceptSMDqmDiscard_[dqmIndex]>0) {
00579     acceptSMDqmDiscard_[dqmIndex]--;
00580     if(nbPendingSMDqmDiscards_>0){
00581       nbPendingSMDqmDiscards_--;
00582     }
00583     else {
00584       LOG4CPLUS_WARN(log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex 
00585                      << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
00586     }
00587     
00588     if (!isHalting_) {
00589       shmBuffer_->discardDqmCell(dqmIndex);
00590       bufRef->release();
00591     }
00592 
00593   }
00594   else {
00595     LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex 
00596                     << " from StorageManager while cell is not accepting discards");
00597   }
00598   
00599   if (isHalting_) {
00600     bufRef->release();
00601     return false;
00602   }
00603   
00604   return true;
00605 }
00606 
00607 
00608 //______________________________________________________________________________
00609 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef)
00610 {
00611   I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg = 
00612     (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
00613   //make sure to fill up the shmem so no process will miss it
00614   // but processes will have to handle duplicates
00615 
00616   for(unsigned int i = 0; i < nbRawCells_; i++) 
00617     {
00618       nbEolPosted_++;
00619       shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00620     }
00621 }
00622 
00623 
00624 //______________________________________________________________________________
00625 void FUResourceTable::dropEvent()
00626 {
00627   FUShmRawCell* cell=shmBuffer_->rawCellToRead();
00628   UInt_t fuResourceId=cell->fuResourceId();
00629   shmBuffer_->finishReadingRawCell(cell);
00630   shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00631 }
00632 
00633 
00634 //______________________________________________________________________________
00635 bool FUResourceTable::handleCrashedEP(UInt_t runNumber,pid_t pid)
00636 {
00637   bool retval = false;
00638   vector<pid_t> pids=cellPrcIds();
00639   UInt_t iRawCell=pids.size();
00640   for (UInt_t i=0;i<pids.size();i++) { if (pid==pids[i]) { iRawCell=i; break; } }
00641   
00642   if (iRawCell<pids.size()){
00643     shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
00644     retval = true;
00645   }
00646   else
00647     LOG4CPLUS_WARN(log_,"No raw data to send to error stream for process " << pid);
00648   shmBuffer_->removeClientPrcId(pid);
00649   return retval;
00650 }
00651 
00652 
00653 //______________________________________________________________________________
00654 void FUResourceTable::dumpEvent(FUShmRawCell* cell)
00655 {
00656   ostringstream oss; oss<<"/tmp/evt"<<cell->evtNumber()<<".dump";
00657   ofstream fout(oss.str().c_str());
00658   fout.fill('0');
00659 
00660   fout<<"#\n# evt "<<cell->evtNumber()<<"\n#\n"<<endl;
00661   for (unsigned int i=0;i<cell->nFed();i++) {
00662     if (cell->fedSize(i)==0) continue;
00663     fout<<"# fedid "<<i<<endl;
00664     unsigned char* addr=cell->fedAddr(i);
00665     for (unsigned int j=0;j<cell->fedSize(i);j++) {
00666       fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
00667       if ((j+1)%8) fout<<" "; else fout<<endl;
00668       ++addr;
00669     }
00670     fout<<endl;
00671   }
00672   fout.close();
00673 }
00674 
00675 
00676 //______________________________________________________________________________
00677 void FUResourceTable::stop()
00678 {
00679   isStopping_ = true;
00680   shutDownClients();
00681 }
00682 
00683 
00684 //______________________________________________________________________________
00685 void FUResourceTable::halt()
00686 {
00687   isHalting_=true;
00688   shutDownClients();
00689 }
00690 
00691 
00692 //______________________________________________________________________________
00693 void FUResourceTable::shutDownClients()
00694 {
00695   nbClientsToShutDown_ = nbClients();
00696   isReadyToShutDown_   = false;
00697   
00698   if (nbClientsToShutDown_==0) {
00699     LOG4CPLUS_INFO(log_,"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
00700     UInt_t n=nbResources();
00701     for (UInt_t i=0;i<n;i++) {
00702       evt::State_t state=shmBuffer_->evtState(i);
00703       if (state!=evt::EMPTY){
00704         LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state " 
00705                        << state);
00706         shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00707       }
00708     }
00709     shmBuffer_->scheduleRawEmptyCellForDiscard();
00710   }
00711   else {
00712     int checks=0;
00713 
00714     while(shmBuffer_->nbRawCellsToWrite()<nbClients() && nbClients()!=0)
00715       {
00716         checks++;
00717         vector<pid_t> prcids=clientPrcIds();
00718         for (UInt_t i=0;i<prcids.size();i++) {
00719           pid_t pid   =prcids[i];
00720           int   status=kill(pid,0);
00721           if (status!=0) {
00722             LOG4CPLUS_ERROR(log_,"EP prc "<<pid<<" completed with error.");
00723             handleCrashedEP(runNumber_,pid);
00724           }
00725         }
00726   
00727         LOG4CPLUS_WARN(log_,"no cell to write stop " << shmBuffer_->nbRawCellsToWrite() 
00728                        << " nClients " << nbClients());
00729         if(checks>10){
00730           string msg = "No Raw Cell to Write STOP messages";
00731           XCEPT_RAISE(evf::Exception,msg);
00732         }
00733         ::usleep(500000);
00734       }
00735     nbClientsToShutDown_ = nbClients();
00736     if(nbClientsToShutDown_==0){
00737       UInt_t n=nbResources();
00738       for (UInt_t i=0;i<n;i++) {
00739         evt::State_t state=shmBuffer_->evtState(i);
00740         if (state!=evt::EMPTY){
00741           LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state " 
00742                          << state);
00743           shmBuffer_->setEvtDiscard(i,1);
00744           shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00745         }
00746       }
00747       shmBuffer_->scheduleRawEmptyCellForDiscard();
00748     }
00749     UInt_t n=nbClientsToShutDown_;
00750     for (UInt_t i=0;i<n;++i) shmBuffer_->writeRawEmptyEvent();
00751   }
00752 }
00753 
00754 
00755 //______________________________________________________________________________
00756 void FUResourceTable::clear()
00757 {
00758   for (UInt_t i=0;i<resources_.size();i++) {
00759     resources_[i]->release();
00760     delete resources_[i];
00761   }
00762   resources_.clear();
00763   while (!freeResourceIds_.empty()) freeResourceIds_.pop();
00764 }
00765 
00766 
00767 //______________________________________________________________________________
00768 void FUResourceTable::resetCounters()
00769 {
00770   if (0!=shmBuffer_) {
00771     for (UInt_t i=0;i<shmBuffer_->nRecoCells();i++) acceptSMDataDiscard_[i]= false;
00772     for (UInt_t i=0;i<shmBuffer_->nDqmCells();i++)  acceptSMDqmDiscard_[i] = 0;
00773   }
00774   
00775   nbAllocated_           =nbPending_;
00776   nbCompleted_           =0;
00777   nbSent_                =0;
00778   nbSentError_           =0;
00779   nbSentDqm_             =0;
00780   nbPendingSMDiscards_   =0;
00781   nbPendingSMDqmDiscards_=0;
00782   nbDiscarded_           =0;
00783   nbLost_                =0;
00784   nbEolPosted_           =0;
00785   nbEolDiscarded_        =0;
00786 
00787   nbErrors_              =0;
00788   nbCrcErrors_           =0;
00789   nbAllocSent_           =0;
00790 
00791   sumOfSquares_          =0;
00792   sumOfSizes_            =0;
00793   isStopping_            =false;
00794 }
00795 
00796 
00797 //______________________________________________________________________________
00798 UInt_t FUResourceTable::nbClients() const
00799 {
00800   UInt_t result(0);
00801   if (0!=shmBuffer_) result=shmBuffer_->nClients();
00802   return result;
00803 }
00804 
00805 
00806 //______________________________________________________________________________
00807 vector<pid_t> FUResourceTable::clientPrcIds() const
00808 {
00809   vector<pid_t> result;
00810   if (0!=shmBuffer_) {
00811     UInt_t n=nbClients();
00812     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->clientPrcId(i));
00813   }
00814   return result;
00815 }
00816 
00817 
00818 //______________________________________________________________________________
00819 string FUResourceTable::clientPrcIdsAsString() const
00820 {
00821   stringstream ss;
00822   if (0!=shmBuffer_) {
00823     UInt_t n=nbClients();
00824     for (UInt_t i=0;i<n;i++) {
00825       if (i>0) ss<<",";
00826       ss<<shmBuffer_->clientPrcId(i);
00827     }
00828   }
00829   return ss.str();
00830 }
00831 
00832 
00833 //______________________________________________________________________________
00834 vector<string> FUResourceTable::cellStates() const
00835 {
00836   vector<string> result;
00837   if (0!=shmBuffer_) {
00838     UInt_t n=nbResources();
00839     shmBuffer_->lock();
00840     for (UInt_t i=0;i<n;i++) {
00841       evt::State_t state=shmBuffer_->evtState(i);
00842       if      (state==evt::EMPTY)      result.push_back("EMPTY");
00843       else if (state==evt::STOP)       result.push_back("STOP");
00844       else if (state==evt::LUMISECTION)result.push_back("LUMISECTION");
00845       else if (state==evt::USEDLS)     result.push_back("USEDLS");
00846       else if (state==evt::RAWWRITING) result.push_back("RAWWRITING");
00847       else if (state==evt::RAWWRITTEN) result.push_back("RAWWRITTEN");
00848       else if (state==evt::RAWREADING) result.push_back("RAWREADING");
00849       else if (state==evt::RAWREAD)    result.push_back("RAWREAD");
00850       else if (state==evt::PROCESSING) result.push_back("PROCESSING");
00851       else if (state==evt::PROCESSED)  result.push_back("PROCESSED");
00852       else if (state==evt::RECOWRITING)result.push_back("RECOWRITING");
00853       else if (state==evt::RECOWRITTEN)result.push_back("RECOWRITTEN");
00854       else if (state==evt::SENDING)    result.push_back("SENDING");
00855       else if (state==evt::SENT)       result.push_back("SENT");
00856       else if (state==evt::DISCARDING) result.push_back("DISCARDING");
00857     }
00858     shmBuffer_->unlock();
00859   }
00860   return result;
00861 }
00862 
00863 vector<string> FUResourceTable::dqmCellStates() const
00864 {
00865   vector<string> result;
00866   if (0!=shmBuffer_) {
00867     UInt_t n=nbDqmCells_;
00868     shmBuffer_->lock();
00869     for (UInt_t i=0;i<n;i++) {
00870       dqm::State_t state=shmBuffer_->dqmState(i);
00871       if      (state==dqm::EMPTY)      result.push_back("EMPTY");
00872       else if (state==dqm::WRITING) result.push_back("WRITING");
00873       else if (state==dqm::WRITTEN) result.push_back("WRITTEN");
00874       else if (state==dqm::SENDING)    result.push_back("SENDING");
00875       else if (state==dqm::SENT)       result.push_back("SENT");
00876       else if (state==dqm::DISCARDING) result.push_back("DISCARDING");
00877     }
00878     shmBuffer_->unlock();
00879   }
00880   return result;
00881 }
00882 
00883 
00884 //______________________________________________________________________________
00885 vector<UInt_t> FUResourceTable::cellEvtNumbers() const
00886 {
00887   vector<UInt_t> result;
00888   if (0!=shmBuffer_) {
00889     UInt_t n=nbResources();
00890     shmBuffer_->lock();
00891     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtNumber(i));
00892     shmBuffer_->unlock();
00893   }
00894   return result;
00895 }
00896 
00897 
00898 //______________________________________________________________________________
00899 vector<pid_t> FUResourceTable::cellPrcIds() const
00900 {
00901   vector<pid_t> result;
00902   if (0!=shmBuffer_) {
00903     UInt_t n=nbResources();
00904     shmBuffer_->lock();
00905     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtPrcId(i));
00906     shmBuffer_->unlock();
00907   }
00908   return result;
00909 }
00910 
00911 
00912 //______________________________________________________________________________
00913 vector<time_t> FUResourceTable::cellTimeStamps() const
00914 {
00915   vector<time_t> result;
00916   if (0!=shmBuffer_) {
00917     UInt_t n=nbResources();
00918     shmBuffer_->lock();
00919     for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtTimeStamp(i));
00920     shmBuffer_->unlock();
00921   }
00922   return result;
00923 }
00924 
00925 
00927 // implementation of private member functions
00929 
00930 //______________________________________________________________________________
00931 void FUResourceTable::sendAllocate()
00932 {
00933   UInt_t nbFreeSlots    = this->nbFreeSlots();
00934   UInt_t nbFreeSlotsMax = resources_.size()/2;
00935   if (nbFreeSlots>nbFreeSlotsMax) {
00936     UIntVec_t fuResourceIds;
00937     for (UInt_t i=0;i<nbFreeSlots;i++)
00938       fuResourceIds.push_back(allocateResource());
00939     bu_->sendAllocate(fuResourceIds);
00940     nbAllocSent_++;
00941   }
00942 }
00943 
00944 
00945 //______________________________________________________________________________
00946 void FUResourceTable::sendDiscard(UInt_t buResourceId)
00947 {
00948   bu_->sendDiscard(buResourceId);
00949   nbDiscarded_++;
00950 }
00951 
00952 
00953 //______________________________________________________________________________
00954 void FUResourceTable::sendInitMessage(UInt_t   fuResourceId,
00955                                       UInt_t   outModId,
00956                                       UInt_t   fuProcessId,
00957                                       UInt_t   fuGuid,
00958                                       UChar_t *data,
00959                                       UInt_t   dataSize)
00960 {
00961   if (0==sm_) {
00962     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP INIT MESSAGE!");
00963   }
00964   else {
00965     acceptSMDataDiscard_[fuResourceId] = true;
00966     UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
00967                                         fuGuid,data,dataSize);
00968     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00969     sumOfSizes_  +=nbBytes;
00970   }
00971 }
00972 
00973 
00974 //______________________________________________________________________________
00975 void FUResourceTable::sendDataEvent(UInt_t   fuResourceId,
00976                                     UInt_t   runNumber,
00977                                     UInt_t   evtNumber,
00978                                     UInt_t   outModId,
00979                                     UInt_t   fuProcessId,
00980                                     UInt_t   fuGuid,
00981                                     UChar_t *data,
00982                                     UInt_t   dataSize)
00983 {
00984   if (0==sm_) {
00985     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP DATA EVENT!");
00986   }
00987   else {
00988     acceptSMDataDiscard_[fuResourceId] = true;
00989     UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
00990                                       outModId,fuProcessId,fuGuid,
00991                                       data,dataSize);
00992     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
00993     sumOfSizes_  +=nbBytes;
00994   }
00995 }
00996 
00997 
00998 //______________________________________________________________________________
00999 void FUResourceTable::sendErrorEvent(UInt_t   fuResourceId,
01000                                      UInt_t   runNumber,
01001                                      UInt_t   evtNumber,
01002                                      UInt_t   fuProcessId,
01003                                      UInt_t   fuGuid,
01004                                      UChar_t *data,
01005                                      UInt_t   dataSize)
01006 {
01007   if (0==sm_) {
01008     LOG4CPLUS_ERROR(log_,"No StorageManager, DROP ERROR EVENT!");
01009   }
01010   else {
01011     acceptSMDataDiscard_[fuResourceId] = true;
01012     UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
01013                                        fuProcessId,fuGuid,data,dataSize);
01014     sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
01015     sumOfSizes_  +=nbBytes;
01016   }
01017 
01018 //   if (0!=shmBuffer_) {
01019 //     UInt_t n=nbDqmCells_;
01020 
01021 //     for (UInt_t i=0;i<n;i++) {
01022 //       if(shmBuffer_->dqmCell(i)->fuProcessId()==fuProcessId)
01023 //      {
01024 //        if(shmBuffer_->dqmState(i)!=dqm::SENT){
01025 //          shmBuffer_->setDqmState(i,dqm::SENT);       
01026 //          shmBuffer_->discardDqmCell(i);      
01027 //          acceptSMDqmDiscard_[i] = false;
01028 //        }  
01029 //      }
01030 //     }
01031 //     n=nbRecoCells_;
01032 //     for (UInt_t i=0;i<n;i++) {
01033 //       if(shmBuffer_->recoCell(i)->fuProcessId()==fuProcessId)
01034 //      {
01035 //        shmBuffer_->discardOrphanedRecoCell(i);  
01036 //      }
01037 //     }
01038 
01039 //   }
01040 }
01041 
01042 
01043 //______________________________________________________________________________
01044 void FUResourceTable::sendDqmEvent(UInt_t   fuDqmId,
01045                                    UInt_t   runNumber,
01046                                    UInt_t   evtAtUpdate,
01047                                    UInt_t   folderId,
01048                                    UInt_t   fuProcessId,
01049                                    UInt_t   fuGuid,
01050                                    UChar_t* data,
01051                                    UInt_t   dataSize)
01052 {
01053   if (0==sm_) {
01054     LOG4CPLUS_WARN(log_,"No StorageManager, DROP DQM EVENT.");
01055   }
01056   else {
01057     sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
01058                       fuProcessId,fuGuid,data,dataSize);
01059 
01060     nbPendingSMDqmDiscards_++;
01061 
01062     acceptSMDqmDiscard_[fuDqmId]++;
01063     if(acceptSMDqmDiscard_[fuDqmId]>1)
01064       LOG4CPLUS_WARN(log_,"DQM Cell " << fuDqmId << " being sent more than once for folder " 
01065                      << folderId << " process " << fuProcessId << " guid " << fuGuid);
01066     nbSentDqm_++;
01067   }
01068 }
01069 
01070 
01071 //______________________________________________________________________________
01072 bool FUResourceTable::isLastMessageOfEvent(MemRef_t* bufRef)
01073 {
01074   while (0!=bufRef->getNextReference()) {
01075     bufRef=bufRef->getNextReference();
01076   }
01077   I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
01078     (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
01079   
01080   UInt_t iBlock    =block->blockNb;
01081   UInt_t nBlock    =block->nbBlocksInSuperFragment;
01082   UInt_t iSuperFrag=block->superFragmentNb;
01083   UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
01084   return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
01085 }
01086 
01087 //______________________________________________________________________________
01088 void FUResourceTable::injectCRCError()
01089 {
01090   for (UInt_t i=0;i<resources_.size();i++) {
01091     resources_[i]->scheduleCRCError();
01092   }
01093 }
01094 void FUResourceTable::printWorkLoopStatus()
01095 {
01096   std::cout << "Workloop status===============" << std::endl;
01097   std::cout << "==============================" << std::endl;
01098   if(wlSendData_!=0)
01099     std::cout << "SendData -> " << wlSendData_->isActive() << std::endl;
01100   if(wlSendDqm_!=0)
01101     std::cout << "SendDqm  -> " << wlSendDqm_->isActive()  << std::endl;
01102   if(wlDiscard_!=0)
01103     std::cout << "Discard  -> " << wlDiscard_->isActive()  << std::endl;
01104   std::cout << "Workloops Active  -> " << isActive_  << std::endl;
01105 
01106 }
01107 
01108 
01109 void FUResourceTable::lastResort()
01110 {
01111   std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() 
01112             << " more rawcells to read " << std::endl;
01113   while(shmBuffer_->nbRawCellsToRead()!=0){
01114     FUShmRawCell* newCell=shmBuffer_->rawCellToRead();
01115     std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
01116     shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
01117     std::cout << "lastResort: schedule raw cell for discard" << std::endl;
01118   }
01119   //trigger the shutdown (again?)
01120   shmBuffer_->scheduleRawEmptyCellForDiscard();
01121 }