00001
00002
00003
00004
00005
00006
00007
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "EvffedFillerRB.h"
00012
00013 #include "interface/evb/i2oEVBMsgs.h"
00014 #include "xcept/tools.h"
00015
00016 #include <sys/types.h>
00017 #include <signal.h>
00018
00019 #ifdef linux
00020 #include <thread>
00021 #endif
00022
00023
00024 using namespace evf;
00025 using namespace std;
00027
00029
00030
00031 FUResourceTable::FUResourceTable(bool segmentationMode, UInt_t nbRawCells,
00032 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00033 UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
00034 SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
00035 EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
00036
00037
00038 IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00039 rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
00040 logger, timeout, frb, app), shmBuffer_(0)
00041
00042 {
00043 initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00044 rawCellSize, recoCellSize, dqmCellSize);
00045 }
00046
00047
00048 FUResourceTable::~FUResourceTable() {
00049 clear();
00050
00051 shmdt( shmBuffer_);
00052 if (FUShmBuffer::releaseSharedMemory())
00053 LOG4CPLUS_INFO(log_, "SHARED MEMORY SUCCESSFULLY RELEASED.");
00054 if (0 != acceptSMDataDiscard_)
00055 delete[] acceptSMDataDiscard_;
00056 if (0 != acceptSMDqmDiscard_)
00057 delete[] acceptSMDqmDiscard_;
00058 }
00059
00061
00063
00064
00065 void FUResourceTable::initialize(bool segmentationMode, UInt_t nbRawCells,
00066 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00067 UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) {
00068 clear();
00069
00070 shmBuffer_ = FUShmBuffer::createShmBuffer(segmentationMode, nbRawCells,
00071 nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize);
00072 if (0 == shmBuffer_) {
00073 string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00074 LOG4CPLUS_FATAL(log_, msg);
00075 XCEPT_RAISE(evf::Exception, msg);
00076 }
00077
00078 for (UInt_t i = 0; i < nbRawCells_; i++) {
00079 FUResource* newResource = new FUResource(i, log_, frb_, app_);
00080 newResource->release(true);
00081 resources_.push_back(newResource);
00082 freeResourceIds_.push(i);
00083 }
00084
00085 acceptSMDataDiscard_ = new bool[nbRecoCells];
00086 acceptSMDqmDiscard_ = new int[nbDqmCells];
00087
00088 resetCounters();
00089 stopFlag_=false;
00090 }
00091
00092
00093 bool FUResourceTable::sendData() {
00094 bool reschedule = true;
00095 FUShmRecoCell* cell = 0;
00096 try {
00097 cell = shmBuffer_->recoCellToRead();
00098 } catch (evf::Exception& e) {
00099 rethrowShmBufferException(e, "FUResourceTable:sendData:recoCellToRead");
00100 }
00101
00102 if (0 == cell->eventSize()) {
00103 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00104 UInt_t cellIndex = cell->index();
00105 try {
00106 shmBuffer_->finishReadingRecoCell(cell);
00107 shmBuffer_->discardRecoCell(cellIndex);
00108 } catch (evf::Exception& e) {
00109 rethrowShmBufferException(e,
00110 "FUResourceTable:sendData:finishReadingRecoCell/discardRecoCell");
00111 }
00112 shutdownStatus_|=1<<7;
00113 reschedule = false;
00114 } else {
00115 try {
00116 if (cell->type() == 0) {
00117 UInt_t cellIndex = cell->index();
00118 UInt_t cellOutModId = cell->outModId();
00119 UInt_t cellFUProcId = cell->fuProcessId();
00120 UInt_t cellFUGuid = cell->fuGuid();
00121 UChar_t* cellPayloadAddr = cell->payloadAddr();
00122 UInt_t cellEventSize = cell->eventSize();
00123 UInt_t cellExpectedEPs = cell->nExpectedEPs();
00124 try {
00125 shmBuffer_->finishReadingRecoCell(cell);
00126 } catch (evf::Exception& e) {
00127 rethrowShmBufferException(e,
00128 "FUResourceTable:sendData:finishReadingRecoCell");
00129 }
00130
00131 lock();
00132 nbPendingSMDiscards_++;
00133 unlock();
00134
00135 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
00136 cellFUGuid, cellPayloadAddr, cellEventSize,
00137 cellExpectedEPs);
00138 } else if (cell->type() == 1) {
00139 UInt_t cellIndex = cell->index();
00140 UInt_t cellRawIndex = cell->rawCellIndex();
00141 UInt_t cellRunNumber = cell->runNumber();
00142 UInt_t cellEvtNumber = cell->evtNumber();
00143 UInt_t cellOutModId = cell->outModId();
00144 UInt_t cellFUProcId = cell->fuProcessId();
00145 UInt_t cellFUGuid = cell->fuGuid();
00146 UChar_t *cellPayloadAddr = cell->payloadAddr();
00147 UInt_t cellEventSize = cell->eventSize();
00148 try {
00149 shmBuffer_->finishReadingRecoCell(cell);
00150 } catch (evf::Exception& e) {
00151 rethrowShmBufferException(e,
00152 "FUResourceTable:sendData:finishReadingRecoCell");
00153 }
00154
00155 lock();
00156 nbPendingSMDiscards_++;
00157 resources_[cellRawIndex]->incNbSent();
00158 if (resources_[cellRawIndex]->nbSent() == 1)
00159 nbSent_++;
00160 unlock();
00161
00162 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
00163 cellOutModId, cellFUProcId, cellFUGuid,
00164 cellPayloadAddr, cellEventSize);
00165 } else if (cell->type() == 2) {
00166 UInt_t cellIndex = cell->index();
00167 UInt_t cellRawIndex = cell->rawCellIndex();
00168
00169 UInt_t cellEvtNumber = cell->evtNumber();
00170 UInt_t cellFUProcId = cell->fuProcessId();
00171 UInt_t cellFUGuid = cell->fuGuid();
00172 UChar_t *cellPayloadAddr = cell->payloadAddr();
00173 UInt_t cellEventSize = cell->eventSize();
00174 try {
00175 shmBuffer_->finishReadingRecoCell(cell);
00176 } catch (evf::Exception& e) {
00177 rethrowShmBufferException(e,
00178 "FUResourceTable:sendData:recoCellToRead");
00179 }
00180
00181 lock();
00182 nbPendingSMDiscards_++;
00183 resources_[cellRawIndex]->incNbSent();
00184 if (resources_[cellRawIndex]->nbSent() == 1) {
00185 nbSent_++;
00186 nbSentError_++;
00187 }
00188 unlock();
00189
00190 sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
00191 cellFUProcId, cellFUGuid, cellPayloadAddr,
00192 cellEventSize);
00193 } else {
00194 string errmsg =
00195 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
00196 XCEPT_RAISE(evf::Exception, errmsg);
00197 }
00198 } catch (xcept::Exception& e) {
00199 LOG4CPLUS_FATAL(
00200 log_,
00201 "Failed to send EVENT DATA to StorageManager: "
00202 << xcept::stdformat_exception_history(e));
00203 reschedule = false;
00204 }
00205 }
00206
00207 sDataActive_=reschedule;
00208 return reschedule;
00209 }
00210
00211
00212 bool FUResourceTable::sendDataWhileHalting() {
00213 bool reschedule = true;
00214 FUShmRecoCell* cell = 0;
00215 try {
00216 cell = shmBuffer_->recoCellToRead();
00217 } catch (evf::Exception& e) {
00218 rethrowShmBufferException(e,
00219 "FUResourceTable:sendDataWhileHalting:recoCellToRead");
00220 }
00221
00222 if (0 == cell->eventSize()) {
00223 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00224 UInt_t cellIndex = cell->index();
00225 try {
00226 shmBuffer_->finishReadingRecoCell(cell);
00227 shmBuffer_->discardRecoCell(cellIndex);
00228 } catch (evf::Exception& e) {
00229 rethrowShmBufferException(e,
00230 "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
00231 }
00232 shutdownStatus_|=1<<8;
00233 reschedule = false;
00234 } else {
00235 LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
00236 UInt_t cellIndex = cell->index();
00237 try {
00238 shmBuffer_->finishReadingRecoCell(cell);
00239 shmBuffer_->discardRecoCell(cellIndex);
00240 } catch (evf::Exception& e) {
00241 rethrowShmBufferException(e,
00242 "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
00243 }
00244 }
00245
00246 sDataActive_=reschedule;
00247 return reschedule;
00248 }
00249
00250
00251 bool FUResourceTable::sendDqm() {
00252 bool reschedule = true;
00253 FUShmDqmCell* cell = 0;
00254
00255 dqm::State_t state = dqm::EMPTY;
00256 try {
00257 cell = shmBuffer_->dqmCellToRead();
00258 state = shmBuffer_->dqmState(cell->index());
00259 } catch (evf::Exception& e) {
00260 rethrowShmBufferException(e,
00261 "FUResourceTable:sendDqm:dqmCellToRead/dqmState");
00262 }
00263
00264 if (state == dqm::EMPTY) {
00265 LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
00266 std::cout << "shut down dqm workloop " << std::endl;
00267 UInt_t cellIndex = cell->index();
00268 try {
00269 shmBuffer_->finishReadingDqmCell(cell);
00270 shmBuffer_->discardDqmCell(cellIndex);
00271 } catch (evf::Exception& e) {
00272 rethrowShmBufferException(e,
00273 "FUResourceTable:sendDqm:finishReadingDqmCell/discardDqmCell");
00274 }
00275 shutdownStatus_|=1<<9;
00276 reschedule = false;
00277 } else {
00278 try {
00279 UInt_t cellIndex = cell->index();
00280 UInt_t cellRunNumber = cell->runNumber();
00281 UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00282 UInt_t cellFolderId = cell->folderId();
00283 UInt_t cellFUProcId = cell->fuProcessId();
00284 UInt_t cellFUGuid = cell->fuGuid();
00285 UChar_t *cellPayloadAddr = cell->payloadAddr();
00286 UInt_t cellEventSize = cell->eventSize();
00287 sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
00288 cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
00289 cellEventSize);
00290 try {
00291 shmBuffer_->finishReadingDqmCell(cell);
00292 } catch (evf::Exception& e) {
00293 rethrowShmBufferException(e,
00294 "FUResourceTable:sendDqm:finishReadingDqmCell");
00295 }
00296 } catch (xcept::Exception& e) {
00297 LOG4CPLUS_FATAL(
00298 log_,
00299 "Failed to send DQM DATA to StorageManager: "
00300 << xcept::stdformat_exception_history(e));
00301 reschedule = false;
00302 }
00303 }
00304
00305 sDqmActive_=reschedule;
00306 return reschedule;
00307 }
00308
00309
00310 bool FUResourceTable::sendDqmWhileHalting() {
00311 bool reschedule = true;
00312 FUShmDqmCell* cell = 0;
00313
00314 dqm::State_t state = dqm::EMPTY;
00315 try {
00316 cell = shmBuffer_->dqmCellToRead();
00317 state = shmBuffer_->dqmState(cell->index());
00318 } catch (evf::Exception& e) {
00319 rethrowShmBufferException(e,
00320 "FUResourceTable:sendDqmWhileHalting:dqmCellToRead/dqmState");
00321 }
00322
00323 if (state == dqm::EMPTY) {
00324 LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
00325 std::cout << "shut down dqm workloop " << std::endl;
00326 UInt_t cellIndex = cell->index();
00327 try {
00328 shmBuffer_->finishReadingDqmCell(cell);
00329 shmBuffer_->discardDqmCell(cellIndex);
00330 } catch (evf::Exception& e) {
00331 rethrowShmBufferException(e,
00332 "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
00333 }
00334 shutdownStatus_|=1<<10;
00335 reschedule = false;
00336 } else {
00337 UInt_t cellIndex = cell->index();
00338 try {
00339 shmBuffer_->finishReadingDqmCell(cell);
00340 shmBuffer_->discardDqmCell(cellIndex);
00341 } catch (evf::Exception& e) {
00342 rethrowShmBufferException(e,
00343 "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
00344 }
00345 }
00346
00347 sDqmActive_=reschedule;
00348 return reschedule;
00349 }
00350
00351
00352
00353
00354 void FUResourceTable::discardNoReschedule() {
00355 std::cout << " entered shutdown cycle " << std::endl;
00356 shutdownStatus_|=1<<11;
00357 try {
00358 shmBuffer_->writeRecoEmptyEvent();
00359 } catch (evf::Exception& e) {
00360 rethrowShmBufferException(e,
00361 "FUResourceTable:discardNoReschedule:writeRecoEmptyEvent");
00362 }
00363
00364 UInt_t count = 0;
00365 while (count < 100) {
00366 std::cout << " shutdown cycle " << shmBuffer_->nClients() << " "
00367 << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00368 if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
00369 shmBuffer_->shmid()) == 1) {
00370 shutdownStatus_|=1<<12;
00371
00372 break;
00373 } else {
00374 count++;
00375 std::cout << " shutdown cycle attempt " << count << std::endl;
00376 LOG4CPLUS_DEBUG(
00377 log_,
00378 "FUResourceTable: Wait for all clients to detach,"
00379 << " nClients=" << shmBuffer_->nClients()
00380 << " nattch=" << FUShmBuffer::shm_nattch(
00381 shmBuffer_->shmid()) << " (" << count << ")");
00382 ::usleep( shutdownTimeout_);
00383 if (count * shutdownTimeout_ > 10000000)
00384 LOG4CPLUS_WARN(
00385 log_,
00386 "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00387 << " nClients=" << shmBuffer_->nClients()
00388 << " nattch=" << FUShmBuffer::shm_nattch(
00389 shmBuffer_->shmid()) << " (" << count << ")");
00390
00391 }
00392 }
00393
00394 bool allEmpty = false;
00395 std::cout << "Checking if all dqm cells are empty " << std::endl;
00396 while (!allEmpty) {
00397 UInt_t n = nbDqmCells_;
00398 allEmpty = true;
00399 shmBuffer_->lock();
00400 for (UInt_t i = 0; i < n; i++) {
00401
00402 dqm::State_t state = dqm::EMPTY;
00403 try {
00404 state = shmBuffer_->dqmState(i);
00405 } catch (evf::Exception& e) {
00406 rethrowShmBufferException(e,
00407 "FUResourceTable:discardNoReschedule:dqmState");
00408 }
00409 if (state != dqm::EMPTY)
00410 allEmpty = false;
00411 }
00412 shmBuffer_->unlock();
00413 }
00414 shutdownStatus_|=1<<13;
00415
00416 std::cout << "Number of pending discards before declaring ready to shut down: " << nbPendingSMDqmDiscards_ << std::endl;
00417 if (nbPendingSMDqmDiscards_ != 0) {
00418 LOG4CPLUS_WARN(
00419 log_,
00420 "FUResourceTable: pending DQM discards not zero: ="
00421 << nbPendingSMDqmDiscards_
00422 << " while cells are all empty. This may cause problems at next start ");
00423
00424 }
00425
00426 try {
00427 shmBuffer_->writeDqmEmptyEvent();
00428 } catch (evf::Exception& e) {
00429 rethrowShmBufferException(e,
00430 "FUResourceTable:discardNoReschedule:writeDqmEmptyEvent");
00431 }
00432
00433 isReadyToShutDown_ = true;
00434
00435 }
00436
00437
00438 bool FUResourceTable::discard() {
00439 FUShmRawCell* cell = 0;
00440
00441 evt::State_t state = evt::EMPTY;
00442 try {
00443 cell = shmBuffer_->rawCellToDiscard();
00444 state = shmBuffer_->evtState(cell->index());
00445 } catch (evf::Exception& e) {
00446 rethrowShmBufferException(e,
00447 "FUResourceTable:discard:rawCellToRead/evtState");
00448 }
00449
00450 bool reschedule = true;
00451 bool shutDown = (state == evt::STOP);
00452 bool isLumi = (state == evt::USEDLS);
00453 UInt_t fuResourceId = cell->fuResourceId();
00454 UInt_t buResourceId = cell->buResourceId();
00455
00456 if (state == evt::EMPTY) {
00457 LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
00458 return true;
00459 }
00460
00461 if (shutDown) {
00462 LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00463 if (nbClientsToShutDown_ > 0)
00464 --nbClientsToShutDown_;
00465 if (nbClientsToShutDown_ == 0) {
00466 LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00467 isActive_ = false;
00468 reschedule = false;
00469 }
00470 }
00471
00472 try {
00473 shmBuffer_->discardRawCell(cell);
00474 } catch (evf::Exception& e) {
00475 rethrowShmBufferException(e, "FUResourceTable:discard:discardRawCell");
00476 }
00477
00478 if (isLumi)
00479 nbEolDiscarded_++;
00480
00481 if (!shutDown && !isLumi) {
00482 if (fuResourceId >= nbResources()) {
00483 LOG4CPLUS_WARN(
00484 log_,
00485 "cell " << cell->index() << " in state " << state
00486 << " scheduled for discard has no associated FU resource ");
00487 } else {
00488 resources_[fuResourceId]->release(true);
00489 lock();
00490 freeResourceIds_.push(fuResourceId);
00491 assert(freeResourceIds_.size() <= resources_.size());
00492 unlock();
00493
00494 sendDiscard(buResourceId);
00495 sendAllocate();
00496 }
00497 }
00498
00499 if (!reschedule) {
00500 discardNoReschedule();
00501 }
00502
00503 return reschedule;
00504 }
00505
00506
00507 bool FUResourceTable::discardWhileHalting(bool sendDiscards) {
00508 FUShmRawCell* cell = 0;
00509
00510 evt::State_t state = evt::EMPTY;
00511 try {
00512 cell = shmBuffer_->rawCellToDiscard();
00513 state = shmBuffer_->evtState(cell->index());
00514 } catch (evf::Exception& e) {
00515 rethrowShmBufferException(e,
00516 "FUResourceTable:discardWhileHalting:rawCellToRead/evtState");
00517 }
00518
00519 bool reschedule = true;
00520 bool shutDown = (state == evt::STOP);
00521 bool isLumi = (state == evt::USEDLS);
00522 UInt_t fuResourceId = cell->fuResourceId();
00523 UInt_t buResourceId = cell->buResourceId();
00524
00525 if (state == evt::EMPTY) {
00526 LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
00527 return true;
00528 }
00529
00530 if (shutDown) {
00531 LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00532 if (nbClientsToShutDown_ > 0)
00533 --nbClientsToShutDown_;
00534 if (nbClientsToShutDown_ == 0) {
00535 LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00536 isActive_ = false;
00537 reschedule = false;
00538 }
00539 }
00540
00541 try {
00542 shmBuffer_->discardRawCell(cell);
00543 } catch (evf::Exception& e) {
00544 rethrowShmBufferException(e,
00545 "FUResourceTable:discardWhileHalting:discardRawCell");
00546 }
00547
00548 if (isLumi)
00549 nbEolDiscarded_++;
00550
00551 if (!shutDown && !isLumi) {
00552 if (fuResourceId >= nbResources()) {
00553 LOG4CPLUS_WARN(
00554 log_,
00555 "cell " << cell->index() << " in state " << state
00556 << " scheduled for discard has no associated FU resource ");
00557 } else {
00558 resources_[fuResourceId]->release(true);
00559 lock();
00560 freeResourceIds_.push(fuResourceId);
00561 assert(freeResourceIds_.size() <= resources_.size());
00562 unlock();
00563
00564
00565
00566
00567
00568 if (sendDiscards)
00569 sendDiscard(buResourceId);
00570 }
00571 }
00572
00573 if (!reschedule) {
00574 discardNoReschedule();
00575 }
00576
00577 return reschedule;
00578 }
00579
00580
00581 bool FUResourceTable::buildResource(MemRef_t* bufRef) {
00582 bool eventComplete = false;
00583
00584 bool lastMsg = isLastMessageOfEvent(bufRef);
00585 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
00586 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
00587
00588 UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
00589 UInt_t buResourceId = (UInt_t) block->buResourceId;
00590
00591 if ((int) block->fuTransactionId < 0 || fuResourceId >= nbRawCells_
00592 || (int) block->buResourceId < 0) {
00593 stringstream failureStr;
00594 failureStr << "Received TAKE message with invalid bu/fu resource id:"
00595 << " fuResourceId: " << fuResourceId << " buResourceId: "
00596 << buResourceId;
00597 LOG4CPLUS_ERROR(log_, failureStr.str());
00598 XCEPT_RAISE(evf::Exception, failureStr.str());
00599 }
00600 FUResource* resource = resources_[fuResourceId];
00601
00602
00603 if (!resource->fatalError() && !resource->isAllocated()) {
00604 FUShmRawCell* cell = 0;
00605 try {
00606 cell = shmBuffer_->rawCellToWrite();
00607 } catch (evf::Exception& e) {
00608 rethrowShmBufferException(e,
00609 "FUResourceTable:buildResource:rawCellToWrite");
00610 }
00611 if (cell == 0) {
00612 bufRef->release();
00613 return eventComplete;
00614 }
00615 resource->allocate(cell);
00616 timeval now;
00617 gettimeofday(&now, 0);
00618
00619 frb_->setRBTimeStamp(
00620 ((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
00621
00622 frb_->setRBEventCount(nbCompleted_);
00623
00624 if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
00625 resource->doCrcCheck(true);
00626 else
00627 resource->doCrcCheck(false);
00628 }
00629
00630 #ifdef DEBUG_RES_TAB
00631 std::cout << "Received frame for resource " << buResourceId << std::endl;
00632 #endif
00633
00634 if (!resource->fatalError()) {
00635 #ifdef DEBUG_RES_TAB
00636 std::cout << "No fatal error for " << buResourceId << ", keep building..."<< std::endl;
00637 #endif
00638 resource->process(bufRef);
00639 lock();
00640 nbErrors_ += resource->nbErrors();
00641 nbCrcErrors_ += resource->nbCrcErrors();
00642 unlock();
00643 #ifdef DEBUG_RES_TAB
00644 std::cout << "Checking if resource is complete " << buResourceId << std::endl;
00645 #endif
00646
00647 if (resource->isComplete()) {
00648 #ifdef DEBUG_RES_TAB
00649 std::cout << "@@@@RESOURCE is COMPLETE " << buResourceId << std::endl;
00650 #endif
00651 lock();
00652 nbCompleted_++;
00653 nbPending_--;
00654 unlock();
00655 if (doDumpEvents_ > 0 && nbCompleted_ % doDumpEvents_ == 0)
00656 dumpEvent(resource->shmCell());
00657 try {
00658 shmBuffer_->finishWritingRawCell(resource->shmCell());
00659 } catch (evf::Exception& e) {
00660 rethrowShmBufferException(e,
00661 "FUResourceTable:buildResource:finishWritingRawCell");
00662 }
00663 eventComplete = true;
00664 }
00665
00666 }
00667
00668 if (resource->fatalError()) {
00669 if (lastMsg) {
00670 try {
00671 shmBuffer_->releaseRawCell(resource->shmCell());
00672 } catch (evf::Exception& e) {
00673 rethrowShmBufferException(e,
00674 "FUResourceTable:buildResource:releaseRawCell");
00675 }
00676 resource->release(true);
00677 lock();
00678 freeResourceIds_.push(fuResourceId);
00679 nbDiscarded_++;
00680 nbLost_++;
00681 nbPending_--;
00682 unlock();
00683 bu_->sendDiscard(buResourceId);
00684 sendAllocate();
00685 }
00686
00687 }
00688
00689 return eventComplete;
00690 }
00691
00692
00693 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef) {
00694 I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00695 msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00696 UInt_t recoIndex = msg->rbBufferID;
00697
00698
00699 if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
00700 LOG4CPLUS_ERROR(
00701 log_,
00702 "Received DISCARD DATA message with invalid recoIndex:"
00703 << recoIndex);
00704
00705 if (acceptSMDataDiscard_[recoIndex]) {
00706 lock();
00707 nbPendingSMDiscards_--;
00708 unlock();
00709 acceptSMDataDiscard_[recoIndex] = false;
00710
00711 try {
00712 shmBuffer_->discardRecoCell(recoIndex);
00713 } catch (evf::Exception& e) {
00714 rethrowShmBufferException(e,
00715 "FUResourceTable:discardDataEvent:discardRecoCell");
00716 }
00717
00718 } else {
00719 LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00720 }
00721
00722 bufRef->release();
00723 return true;
00724 }
00725
00726
00727 bool FUResourceTable::discardDataEventWhileHalting(MemRef_t* bufRef) {
00728 I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00729 msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00730 UInt_t recoIndex = msg->rbBufferID;
00731
00732
00733 if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
00734 LOG4CPLUS_ERROR(
00735 log_,
00736 "Received DISCARD DATA message with invalid recoIndex:"
00737 << recoIndex);
00738
00739 if (acceptSMDataDiscard_[recoIndex]) {
00740 lock();
00741 nbPendingSMDiscards_--;
00742 unlock();
00743 acceptSMDataDiscard_[recoIndex] = false;
00744
00745 } else {
00746 LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00747 }
00748
00749 bufRef->release();
00750 return false;
00751 }
00752
00753
00754 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef) {
00755 I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00756 msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00757 UInt_t dqmIndex = msg->rbBufferID;
00758
00759
00760 if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
00761 LOG4CPLUS_ERROR(
00762 log_,
00763 "Received DISCARD DQM message with invalid dqmIndex:"
00764 << dqmIndex);
00765
00766 unsigned int ntries = 0;
00767 try {
00768 while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
00769 if (ntries)
00770 LOG4CPLUS_WARN(
00771 log_,
00772 "DQM discard for cell " << dqmIndex
00773 << " which is not yet in SENT state - waiting");
00774 ::usleep(10000);
00775 if (ntries++ > 10) {
00776 LOG4CPLUS_ERROR(
00777 log_,
00778 "DQM cell " << dqmIndex
00779 << " discard timed out while cell still in state "
00780 << shmBuffer_->dqmState(dqmIndex));
00781 bufRef->release();
00782 return true;
00783 }
00784 }
00785 } catch (evf::Exception& e) {
00786 rethrowShmBufferException(e, "FUResourceTable:discardDqmEvent:dqmState");
00787 }
00788 if (acceptSMDqmDiscard_[dqmIndex] > 0) {
00789 acceptSMDqmDiscard_[dqmIndex]--;
00790 if (--nbPendingSMDqmDiscards_ < 0) {
00791 LOG4CPLUS_WARN(
00792 log_,
00793 "Spurious??? DQM discard by StorageManager, index "
00794 << dqmIndex << " cell state "
00795 << shmBuffer_->dqmState(dqmIndex)
00796 << " accept flag " << acceptSMDqmDiscard_[dqmIndex]);
00797 }
00798 try {
00799 shmBuffer_->discardDqmCell(dqmIndex);
00800 } catch (evf::Exception& e) {
00801 rethrowShmBufferException(e,
00802 "FUResourceTable:discardDqmEvent:discardDqmCell");
00803 }
00804
00805 } else {
00806 LOG4CPLUS_ERROR(
00807 log_,
00808 "Spurious DQM discard for cell " << dqmIndex
00809 << " from StorageManager while cell is not accepting discards");
00810 }
00811
00812 bufRef->release();
00813 return true;
00814 }
00815
00816
00817 bool FUResourceTable::discardDqmEventWhileHalting(MemRef_t* bufRef) {
00818 I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00819 msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00820 UInt_t dqmIndex = msg->rbBufferID;
00821
00822
00823 if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
00824 LOG4CPLUS_ERROR(
00825 log_,
00826 "Received DISCARD DQM message with invalid dqmIndex:"
00827 << dqmIndex);
00828
00829 unsigned int ntries = 0;
00830 try {
00831 while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
00832 if (ntries)
00833 LOG4CPLUS_WARN(
00834 log_,
00835 "DQM discard for cell " << dqmIndex
00836 << " which is not yet in SENT state - waiting");
00837 ::usleep(10000);
00838 if (ntries++ > 10) {
00839 LOG4CPLUS_ERROR(
00840 log_,
00841 "DQM cell " << dqmIndex
00842 << " discard timed out while cell still in state "
00843 << shmBuffer_->dqmState(dqmIndex));
00844 bufRef->release();
00845 return true;
00846 }
00847 }
00848 } catch (evf::Exception& e) {
00849 rethrowShmBufferException(e,
00850 "FUResourceTable:discardDqmEventWhileHalting:dqmState(2)");
00851 }
00852 if (acceptSMDqmDiscard_[dqmIndex] > 0) {
00853 acceptSMDqmDiscard_[dqmIndex]--;
00854 if (--nbPendingSMDqmDiscards_ < 0) {
00855 try {
00856 LOG4CPLUS_WARN(
00857 log_,
00858 "Spurious??? DQM discard by StorageManager, index "
00859 << dqmIndex << " cell state "
00860 << shmBuffer_->dqmState(dqmIndex)
00861 << " accept flag "
00862 << acceptSMDqmDiscard_[dqmIndex]);
00863 } catch (evf::Exception& e) {
00864 rethrowShmBufferException(e,
00865 "FUResourceTable:discardDqmEventWhileHalting:dqmState");
00866 }
00867 }
00868
00869 } else {
00870 LOG4CPLUS_ERROR(
00871 log_,
00872 "Spurious DQM discard for cell " << dqmIndex
00873 << " from StorageManager while cell is not accepting discards");
00874 }
00875
00876 bufRef->release();
00877 return false;
00878 }
00879
00880
00881 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef) {
00882 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
00883 *msg =
00884 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
00885
00886
00887
00888
00889 int lumiCheck = (int) msg->lumiSection;
00890 if (lumiCheck < 0)
00891 LOG4CPLUS_ERROR(log_,
00892 "Received EOL message with invalid index:" << lumiCheck);
00893
00894 for (unsigned int i = 0; i < nbRawCells_; i++) {
00895
00896 if (stopFlag_) break;
00897 nbEolPosted_++;
00898 try {
00899 shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00900 } catch (evf::Exception& e) {
00901 rethrowShmBufferException(e,
00902 "FUResourceTable:postEndOfLumiSection:writeRawLumiSectionEvent");
00903 }
00904 }
00905 }
00906
00907
00908 void FUResourceTable::dropEvent() {
00909 FUShmRawCell* cell = 0;
00910 try {
00911 cell = shmBuffer_->rawCellToRead();
00912 } catch (evf::Exception& e) {
00913 rethrowShmBufferException(e, "FUResourceTable:dropEvent:rawCellToRead");
00914 }
00915 UInt_t fuResourceId = cell->fuResourceId();
00916 try {
00917 shmBuffer_->finishReadingRawCell(cell);
00918 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00919 } catch (evf::Exception& e) {
00920 rethrowShmBufferException(e,
00921 "FUResourceTable:dropEvent:finishReadingRawCell/scheduleRawCellForDiscard");
00922 }
00923 }
00924
00925
00926 bool FUResourceTable::handleCrashedEP(UInt_t runNumber, pid_t pid) {
00927 bool retval = false;
00928 vector < pid_t > pids = cellPrcIds();
00929 UInt_t iRawCell = pids.size();
00930 for (UInt_t i = 0; i < pids.size(); i++) {
00931 if (pid == pids[i]) {
00932 iRawCell = i;
00933 break;
00934 }
00935 }
00936
00937 if (iRawCell < pids.size()) {
00938 try {
00939 bool shmret = shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell, true);
00940 if (!shmret)
00941 LOG4CPLUS_WARN(log_,"Problem writing to the error stream.");
00942 } catch (evf::Exception& e) {
00943 rethrowShmBufferException(e,
00944 "FUResourceTable:handleCrashedEP:writeErrorEventData");
00945 }
00946 retval = true;
00947 } else
00948 LOG4CPLUS_WARN(log_,
00949 "No raw data to send to error stream for process " << pid);
00950 try {
00951 bool success = shmBuffer_->removeClientPrcId(pid);
00952 if (!success)
00953 LOG4CPLUS_WARN(log_,
00954 "removeClientPrcId: " << pid << " not in shared memory index, was in raw cell " << iRawCell);
00955 } catch (evf::Exception& e) {
00956 rethrowShmBufferException(e,
00957 "FUResourceTable:handleCrashedEP:removeClientPrcId");
00958 }
00959 return retval;
00960 }
00961
00962
00963 void FUResourceTable::shutdownWatchdog(unsigned int timeout)
00964 {
00965 unsigned int timeoutUs=timeout*1000000+1;
00966 bool warned=false;
00967 while (!watchDogEnd_) {
00968
00969 usleep(50000);
00970 timeoutUs-=50000;
00971 if (timeoutUs<=50000) {
00972 LOG4CPLUS_ERROR(log_,"Timeout in shutdownClients, status:"<< std::hex << shutdownStatus_);
00973 watchDogSetFailed_=true;
00974 break;
00975 }
00976 if (timeoutUs<=1000000*timeout/2 && !warned) {
00977 warned=true;
00978 LOG4CPLUS_WARN(log_,"Long shutdown of clients, status:" << std::hex << shutdownStatus_);
00979 }
00980 }
00981 }
00982
00983
00984 void FUResourceTable::shutDownClients() {
00985 nbClientsToShutDown_ = nbClients();
00986 isReadyToShutDown_ = false;
00987
00988 shutdownStatus_=1;
00989
00990
00991 watchDogEnd_=false;
00992 watchDogSetFailed_=false;
00993 #ifdef linux
00994 std::thread watch(&FUResourceTable::shutdownWatchdog,this,20);
00995 #endif
00996 if (nbClientsToShutDown_ == 0) {
00997 shutdownStatus_|=1<<1;
00998 LOG4CPLUS_INFO(
00999 log_,
01000 "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
01001 UInt_t n = nbResources();
01002 try {
01003 for (UInt_t i = 0; i < n; i++) {
01004 evt::State_t state = shmBuffer_->evtState(i);
01005 if (state != evt::EMPTY) {
01006 LOG4CPLUS_WARN(
01007 log_,
01008 "Schedule discard at STOP for orphaned event in state "
01009 << state);
01010 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
01011 }
01012 }
01013 shmBuffer_->scheduleRawEmptyCellForDiscard();
01014 } catch (evf::Exception& e) {
01015 rethrowShmBufferException(e,
01016 "FUResourceTable:shutDownClients:evtState/scheduleRawEmptyCellForDiscard");
01017 }
01018 } else {
01019
01020 int checks = 0;
01021 try {
01022 while (shmBuffer_->nbRawCellsToWrite() < nbClients() && nbClients()
01023 != 0) {
01024 shutdownStatus_|=1<<2;
01025 checks++;
01026 {
01027 #ifdef linux
01028 auto lk = lockCrashHandlerTimed(10);
01029 #else
01030 bool lk=true;
01031 #endif
01032 if (lk) {
01033 vector < pid_t > prcids = clientPrcIds();
01034 for (UInt_t i = 0; i < prcids.size(); i++) {
01035 pid_t pid = prcids[i];
01036 int status = kill(pid, 0);
01037 if (status != 0) {
01038 LOG4CPLUS_ERROR(log_,
01039 "EP prc " << pid << " completed with error.");
01040 handleCrashedEP(runNumber_, pid);
01041 }
01042 }
01043 }
01044 else {
01045 XCEPT_RAISE(evf::Exception,
01046 "Timed out access to the Crash Handler in stop. SM discards not arriving?");
01047
01048 }
01049 }
01050
01051 LOG4CPLUS_WARN(
01052 log_,
01053 "no cell to write stop "
01054 << shmBuffer_->nbRawCellsToWrite()
01055 << " nClients " << nbClients());
01056 if (checks > 15) {
01057 string msg = "No Raw Cell to Write STOP messages";
01058 XCEPT_RAISE(evf::Exception, msg);
01059 }
01060 ::usleep(500000);
01061 }
01062 shutdownStatus_|=1<<3;
01063
01064 } catch (evf::Exception& e) {
01065 watchDogEnd_=true;
01066 #ifdef linux
01067 watch.join();
01068 #endif
01069 rethrowShmBufferException(e,
01070 "FUResourceTable:shutDownClients:nbRawCellsToWrite");
01071 }
01072 nbClientsToShutDown_ = nbClients();
01073 if (nbClientsToShutDown_ == 0) {
01074 shutdownStatus_|=1<<4;
01075 UInt_t n = nbResources();
01076 for (UInt_t i = 0; i < n; i++) {
01077
01078 evt::State_t state = evt::EMPTY;
01079 try {
01080 state = shmBuffer_->evtState(i);
01081 } catch (evf::Exception& e) {
01082 watchDogEnd_=true;
01083 #ifdef linux
01084 watch.join();
01085 #endif
01086 rethrowShmBufferException(e,
01087 "FUResourceTable:shutDownClients:evtState");
01088 }
01089 if (state != evt::EMPTY) {
01090 LOG4CPLUS_WARN(
01091 log_,
01092 "Schedule discard at STOP for orphaned event in state "
01093 << state);
01094 try {
01095 shmBuffer_->setEvtDiscard(i, 1, true);
01096 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
01097 } catch (evf::Exception& e) {
01098 watchDogEnd_=true;
01099 #ifdef linux
01100 watch.join();
01101 #endif
01102 rethrowShmBufferException(e,
01103 "FUResourceTable:shutDownClients:scheduleRawCellForDiscardServerSide");
01104 }
01105 }
01106 }
01107 try {
01108 shmBuffer_->scheduleRawEmptyCellForDiscard();
01109 } catch (evf::Exception& e) {
01110 watchDogEnd_=true;
01111 #ifdef linux
01112 watch.join();
01113 #endif
01114 rethrowShmBufferException(e,
01115 "FUResourceTable:shutDownClients:scheduleRawEmptyCellForDiscard");
01116 }
01117 }
01118 UInt_t n = nbClientsToShutDown_;
01119 shutdownStatus_|=1<<5;
01120 try {
01121 for (UInt_t i = 0; i < n; ++i)
01122 shmBuffer_->writeRawEmptyEvent();
01123 } catch (evf::Exception& e) {
01124 watchDogEnd_=true;
01125 #ifdef linux
01126 watch.join();
01127 #endif
01128 rethrowShmBufferException(e,
01129 "FUResourceTable:shutDownClients:writeRawEmptyEvent");
01130 }
01131 shutdownStatus_|=1<<6;
01132 }
01133 watchDogEnd_=true;
01134 #ifdef linux
01135 watch.join();
01136 if (watchDogSetFailed_)
01137 XCEPT_RAISE(evf::Exception, "Failed (timed out) shutdown of clients");
01138 #endif
01139 }
01140
01141
01142 void FUResourceTable::clear() {
01143 for (UInt_t i = 0; i < resources_.size(); i++) {
01144 resources_[i]->release(true);
01145 delete resources_[i];
01146 }
01147 resources_.clear();
01148 while (!freeResourceIds_.empty())
01149 freeResourceIds_.pop();
01150 }
01151
01153
01155
01156
01157 void FUResourceTable::resetCounters() {
01158 if (0 != shmBuffer_) {
01159 try {
01160 for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
01161 acceptSMDataDiscard_[i] = false;
01162 for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
01163 acceptSMDqmDiscard_[i] = 0;
01164 } catch (evf::Exception& e) {
01165 rethrowShmBufferException(e,
01166 "FUResourceTable:resetCounters:nRecoCells/nDqmCells");
01167 }
01168 }
01169
01170
01171 nbAllocated_ = 0;
01172 nbPending_ = 0;
01173 nbCompleted_ = 0;
01174 nbSent_ = 0;
01175 nbSentError_ = 0;
01176 nbSentDqm_ = 0;
01177 nbPendingSMDiscards_ = 0;
01178 nbPendingSMDqmDiscards_ = 0;
01179 nbDiscarded_ = 0;
01180 nbLost_ = 0;
01181
01182 nbEolPosted_ = 0;
01183 nbEolDiscarded_ = 0;
01184
01185 nbErrors_ = 0;
01186 nbCrcErrors_ = 0;
01187 nbAllocSent_ = 0;
01188
01189 sumOfSquares_ = 0;
01190 sumOfSizes_ = 0;
01191
01192
01193 sDqmActive_=true;
01194 sDataActive_=true;
01195
01196 }
01197
01198
01199 UInt_t FUResourceTable::nbClients() const {
01200 UInt_t result(0);
01201 try {
01202 if (0 != shmBuffer_)
01203 result = shmBuffer_->nClients();
01204 } catch (evf::Exception& e) {
01205 rethrowShmBufferException(e, "FUResourceTable:nbClients:nClients");
01206 }
01207 return result;
01208 }
01209
01210
01211 vector<pid_t> FUResourceTable::clientPrcIds() const {
01212 vector < pid_t > result;
01213 try {
01214 if (0 != shmBuffer_) {
01215 UInt_t n = nbClients();
01216 for (UInt_t i = 0; i < n; i++)
01217 result.push_back(shmBuffer_->clientPrcId(i));
01218 }
01219 } catch (evf::Exception& e) {
01220 rethrowShmBufferException(e,
01221 "FUResourceTable:clientPrcIds:clientPrcIds");
01222 }
01223 return result;
01224 }
01225
01226
01227 string FUResourceTable::clientPrcIdsAsString() const {
01228 stringstream ss;
01229 try {
01230 if (0 != shmBuffer_) {
01231 UInt_t n = nbClients();
01232 for (UInt_t i = 0; i < n; i++) {
01233 if (i > 0)
01234 ss << ",";
01235 ss << shmBuffer_->clientPrcId(i);
01236 }
01237 }
01238 } catch (evf::Exception& e) {
01239 rethrowShmBufferException(e,
01240 "FUResourceTable:clientPrcIdsAsString:clientPrcId");
01241 }
01242 return ss.str();
01243 }
01244
01245
01246 vector<string> FUResourceTable::cellStates() const {
01247 vector < string > result;
01248 if (0 != shmBuffer_) {
01249 UInt_t n = nbResources();
01250 shmBuffer_->lock();
01251 try {
01252 for (UInt_t i = 0; i < n; i++) {
01253 evt::State_t state = shmBuffer_->evtState(i);
01254 if (state == evt::EMPTY)
01255 result.push_back("EMPTY");
01256 else if (state == evt::STOP)
01257 result.push_back("STOP");
01258 else if (state == evt::LUMISECTION)
01259 result.push_back("LUMISECTION");
01260
01261 else if (state == evt::USEDLS)
01262 result.push_back("USEDLS");
01263 else if (state == evt::RAWWRITING)
01264 result.push_back("RAWWRITING");
01265 else if (state == evt::RAWWRITTEN)
01266 result.push_back("RAWWRITTEN");
01267 else if (state == evt::RAWREADING)
01268 result.push_back("RAWREADING");
01269 else if (state == evt::RAWREAD)
01270 result.push_back("RAWREAD");
01271 else if (state == evt::PROCESSING)
01272 result.push_back("PROCESSING");
01273 else if (state == evt::PROCESSED)
01274 result.push_back("PROCESSED");
01275 else if (state == evt::RECOWRITING)
01276 result.push_back("RECOWRITING");
01277 else if (state == evt::RECOWRITTEN)
01278 result.push_back("RECOWRITTEN");
01279 else if (state == evt::SENDING)
01280 result.push_back("SENDING");
01281 else if (state == evt::SENT)
01282 result.push_back("SENT");
01283 else if (state == evt::DISCARDING)
01284 result.push_back("DISCARDING");
01285 }
01286 } catch (evf::Exception& e) {
01287 rethrowShmBufferException(e, "FUResourceTable:cellStates:evtState");
01288 }
01289 shmBuffer_->unlock();
01290 }
01291 return result;
01292 }
01293
01294 vector<string> FUResourceTable::dqmCellStates() const {
01295 vector < string > result;
01296 if (0 != shmBuffer_) {
01297 UInt_t n = nbDqmCells_;
01298 shmBuffer_->lock();
01299 try {
01300 for (UInt_t i = 0; i < n; i++) {
01301 dqm::State_t state = shmBuffer_->dqmState(i);
01302 if (state == dqm::EMPTY)
01303 result.push_back("EMPTY");
01304 else if (state == dqm::WRITING)
01305 result.push_back("WRITING");
01306 else if (state == dqm::WRITTEN)
01307 result.push_back("WRITTEN");
01308 else if (state == dqm::SENDING)
01309 result.push_back("SENDING");
01310 else if (state == dqm::SENT)
01311 result.push_back("SENT");
01312 else if (state == dqm::DISCARDING)
01313 result.push_back("DISCARDING");
01314 }
01315 } catch (evf::Exception& e) {
01316 rethrowShmBufferException(e,
01317 "FUResourceTable:dqmCellStates:dqmState");
01318 }
01319 shmBuffer_->unlock();
01320 }
01321 return result;
01322 }
01323
01324
01325 vector<UInt_t> FUResourceTable::cellEvtNumbers() const {
01326 vector < UInt_t > result;
01327 if (0 != shmBuffer_) {
01328 UInt_t n = nbResources();
01329 shmBuffer_->lock();
01330 try {
01331 for (UInt_t i = 0; i < n; i++)
01332 result.push_back(shmBuffer_->evtNumber(i));
01333 } catch (evf::Exception& e) {
01334 rethrowShmBufferException(e,
01335 "FUResourceTable:cellEvtNumbers:evtNumber");
01336 }
01337 shmBuffer_->unlock();
01338 }
01339 return result;
01340 }
01341
01342
01343 vector<pid_t> FUResourceTable::cellPrcIds() const {
01344 vector < pid_t > result;
01345 if (0 != shmBuffer_) {
01346 UInt_t n = nbResources();
01347 shmBuffer_->lock();
01348 try {
01349 for (UInt_t i = 0; i < n; i++)
01350 result.push_back(shmBuffer_->evtPrcId(i));
01351 } catch (evf::Exception& e) {
01352 rethrowShmBufferException(e, "FUResourceTable:cellPrcIds:evtPrcId");
01353 }
01354 shmBuffer_->unlock();
01355 }
01356 return result;
01357 }
01358
01359
01360 vector<time_t> FUResourceTable::cellTimeStamps() const {
01361 vector < time_t > result;
01362 try {
01363 if (0 != shmBuffer_) {
01364 UInt_t n = nbResources();
01365 shmBuffer_->lock();
01366 for (UInt_t i = 0; i < n; i++)
01367 result.push_back(shmBuffer_->evtTimeStamp(i));
01368 shmBuffer_->unlock();
01369 }
01370 } catch (evf::Exception& e) {
01371 rethrowShmBufferException(e,
01372 "FUResourceTable:cellTimeStamps:evtTimeStamp");
01373 }
01374 return result;
01375 }
01376
01378
01380
01381 void FUResourceTable::lastResort() {
01382 try {
01383 ostringstream ost;
01384 ost << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01385 << " more rawcells to read ";
01386 LOG4CPLUS_WARN(log_,ost.str());
01387 std::cout << ost.str() << std::endl;
01388
01389 while (shmBuffer_->nbRawCellsToRead() != 0) {
01390 FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
01391 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01392 << std::endl;
01393
01394 LOG4CPLUS_WARN(log_,"lastResort: Scheduling raw cell (server side) "<< newCell->index());
01395 shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
01396
01397 std::cout << "lastResort: schedule raw cell for discard "
01398 << newCell->index() << std::endl;
01399 }
01400
01401 LOG4CPLUS_WARN(log_,"lastResort: scheduling empty raw cell (server side) ");
01402 shmBuffer_->scheduleRawEmptyCellForDiscard();
01403 LOG4CPLUS_WARN(log_,"lastResort: Finished. cells remaining: " << shmBuffer_->nbRawCellsToRead());
01404 } catch (evf::Exception& e) {
01405 rethrowShmBufferException(
01406 e,
01407 "FUResourceTable:lastResort:nbRawCellsToRead/scheduleRawCellForDiscardServerSide");
01408 }
01409 LOG4CPLUS_WARN(log_,"Last resort finished ");
01410 }
01411
01412 void FUResourceTable::resetIPC() {
01413 if (shmBuffer_ != 0) {
01414
01415 int countdown_=60;
01416 while (countdown_-- && (sDataActive_ || sDqmActive_)) ::usleep(50000);
01417 if (countdown_<=0) {
01418 std::ostringstream ostr;
01419 ostr << "Resource broker timed out waiting for workloop shutdowns (3 seconds). Continuing to reset Shm. States - "
01420 << " sendDqm:"<<sDqmActive_ << " sendData:" << sDataActive_;
01421 LOG4CPLUS_ERROR(log_,ostr.str());
01422 std::cout << ostr.str() << std::endl;
01423 }
01424
01425 shmBuffer_->reset(false);
01426 LOG4CPLUS_INFO(log_, "ShmBuffer was reset!");
01427 }
01428 }
01429
01430 std::string FUResourceTable::printStatus() {
01431 if (shmBuffer_) return shmBuffer_->sem_print_s();
01432 else return std::string("ShmBuffer not initialized");
01433 }
01434
01435 void FUResourceTable::rethrowShmBufferException(evf::Exception& e, string where) const
01436 throw (evf::Exception) {
01437 stringstream details;
01438 vector < string > dataStates = cellStates();
01439 vector < string > dqmStates = dqmCellStates();
01440 details << "Exception raised: " << e.what() << " (in module: "
01441 << e.module() << " in function: " << e.function() << " at line: "
01442 << e.line() << ")";
01443 details << " Dumping cell state... ";
01444 details << "data cells --> ";
01445 for (unsigned int i = 0; i < dataStates.size(); i++)
01446 details << dataStates[i] << " ";
01447 details << "dqm cells --> ";
01448 for (unsigned int i = 0; i < dqmStates.size(); i++)
01449 details << dqmStates[i] << " ";
01450 details << " ... originated in: " << where;
01451 XCEPT_RETHROW(evf::Exception, details.str(), e);
01452 }