CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_6/src/EventFilter/ResourceBroker/src/FUResourceTable.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUResourceTable
00004 // ---------------
00005 //
00006 //            12/10/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00007 //            20/01/2012 Andrei Spataru <aspataru@cern.ch>
00009 
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "EvffedFillerRB.h"
00012 
00013 #include "interface/evb/i2oEVBMsgs.h"
00014 #include "xcept/tools.h"
00015 
00016 #include <sys/types.h>
00017 #include <signal.h>
00018 
00019 #ifdef linux
00020 #include <thread>
00021 #endif
00022 //#define DEBUG_RES_TAB
00023 
00024 using namespace evf;
00025 using namespace std;
00027 // construction/destruction
00029 
00030 //______________________________________________________________________________
00031 FUResourceTable::FUResourceTable(bool segmentationMode, UInt_t nbRawCells,
00032                 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00033                 UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
00034                 SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
00035                 EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
00036 
00037         // call super constructor
00038                         IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00039                                         rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
00040                                         logger, timeout, frb, app), shmBuffer_(0)
00041 
00042 {
00043         initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00044                         rawCellSize, recoCellSize, dqmCellSize);
00045 }
00046 
00047 //______________________________________________________________________________
00048 FUResourceTable::~FUResourceTable() {
00049         clear();
00050         //workloop cancels used to be here in the previous version
00051         shmdt( shmBuffer_);
00052         if (FUShmBuffer::releaseSharedMemory())
00053                 LOG4CPLUS_INFO(log_, "SHARED MEMORY SUCCESSFULLY RELEASED.");
00054         if (0 != acceptSMDataDiscard_)
00055                 delete[] acceptSMDataDiscard_;
00056         if (0 != acceptSMDqmDiscard_)
00057                 delete[] acceptSMDqmDiscard_;
00058 }
00059 
00061 // implementation of member functions
00063 
00064 //______________________________________________________________________________
00065 void FUResourceTable::initialize(bool segmentationMode, UInt_t nbRawCells,
00066                 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00067                 UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) {
00068         clear();
00069 
00070         shmBuffer_ = FUShmBuffer::createShmBuffer(segmentationMode, nbRawCells,
00071                         nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize);
00072         if (0 == shmBuffer_) {
00073                 string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00074                 LOG4CPLUS_FATAL(log_, msg);
00075                 XCEPT_RAISE(evf::Exception, msg);
00076         }
00077 
00078         for (UInt_t i = 0; i < nbRawCells_; i++) {
00079                 FUResource* newResource = new FUResource(i, log_, frb_, app_);
00080                 newResource->release(true);
00081                 resources_.push_back(newResource);
00082                 freeResourceIds_.push(i);
00083         }
00084 
00085         acceptSMDataDiscard_ = new bool[nbRecoCells];
00086         acceptSMDqmDiscard_ = new int[nbDqmCells];
00087 
00088         resetCounters();
00089         stopFlag_=false;
00090 }
00091 
00092 //______________________________________________________________________________
00093 bool FUResourceTable::sendData() {
00094         bool reschedule = true;
00095         FUShmRecoCell* cell = 0;
00096         try {
00097                 cell = shmBuffer_->recoCellToRead();
00098         } catch (evf::Exception& e) {
00099                 rethrowShmBufferException(e, "FUResourceTable:sendData:recoCellToRead");
00100         }
00101 
00102         if (0 == cell->eventSize()) {
00103                 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00104                 UInt_t cellIndex = cell->index();
00105                 try {
00106                         shmBuffer_->finishReadingRecoCell(cell);
00107                         shmBuffer_->discardRecoCell(cellIndex);
00108                 } catch (evf::Exception& e) {
00109                         rethrowShmBufferException(e,
00110                                         "FUResourceTable:sendData:finishReadingRecoCell/discardRecoCell");
00111                 }
00112                 shutdownStatus_|=1<<7;
00113                 reschedule = false;
00114         } else {
00115                 try {
00116                         if (cell->type() == 0) {
00117                                 UInt_t cellIndex = cell->index();
00118                                 UInt_t cellOutModId = cell->outModId();
00119                                 UInt_t cellFUProcId = cell->fuProcessId();
00120                                 UInt_t cellFUGuid = cell->fuGuid();
00121                                 UChar_t* cellPayloadAddr = cell->payloadAddr();
00122                                 UInt_t cellEventSize = cell->eventSize();
00123                                 UInt_t cellExpectedEPs = cell->nExpectedEPs();
00124                                 try {
00125                                         shmBuffer_->finishReadingRecoCell(cell);
00126                                 } catch (evf::Exception& e) {
00127                                         rethrowShmBufferException(e,
00128                                                         "FUResourceTable:sendData:finishReadingRecoCell");
00129                                 }
00130 
00131                                 lock();
00132                                 nbPendingSMDiscards_++;
00133                                 unlock();
00134 
00135                                 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
00136                                                 cellFUGuid, cellPayloadAddr, cellEventSize,
00137                                                 cellExpectedEPs);
00138                         } else if (cell->type() == 1) {
00139                                 UInt_t cellIndex = cell->index();
00140                                 UInt_t cellRawIndex = cell->rawCellIndex();
00141                                 UInt_t cellRunNumber = cell->runNumber();
00142                                 UInt_t cellEvtNumber = cell->evtNumber();
00143                                 UInt_t cellOutModId = cell->outModId();
00144                                 UInt_t cellFUProcId = cell->fuProcessId();
00145                                 UInt_t cellFUGuid = cell->fuGuid();
00146                                 UChar_t *cellPayloadAddr = cell->payloadAddr();
00147                                 UInt_t cellEventSize = cell->eventSize();
00148                                 try {
00149                                         shmBuffer_->finishReadingRecoCell(cell);
00150                                 } catch (evf::Exception& e) {
00151                                         rethrowShmBufferException(e,
00152                                                         "FUResourceTable:sendData:finishReadingRecoCell");
00153                                 }
00154 
00155                                 lock();
00156                                 nbPendingSMDiscards_++;
00157                                 resources_[cellRawIndex]->incNbSent();
00158                                 if (resources_[cellRawIndex]->nbSent() == 1)
00159                                         nbSent_++;
00160                                 unlock();
00161 
00162                                 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
00163                                                 cellOutModId, cellFUProcId, cellFUGuid,
00164                                                 cellPayloadAddr, cellEventSize);
00165                         } else if (cell->type() == 2) {
00166                                 UInt_t cellIndex = cell->index();
00167                                 UInt_t cellRawIndex = cell->rawCellIndex();
00168                                 //UInt_t   cellRunNumber   = cell->runNumber();
00169                                 UInt_t cellEvtNumber = cell->evtNumber();
00170                                 UInt_t cellFUProcId = cell->fuProcessId();
00171                                 UInt_t cellFUGuid = cell->fuGuid();
00172                                 UChar_t *cellPayloadAddr = cell->payloadAddr();
00173                                 UInt_t cellEventSize = cell->eventSize();
00174                                 try {
00175                                         shmBuffer_->finishReadingRecoCell(cell);
00176                                 } catch (evf::Exception& e) {
00177                                         rethrowShmBufferException(e,
00178                                                         "FUResourceTable:sendData:recoCellToRead");
00179                                 }
00180 
00181                                 lock();
00182                                 nbPendingSMDiscards_++;
00183                                 resources_[cellRawIndex]->incNbSent();
00184                                 if (resources_[cellRawIndex]->nbSent() == 1) {
00185                                         nbSent_++;
00186                                         nbSentError_++;
00187                                 }
00188                                 unlock();
00189 
00190                                 sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
00191                                                 cellFUProcId, cellFUGuid, cellPayloadAddr,
00192                                                 cellEventSize);
00193                         } else {
00194                                 string errmsg =
00195                                                 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
00196                                 XCEPT_RAISE(evf::Exception, errmsg);
00197                         }
00198                 } catch (xcept::Exception& e) {
00199                         LOG4CPLUS_FATAL(
00200                                         log_,
00201                                         "Failed to send EVENT DATA to StorageManager: "
00202                                                         << xcept::stdformat_exception_history(e));
00203                         reschedule = false;
00204                 }
00205         }
00206 
00207         sDataActive_=reschedule;
00208         return reschedule;
00209 }
00210 
00211 //______________________________________________________________________________
00212 bool FUResourceTable::sendDataWhileHalting() {
00213         bool reschedule = true;
00214         FUShmRecoCell* cell = 0;
00215         try {
00216                 cell = shmBuffer_->recoCellToRead();
00217         } catch (evf::Exception& e) {
00218                 rethrowShmBufferException(e,
00219                                 "FUResourceTable:sendDataWhileHalting:recoCellToRead");
00220         }
00221 
00222         if (0 == cell->eventSize()) {
00223                 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00224                 UInt_t cellIndex = cell->index();
00225                 try {
00226                         shmBuffer_->finishReadingRecoCell(cell);
00227                         shmBuffer_->discardRecoCell(cellIndex);
00228                 } catch (evf::Exception& e) {
00229                         rethrowShmBufferException(e,
00230                                         "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
00231                 }
00232                 shutdownStatus_|=1<<8;
00233                 reschedule = false;
00234         } else {
00235                 LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
00236                 UInt_t cellIndex = cell->index();
00237                 try {
00238                         shmBuffer_->finishReadingRecoCell(cell);
00239                         shmBuffer_->discardRecoCell(cellIndex);
00240                 } catch (evf::Exception& e) {
00241                         rethrowShmBufferException(e,
00242                                         "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
00243                 }
00244         }
00245 
00246         sDataActive_=reschedule;
00247         return reschedule;
00248 }
00249 
00250 //______________________________________________________________________________
00251 bool FUResourceTable::sendDqm() {
00252         bool reschedule = true;
00253         FUShmDqmCell* cell = 0;
00254         // initialize to a value to avoid warnings
00255         dqm::State_t state = dqm::EMPTY;
00256         try {
00257                 cell = shmBuffer_->dqmCellToRead();
00258                 state = shmBuffer_->dqmState(cell->index());
00259         } catch (evf::Exception& e) {
00260                 rethrowShmBufferException(e,
00261                                 "FUResourceTable:sendDqm:dqmCellToRead/dqmState");
00262         }
00263 
00264         if (state == dqm::EMPTY) {
00265                 LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
00266                 std::cout << "shut down dqm workloop " << std::endl;
00267                 UInt_t cellIndex = cell->index();
00268                 try {
00269                         shmBuffer_->finishReadingDqmCell(cell);
00270                         shmBuffer_->discardDqmCell(cellIndex);
00271                 } catch (evf::Exception& e) {
00272                         rethrowShmBufferException(e,
00273                                         "FUResourceTable:sendDqm:finishReadingDqmCell/discardDqmCell");
00274                 }
00275                 shutdownStatus_|=1<<9;
00276                 reschedule = false;
00277         } else {
00278                 try {
00279                         UInt_t cellIndex = cell->index();
00280                         UInt_t cellRunNumber = cell->runNumber();
00281                         UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00282                         UInt_t cellFolderId = cell->folderId();
00283                         UInt_t cellFUProcId = cell->fuProcessId();
00284                         UInt_t cellFUGuid = cell->fuGuid();
00285                         UChar_t *cellPayloadAddr = cell->payloadAddr();
00286                         UInt_t cellEventSize = cell->eventSize();
00287                         sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
00288                                         cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
00289                                         cellEventSize);
00290                         try {
00291                                 shmBuffer_->finishReadingDqmCell(cell);
00292                         } catch (evf::Exception& e) {
00293                                 rethrowShmBufferException(e,
00294                                                 "FUResourceTable:sendDqm:finishReadingDqmCell");
00295                         }
00296                 } catch (xcept::Exception& e) {
00297                         LOG4CPLUS_FATAL(
00298                                         log_,
00299                                         "Failed to send DQM DATA to StorageManager: "
00300                                                         << xcept::stdformat_exception_history(e));
00301                         reschedule = false;
00302                 }
00303         }
00304 
00305         sDqmActive_=reschedule;
00306         return reschedule;
00307 }
00308 
00309 //______________________________________________________________________________
00310 bool FUResourceTable::sendDqmWhileHalting() {
00311         bool reschedule = true;
00312         FUShmDqmCell* cell = 0;
00313         // initialize to a value to avoid warnings
00314         dqm::State_t state = dqm::EMPTY;
00315         try {
00316                 cell = shmBuffer_->dqmCellToRead();
00317                 state = shmBuffer_->dqmState(cell->index());
00318         } catch (evf::Exception& e) {
00319                 rethrowShmBufferException(e,
00320                                 "FUResourceTable:sendDqmWhileHalting:dqmCellToRead/dqmState");
00321         }
00322 
00323         if (state == dqm::EMPTY) {
00324                 LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
00325                 std::cout << "shut down dqm workloop " << std::endl;
00326                 UInt_t cellIndex = cell->index();
00327                 try {
00328                         shmBuffer_->finishReadingDqmCell(cell);
00329                         shmBuffer_->discardDqmCell(cellIndex);
00330                 } catch (evf::Exception& e) {
00331                         rethrowShmBufferException(e,
00332                                         "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
00333                 }
00334                 shutdownStatus_|=1<<10;
00335                 reschedule = false;
00336         } else {
00337                 UInt_t cellIndex = cell->index();
00338                 try {
00339                         shmBuffer_->finishReadingDqmCell(cell);
00340                         shmBuffer_->discardDqmCell(cellIndex);
00341                 } catch (evf::Exception& e) {
00342                         rethrowShmBufferException(e,
00343                                         "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
00344                 }
00345         }
00346 
00347         sDqmActive_=reschedule;
00348         return reschedule;
00349 }
00350 
00351 // common procedure for discard() and discardWhileHalting()
00352 // when the workloop should not be rescheduled
00353 //______________________________________________________________________________
00354 void FUResourceTable::discardNoReschedule() {
00355         std::cout << " entered shutdown cycle " << std::endl;
00356         shutdownStatus_|=1<<11;
00357         try {
00358                 shmBuffer_->writeRecoEmptyEvent();
00359         } catch (evf::Exception& e) {
00360                 rethrowShmBufferException(e,
00361                                 "FUResourceTable:discardNoReschedule:writeRecoEmptyEvent");
00362         }
00363 
00364         UInt_t count = 0;
00365         while (count < 100) {
00366                 std::cout << " shutdown cycle " << shmBuffer_->nClients() << " "
00367                                 << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00368                 if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
00369                                 shmBuffer_->shmid()) == 1) {
00370                         shutdownStatus_|=1<<12;
00371                         //isReadyToShutDown_ = true;
00372                         break;
00373                 } else {
00374                         count++;
00375                         std::cout << " shutdown cycle attempt " << count << std::endl;
00376                         LOG4CPLUS_DEBUG(
00377                                         log_,
00378                                         "FUResourceTable: Wait for all clients to detach,"
00379                                                         << " nClients=" << shmBuffer_->nClients()
00380                                                         << " nattch=" << FUShmBuffer::shm_nattch(
00381                                                         shmBuffer_->shmid()) << " (" << count << ")");
00382                         ::usleep( shutdownTimeout_);
00383                         if (count * shutdownTimeout_ > 10000000)
00384                                 LOG4CPLUS_WARN(
00385                                                 log_,
00386                                                 "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00387                                                                 << " nClients=" << shmBuffer_->nClients()
00388                                                                 << " nattch=" << FUShmBuffer::shm_nattch(
00389                                                                 shmBuffer_->shmid()) << " (" << count << ")");
00390 
00391                 }
00392         }
00393         
00394         bool allEmpty = false;
00395         std::cout << "Checking if all dqm cells are empty " << std::endl;
00396         while (!allEmpty) {
00397                 UInt_t n = nbDqmCells_;
00398                 allEmpty = true;
00399                 shmBuffer_->lock();
00400                 for (UInt_t i = 0; i < n; i++) {
00401                         // initialize to a value to avoid warnings
00402                         dqm::State_t state = dqm::EMPTY;
00403                         try {
00404                                 state = shmBuffer_->dqmState(i);
00405                         } catch (evf::Exception& e) {
00406                                 rethrowShmBufferException(e,
00407                                                 "FUResourceTable:discardNoReschedule:dqmState");
00408                         }
00409                         if (state != dqm::EMPTY)
00410                                 allEmpty = false;
00411                 }
00412                 shmBuffer_->unlock();
00413         }
00414         shutdownStatus_|=1<<13;
00415 
00416         std::cout << "Number of  pending discards before declaring ready to shut down: " << nbPendingSMDqmDiscards_ << std::endl;
00417         if (nbPendingSMDqmDiscards_ != 0) {
00418                 LOG4CPLUS_WARN(
00419                                 log_,
00420                                 "FUResourceTable: pending DQM discards not zero: ="
00421                                                 << nbPendingSMDqmDiscards_
00422                                                 << " while cells are all empty. This may cause problems at next start ");
00423 
00424         }
00425 
00426         try {
00427                 shmBuffer_->writeDqmEmptyEvent();
00428         } catch (evf::Exception& e) {
00429                 rethrowShmBufferException(e,
00430                                 "FUResourceTable:discardNoReschedule:writeDqmEmptyEvent");
00431         }
00432 
00433         isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
00434         // sendDqm loop has been shut down as well
00435 }
00436 
00437 //______________________________________________________________________________
00438 bool FUResourceTable::discard() {
00439         FUShmRawCell* cell = 0;
00440         // initialize to a value to avoid warnings
00441         evt::State_t state = evt::EMPTY;
00442         try {
00443                 cell = shmBuffer_->rawCellToDiscard();
00444                 state = shmBuffer_->evtState(cell->index());
00445         } catch (evf::Exception& e) {
00446                 rethrowShmBufferException(e,
00447                                 "FUResourceTable:discard:rawCellToRead/evtState");
00448         }
00449 
00450         bool reschedule = true;
00451         bool shutDown = (state == evt::STOP);
00452         bool isLumi = (state == evt::USEDLS);
00453         UInt_t fuResourceId = cell->fuResourceId();
00454         UInt_t buResourceId = cell->buResourceId();
00455 
00456         if (state == evt::EMPTY) {
00457                 LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
00458                 return true;
00459         }
00460 
00461         if (shutDown) {
00462                 LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00463                 if (nbClientsToShutDown_ > 0)
00464                         --nbClientsToShutDown_;
00465                 if (nbClientsToShutDown_ == 0) {
00466                         LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00467                         isActive_ = false;
00468                         reschedule = false;
00469                 }
00470         }
00471 
00472         try {
00473                 shmBuffer_->discardRawCell(cell);
00474         } catch (evf::Exception& e) {
00475                 rethrowShmBufferException(e, "FUResourceTable:discard:discardRawCell");
00476         }
00477         // UPDATED
00478         if (isLumi)
00479                 nbEolDiscarded_++;
00480 
00481         if (!shutDown && !isLumi) {
00482                 if (fuResourceId >= nbResources()) {
00483                         LOG4CPLUS_WARN(
00484                                         log_,
00485                                         "cell " << cell->index() << " in state " << state
00486                                                         << " scheduled for discard has no associated FU resource ");
00487                 } else {
00488                         resources_[fuResourceId]->release(true);
00489                         lock();
00490                         freeResourceIds_.push(fuResourceId);
00491                         assert(freeResourceIds_.size() <= resources_.size());
00492                         unlock();
00493 
00494                         sendDiscard(buResourceId);
00495                         sendAllocate();
00496                 }
00497         }
00498 
00499         if (!reschedule) {
00500                 discardNoReschedule();
00501         }
00502 
00503         return reschedule;
00504 }
00505 
00506 //______________________________________________________________________________
00507 bool FUResourceTable::discardWhileHalting(bool sendDiscards) {
00508         FUShmRawCell* cell = 0;
00509         // initialize to a value to avoid warnings
00510         evt::State_t state = evt::EMPTY;
00511         try {
00512                 cell = shmBuffer_->rawCellToDiscard();
00513                 state = shmBuffer_->evtState(cell->index());
00514         } catch (evf::Exception& e) {
00515                 rethrowShmBufferException(e,
00516                                 "FUResourceTable:discardWhileHalting:rawCellToRead/evtState");
00517         }
00518 
00519         bool reschedule = true;
00520         bool shutDown = (state == evt::STOP);
00521         bool isLumi = (state == evt::USEDLS);
00522         UInt_t fuResourceId = cell->fuResourceId();
00523         UInt_t buResourceId = cell->buResourceId();
00524 
00525         if (state == evt::EMPTY) {
00526                 LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
00527                 return true;
00528         }
00529 
00530         if (shutDown) {
00531                 LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00532                 if (nbClientsToShutDown_ > 0)
00533                         --nbClientsToShutDown_;
00534                 if (nbClientsToShutDown_ == 0) {
00535                         LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00536                         isActive_ = false;
00537                         reschedule = false;
00538                 }
00539         }
00540 
00541         try {
00542                 shmBuffer_->discardRawCell(cell);
00543         } catch (evf::Exception& e) {
00544                 rethrowShmBufferException(e,
00545                                 "FUResourceTable:discardWhileHalting:discardRawCell");
00546         }
00547         // UPDATED
00548         if (isLumi)
00549                 nbEolDiscarded_++;
00550 
00551         if (!shutDown && !isLumi) {
00552                 if (fuResourceId >= nbResources()) {
00553                         LOG4CPLUS_WARN(
00554                                         log_,
00555                                         "cell " << cell->index() << " in state " << state
00556                                                         << " scheduled for discard has no associated FU resource ");
00557                 } else {
00558                         resources_[fuResourceId]->release(true);
00559                         lock();
00560                         freeResourceIds_.push(fuResourceId);
00561                         assert(freeResourceIds_.size() <= resources_.size());
00562                         unlock();
00563 
00564                         /*
00565                          sendDiscard(buResourceId);
00566                          sendAllocate();
00567                          */
00568                         if (sendDiscards)
00569                                 sendDiscard(buResourceId);
00570                 }
00571         }
00572 
00573         if (!reschedule) {
00574                 discardNoReschedule();
00575         }
00576 
00577         return reschedule;
00578 }
00579 
00580 //______________________________________________________________________________
00581 bool FUResourceTable::buildResource(MemRef_t* bufRef) {
00582         bool eventComplete = false;
00583         // UPDATED
00584         bool lastMsg = isLastMessageOfEvent(bufRef);
00585         I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
00586                         (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
00587 
00588         UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
00589         UInt_t buResourceId = (UInt_t) block->buResourceId;
00590         // Check input
00591         if ((int) block->fuTransactionId < 0 || fuResourceId >= nbRawCells_
00592                         || (int) block->buResourceId < 0) {
00593                 stringstream failureStr;
00594                 failureStr << "Received TAKE message with invalid bu/fu resource id:"
00595                                 << " fuResourceId: " << fuResourceId << " buResourceId: "
00596                                 << buResourceId;
00597                 LOG4CPLUS_ERROR(log_, failureStr.str());
00598                 XCEPT_RAISE(evf::Exception, failureStr.str());
00599         }
00600         FUResource* resource = resources_[fuResourceId];
00601 
00602         // allocate resource
00603         if (!resource->fatalError() && !resource->isAllocated()) {
00604                 FUShmRawCell* cell = 0;
00605                 try {
00606                         cell = shmBuffer_->rawCellToWrite();
00607                 } catch (evf::Exception& e) {
00608                         rethrowShmBufferException(e,
00609                                         "FUResourceTable:buildResource:rawCellToWrite");
00610                 }
00611                 if (cell == 0) {
00612                         bufRef->release();
00613                         return eventComplete;
00614                 }
00615                 resource->allocate(cell);
00616                 timeval now;
00617                 gettimeofday(&now, 0);
00618 
00619                 frb_->setRBTimeStamp(
00620                                 ((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00621 
00622                 frb_->setRBEventCount(nbCompleted_);
00623 
00624                 if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
00625                         resource->doCrcCheck(true);
00626                 else
00627                         resource->doCrcCheck(false);
00628         }
00629 
00630 #ifdef DEBUG_RES_TAB
00631         std::cout << "Received frame for resource " << buResourceId << std::endl;
00632 #endif
00633         // keep building this resource if it is healthy
00634         if (!resource->fatalError()) {
00635 #ifdef DEBUG_RES_TAB
00636                 std::cout << "No fatal error for  " << buResourceId << ", keep building..."<< std::endl;
00637 #endif
00638                 resource->process(bufRef);
00639                 lock();
00640                 nbErrors_ += resource->nbErrors();
00641                 nbCrcErrors_ += resource->nbCrcErrors();
00642                 unlock();
00643 #ifdef DEBUG_RES_TAB
00644                 std::cout << "Checking if resource is complete " << buResourceId << std::endl;
00645 #endif
00646                 // make resource available for pick-up
00647                 if (resource->isComplete()) {
00648 #ifdef DEBUG_RES_TAB
00649                         std::cout << "@@@@RESOURCE is COMPLETE " << buResourceId << std::endl;
00650 #endif
00651                         lock();
00652                         nbCompleted_++;
00653                         nbPending_--;
00654                         unlock();
00655                         if (doDumpEvents_ > 0 && nbCompleted_ % doDumpEvents_ == 0)
00656                                 dumpEvent(resource->shmCell());
00657                         try {
00658                                 shmBuffer_->finishWritingRawCell(resource->shmCell());
00659                         } catch (evf::Exception& e) {
00660                                 rethrowShmBufferException(e,
00661                                                 "FUResourceTable:buildResource:finishWritingRawCell");
00662                         }
00663                         eventComplete = true;
00664                 }
00665 
00666         }
00667         // bad event, release msg, and the whole resource if this was the last one
00668         if (resource->fatalError()) {
00669                 if (lastMsg) {
00670                         try {
00671                                 shmBuffer_->releaseRawCell(resource->shmCell());
00672                         } catch (evf::Exception& e) {
00673                                 rethrowShmBufferException(e,
00674                                                 "FUResourceTable:buildResource:releaseRawCell");
00675                         }
00676                         resource->release(true);
00677                         lock();
00678                         freeResourceIds_.push(fuResourceId);
00679                         nbDiscarded_++;
00680                         nbLost_++;
00681                         nbPending_--;
00682                         unlock();
00683                         bu_->sendDiscard(buResourceId);
00684                         sendAllocate();
00685                 }
00686                 //bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
00687         }
00688 
00689         return eventComplete;
00690 }
00691 
00692 //______________________________________________________________________________
00693 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef) {
00694         I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00695         msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00696         UInt_t recoIndex = msg->rbBufferID;
00697 
00698         // Check input
00699         if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
00700                 LOG4CPLUS_ERROR(
00701                                 log_,
00702                                 "Received DISCARD DATA message with invalid recoIndex:"
00703                                                 << recoIndex);
00704 
00705         if (acceptSMDataDiscard_[recoIndex]) {
00706                 lock();
00707                 nbPendingSMDiscards_--;
00708                 unlock();
00709                 acceptSMDataDiscard_[recoIndex] = false;
00710 
00711                 try {
00712                         shmBuffer_->discardRecoCell(recoIndex);
00713                 } catch (evf::Exception& e) {
00714                         rethrowShmBufferException(e,
00715                                         "FUResourceTable:discardDataEvent:discardRecoCell");
00716                 }
00717 
00718         } else {
00719                 LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00720         }
00721 
00722         bufRef->release();
00723         return true;
00724 }
00725 
00726 //______________________________________________________________________________
00727 bool FUResourceTable::discardDataEventWhileHalting(MemRef_t* bufRef) {
00728         I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00729         msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00730         UInt_t recoIndex = msg->rbBufferID;
00731 
00732         // Check input
00733         if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
00734                 LOG4CPLUS_ERROR(
00735                                 log_,
00736                                 "Received DISCARD DATA message with invalid recoIndex:"
00737                                                 << recoIndex);
00738 
00739         if (acceptSMDataDiscard_[recoIndex]) {
00740                 lock();
00741                 nbPendingSMDiscards_--;
00742                 unlock();
00743                 acceptSMDataDiscard_[recoIndex] = false;
00744 
00745         } else {
00746                 LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00747         }
00748 
00749         bufRef->release();
00750         return false;
00751 }
00752 
00753 //______________________________________________________________________________
00754 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef) {
00755         I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00756         msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00757         UInt_t dqmIndex = msg->rbBufferID;
00758 
00759         // Check input
00760         if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
00761                 LOG4CPLUS_ERROR(
00762                                 log_,
00763                                 "Received DISCARD DQM message with invalid dqmIndex:"
00764                                                 << dqmIndex);
00765 
00766         unsigned int ntries = 0;
00767         try {
00768                 while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
00769                         if (ntries)//tolerate one attempt
00770                         LOG4CPLUS_WARN(
00771                                         log_,
00772                                         "DQM discard for cell " << dqmIndex
00773                                                         << " which is not yet in SENT state - waiting");
00774                         ::usleep(10000);
00775                         if (ntries++ > 10) {
00776                                 LOG4CPLUS_ERROR(
00777                                                 log_,
00778                                                 "DQM cell " << dqmIndex
00779                                                                 << " discard timed out while cell still in state "
00780                                                                 << shmBuffer_->dqmState(dqmIndex));
00781                                 bufRef->release();
00782                                 return true;
00783                         }
00784                 }
00785         } catch (evf::Exception& e) {
00786                 rethrowShmBufferException(e, "FUResourceTable:discardDqmEvent:dqmState");
00787         }
00788         if (acceptSMDqmDiscard_[dqmIndex] > 0) {
00789                 acceptSMDqmDiscard_[dqmIndex]--;
00790                 if (--nbPendingSMDqmDiscards_ < 0) {
00791                         LOG4CPLUS_WARN(
00792                                         log_,
00793                                         "Spurious??? DQM discard by StorageManager, index "
00794                                                         << dqmIndex << " cell state "
00795                                                         << shmBuffer_->dqmState(dqmIndex)
00796                                                         << " accept flag " << acceptSMDqmDiscard_[dqmIndex]);
00797                 }
00798                 try {
00799                         shmBuffer_->discardDqmCell(dqmIndex);
00800                 } catch (evf::Exception& e) {
00801                         rethrowShmBufferException(e,
00802                                         "FUResourceTable:discardDqmEvent:discardDqmCell");
00803                 }
00804 
00805         } else {
00806                 LOG4CPLUS_ERROR(
00807                                 log_,
00808                                 "Spurious DQM discard for cell " << dqmIndex
00809                                                 << " from StorageManager while cell is not accepting discards");
00810         }
00811 
00812         bufRef->release();
00813         return true;
00814 }
00815 
00816 //______________________________________________________________________________
00817 bool FUResourceTable::discardDqmEventWhileHalting(MemRef_t* bufRef) {
00818         I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00819         msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00820         UInt_t dqmIndex = msg->rbBufferID;
00821 
00822         // Check input
00823         if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
00824                 LOG4CPLUS_ERROR(
00825                                 log_,
00826                                 "Received DISCARD DQM message with invalid dqmIndex:"
00827                                                 << dqmIndex);
00828 
00829         unsigned int ntries = 0;
00830         try {
00831                 while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
00832                         if (ntries)//tolerate one attempt
00833                         LOG4CPLUS_WARN(
00834                                         log_,
00835                                         "DQM discard for cell " << dqmIndex
00836                                                         << " which is not yet in SENT state - waiting");
00837                         ::usleep(10000);
00838                         if (ntries++ > 10) {
00839                                 LOG4CPLUS_ERROR(
00840                                                 log_,
00841                                                 "DQM cell " << dqmIndex
00842                                                                 << " discard timed out while cell still in state "
00843                                                                 << shmBuffer_->dqmState(dqmIndex));
00844                                 bufRef->release();
00845                                 return true;
00846                         }
00847                 }
00848         } catch (evf::Exception& e) {
00849                 rethrowShmBufferException(e,
00850                                 "FUResourceTable:discardDqmEventWhileHalting:dqmState(2)");
00851         }
00852         if (acceptSMDqmDiscard_[dqmIndex] > 0) {
00853                 acceptSMDqmDiscard_[dqmIndex]--;
00854                 if (--nbPendingSMDqmDiscards_ < 0) {
00855                         try {
00856                                 LOG4CPLUS_WARN(
00857                                                 log_,
00858                                                 "Spurious??? DQM discard by StorageManager, index "
00859                                                                 << dqmIndex << " cell state "
00860                                                                 << shmBuffer_->dqmState(dqmIndex)
00861                                                                 << " accept flag "
00862                                                                 << acceptSMDqmDiscard_[dqmIndex]);
00863                         } catch (evf::Exception& e) {
00864                                 rethrowShmBufferException(e,
00865                                                 "FUResourceTable:discardDqmEventWhileHalting:dqmState");
00866                         }
00867                 }
00868 
00869         } else {
00870                 LOG4CPLUS_ERROR(
00871                                 log_,
00872                                 "Spurious DQM discard for cell " << dqmIndex
00873                                                 << " from StorageManager while cell is not accepting discards");
00874         }
00875 
00876         bufRef->release();
00877         return false;
00878 }
00879 
00880 //______________________________________________________________________________
00881 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef) {
00882         I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
00883                         *msg =
00884                                         (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
00885         //make sure to fill up the shmem so no process will miss it
00886         // but processes will have to handle duplicates
00887 
00888         // Check input
00889         int lumiCheck = (int) msg->lumiSection;
00890         if (lumiCheck < 0)
00891                 LOG4CPLUS_ERROR(log_,
00892                                 "Received EOL message with invalid index:" << lumiCheck);
00893 
00894         for (unsigned int i = 0; i < nbRawCells_; i++) {
00895                 // UPDATED
00896                 if (stopFlag_) break;
00897                 nbEolPosted_++;
00898                 try {
00899                         shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00900                 } catch (evf::Exception& e) {
00901                         rethrowShmBufferException(e,
00902                                         "FUResourceTable:postEndOfLumiSection:writeRawLumiSectionEvent");
00903                 }
00904         }
00905 }
00906 
00907 //______________________________________________________________________________
00908 void FUResourceTable::dropEvent() {
00909         FUShmRawCell* cell = 0;
00910         try {
00911                 cell = shmBuffer_->rawCellToRead();
00912         } catch (evf::Exception& e) {
00913                 rethrowShmBufferException(e, "FUResourceTable:dropEvent:rawCellToRead");
00914         }
00915         UInt_t fuResourceId = cell->fuResourceId();
00916         try {
00917                 shmBuffer_->finishReadingRawCell(cell);
00918                 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00919         } catch (evf::Exception& e) {
00920                 rethrowShmBufferException(e,
00921                                 "FUResourceTable:dropEvent:finishReadingRawCell/scheduleRawCellForDiscard");
00922         }
00923 }
00924 
00925 //______________________________________________________________________________
00926 bool FUResourceTable::handleCrashedEP(UInt_t runNumber, pid_t pid) {
00927         bool retval = false;
00928         vector < pid_t > pids = cellPrcIds();
00929         UInt_t iRawCell = pids.size();
00930         for (UInt_t i = 0; i < pids.size(); i++) {
00931                 if (pid == pids[i]) {
00932                         iRawCell = i;
00933                         break;
00934                 }
00935         }
00936 
00937         if (iRawCell < pids.size()) {
00938                 try {
00939                         bool shmret = shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell, true);
00940                         if (!shmret)
00941                                 LOG4CPLUS_WARN(log_,"Problem writing to the error stream.");
00942                 } catch (evf::Exception& e) {
00943                         rethrowShmBufferException(e,
00944                                         "FUResourceTable:handleCrashedEP:writeErrorEventData");
00945                 }
00946                 retval = true;
00947         } else
00948                 LOG4CPLUS_WARN(log_,
00949                                 "No raw data to send to error stream for process " << pid);
00950         try {
00951                 bool success = shmBuffer_->removeClientPrcId(pid);
00952                 if (!success)
00953                   LOG4CPLUS_WARN(log_,
00954                                 "removeClientPrcId: " << pid << " not in shared memory index, was in raw cell " << iRawCell);
00955         } catch (evf::Exception& e) {
00956                 rethrowShmBufferException(e,
00957                                 "FUResourceTable:handleCrashedEP:removeClientPrcId");
00958         }
00959         return retval;
00960 }
00961 
00962 //______________________________________________________________________________
00963 void FUResourceTable::shutdownWatchdog(unsigned int timeout)
00964 {
00965         unsigned int timeoutUs=timeout*1000000+1;
00966         bool warned=false;
00967         while (!watchDogEnd_) {
00968 
00969                 usleep(50000);
00970                 timeoutUs-=50000;
00971                 if (timeoutUs<=50000) {
00972                         LOG4CPLUS_ERROR(log_,"Timeout in shutdownClients, status:"<< std::hex << shutdownStatus_);
00973                         watchDogSetFailed_=true;
00974                         break;
00975                 }
00976                 if (timeoutUs<=1000000*timeout/2 && !warned) {
00977                         warned=true;
00978                         LOG4CPLUS_WARN(log_,"Long shutdown of clients, status:" << std::hex << shutdownStatus_);
00979                 }
00980         }
00981 }
00982 
00983 //______________________________________________________________________________
00984 void FUResourceTable::shutDownClients() {
00985         nbClientsToShutDown_ = nbClients();
00986         isReadyToShutDown_ = false;
00987 
00988         shutdownStatus_=1;
00989 
00990         //start watchdog thread
00991         watchDogEnd_=false;
00992         watchDogSetFailed_=false;
00993         #ifdef linux
00994         std::thread watch(&FUResourceTable::shutdownWatchdog,this,20);
00995         #endif
00996         if (nbClientsToShutDown_ == 0) {
00997                 shutdownStatus_|=1<<1;
00998                 LOG4CPLUS_INFO(
00999                                 log_,
01000                                 "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
01001                 UInt_t n = nbResources();
01002                 try {
01003                         for (UInt_t i = 0; i < n; i++) {
01004                                 evt::State_t state = shmBuffer_->evtState(i);
01005                                 if (state != evt::EMPTY) {
01006                                         LOG4CPLUS_WARN(
01007                                                         log_,
01008                                                         "Schedule discard at STOP for orphaned event in state "
01009                                                                         << state);
01010                                         shmBuffer_->scheduleRawCellForDiscardServerSide(i);
01011                                 }
01012                         }
01013                         shmBuffer_->scheduleRawEmptyCellForDiscard();
01014                 } catch (evf::Exception& e) {
01015                         rethrowShmBufferException(e,
01016                                         "FUResourceTable:shutDownClients:evtState/scheduleRawEmptyCellForDiscard");
01017                 }
01018         } else {
01019                 // UPDATED
01020                 int checks = 0;
01021                 try {
01022                         while (shmBuffer_->nbRawCellsToWrite() < nbClients() && nbClients()
01023                                         != 0) {
01024                                 shutdownStatus_|=1<<2;
01025                                 checks++;
01026                                 {
01027                                         #ifdef linux
01028                                         auto lk = lockCrashHandlerTimed(10);
01029                                         #else
01030                                         bool lk=true;
01031                                         #endif
01032                                         if (lk) {
01033                                                 vector < pid_t > prcids = clientPrcIds();
01034                                                 for (UInt_t i = 0; i < prcids.size(); i++) {
01035                                                         pid_t pid = prcids[i];
01036                                                         int status = kill(pid, 0);
01037                                                         if (status != 0) {
01038                                                                 LOG4CPLUS_ERROR(log_,
01039                                                                                 "EP prc " << pid << " completed with error.");
01040                                                                 handleCrashedEP(runNumber_, pid);
01041                                                         }
01042                                                 }
01043                                         }
01044                                         else {
01045                                                 XCEPT_RAISE(evf::Exception, 
01046                                                         "Timed out access to the Crash Handler in stop. SM discards not arriving?");
01047 
01048                                         }
01049                                 }
01050 
01051                                 LOG4CPLUS_WARN(
01052                                                 log_,
01053                                                 "no cell to write stop "
01054                                                                 << shmBuffer_->nbRawCellsToWrite()
01055                                                                 << " nClients " << nbClients());
01056                                 if (checks > 15) {
01057                                         string msg = "No Raw Cell to Write STOP messages";
01058                                         XCEPT_RAISE(evf::Exception, msg);
01059                                 }
01060                                 ::usleep(500000);
01061                         }
01062                         shutdownStatus_|=1<<3;
01063 
01064                 } catch (evf::Exception& e) {
01065                         watchDogEnd_=true;
01066                         #ifdef linux
01067                         watch.join();
01068                         #endif
01069                         rethrowShmBufferException(e,
01070                                         "FUResourceTable:shutDownClients:nbRawCellsToWrite");
01071                 }
01072                 nbClientsToShutDown_ = nbClients();
01073                 if (nbClientsToShutDown_ == 0) {
01074                         shutdownStatus_|=1<<4;
01075                         UInt_t n = nbResources();
01076                         for (UInt_t i = 0; i < n; i++) {
01077                                 // initialize to a value to avoid warnings
01078                                 evt::State_t state = evt::EMPTY;
01079                                 try {
01080                                         state = shmBuffer_->evtState(i);
01081                                 } catch (evf::Exception& e) {
01082                                         watchDogEnd_=true;
01083                                         #ifdef linux
01084                                         watch.join();
01085                                         #endif
01086                                         rethrowShmBufferException(e,
01087                                                         "FUResourceTable:shutDownClients:evtState");
01088                                 }
01089                                 if (state != evt::EMPTY) {
01090                                         LOG4CPLUS_WARN(
01091                                                         log_,
01092                                                         "Schedule discard at STOP for orphaned event in state "
01093                                                                         << state);
01094                                         try {
01095                                                 shmBuffer_->setEvtDiscard(i, 1, true);
01096                                                 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
01097                                         } catch (evf::Exception& e) {
01098                                                 watchDogEnd_=true;
01099                                                 #ifdef linux
01100                                                 watch.join();
01101                                                 #endif
01102                                                 rethrowShmBufferException(e,
01103                                                                 "FUResourceTable:shutDownClients:scheduleRawCellForDiscardServerSide");
01104                                         }
01105                                 }
01106                         }
01107                         try {
01108                                 shmBuffer_->scheduleRawEmptyCellForDiscard();
01109                         } catch (evf::Exception& e) {
01110                                 watchDogEnd_=true;
01111                                 #ifdef linux
01112                                 watch.join();
01113                                 #endif
01114                                 rethrowShmBufferException(e,
01115                                                 "FUResourceTable:shutDownClients:scheduleRawEmptyCellForDiscard");
01116                         }
01117                 }
01118                 UInt_t n = nbClientsToShutDown_;
01119                 shutdownStatus_|=1<<5;
01120                 try {
01121                         for (UInt_t i = 0; i < n; ++i)
01122                                 shmBuffer_->writeRawEmptyEvent();
01123                 } catch (evf::Exception& e) {
01124                         watchDogEnd_=true;
01125                         #ifdef linux
01126                         watch.join();
01127                         #endif
01128                         rethrowShmBufferException(e,
01129                                         "FUResourceTable:shutDownClients:writeRawEmptyEvent");
01130                 }
01131                 shutdownStatus_|=1<<6;
01132         }
01133         watchDogEnd_=true;
01134         #ifdef linux
01135         watch.join();
01136         if (watchDogSetFailed_)
01137           XCEPT_RAISE(evf::Exception, "Failed (timed out) shutdown of clients");
01138         #endif
01139 }
01140 
01141 //______________________________________________________________________________
01142 void FUResourceTable::clear() {
01143         for (UInt_t i = 0; i < resources_.size(); i++) {
01144                 resources_[i]->release(true);
01145                 delete resources_[i];
01146         }
01147         resources_.clear();
01148         while (!freeResourceIds_.empty())
01149                 freeResourceIds_.pop();
01150 }
01151 
01153 // implementation of private member functions
01155 
01156 //______________________________________________________________________________
01157 void FUResourceTable::resetCounters() {
01158         if (0 != shmBuffer_) {
01159                 try {
01160                         for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
01161                                 acceptSMDataDiscard_[i] = false;
01162                         for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
01163                                 acceptSMDqmDiscard_[i] = 0;
01164                 } catch (evf::Exception& e) {
01165                         rethrowShmBufferException(e,
01166                                         "FUResourceTable:resetCounters:nRecoCells/nDqmCells");
01167                 }
01168         }
01169 
01170         // UPDATE: reset pending allocate's
01171         nbAllocated_ = 0;
01172         nbPending_ = 0;
01173         nbCompleted_ = 0;
01174         nbSent_ = 0;
01175         nbSentError_ = 0;
01176         nbSentDqm_ = 0;
01177         nbPendingSMDiscards_ = 0;
01178         nbPendingSMDqmDiscards_ = 0;
01179         nbDiscarded_ = 0;
01180         nbLost_ = 0;
01181         // UPDATED
01182         nbEolPosted_ = 0;
01183         nbEolDiscarded_ = 0;
01184 
01185         nbErrors_ = 0;
01186         nbCrcErrors_ = 0;
01187         nbAllocSent_ = 0;
01188 
01189         sumOfSquares_ = 0;
01190         sumOfSizes_ = 0;
01191 
01192         //"send" workloop states
01193         sDqmActive_=true;
01194         sDataActive_=true;
01195 
01196 }
01197 
01198 //______________________________________________________________________________
01199 UInt_t FUResourceTable::nbClients() const {
01200         UInt_t result(0);
01201         try {
01202                 if (0 != shmBuffer_)
01203                         result = shmBuffer_->nClients();
01204         } catch (evf::Exception& e) {
01205                 rethrowShmBufferException(e, "FUResourceTable:nbClients:nClients");
01206         }
01207         return result;
01208 }
01209 
01210 //______________________________________________________________________________
01211 vector<pid_t> FUResourceTable::clientPrcIds() const {
01212         vector < pid_t > result;
01213         try {
01214                 if (0 != shmBuffer_) {
01215                         UInt_t n = nbClients();
01216                         for (UInt_t i = 0; i < n; i++)
01217                                 result.push_back(shmBuffer_->clientPrcId(i));
01218                 }
01219         } catch (evf::Exception& e) {
01220                 rethrowShmBufferException(e,
01221                                 "FUResourceTable:clientPrcIds:clientPrcIds");
01222         }
01223         return result;
01224 }
01225 
01226 //______________________________________________________________________________
01227 string FUResourceTable::clientPrcIdsAsString() const {
01228         stringstream ss;
01229         try {
01230                 if (0 != shmBuffer_) {
01231                         UInt_t n = nbClients();
01232                         for (UInt_t i = 0; i < n; i++) {
01233                                 if (i > 0)
01234                                         ss << ",";
01235                                 ss << shmBuffer_->clientPrcId(i);
01236                         }
01237                 }
01238         } catch (evf::Exception& e) {
01239                 rethrowShmBufferException(e,
01240                                 "FUResourceTable:clientPrcIdsAsString:clientPrcId");
01241         }
01242         return ss.str();
01243 }
01244 
01245 //______________________________________________________________________________
01246 vector<string> FUResourceTable::cellStates() const {
01247         vector < string > result;
01248         if (0 != shmBuffer_) {
01249                 UInt_t n = nbResources();
01250                 shmBuffer_->lock();
01251                 try {
01252                         for (UInt_t i = 0; i < n; i++) {
01253                                 evt::State_t state = shmBuffer_->evtState(i);
01254                                 if (state == evt::EMPTY)
01255                                         result.push_back("EMPTY");
01256                                 else if (state == evt::STOP)
01257                                         result.push_back("STOP");
01258                                 else if (state == evt::LUMISECTION)
01259                                         result.push_back("LUMISECTION");
01260                                 // UPDATED
01261                                 else if (state == evt::USEDLS)
01262                                         result.push_back("USEDLS");
01263                                 else if (state == evt::RAWWRITING)
01264                                         result.push_back("RAWWRITING");
01265                                 else if (state == evt::RAWWRITTEN)
01266                                         result.push_back("RAWWRITTEN");
01267                                 else if (state == evt::RAWREADING)
01268                                         result.push_back("RAWREADING");
01269                                 else if (state == evt::RAWREAD)
01270                                         result.push_back("RAWREAD");
01271                                 else if (state == evt::PROCESSING)
01272                                         result.push_back("PROCESSING");
01273                                 else if (state == evt::PROCESSED)
01274                                         result.push_back("PROCESSED");
01275                                 else if (state == evt::RECOWRITING)
01276                                         result.push_back("RECOWRITING");
01277                                 else if (state == evt::RECOWRITTEN)
01278                                         result.push_back("RECOWRITTEN");
01279                                 else if (state == evt::SENDING)
01280                                         result.push_back("SENDING");
01281                                 else if (state == evt::SENT)
01282                                         result.push_back("SENT");
01283                                 else if (state == evt::DISCARDING)
01284                                         result.push_back("DISCARDING");
01285                         }
01286                 } catch (evf::Exception& e) {
01287                         rethrowShmBufferException(e, "FUResourceTable:cellStates:evtState");
01288                 }
01289                 shmBuffer_->unlock();
01290         }
01291         return result;
01292 }
01293 
01294 vector<string> FUResourceTable::dqmCellStates() const {
01295         vector < string > result;
01296         if (0 != shmBuffer_) {
01297                 UInt_t n = nbDqmCells_;
01298                 shmBuffer_->lock();
01299                 try {
01300                         for (UInt_t i = 0; i < n; i++) {
01301                                 dqm::State_t state = shmBuffer_->dqmState(i);
01302                                 if (state == dqm::EMPTY)
01303                                         result.push_back("EMPTY");
01304                                 else if (state == dqm::WRITING)
01305                                         result.push_back("WRITING");
01306                                 else if (state == dqm::WRITTEN)
01307                                         result.push_back("WRITTEN");
01308                                 else if (state == dqm::SENDING)
01309                                         result.push_back("SENDING");
01310                                 else if (state == dqm::SENT)
01311                                         result.push_back("SENT");
01312                                 else if (state == dqm::DISCARDING)
01313                                         result.push_back("DISCARDING");
01314                         }
01315                 } catch (evf::Exception& e) {
01316                         rethrowShmBufferException(e,
01317                                         "FUResourceTable:dqmCellStates:dqmState");
01318                 }
01319                 shmBuffer_->unlock();
01320         }
01321         return result;
01322 }
01323 
01324 //______________________________________________________________________________
01325 vector<UInt_t> FUResourceTable::cellEvtNumbers() const {
01326         vector < UInt_t > result;
01327         if (0 != shmBuffer_) {
01328                 UInt_t n = nbResources();
01329                 shmBuffer_->lock();
01330                 try {
01331                         for (UInt_t i = 0; i < n; i++)
01332                                 result.push_back(shmBuffer_->evtNumber(i));
01333                 } catch (evf::Exception& e) {
01334                         rethrowShmBufferException(e,
01335                                         "FUResourceTable:cellEvtNumbers:evtNumber");
01336                 }
01337                 shmBuffer_->unlock();
01338         }
01339         return result;
01340 }
01341 
01342 //______________________________________________________________________________
01343 vector<pid_t> FUResourceTable::cellPrcIds() const {
01344         vector < pid_t > result;
01345         if (0 != shmBuffer_) {
01346                 UInt_t n = nbResources();
01347                 shmBuffer_->lock();
01348                 try {
01349                         for (UInt_t i = 0; i < n; i++)
01350                                 result.push_back(shmBuffer_->evtPrcId(i));
01351                 } catch (evf::Exception& e) {
01352                         rethrowShmBufferException(e, "FUResourceTable:cellPrcIds:evtPrcId");
01353                 }
01354                 shmBuffer_->unlock();
01355         }
01356         return result;
01357 }
01358 
01359 //______________________________________________________________________________
01360 vector<time_t> FUResourceTable::cellTimeStamps() const {
01361         vector < time_t > result;
01362         try {
01363                 if (0 != shmBuffer_) {
01364                         UInt_t n = nbResources();
01365                         shmBuffer_->lock();
01366                         for (UInt_t i = 0; i < n; i++)
01367                                 result.push_back(shmBuffer_->evtTimeStamp(i));
01368                         shmBuffer_->unlock();
01369                 }
01370         } catch (evf::Exception& e) {
01371                 rethrowShmBufferException(e,
01372                                 "FUResourceTable:cellTimeStamps:evtTimeStamp");
01373         }
01374         return result;
01375 }
01376 
01378 // implementation of private member functions
01380 
01381 void FUResourceTable::lastResort() {
01382         try {
01383                 ostringstream ost;
01384                 ost << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01385                                 << " more rawcells to read ";
01386                 LOG4CPLUS_WARN(log_,ost.str());
01387                 std::cout << ost.str() << std::endl;
01388 
01389                 while (shmBuffer_->nbRawCellsToRead() != 0) {
01390                         FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
01391                         std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01392                                         << std::endl;
01393                         // UPDATED
01394                         LOG4CPLUS_WARN(log_,"lastResort: Scheduling raw cell (server side) "<< newCell->index());
01395                         shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
01396 
01397                         std::cout << "lastResort: schedule raw cell for discard "
01398                                         << newCell->index() << std::endl;
01399                 }
01400                 //trigger the shutdown (again?)
01401                 LOG4CPLUS_WARN(log_,"lastResort: scheduling empty raw cell (server side) ");
01402                 shmBuffer_->scheduleRawEmptyCellForDiscard();
01403                 LOG4CPLUS_WARN(log_,"lastResort: Finished. cells remaining: "   << shmBuffer_->nbRawCellsToRead());
01404         } catch (evf::Exception& e) {
01405                 rethrowShmBufferException(
01406                                 e,
01407                                 "FUResourceTable:lastResort:nbRawCellsToRead/scheduleRawCellForDiscardServerSide");
01408         }
01409                 LOG4CPLUS_WARN(log_,"Last resort finished ");
01410 }
01411 
01412 void FUResourceTable::resetIPC() {
01413         if (shmBuffer_ != 0) {
01414                 //waiting for sendData and sendDqm workloops to finish
01415                 int countdown_=60;
01416                 while (countdown_-- && (sDataActive_ || sDqmActive_)) ::usleep(50000);
01417                 if (countdown_<=0) {
01418                   std::ostringstream ostr;
01419                   ostr << "Resource broker timed out waiting for workloop shutdowns (3 seconds). Continuing to reset Shm. States - "
01420                        << " sendDqm:"<<sDqmActive_ << " sendData:" << sDataActive_;
01421                   LOG4CPLUS_ERROR(log_,ostr.str());
01422                   std::cout << ostr.str() << std::endl;
01423                 }
01424                 //resetting shm buffer
01425                 shmBuffer_->reset(false);
01426                 LOG4CPLUS_INFO(log_, "ShmBuffer was reset!");
01427         }
01428 }
01429 
01430 std::string FUResourceTable::printStatus() {
01431         if (shmBuffer_) return shmBuffer_->sem_print_s();
01432         else return std::string("ShmBuffer not initialized");
01433 }
01434 
01435 void FUResourceTable::rethrowShmBufferException(evf::Exception& e, string where) const
01436                 throw (evf::Exception) {
01437         stringstream details;
01438         vector < string > dataStates = cellStates();
01439         vector < string > dqmStates = dqmCellStates();
01440         details << "Exception raised: " << e.what() << " (in module: "
01441                         << e.module() << " in function: " << e.function() << " at line: "
01442                         << e.line() << ")";
01443         details << "   Dumping cell state...   ";
01444         details << "data cells --> ";
01445         for (unsigned int i = 0; i < dataStates.size(); i++)
01446                 details << dataStates[i] << " ";
01447         details << "dqm cells --> ";
01448         for (unsigned int i = 0; i < dqmStates.size(); i++)
01449                 details << dqmStates[i] << " ";
01450         details << " ... originated in: " << where;
01451         XCEPT_RETHROW(evf::Exception, details.str(), e);
01452 }