CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/EventFilter/ResourceBroker/src/FUResourceQueue.cc

Go to the documentation of this file.
00001 
00002 //
00003 //                                                      >> UNDER DEVELOPMENT <<
00004 //
00005 // FUResourceQueue
00006 // ---------------
00007 //
00008 // Main class for Message Queue interprocess communication.
00009 //
00010 //                   28/10/2011 Andrei Spataru <andrei.cristian.spataru@cern.ch>
00012 
00013 #include "EventFilter/ResourceBroker/interface/FUResourceQueue.h"
00014 #include "EventFilter/ResourceBroker/interface/RecoMsgBuf.h"
00015 #include "EventFilter/ResourceBroker/interface/DQMMsgBuf.h"
00016 #include "EventFilter/ResourceBroker/interface/msq_constants.h"
00017 #include "EventFilter/ShmBuffer/interface/FUShmDqmCell.h"
00018 
00019 #include "EvffedFillerRB.h"
00020 #include "interface/evb/i2oEVBMsgs.h"
00021 #include "xcept/tools.h"
00022 
00023 #include <fstream>
00024 #include <sstream>
00025 #include <iomanip>
00026 #include <unistd.h>
00027 
00028 using std::cout;
00029 using std::endl;
00030 using std::string;
00031 using std::stringstream;
00032 using std::vector;
00033 using namespace evf;
00034 
00036 // construction/destruction
00038 
00039 //______________________________________________________________________________
00040 FUResourceQueue::FUResourceQueue(bool segmentationMode, UInt_t nbRawCells,
00041                 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00042                 UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
00043                 SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
00044                 EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
00045                         IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00046                                         rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
00047                                         logger, timeout, frb, app), msq_(99) {
00048         //improve fix UInt_t and msq_(99)
00049 
00050         initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00051                         rawCellSize, recoCellSize, dqmCellSize);
00052 
00053 }
00054 
00055 //______________________________________________________________________________
00056 FUResourceQueue::~FUResourceQueue() {
00057         clear();
00058 
00059         // disconnect from queue
00060         if (msq_.disconnect() == 0)
00061                 LOG4CPLUS_INFO(log_, "MESSAGE QUEUE SUCCESSFULLY RELEASED.");
00062 
00063         // needed??
00064         /*
00065          if (0 != acceptSMDataDiscard_)
00066          delete[] acceptSMDataDiscard_;
00067          if (0 != acceptSMDqmDiscard_)
00068          delete[] acceptSMDqmDiscard_;
00069          */
00070 }
00071 
00073 // implementation of member functions
00075 
00076 //______________________________________________________________________________
00077 void FUResourceQueue::initialize(bool segmentationMode, UInt_t nbRawCells,
00078                 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00079                 UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) {
00080 
00081         rawCellSize_ = rawCellSize;
00082         recoCellSize_ = recoCellSize;
00083         dqmCellSize_ = dqmCellSize;
00084 
00085         clear();
00086 
00087         if (0 == &msq_ || 0 == msq_.id()) {
00088                 string msg = "CREATION OF MESSAGE QUEUE FAILED!";
00089                 LOG4CPLUS_FATAL(log_, msg);
00090                 XCEPT_RAISE(evf::Exception, msg);
00091         }
00092 
00093         cache_ = RawCache::getInstance();
00094         cache_->initialise(nbRawCells, rawCellSize);
00095 
00096         // SEC need cap on max resources
00097         for (UInt_t i = 0; i < nbRawCells_; i++) {
00098                 FUResource* newResource = new FUResource(i, log_, frb_, app_);
00099                 newResource->release(false);
00100                 resources_.push_back(newResource);
00101                 freeResourceIds_.push(i);
00102         }
00103 
00104         //acceptSMDataDiscard_ = new bool[nbRecoCells];
00105         //acceptSMDqmDiscard_ = new int[nbDqmCells];
00106 
00107         resetCounters();
00108 }
00109 
00110 // work loop to send data events to storage manager
00111 //______________________________________________________________________________
00112 bool FUResourceQueue::sendData() {
00113         bool reschedule = true;
00114 
00115         //FUShmRecoCell* cell = shmBuffer_->recoCellToRead();
00116         RecoMsgBuf recoMsg(recoCellSize_, RECO_MESSAGE_TYPE);
00117 
00118         bool rcvSuccess = msq_.rcvQuiet(recoMsg);
00119         if (!rcvSuccess) {
00120                 cout << "RCV failed!" << endl;
00121 		::sleep(5);
00122                 return reschedule;
00123         }
00124 
00125         FUShmRecoCell* cell = recoMsg.recoCell();
00126 
00127         // event size 0 -> stop
00128 
00129         if (0 == cell->eventSize()) {
00130                 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00131                 //UInt_t cellIndex = cell->index();
00132                 /*
00133                  shmBuffer_->finishReadingRecoCell(cell);
00134                  shmBuffer_->discardRecoCell(cellIndex);
00135                  */
00136                 reschedule = false;
00137 
00138                 // halting
00139         } else if (/*isHalting_*/false) {
00140                 LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
00141                 //UInt_t cellIndex = cell->index();
00142                 /*
00143                  shmBuffer_->finishReadingRecoCell(cell);
00144                  shmBuffer_->discardRecoCell(cellIndex);
00145                  */
00146 
00147         } else {
00148                 try {
00149                         //init message
00150                         if (cell->type() == 0) {
00151                                 UInt_t cellIndex = cell->index();
00152                                 UInt_t cellOutModId = cell->outModId();
00153                                 UInt_t cellFUProcId = cell->fuProcessId();
00154                                 UInt_t cellFUGuid = cell->fuGuid();
00155                                 UChar_t* cellPayloadAddr = cell->payloadAddr();
00156                                 UInt_t cellEventSize = cell->eventSize();
00157                                 UInt_t cellExpectedEPs = cell->nExpectedEPs();
00158                                 //shmBuffer_->finishReadingRecoCell(cell);
00159 
00160                                 lock();
00161                                 nbPendingSMDiscards_++;
00162                                 unlock();
00163 
00164                                 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
00165                                                 cellFUGuid, cellPayloadAddr, cellEventSize,
00166                                                 cellExpectedEPs);
00167 
00168                                 //
00169                                 // DATA event message
00170                                 //
00171                         } else if (cell->type() == 1) {
00172                                 UInt_t cellIndex = cell->index();
00173                                 UInt_t cellRawIndex = cell->rawCellIndex();
00174                                 UInt_t cellRunNumber = cell->runNumber();
00175                                 UInt_t cellEvtNumber = cell->evtNumber();
00176                                 UInt_t cellOutModId = cell->outModId();
00177                                 UInt_t cellFUProcId = cell->fuProcessId();
00178                                 UInt_t cellFUGuid = cell->fuGuid();
00179                                 UChar_t *cellPayloadAddr = cell->payloadAddr();
00180                                 UInt_t cellEventSize = cell->eventSize();
00181                                 //shmBuffer_->finishReadingRecoCell(cell);
00182                                 lock();
00183                                 nbPendingSMDiscards_++;
00184                                 resources_[cellRawIndex]->incNbSent();
00185                                 if (resources_[cellRawIndex]->nbSent() == 1)
00186                                         nbSent_++;
00187                                 unlock();
00188 
00189                                 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
00190                                                 cellOutModId, cellFUProcId, cellFUGuid,
00191                                                 cellPayloadAddr, cellEventSize);
00192                                 //
00193                                 // ERROR event message
00194                                 //
00195                         } else if (cell->type() == 2) {
00196                                 UInt_t cellIndex = cell->index();
00197                                 UInt_t cellRawIndex = cell->rawCellIndex();
00198                                 //UInt_t   cellRunNumber   = cell->runNumber();
00199                                 UInt_t cellEvtNumber = cell->evtNumber();
00200                                 UInt_t cellFUProcId = cell->fuProcessId();
00201                                 UInt_t cellFUGuid = cell->fuGuid();
00202                                 UChar_t *cellPayloadAddr = cell->payloadAddr();
00203                                 UInt_t cellEventSize = cell->eventSize();
00204                                 //shmBuffer_->finishReadingRecoCell(cell);
00205 
00206                                 lock();
00207                                 nbPendingSMDiscards_++;
00208                                 resources_[cellRawIndex]->incNbSent();
00209                                 if (resources_[cellRawIndex]->nbSent() == 1) {
00210                                         nbSent_++;
00211                                         nbSentError_++;
00212                                 }
00213                                 unlock();
00214 
00215                                 sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
00216                                                 cellFUProcId, cellFUGuid, cellPayloadAddr,
00217                                                 cellEventSize);
00218                         } else {
00219                                 string errmsg =
00220                                                 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
00221                                 XCEPT_RAISE(evf::Exception, errmsg);
00222                         }
00223                 } catch (xcept::Exception& e) {
00224                         LOG4CPLUS_FATAL(
00225                                         log_,
00226                                         "Failed to send EVENT DATA to StorageManager: "
00227                                                         << xcept::stdformat_exception_history(e));
00228                         reschedule = false;
00229                 }
00230         }
00231 
00232         return reschedule;
00233 }
00234 
00235 //______________________________________________________________________________
00236 bool FUResourceQueue::sendDataWhileHalting(/*toolbox::task::WorkLoop*  wl */) {
00237         bool reschedule = true;
00238 
00239         //FUShmRecoCell* cell = shmBuffer_->recoCellToRead();
00240         RecoMsgBuf recoMsg(recoCellSize_, RECO_MESSAGE_TYPE);
00241 
00242         bool rcvSuccess = msq_.rcvQuiet(recoMsg);
00243         if (!rcvSuccess) {
00244                 cout << "RCV failed!" << endl;
00245 		::sleep(5);
00246                 return reschedule;
00247         }
00248 
00249         FUShmRecoCell* cell = recoMsg.recoCell();
00250 
00251         // event size 0 -> stop
00252 
00253         if (0 == cell->eventSize()) {
00254                 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00255                 //UInt_t cellIndex = cell->index();
00256                 /*
00257                  shmBuffer_->finishReadingRecoCell(cell);
00258                  shmBuffer_->discardRecoCell(cellIndex);
00259                  */
00260                 reschedule = false;
00261 
00262                 // halting
00263         } else if (/*isHalting_*/true) {
00264                 LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
00265                 //UInt_t cellIndex = cell->index();
00266                 /*
00267                  shmBuffer_->finishReadingRecoCell(cell);
00268                  shmBuffer_->discardRecoCell(cellIndex);
00269                  */
00270 
00271         } else {
00272                 try {
00273                         //init message
00274                         if (cell->type() == 0) {
00275                                 UInt_t cellIndex = cell->index();
00276                                 UInt_t cellOutModId = cell->outModId();
00277                                 UInt_t cellFUProcId = cell->fuProcessId();
00278                                 UInt_t cellFUGuid = cell->fuGuid();
00279                                 UChar_t* cellPayloadAddr = cell->payloadAddr();
00280                                 UInt_t cellEventSize = cell->eventSize();
00281                                 UInt_t cellExpectedEPs = cell->nExpectedEPs();
00282                                 //shmBuffer_->finishReadingRecoCell(cell);
00283 
00284                                 lock();
00285                                 nbPendingSMDiscards_++;
00286                                 unlock();
00287 
00288                                 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
00289                                                 cellFUGuid, cellPayloadAddr, cellEventSize,
00290                                                 cellExpectedEPs);
00291 
00292                                 //
00293                                 // DATA event message
00294                                 //
00295                         } else if (cell->type() == 1) {
00296                                 UInt_t cellIndex = cell->index();
00297                                 UInt_t cellRawIndex = cell->rawCellIndex();
00298                                 UInt_t cellRunNumber = cell->runNumber();
00299                                 UInt_t cellEvtNumber = cell->evtNumber();
00300                                 UInt_t cellOutModId = cell->outModId();
00301                                 UInt_t cellFUProcId = cell->fuProcessId();
00302                                 UInt_t cellFUGuid = cell->fuGuid();
00303                                 UChar_t *cellPayloadAddr = cell->payloadAddr();
00304                                 UInt_t cellEventSize = cell->eventSize();
00305 
00306                                 //shmBuffer_->finishReadingRecoCell(cell);
00307                                 lock();
00308                                 nbPendingSMDiscards_++;
00309                                 resources_[cellRawIndex]->incNbSent();
00310                                 if (resources_[cellRawIndex]->nbSent() == 1)
00311                                         nbSent_++;
00312                                 unlock();
00313 
00314                                 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
00315                                                 cellOutModId, cellFUProcId, cellFUGuid,
00316                                                 cellPayloadAddr, cellEventSize);
00317                                 //
00318                                 // ERROR event message
00319                                 //
00320                         } else if (cell->type() == 2) {
00321                                 UInt_t cellIndex = cell->index();
00322                                 UInt_t cellRawIndex = cell->rawCellIndex();
00323                                 //UInt_t   cellRunNumber   = cell->runNumber();
00324                                 UInt_t cellEvtNumber = cell->evtNumber();
00325                                 UInt_t cellFUProcId = cell->fuProcessId();
00326                                 UInt_t cellFUGuid = cell->fuGuid();
00327                                 UChar_t *cellPayloadAddr = cell->payloadAddr();
00328                                 UInt_t cellEventSize = cell->eventSize();
00329                                 //shmBuffer_->finishReadingRecoCell(cell);
00330 
00331                                 lock();
00332                                 nbPendingSMDiscards_++;
00333                                 resources_[cellRawIndex]->incNbSent();
00334                                 if (resources_[cellRawIndex]->nbSent() == 1) {
00335                                         nbSent_++;
00336                                         nbSentError_++;
00337                                 }
00338                                 unlock();
00339 
00340                                 sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
00341                                                 cellFUProcId, cellFUGuid, cellPayloadAddr,
00342                                                 cellEventSize);
00343                         } else {
00344                                 string errmsg =
00345                                                 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
00346                                 XCEPT_RAISE(evf::Exception, errmsg);
00347                         }
00348                 } catch (xcept::Exception& e) {
00349                         LOG4CPLUS_FATAL(
00350                                         log_,
00351                                         "Failed to send EVENT DATA to StorageManager: "
00352                                                         << xcept::stdformat_exception_history(e));
00353                         reschedule = false;
00354                 }
00355         }
00356 
00357         return reschedule;
00358 }
00359 
00360 // work loop to send dqm events to storage manager
00361 //______________________________________________________________________________
00362 bool FUResourceQueue::sendDqm() {
00363         bool reschedule = true;
00364 
00365         //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead();
00366         //dqm::State_t state = shmBuffer_->dqmState(cell->index());
00367 
00368         DQMMsgBuf dqmMsg(dqmCellSize_, DQM_MESSAGE_TYPE);
00369 
00370         bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
00371         if (!rcvSuccess) {
00372                 cout << "RCV failed!" << endl;
00373 		::sleep(5);
00374                 return reschedule;
00375         }
00376         FUShmDqmCell* cell = dqmMsg.dqmCell();
00377 
00378         // concept add stop messages (there is no more "state")
00379         //if (state == dqm::EMPTY) {
00380         if (false) {
00381                 LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
00382                 cout << "shut down dqm workloop " << endl;
00383                 //UInt_t cellIndex = cell->index();
00384                 /*
00385                  shmBuffer_->finishReadingDqmCell(cell);
00386                  shmBuffer_->discardDqmCell(cellIndex);
00387                  */
00388                 reschedule = false;
00389         } else if (/*isHalting_*/false) {
00390                 //UInt_t cellIndex = cell->index();
00391                 /*
00392                  shmBuffer_->finishReadingDqmCell(cell);
00393                  shmBuffer_->discardDqmCell(cellIndex);
00394                  */
00395         } else {
00396                 try {
00397                         UInt_t cellIndex = cell->index();
00398                         UInt_t cellRunNumber = cell->runNumber();
00399                         UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00400                         UInt_t cellFolderId = cell->folderId();
00401                         UInt_t cellFUProcId = cell->fuProcessId();
00402                         UInt_t cellFUGuid = cell->fuGuid();
00403                         UChar_t *cellPayloadAddr = cell->payloadAddr();
00404                         UInt_t cellEventSize = cell->eventSize();
00405                         sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
00406                                         cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
00407                                         cellEventSize);
00408                         //shmBuffer_->finishReadingDqmCell(cell);
00409                 } catch (xcept::Exception& e) {
00410                         LOG4CPLUS_FATAL(
00411                                         log_,
00412                                         "Failed to send DQM DATA to StorageManager: "
00413                                                         << xcept::stdformat_exception_history(e));
00414                         reschedule = false;
00415                 }
00416         }
00417 
00418         return reschedule;
00419 }
00420 
00421 //______________________________________________________________________________
00422 bool FUResourceQueue::sendDqmWhileHalting(/*toolbox::task::WorkLoop* wl*/) {
00423         bool reschedule = true;
00424 
00425         //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead();
00426         //dqm::State_t state = shmBuffer_->dqmState(cell->index());
00427 
00428         DQMMsgBuf dqmMsg(dqmCellSize_, DQM_MESSAGE_TYPE);
00429 
00430         bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
00431         if (!rcvSuccess) {
00432                 cout << "RCV failed!" << endl;
00433 		::sleep(5);
00434                 return reschedule;
00435         }
00436         FUShmDqmCell* cell = dqmMsg.dqmCell();
00437 
00438         // concept add stop messages (there is no more "state")
00439         //if (state == dqm::EMPTY) {
00440         if (false) {
00441                 LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
00442                 cout << "shut down dqm workloop " << endl;
00443                 //UInt_t cellIndex = cell->index();
00444                 /*
00445                  shmBuffer_->finishReadingDqmCell(cell);
00446                  shmBuffer_->discardDqmCell(cellIndex);
00447                  */
00448                 reschedule = false;
00449         } else if (/*isHalting_*/true) {
00450                 //UInt_t cellIndex = cell->index();
00451                 /*
00452                  shmBuffer_->finishReadingDqmCell(cell);
00453                  shmBuffer_->discardDqmCell(cellIndex);
00454                  */
00455         } else {
00456                 try {
00457                         UInt_t cellIndex = cell->index();
00458                         UInt_t cellRunNumber = cell->runNumber();
00459                         UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00460                         UInt_t cellFolderId = cell->folderId();
00461                         UInt_t cellFUProcId = cell->fuProcessId();
00462                         UInt_t cellFUGuid = cell->fuGuid();
00463                         UChar_t *cellPayloadAddr = cell->payloadAddr();
00464                         UInt_t cellEventSize = cell->eventSize();
00465                         sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
00466                                         cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
00467                                         cellEventSize);
00468                         //shmBuffer_->finishReadingDqmCell(cell);
00469                 } catch (xcept::Exception& e) {
00470                         LOG4CPLUS_FATAL(
00471                                         log_,
00472                                         "Failed to send DQM DATA to StorageManager: "
00473                                                         << xcept::stdformat_exception_history(e));
00474                         reschedule = false;
00475                 }
00476         }
00477 
00478         return reschedule;
00479 }
00480 
00481 // work loop to discard events to builder unit
00482 //______________________________________________________________________________
00483 bool FUResourceQueue::discard(/*toolbox::task::WorkLoop* wl*/) {
00484 
00485         bool reschedule = true;
00486 
00487         /*
00488          * DISCARDING raw msg buffers
00489          */
00490         MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
00491         bool rcvSuccess = msq_.rcvQuiet(discardRaw);
00492 
00493         if (!rcvSuccess) {
00494                 cout << "RCV failed!" << endl;
00495 		::sleep(5);
00496                 return reschedule;
00497         }
00498 
00499         unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
00500         unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
00501                         + sizeof(unsigned int));
00502 
00503         unsigned int buResourceId = *pBuID;
00504         unsigned int fuResourceId = *pFuID;
00505 
00506         cout << "Discard received for buResourceID: " << buResourceId
00507                         << " fuResourceID " << fuResourceId << endl << endl;
00508 
00509         //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard();
00510         //evt::State_t state = shmBuffer_->evtState(cell->index());
00511 
00512         /*
00513          bool shutDown = (state == evt::STOP);
00514          bool isLumi = (state == evt::LUMISECTION);
00515          */
00516         //UInt_t fuResourceId = cell->fuResourceId();
00517         //UInt_t buResourceId = cell->buResourceId();
00518 
00519         //  cout << "discard loop, state, shutDown, isLumi " << state << " "
00520         //          << shutDown << " " << isLumi << endl;
00521         //  cout << "resource ids " << fuResourceId << " " << buResourceId << endl;
00522 
00523         /*
00524          if (shutDown) {
00525          LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00526          if (nbClientsToShutDown_ > 0)
00527          --nbClientsToShutDown_;
00528          if (nbClientsToShutDown_ == 0) {
00529          LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00530          isActive_ = false;
00531          reschedule = false;
00532          }
00533          }
00534          */
00535 
00536         //shmBuffer_->discardRawCell(cell);
00537 
00538         //if (!shutDown && !isLumi) {
00539         if (true) {
00540                 // (false = no shmdt)
00541                 resources_[fuResourceId]->release(false);
00542                 // also release space in RawCache
00543                 RawCache::getInstance()->releaseMsg(fuResourceId);
00544 
00545                 lock();
00546                 freeResourceIds_.push(fuResourceId);
00547                 assert(freeResourceIds_.size() <= resources_.size());
00548                 unlock();
00549 
00550                 if (true) {
00551                         sendDiscard(buResourceId);
00552                         if (true)
00553                                 sendAllocate();
00554                 }
00555         }
00556 
00557         // concept shutdown cycle
00558         /*
00559          if (!reschedule) {
00560          cout << " entered shutdown cycle " << endl;
00561          shmBuffer_->writeRecoEmptyEvent();
00562          UInt_t count = 0;
00563          while (count < 100) {
00564          cout << " shutdown cycle " << shmBuffer_->nClients() << " "
00565          << FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00566          << endl;
00567          if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
00568          shmBuffer_->shmid()) == 1) {
00569          //     isReadyToShutDown_ = true;
00570          break;
00571          } else {
00572          count++;
00573          cout << " shutdown cycle attempt " << count << endl;
00574          LOG4CPLUS_DEBUG(
00575          log_,
00576          "FUResourceTable: Wait for all clients to detach,"
00577          << " nClients=" << shmBuffer_->nClients()
00578          << " nattch=" << FUShmBuffer::shm_nattch(
00579          shmBuffer_->shmid()) << " (" << count << ")");
00580          ::usleep(shutdownTimeout_);
00581          if (count * shutdownTimeout_ > 10000000)
00582          LOG4CPLUS_WARN(
00583          log_,
00584          "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00585          << " nClients=" << shmBuffer_->nClients()
00586          << " nattch=" << FUShmBuffer::shm_nattch(
00587          shmBuffer_->shmid()) << " (" << count
00588          << ")");
00589 
00590          }
00591          }
00592          bool allEmpty = false;
00593          cout << "Checking if all dqm cells are empty " << endl;
00594          while (!allEmpty) {
00595          UInt_t n = nbDqmCells_;
00596          allEmpty = true;
00597          shmBuffer_->lock();
00598          for (UInt_t i = 0; i < n; i++) {
00599          dqm::State_t state = shmBuffer_->dqmState(i);
00600          if (state != dqm::EMPTY)
00601          allEmpty = false;
00602          }
00603          shmBuffer_->unlock();
00604          }
00605          cout << "Making sure there are no dqm pending discards "
00606          << endl;
00607          if (nbPendingSMDqmDiscards_ != 0) {
00608          LOG4CPLUS_WARN(
00609          log_,
00610          "FUResourceTable: pending DQM discards not zero: ="
00611          << nbPendingSMDqmDiscards_
00612          << " while cells are all empty. This may cause problems at next start ");
00613 
00614          }
00615          shmBuffer_->writeDqmEmptyEvent();
00616          isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
00617          // sendDqm loop has been shut down as well
00618          }
00619          */
00620 
00621         return reschedule;
00622 }
00623 
00624 //______________________________________________________________________________
00625 bool FUResourceQueue::discardWhileHalting(bool sendDiscards) {
00626 
00627         bool reschedule = true;
00628 
00629         /*
00630          * DISCARDING raw msg buffers
00631          */
00632         MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
00633         bool rcvSuccess = msq_.rcvQuiet(discardRaw);
00634 
00635         if (!rcvSuccess) {
00636                 cout << "RCV failed!" << endl;
00637 		::sleep(5);
00638                 return reschedule;
00639         }
00640 
00641         unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
00642         unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
00643                         + sizeof(unsigned int));
00644 
00645         unsigned int buResourceId = *pBuID;
00646         unsigned int fuResourceId = *pFuID;
00647 
00648         cout << "Discard received for buResourceID: " << buResourceId
00649                         << " fuResourceID " << fuResourceId << endl << endl;
00650 
00651         //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard();
00652         //evt::State_t state = shmBuffer_->evtState(cell->index());
00653 
00654         /*
00655          bool shutDown = (state == evt::STOP);
00656          bool isLumi = (state == evt::LUMISECTION);
00657          */
00658         //UInt_t fuResourceId = cell->fuResourceId();
00659         //UInt_t buResourceId = cell->buResourceId();
00660 
00661         //  cout << "discard loop, state, shutDown, isLumi " << state << " "
00662         //          << shutDown << " " << isLumi << endl;
00663         //  cout << "resource ids " << fuResourceId << " " << buResourceId << endl;
00664 
00665         /*
00666          if (shutDown) {
00667          LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00668          if (nbClientsToShutDown_ > 0)
00669          --nbClientsToShutDown_;
00670          if (nbClientsToShutDown_ == 0) {
00671          LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00672          isActive_ = false;
00673          reschedule = false;
00674          }
00675          }
00676          */
00677 
00678         //shmBuffer_->discardRawCell(cell);
00679 
00680         //if (!shutDown && !isLumi) {
00681         if (true) {
00682                 // (false = no shmdt)
00683                 resources_[fuResourceId]->release(false);
00684                 // also release space in RawCache
00685                 RawCache::getInstance()->releaseMsg(fuResourceId);
00686 
00687                 lock();
00688                 freeResourceIds_.push(fuResourceId);
00689                 assert(freeResourceIds_.size() <= resources_.size());
00690                 unlock();
00691 
00692                 if (false) {
00693                         sendDiscard(buResourceId);
00694                         if (false)
00695                                 sendAllocate();
00696                 }
00697         }
00698 
00699         // concept shutdown cycle
00700         /*
00701          if (!reschedule) {
00702          cout << " entered shutdown cycle " << endl;
00703          shmBuffer_->writeRecoEmptyEvent();
00704          UInt_t count = 0;
00705          while (count < 100) {
00706          cout << " shutdown cycle " << shmBuffer_->nClients() << " "
00707          << FUShmBuffer::shm_nattch(shmBuffer_->shmid())
00708          << endl;
00709          if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
00710          shmBuffer_->shmid()) == 1) {
00711          //     isReadyToShutDown_ = true;
00712          break;
00713          } else {
00714          count++;
00715          cout << " shutdown cycle attempt " << count << endl;
00716          LOG4CPLUS_DEBUG(
00717          log_,
00718          "FUResourceTable: Wait for all clients to detach,"
00719          << " nClients=" << shmBuffer_->nClients()
00720          << " nattch=" << FUShmBuffer::shm_nattch(
00721          shmBuffer_->shmid()) << " (" << count << ")");
00722          ::usleep(shutdownTimeout_);
00723          if (count * shutdownTimeout_ > 10000000)
00724          LOG4CPLUS_WARN(
00725          log_,
00726          "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00727          << " nClients=" << shmBuffer_->nClients()
00728          << " nattch=" << FUShmBuffer::shm_nattch(
00729          shmBuffer_->shmid()) << " (" << count
00730          << ")");
00731 
00732          }
00733          }
00734          bool allEmpty = false;
00735          cout << "Checking if all dqm cells are empty " << endl;
00736          while (!allEmpty) {
00737          UInt_t n = nbDqmCells_;
00738          allEmpty = true;
00739          shmBuffer_->lock();
00740          for (UInt_t i = 0; i < n; i++) {
00741          dqm::State_t state = shmBuffer_->dqmState(i);
00742          if (state != dqm::EMPTY)
00743          allEmpty = false;
00744          }
00745          shmBuffer_->unlock();
00746          }
00747          cout << "Making sure there are no dqm pending discards "
00748          << endl;
00749          if (nbPendingSMDqmDiscards_ != 0) {
00750          LOG4CPLUS_WARN(
00751          log_,
00752          "FUResourceTable: pending DQM discards not zero: ="
00753          << nbPendingSMDqmDiscards_
00754          << " while cells are all empty. This may cause problems at next start ");
00755 
00756          }
00757          shmBuffer_->writeDqmEmptyEvent();
00758          isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
00759          // sendDqm loop has been shut down as well
00760          }
00761          */
00762 
00763         return reschedule;
00764 }
00765 
00766 // process buffer received via I2O_FU_TAKE message
00767 //______________________________________________________________________________
00768 
00769 // BUILD RESOURCE (RAW CELL WRITING)
00770 
00771 bool FUResourceQueue::buildResource(MemRef_t* bufRef) {
00772 
00773         bool eventComplete = false;
00774 
00775         I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
00776                         (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
00777 
00778         UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
00779         UInt_t buResourceId = (UInt_t) block->buResourceId;
00780 
00781         FUResource* resource = resources_[fuResourceId];
00782 
00783         RawMsgBuf* currentMessageToWrite = cache_->getMsgToWrite();
00784 
00785         if (!resource->fatalError() && !resource->isAllocated()) {
00786                 FUShmRawCell* rawCell = currentMessageToWrite->rawCell();
00787                 rawCell->initialize(fuResourceId);
00788                 resource->allocate(rawCell);
00789 
00790                 timeval now;
00791                 gettimeofday(&now, 0);
00792 
00793                 frb_->setRBTimeStamp(
00794                                 ((uint64_t) (now.tv_sec) << 32) + (uint64_t) (now.tv_usec));
00795 
00796                 frb_->setRBEventCount(nbCompleted_);
00797 
00798                 if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
00799                         resource->doCrcCheck(true);
00800                 else
00801                         resource->doCrcCheck(false);
00802         }
00803 
00804         // keep building this resource if it is healthy
00805         if (!resource->fatalError()) {
00806                 resource->process(bufRef);
00807 
00808                 lock();
00809                 nbErrors_ += resource->nbErrors();
00810                 nbCrcErrors_ += resource->nbCrcErrors();
00811                 unlock();
00812 
00813                 // make resource available for pick-up
00814                 if (resource->isComplete()) {
00815                         lock();
00816                         nbCompleted_++;
00817                         nbPending_--;
00818                         unlock();
00819                         /*
00820                          cout << "POSTING to q: msqid = " << msq_.id()
00821                          << ", message buf is allocated for:  "
00822                          << currentMessageToWrite->msize()
00823                          << "... Actually copied: "
00824                          << currentMessageToWrite->usedSize() << " fuResourceID = "
00825                          << currentMessageToWrite->rawCell()->fuResourceId()
00826                          << " buResourceID = "
00827                          << currentMessageToWrite->rawCell()->buResourceId() << endl;
00828                          */
00829 
00830                         //msq_.post(*currentMessageToWrite);
00831 
00832                         try {
00833                                 msq_.postLength(*currentMessageToWrite,
00834                                                 currentMessageToWrite->usedSize());
00835                         } catch (...) {
00836                                 string errmsg = "Failed to post message to Queue!";
00837                                 LOG4CPLUS_FATAL(log_, errmsg);
00838                                 XCEPT_RAISE(evf::Exception, errmsg);
00839 
00840                         }
00841 
00842                         // CURRENT RECEIVERS
00843                         /*
00844                          vector<int> receivers = msq_.getReceivers();
00845                          cout << "--Receiving processes: ";
00846                          for (unsigned int i = 0; i < receivers.size(); ++i)
00847                          cout << i << " " << receivers[i];
00848                          cout << endl;
00849                          */
00850 
00851                         eventComplete = true;
00852                 }
00853 
00854         }
00855         // bad event, release msg, and the whole resource if this was the last one
00856         if (resource->fatalError()) {
00857                 bool lastMsg = isLastMessageOfEvent(bufRef);
00858                 if (lastMsg) {
00859                         resource->release(false);
00860                         lock();
00861                         freeResourceIds_.push(fuResourceId);
00862                         nbDiscarded_++;
00863                         nbLost_++;
00864                         nbPending_--;
00865                         unlock();
00866                         bu_->sendDiscard(buResourceId);
00867                         sendAllocate();
00868                 }
00869                 bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
00870         }
00871 
00872         return eventComplete;
00873 }
00874 
00875 //concept discardDataEvent still required
00876 // process buffer received via I2O_SM_DATA_DISCARD message
00877 //______________________________________________________________________________
00878 bool FUResourceQueue::discardDataEvent(MemRef_t* bufRef) {
00879 
00880         /*
00881          I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00882          msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00883          UInt_t recoIndex = msg->rbBufferID;
00884 
00885          if (acceptSMDataDiscard_[recoIndex]) {
00886          lock();
00887          nbPendingSMDiscards_--;
00888          unlock();
00889          acceptSMDataDiscard_[recoIndex] = false;
00890 
00891          if (!isHalting_) {
00892          shmBuffer_->discardRecoCell(recoIndex);
00893          bufRef->release();
00894          }
00895          } else {
00896          LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00897          }
00898 
00899          if (isHalting_) {
00900          bufRef->release();
00901          return false;
00902          }
00903          */
00904         return true;
00905 }
00906 
00907 // process buffer received via I2O_SM_DATA_DISCARD message
00908 //______________________________________________________________________________
00909 bool FUResourceQueue::discardDataEventWhileHalting(MemRef_t* bufRef) {
00910 
00911         /*
00912          I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00913          msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00914          UInt_t recoIndex = msg->rbBufferID;
00915 
00916          if (acceptSMDataDiscard_[recoIndex]) {
00917          lock();
00918          nbPendingSMDiscards_--;
00919          unlock();
00920          acceptSMDataDiscard_[recoIndex] = false;
00921 
00922          if (!isHalting_) {
00923          shmBuffer_->discardRecoCell(recoIndex);
00924          bufRef->release();
00925          }
00926          } else {
00927          LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00928          }
00929 
00930          if (isHalting_) {
00931          bufRef->release();
00932          return false;
00933          }
00934          */
00935         return true;
00936 }
00937 
00938 //concept discardDqmEvent still required?
00939 // process buffer received via I2O_SM_DQM_DISCARD message
00940 //______________________________________________________________________________
00941 bool FUResourceQueue::discardDqmEvent(MemRef_t* bufRef) {
00942         /*
00943          I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00944          msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00945          UInt_t dqmIndex = msg->rbBufferID;
00946          unsigned int ntries = 0;
00947          while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
00948          LOG4CPLUS_WARN(
00949          log_,
00950          "DQM discard for cell " << dqmIndex
00951          << " which is not yer in SENT state - waiting");
00952          ::usleep(10000);
00953          if (ntries++ > 10) {
00954          LOG4CPLUS_ERROR(
00955          log_,
00956          "DQM cell " << dqmIndex
00957          << " discard timed out while cell still in state "
00958          << shmBuffer_->dqmState(dqmIndex));
00959          bufRef->release();
00960          return true;
00961          }
00962          }
00963          if (acceptSMDqmDiscard_[dqmIndex] > 0) {
00964          acceptSMDqmDiscard_[dqmIndex]--;
00965          if (nbPendingSMDqmDiscards_ > 0) {
00966          nbPendingSMDqmDiscards_--;
00967          } else {
00968          LOG4CPLUS_WARN         (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
00969          << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
00970          }
00971 
00972          if (!isHalting_) {
00973          shmBuffer_->discardDqmCell(dqmIndex);
00974          bufRef->release();
00975          }
00976 
00977          }
00978          else {
00979          LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
00980          << " from StorageManager while cell is not accepting discards");
00981          }
00982 
00983          if (isHalting_) {
00984          bufRef->release();
00985          return false;
00986          }
00987          */
00988         return true;
00989 }
00990 
00991 // process buffer received via I2O_SM_DQM_DISCARD message
00992 //______________________________________________________________________________
00993 bool FUResourceQueue::discardDqmEventWhileHalting(MemRef_t* bufRef) {
00994         /*
00995          I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00996          msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00997          UInt_t dqmIndex = msg->rbBufferID;
00998          unsigned int ntries = 0;
00999          while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
01000          LOG4CPLUS_WARN(
01001          log_,
01002          "DQM discard for cell " << dqmIndex
01003          << " which is not yer in SENT state - waiting");
01004          ::usleep(10000);
01005          if (ntries++ > 10) {
01006          LOG4CPLUS_ERROR(
01007          log_,
01008          "DQM cell " << dqmIndex
01009          << " discard timed out while cell still in state "
01010          << shmBuffer_->dqmState(dqmIndex));
01011          bufRef->release();
01012          return true;
01013          }
01014          }
01015          if (acceptSMDqmDiscard_[dqmIndex] > 0) {
01016          acceptSMDqmDiscard_[dqmIndex]--;
01017          if (nbPendingSMDqmDiscards_ > 0) {
01018          nbPendingSMDqmDiscards_--;
01019          } else {
01020          LOG4CPLUS_WARN         (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
01021          << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
01022          }
01023 
01024          if (!isHalting_) {
01025          shmBuffer_->discardDqmCell(dqmIndex);
01026          bufRef->release();
01027          }
01028 
01029          }
01030          else {
01031          LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
01032          << " from StorageManager while cell is not accepting discards");
01033          }
01034 
01035          if (isHalting_) {
01036          bufRef->release();
01037          return false;
01038          }
01039          */
01040         return true;
01041 }
01042 
01043 //concept add message type, post end-of-ls event to msq
01044 //______________________________________________________________________________
01045 void FUResourceQueue::postEndOfLumiSection(MemRef_t* bufRef) {
01046         /*
01047          I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
01048          *msg =
01049          (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
01050          //make sure to fill up the shmem so no process will miss it
01051          // but processes will have to handle duplicates
01052 
01053          for (unsigned int i = 0; i < nbRawCells_; i++)
01054          shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
01055          */
01056 }
01057 
01058 //concept dropEvent required?
01059 //______________________________________________________________________________
01060 void FUResourceQueue::dropEvent() {
01061         /*
01062          FUShmRawCell* cell = shmBuffer_->rawCellToRead();
01063          UInt_t fuResourceId = cell->fuResourceId();
01064          shmBuffer_->finishReadingRawCell(cell);
01065          shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
01066          */
01067 }
01068 
01069 //concept switch to error message send
01070 //______________________________________________________________________________
01071 bool FUResourceQueue::handleCrashedEP(UInt_t runNumber, pid_t pid) {
01072         bool retval = false;
01073         /*
01074          vector<pid_t> pids = cellPrcIds();
01075          UInt_t iRawCell = pids.size();
01076          for (UInt_t i = 0; i < pids.size(); i++) {
01077          if (pid == pids[i]) {
01078          iRawCell = i;
01079          break;
01080          }
01081          }
01082 
01083          if (iRawCell < pids.size()) {
01084          shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell);
01085          retval = true;
01086          } else
01087          LOG4CPLUS_WARN(log_,
01088          "No raw data to send to error stream for process " << pid);
01089          shmBuffer_->removeClientPrcId(pid);
01090          */
01091         return retval;
01092 }
01093 
01094 //concept RAW with nothing in it
01095 //______________________________________________________________________________
01096 void FUResourceQueue::shutDownClients() {
01097         isReadyToShutDown_ = true;
01098         /*
01099          nbClientsToShutDown_ = nbClients();
01100          isReadyToShutDown_ = false;
01101 
01102          if (nbClientsToShutDown_ == 0) {
01103          LOG4CPLUS_INFO(
01104          log_,
01105          "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
01106          UInt_t n = nbResources();
01107          for (UInt_t i = 0; i < n; i++) {
01108          evt::State_t state = shmBuffer_->evtState(i);
01109          if (state != evt::EMPTY) {
01110          LOG4CPLUS_WARN(
01111          log_,
01112          "Schedule discard at STOP for orphaned event in state "
01113          << state);
01114          shmBuffer_->scheduleRawCellForDiscardServerSide(i);
01115          }
01116          }
01117          shmBuffer_->scheduleRawEmptyCellForDiscard();
01118          } else {
01119          UInt_t n = nbClientsToShutDown_;
01120          for (UInt_t i = 0; i < n; ++i)
01121          shmBuffer_->writeRawEmptyEvent();
01122          }
01123          */
01124 }
01125 
01126 //______________________________________________________________________________
01127 void FUResourceQueue::clear() {
01128         for (UInt_t i = 0; i < resources_.size(); i++) {
01129                 resources_[i]->release(false);
01130                 delete resources_[i];
01131         }
01132         resources_.clear();
01133 
01134         while (!freeResourceIds_.empty())
01135                 freeResourceIds_.pop();
01136 }
01137 
01139 // implementation of private member functions
01141 
01142 //concept adapt resetCounters
01143 //______________________________________________________________________________
01144 void FUResourceQueue::resetCounters() {
01145         /*
01146          if (0 != shmBuffer_) {
01147          for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
01148          acceptSMDataDiscard_[i] = false;
01149          for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
01150          acceptSMDqmDiscard_[i] = 0;
01151          }
01152          */
01153         nbAllocated_ = nbPending_;
01154         nbCompleted_ = 0;
01155         nbSent_ = 0;
01156         nbSentError_ = 0;
01157         nbSentDqm_ = 0;
01158         nbPendingSMDiscards_ = 0;
01159         nbPendingSMDqmDiscards_ = 0;
01160         nbDiscarded_ = 0;
01161         nbLost_ = 0;
01162 
01163         nbErrors_ = 0;
01164         nbCrcErrors_ = 0;
01165         nbAllocSent_ = 0;
01166 
01167         sumOfSquares_ = 0;
01168         sumOfSizes_ = 0;
01169         //isStopping_ = false;
01170 
01171 }
01172 
01173 //concept adapt nbClients
01174 //______________________________________________________________________________
01175 UInt_t FUResourceQueue::nbClients() const {
01176         UInt_t result(0);
01177 
01178         /*
01179          if (0 != shmBuffer_)
01180          result = shmBuffer_->nClients();
01181          */
01182         return result;
01183 }
01184 
01185 //concept adapt clientPrcIds
01186 //______________________________________________________________________________
01187 vector<pid_t> FUResourceQueue::clientPrcIds() const {
01188         vector<pid_t> result;
01189 
01190         /*
01191          if (0 != shmBuffer_) {
01192          UInt_t n = nbClients();
01193          for (UInt_t i = 0; i < n; i++)
01194          result.push_back(shmBuffer_->clientPrcId(i));
01195          }
01196          */
01197         return result;
01198 }
01199 
01200 //concept adapt clientPrcIdsAsString
01201 //______________________________________________________________________________
01202 string FUResourceQueue::clientPrcIdsAsString() const {
01203         stringstream ss;
01204 
01205         /*
01206          if (0 != shmBuffer_) {
01207          UInt_t n = nbClients();
01208          for (UInt_t i = 0; i < n; i++) {
01209          if (i > 0)
01210          ss << ",";
01211          ss << shmBuffer_->clientPrcId(i);
01212          }
01213          }
01214          */
01215         return ss.str();
01216 }
01217 
01218 //concept adapt cellStates
01219 //______________________________________________________________________________
01220 vector<string> FUResourceQueue::cellStates() const {
01221         vector<string> result;
01222         /*
01223          if (0 != shmBuffer_) {
01224          UInt_t n = nbResources();
01225          shmBuffer_->lock();
01226          for (UInt_t i = 0; i < n; i++) {
01227          evt::State_t state = shmBuffer_->evtState(i);
01228          if (state == evt::EMPTY)
01229          result.push_back("EMPTY");
01230          else if (state == evt::STOP)
01231          result.push_back("STOP");
01232          else if (state == evt::LUMISECTION)
01233          result.push_back("LUMISECTION");
01234          else if (state == evt::RAWWRITING)
01235          result.push_back("RAWWRITING");
01236          else if (state == evt::RAWWRITTEN)
01237          result.push_back("RAWWRITTEN");
01238          else if (state == evt::RAWREADING)
01239          result.push_back("RAWREADING");
01240          else if (state == evt::RAWREAD)
01241          result.push_back("RAWREAD");
01242          else if (state == evt::PROCESSING)
01243          result.push_back("PROCESSING");
01244          else if (state == evt::PROCESSED)
01245          result.push_back("PROCESSED");
01246          else if (state == evt::RECOWRITING)
01247          result.push_back("RECOWRITING");
01248          else if (state == evt::RECOWRITTEN)
01249          result.push_back("RECOWRITTEN");
01250          else if (state == evt::SENDING)
01251          result.push_back("SENDING");
01252          else if (state == evt::SENT)
01253          result.push_back("SENT");
01254          else if (state == evt::DISCARDING)
01255          result.push_back("DISCARDING");
01256          }
01257          shmBuffer_->unlock();
01258          }
01259          */
01260         return result;
01261 }
01262 
01263 //concept adapt dqmCellStates
01264 vector<string> FUResourceQueue::dqmCellStates() const {
01265         vector<string> result;
01266 
01267         /*
01268          if (0 != shmBuffer_) {
01269          UInt_t n = nbDqmCells_;
01270          shmBuffer_->lock();
01271          for (UInt_t i = 0; i < n; i++) {
01272          dqm::State_t state = shmBuffer_->dqmState(i);
01273          if (state == dqm::EMPTY)
01274          result.push_back("EMPTY");
01275          else if (state == dqm::WRITING)
01276          result.push_back("WRITING");
01277          else if (state == dqm::WRITTEN)
01278          result.push_back("WRITTEN");
01279          else if (state == dqm::SENDING)
01280          result.push_back("SENDING");
01281          else if (state == dqm::SENT)
01282          result.push_back("SENT");
01283          else if (state == dqm::DISCARDING)
01284          result.push_back("DISCARDING");
01285          }
01286          shmBuffer_->unlock();
01287          }
01288          */
01289         return result;
01290 }
01291 
01292 //concept adapt cellEvtNumbers
01293 //______________________________________________________________________________
01294 vector<UInt_t> FUResourceQueue::cellEvtNumbers() const {
01295         vector<UInt_t> result;
01296 
01297         /*
01298          if (0 != shmBuffer_) {
01299          UInt_t n = nbResources();
01300          shmBuffer_->lock();
01301          for (UInt_t i = 0; i < n; i++)
01302          result.push_back(shmBuffer_->evtNumber(i));
01303          shmBuffer_->unlock();
01304          }
01305          */
01306         return result;
01307 }
01308 
01309 //concept adapt cellPrcIds
01310 //______________________________________________________________________________
01311 vector<pid_t> FUResourceQueue::cellPrcIds() const {
01312         vector<pid_t> result;
01313 
01314         /*
01315          if (0 != shmBuffer_) {
01316          UInt_t n = nbResources();
01317          shmBuffer_->lock();
01318          for (UInt_t i = 0; i < n; i++)
01319          result.push_back(shmBuffer_->evtPrcId(i));
01320          shmBuffer_->unlock();
01321          }
01322          */
01323         return result;
01324 }
01325 
01326 //concept adapt cellTimeStamps
01327 //______________________________________________________________________________
01328 vector<time_t> FUResourceQueue::cellTimeStamps() const {
01329         vector<time_t> result;
01330 
01331         /*
01332          if (0 != shmBuffer_) {
01333          UInt_t n = nbResources();
01334          shmBuffer_->lock();
01335          for (UInt_t i = 0; i < n; i++)
01336          result.push_back(shmBuffer_->evtTimeStamp(i));
01337          shmBuffer_->unlock();
01338          }
01339          */
01340         return result;
01341 }
01342 
01344 // implementation of private member functions
01346 
01347 //concept adapt lastResort
01348 void FUResourceQueue::lastResort() {
01349 
01350         /*
01351          cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01352          << " more rawcells to read " << endl;
01353          while (shmBuffer_->nbRawCellsToRead() != 0) {
01354          FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
01355          cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01356          << endl;
01357          shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell);
01358          cout << "lastResort: schedule raw cell for discard" << endl;
01359          }
01360          */
01361 }
01362