00001
00002
00003
00004
00005
00006
00007
00009
00010 #include "EventFilter/ResourceBroker/interface/FUResourceTable.h"
00011 #include "EvffedFillerRB.h"
00012
00013 #include "interface/evb/i2oEVBMsgs.h"
00014 #include "xcept/tools.h"
00015
00016 #include <sys/types.h>
00017 #include <signal.h>
00018
00019
00020
00021 using namespace evf;
00022 using namespace std;
00023
00025
00027
00028
00029 FUResourceTable::FUResourceTable(bool segmentationMode, UInt_t nbRawCells,
00030 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00031 UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
00032 SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
00033 EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
00034
00035
00036 IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00037 rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
00038 logger, timeout, frb, app), shmBuffer_(0)
00039
00040 {
00041 initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00042 rawCellSize, recoCellSize, dqmCellSize);
00043 }
00044
00045
00046 FUResourceTable::~FUResourceTable() {
00047 clear();
00048
00049 shmdt(shmBuffer_);
00050 if (FUShmBuffer::releaseSharedMemory())
00051 LOG4CPLUS_INFO(log_, "SHARED MEMORY SUCCESSFULLY RELEASED.");
00052 if (0 != acceptSMDataDiscard_)
00053 delete[] acceptSMDataDiscard_;
00054 if (0 != acceptSMDqmDiscard_)
00055 delete[] acceptSMDqmDiscard_;
00056 }
00057
00059
00061
00062
00063 void FUResourceTable::initialize(bool segmentationMode, UInt_t nbRawCells,
00064 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00065 UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) {
00066 clear();
00067
00068 shmBuffer_ = FUShmBuffer::createShmBuffer(segmentationMode, nbRawCells,
00069 nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize);
00070 if (0 == shmBuffer_) {
00071 string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
00072 LOG4CPLUS_FATAL(log_, msg);
00073 XCEPT_RAISE(evf::Exception, msg);
00074 }
00075
00076 for (UInt_t i = 0; i < nbRawCells_; i++) {
00077 FUResource* newResource = new FUResource(i, log_, frb_, app_);
00078 newResource->release(true);
00079 resources_.push_back(newResource);
00080 freeResourceIds_.push(i);
00081 }
00082
00083 acceptSMDataDiscard_ = new bool[nbRecoCells];
00084 acceptSMDqmDiscard_ = new int[nbDqmCells];
00085
00086 resetCounters();
00087 }
00088
00089
00090 bool FUResourceTable::sendData() {
00091 bool reschedule = true;
00092 FUShmRecoCell* cell = 0;
00093 try {
00094 cell = shmBuffer_->recoCellToRead();
00095 } catch (evf::Exception& e) {
00096 rethrowShmBufferException(e);
00097 }
00098
00099 if (0 == cell->eventSize()) {
00100 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00101 UInt_t cellIndex = cell->index();
00102 try {
00103 shmBuffer_->finishReadingRecoCell(cell);
00104 shmBuffer_->discardRecoCell(cellIndex);
00105 } catch (evf::Exception& e) {
00106 rethrowShmBufferException(e);
00107 }
00108 reschedule = false;
00109 } else {
00110 try {
00111 if (cell->type() == 0) {
00112 UInt_t cellIndex = cell->index();
00113 UInt_t cellOutModId = cell->outModId();
00114 UInt_t cellFUProcId = cell->fuProcessId();
00115 UInt_t cellFUGuid = cell->fuGuid();
00116 UChar_t* cellPayloadAddr = cell->payloadAddr();
00117 UInt_t cellEventSize = cell->eventSize();
00118 UInt_t cellExpectedEPs = cell->nExpectedEPs();
00119 try {
00120 shmBuffer_->finishReadingRecoCell(cell);
00121 } catch (evf::Exception& e) {
00122 rethrowShmBufferException(e);
00123 }
00124
00125 lock();
00126 nbPendingSMDiscards_++;
00127 unlock();
00128
00129 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
00130 cellFUGuid, cellPayloadAddr, cellEventSize,
00131 cellExpectedEPs);
00132 } else if (cell->type() == 1) {
00133 UInt_t cellIndex = cell->index();
00134 UInt_t cellRawIndex = cell->rawCellIndex();
00135 UInt_t cellRunNumber = cell->runNumber();
00136 UInt_t cellEvtNumber = cell->evtNumber();
00137 UInt_t cellOutModId = cell->outModId();
00138 UInt_t cellFUProcId = cell->fuProcessId();
00139 UInt_t cellFUGuid = cell->fuGuid();
00140 UChar_t *cellPayloadAddr = cell->payloadAddr();
00141 UInt_t cellEventSize = cell->eventSize();
00142 try {
00143 shmBuffer_->finishReadingRecoCell(cell);
00144 } catch (evf::Exception& e) {
00145 rethrowShmBufferException(e);
00146 }
00147
00148 lock();
00149 nbPendingSMDiscards_++;
00150 resources_[cellRawIndex]->incNbSent();
00151 if (resources_[cellRawIndex]->nbSent() == 1)
00152 nbSent_++;
00153 unlock();
00154
00155 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
00156 cellOutModId, cellFUProcId, cellFUGuid,
00157 cellPayloadAddr, cellEventSize);
00158 } else if (cell->type() == 2) {
00159 UInt_t cellIndex = cell->index();
00160 UInt_t cellRawIndex = cell->rawCellIndex();
00161
00162 UInt_t cellEvtNumber = cell->evtNumber();
00163 UInt_t cellFUProcId = cell->fuProcessId();
00164 UInt_t cellFUGuid = cell->fuGuid();
00165 UChar_t *cellPayloadAddr = cell->payloadAddr();
00166 UInt_t cellEventSize = cell->eventSize();
00167 try {
00168 shmBuffer_->finishReadingRecoCell(cell);
00169 } catch (evf::Exception& e) {
00170 rethrowShmBufferException(e);
00171 }
00172
00173 lock();
00174 nbPendingSMDiscards_++;
00175 resources_[cellRawIndex]->incNbSent();
00176 if (resources_[cellRawIndex]->nbSent() == 1) {
00177 nbSent_++;
00178 nbSentError_++;
00179 }
00180 unlock();
00181
00182 sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
00183 cellFUProcId, cellFUGuid, cellPayloadAddr,
00184 cellEventSize);
00185 } else {
00186 string errmsg =
00187 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
00188 XCEPT_RAISE(evf::Exception, errmsg);
00189 }
00190 } catch (xcept::Exception& e) {
00191 LOG4CPLUS_FATAL(
00192 log_,
00193 "Failed to send EVENT DATA to StorageManager: "
00194 << xcept::stdformat_exception_history(e));
00195 reschedule = false;
00196 }
00197 }
00198
00199 return reschedule;
00200 }
00201
00202
00203 bool FUResourceTable::sendDataWhileHalting() {
00204 bool reschedule = true;
00205 FUShmRecoCell* cell = 0;
00206 try {
00207 cell = shmBuffer_->recoCellToRead();
00208 } catch (evf::Exception& e) {
00209 rethrowShmBufferException(e);
00210 }
00211
00212 if (0 == cell->eventSize()) {
00213 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00214 UInt_t cellIndex = cell->index();
00215 try {
00216 shmBuffer_->finishReadingRecoCell(cell);
00217 shmBuffer_->discardRecoCell(cellIndex);
00218 } catch (evf::Exception& e) {
00219 rethrowShmBufferException(e);
00220 }
00221 reschedule = false;
00222 } else {
00223 LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
00224 UInt_t cellIndex = cell->index();
00225 try {
00226 shmBuffer_->finishReadingRecoCell(cell);
00227 shmBuffer_->discardRecoCell(cellIndex);
00228 } catch (evf::Exception& e) {
00229 rethrowShmBufferException(e);
00230 }
00231 }
00232
00233 return reschedule;
00234 }
00235
00236
00237 bool FUResourceTable::sendDqm() {
00238 bool reschedule = true;
00239 FUShmDqmCell* cell = 0;
00240
00241 dqm::State_t state = dqm::EMPTY;
00242 try {
00243 cell = shmBuffer_->dqmCellToRead();
00244 state = shmBuffer_->dqmState(cell->index());
00245 } catch (evf::Exception& e) {
00246 rethrowShmBufferException(e);
00247 }
00248
00249 if (state == dqm::EMPTY) {
00250 LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
00251 std::cout << "shut down dqm workloop " << std::endl;
00252 UInt_t cellIndex = cell->index();
00253 try {
00254 shmBuffer_->finishReadingDqmCell(cell);
00255 shmBuffer_->discardDqmCell(cellIndex);
00256 } catch (evf::Exception& e) {
00257 rethrowShmBufferException(e);
00258 }
00259 reschedule = false;
00260 } else {
00261 try {
00262 UInt_t cellIndex = cell->index();
00263 UInt_t cellRunNumber = cell->runNumber();
00264 UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00265 UInt_t cellFolderId = cell->folderId();
00266 UInt_t cellFUProcId = cell->fuProcessId();
00267 UInt_t cellFUGuid = cell->fuGuid();
00268 UChar_t *cellPayloadAddr = cell->payloadAddr();
00269 UInt_t cellEventSize = cell->eventSize();
00270 sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
00271 cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
00272 cellEventSize);
00273 try {
00274 shmBuffer_->finishReadingDqmCell(cell);
00275 } catch (evf::Exception& e) {
00276 rethrowShmBufferException(e);
00277 }
00278 } catch (xcept::Exception& e) {
00279 LOG4CPLUS_FATAL(
00280 log_,
00281 "Failed to send DQM DATA to StorageManager: "
00282 << xcept::stdformat_exception_history(e));
00283 reschedule = false;
00284 }
00285 }
00286
00287 return reschedule;
00288 }
00289
00290
00291 bool FUResourceTable::sendDqmWhileHalting() {
00292 bool reschedule = true;
00293 FUShmDqmCell* cell = 0;
00294
00295 dqm::State_t state = dqm::EMPTY;
00296 try {
00297 cell = shmBuffer_->dqmCellToRead();
00298 state = shmBuffer_->dqmState(cell->index());
00299 } catch (evf::Exception& e) {
00300 rethrowShmBufferException(e);
00301 }
00302
00303 if (state == dqm::EMPTY) {
00304 LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
00305 std::cout << "shut down dqm workloop " << std::endl;
00306 UInt_t cellIndex = cell->index();
00307 try {
00308 shmBuffer_->finishReadingDqmCell(cell);
00309 shmBuffer_->discardDqmCell(cellIndex);
00310 } catch (evf::Exception& e) {
00311 rethrowShmBufferException(e);
00312 }
00313 reschedule = false;
00314 } else {
00315 UInt_t cellIndex = cell->index();
00316 try {
00317 shmBuffer_->finishReadingDqmCell(cell);
00318 shmBuffer_->discardDqmCell(cellIndex);
00319 } catch (evf::Exception& e) {
00320 rethrowShmBufferException(e);
00321 }
00322 }
00323
00324 return reschedule;
00325 }
00326
00327
00328
00329
00330 void FUResourceTable::discardNoReschedule() {
00331 std::cout << " entered shutdown cycle " << std::endl;
00332 try {
00333 shmBuffer_->writeRecoEmptyEvent();
00334 } catch (evf::Exception& e) {
00335 rethrowShmBufferException(e);
00336 }
00337 UInt_t count = 0;
00338 while (count < 100) {
00339 std::cout << " shutdown cycle " << shmBuffer_->nClients() << " "
00340 << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
00341 if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
00342 shmBuffer_->shmid()) == 1) {
00343
00344 break;
00345 } else {
00346 count++;
00347 std::cout << " shutdown cycle attempt " << count << std::endl;
00348 LOG4CPLUS_DEBUG(
00349 log_,
00350 "FUResourceTable: Wait for all clients to detach,"
00351 << " nClients=" << shmBuffer_->nClients()
00352 << " nattch=" << FUShmBuffer::shm_nattch(
00353 shmBuffer_->shmid()) << " (" << count << ")");
00354 ::usleep(shutdownTimeout_);
00355 if (count * shutdownTimeout_ > 10000000)
00356 LOG4CPLUS_WARN(
00357 log_,
00358 "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
00359 << " nClients=" << shmBuffer_->nClients()
00360 << " nattch=" << FUShmBuffer::shm_nattch(
00361 shmBuffer_->shmid()) << " (" << count << ")");
00362
00363 }
00364 }
00365 bool allEmpty = false;
00366 std::cout << "Checking if all dqm cells are empty " << std::endl;
00367 while (!allEmpty) {
00368 UInt_t n = nbDqmCells_;
00369 allEmpty = true;
00370 shmBuffer_->lock();
00371 for (UInt_t i = 0; i < n; i++) {
00372
00373 dqm::State_t state = dqm::EMPTY;
00374 try {
00375 state = shmBuffer_->dqmState(i);
00376 } catch (evf::Exception& e) {
00377 rethrowShmBufferException(e);
00378 }
00379 if (state != dqm::EMPTY)
00380 allEmpty = false;
00381 }
00382 shmBuffer_->unlock();
00383 }
00384 std::cout << "Making sure there are no dqm pending discards " << std::endl;
00385 if (nbPendingSMDqmDiscards_ != 0) {
00386 LOG4CPLUS_WARN(
00387 log_,
00388 "FUResourceTable: pending DQM discards not zero: ="
00389 << nbPendingSMDqmDiscards_
00390 << " while cells are all empty. This may cause problems at next start ");
00391
00392 }
00393 try {
00394 shmBuffer_->writeDqmEmptyEvent();
00395 } catch (evf::Exception& e) {
00396 rethrowShmBufferException(e);
00397 }
00398 isReadyToShutDown_ = true;
00399
00400 }
00401
00402
00403 bool FUResourceTable::discard() {
00404 FUShmRawCell* cell = 0;
00405
00406 evt::State_t state = evt::EMPTY;
00407 try {
00408 cell = shmBuffer_->rawCellToDiscard();
00409 state = shmBuffer_->evtState(cell->index());
00410 } catch (evf::Exception& e) {
00411 rethrowShmBufferException(e);
00412 }
00413
00414 bool reschedule = true;
00415 bool shutDown = (state == evt::STOP);
00416 bool isLumi = (state == evt::USEDLS);
00417 UInt_t fuResourceId = cell->fuResourceId();
00418 UInt_t buResourceId = cell->buResourceId();
00419
00420 if (state == evt::EMPTY) {
00421 LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
00422 return true;
00423 }
00424
00425 if (shutDown) {
00426 LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00427 if (nbClientsToShutDown_ > 0)
00428 --nbClientsToShutDown_;
00429 if (nbClientsToShutDown_ == 0) {
00430 LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00431 isActive_ = false;
00432 reschedule = false;
00433 }
00434 }
00435
00436 try {
00437 shmBuffer_->discardRawCell(cell);
00438 } catch (evf::Exception& e) {
00439 rethrowShmBufferException(e);
00440 }
00441
00442 if (isLumi)
00443 nbEolDiscarded_++;
00444
00445 if (!shutDown && !isLumi) {
00446 if (fuResourceId >= nbResources()) {
00447 LOG4CPLUS_WARN(
00448 log_,
00449 "cell " << cell->index() << " in state " << state
00450 << " scheduled for discard has no associated FU resource ");
00451 } else {
00452 resources_[fuResourceId]->release(true);
00453 lock();
00454 freeResourceIds_.push(fuResourceId);
00455 assert(freeResourceIds_.size() <= resources_.size());
00456 unlock();
00457
00458 sendDiscard(buResourceId);
00459 sendAllocate();
00460 }
00461 }
00462
00463 if (!reschedule) {
00464 discardNoReschedule();
00465 }
00466
00467 return reschedule;
00468 }
00469
00470
00471 bool FUResourceTable::discardWhileHalting(bool sendDiscards) {
00472 FUShmRawCell* cell = 0;
00473
00474 evt::State_t state = evt::EMPTY;
00475 try {
00476 cell = shmBuffer_->rawCellToDiscard();
00477 state = shmBuffer_->evtState(cell->index());
00478 } catch (evf::Exception& e) {
00479 rethrowShmBufferException(e);
00480 }
00481
00482 bool reschedule = true;
00483 bool shutDown = (state == evt::STOP);
00484 bool isLumi = (state == evt::USEDLS);
00485 UInt_t fuResourceId = cell->fuResourceId();
00486 UInt_t buResourceId = cell->buResourceId();
00487
00488 if (state == evt::EMPTY) {
00489 LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
00490 return true;
00491 }
00492
00493 if (shutDown) {
00494 LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
00495 if (nbClientsToShutDown_ > 0)
00496 --nbClientsToShutDown_;
00497 if (nbClientsToShutDown_ == 0) {
00498 LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
00499 isActive_ = false;
00500 reschedule = false;
00501 }
00502 }
00503
00504 try {
00505 shmBuffer_->discardRawCell(cell);
00506 } catch (evf::Exception& e) {
00507 rethrowShmBufferException(e);
00508 }
00509
00510 if (isLumi)
00511 nbEolDiscarded_++;
00512
00513 if (!shutDown && !isLumi) {
00514 if (fuResourceId >= nbResources()) {
00515 LOG4CPLUS_WARN(
00516 log_,
00517 "cell " << cell->index() << " in state " << state
00518 << " scheduled for discard has no associated FU resource ");
00519 } else {
00520 resources_[fuResourceId]->release(true);
00521 lock();
00522 freeResourceIds_.push(fuResourceId);
00523 assert(freeResourceIds_.size() <= resources_.size());
00524 unlock();
00525
00526
00527
00528
00529
00530 if (sendDiscards)
00531 sendDiscard(buResourceId);
00532 }
00533 }
00534
00535 if (!reschedule) {
00536 discardNoReschedule();
00537 }
00538
00539 return reschedule;
00540 }
00541
00542
00543 bool FUResourceTable::buildResource(MemRef_t* bufRef) {
00544 bool eventComplete = false;
00545
00546 bool lastMsg = isLastMessageOfEvent(bufRef);
00547 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
00548 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
00549
00550 UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
00551 UInt_t buResourceId = (UInt_t) block->buResourceId;
00552 FUResource* resource = resources_[fuResourceId];
00553
00554
00555 if (!resource->fatalError() && !resource->isAllocated()) {
00556 FUShmRawCell* cell = 0;
00557 try {
00558 cell = shmBuffer_->rawCellToWrite();
00559 } catch (evf::Exception& e) {
00560 rethrowShmBufferException(e);
00561 }
00562 if (cell == 0) {
00563 bufRef->release();
00564 return eventComplete;
00565 }
00566 resource->allocate(cell);
00567 timeval now;
00568 gettimeofday(&now, 0);
00569
00570 frb_->setRBTimeStamp(
00571 ((uint64_t) (now.tv_sec) << 32) + (uint64_t) (now.tv_usec));
00572
00573 frb_->setRBEventCount(nbCompleted_);
00574
00575 if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
00576 resource->doCrcCheck(true);
00577 else
00578 resource->doCrcCheck(false);
00579 }
00580
00581 #ifdef DEBUG_RES_TAB
00582 std::cout << "Received frame for resource " << buResourceId << std::endl;
00583 #endif
00584
00585 if (!resource->fatalError()) {
00586 #ifdef DEBUG_RES_TAB
00587 std::cout << "No fatal error for " << buResourceId << ", keep building..."<< std::endl;
00588 #endif
00589 resource->process(bufRef);
00590 lock();
00591 nbErrors_ += resource->nbErrors();
00592 nbCrcErrors_ += resource->nbCrcErrors();
00593 unlock();
00594 #ifdef DEBUG_RES_TAB
00595 std::cout << "Checking if resource is complete " << buResourceId << std::endl;
00596 #endif
00597
00598 if (resource->isComplete()) {
00599 #ifdef DEBUG_RES_TAB
00600 std::cout << "@@@@RESOURCE is COMPLETE " << buResourceId << std::endl;
00601 #endif
00602 lock();
00603 nbCompleted_++;
00604 nbPending_--;
00605 unlock();
00606 if (doDumpEvents_ > 0 && nbCompleted_ % doDumpEvents_ == 0)
00607 dumpEvent(resource->shmCell());
00608 try {
00609 shmBuffer_->finishWritingRawCell(resource->shmCell());
00610 } catch (evf::Exception& e) {
00611 rethrowShmBufferException(e);
00612 }
00613 eventComplete = true;
00614 }
00615
00616 }
00617
00618 if (resource->fatalError()) {
00619 if (lastMsg) {
00620 try {
00621 shmBuffer_->releaseRawCell(resource->shmCell());
00622 } catch (evf::Exception& e) {
00623 rethrowShmBufferException(e);
00624 }
00625 resource->release(true);
00626 lock();
00627 freeResourceIds_.push(fuResourceId);
00628 nbDiscarded_++;
00629 nbLost_++;
00630 nbPending_--;
00631 unlock();
00632 bu_->sendDiscard(buResourceId);
00633 sendAllocate();
00634 }
00635
00636 }
00637
00638 return eventComplete;
00639 }
00640
00641
00642 bool FUResourceTable::discardDataEvent(MemRef_t* bufRef) {
00643 I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00644 msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00645 UInt_t recoIndex = msg->rbBufferID;
00646
00647 if (acceptSMDataDiscard_[recoIndex]) {
00648 lock();
00649 nbPendingSMDiscards_--;
00650 unlock();
00651 acceptSMDataDiscard_[recoIndex] = false;
00652
00653 try {
00654 shmBuffer_->discardRecoCell(recoIndex);
00655 } catch (evf::Exception& e) {
00656 rethrowShmBufferException(e);
00657 }
00658 bufRef->release();
00659
00660 } else {
00661 LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00662 }
00663
00664 return true;
00665 }
00666
00667
00668 bool FUResourceTable::discardDataEventWhileHalting(MemRef_t* bufRef) {
00669 I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
00670 msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00671 UInt_t recoIndex = msg->rbBufferID;
00672
00673 if (acceptSMDataDiscard_[recoIndex]) {
00674 lock();
00675 nbPendingSMDiscards_--;
00676 unlock();
00677 acceptSMDataDiscard_[recoIndex] = false;
00678
00679 } else {
00680 LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
00681 }
00682
00683 bufRef->release();
00684 return false;
00685 }
00686
00687
00688 bool FUResourceTable::discardDqmEvent(MemRef_t* bufRef) {
00689 I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00690 msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00691 UInt_t dqmIndex = msg->rbBufferID;
00692 unsigned int ntries = 0;
00693 try {
00694 while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
00695 LOG4CPLUS_WARN(
00696 log_,
00697 "DQM discard for cell " << dqmIndex
00698 << " which is not yer in SENT state - waiting");
00699 ::usleep(10000);
00700 if (ntries++ > 10) {
00701 LOG4CPLUS_ERROR(
00702 log_,
00703 "DQM cell " << dqmIndex
00704 << " discard timed out while cell still in state "
00705 << shmBuffer_->dqmState(dqmIndex));
00706 bufRef->release();
00707 return true;
00708 }
00709 }
00710 } catch (evf::Exception& e) {
00711 rethrowShmBufferException(e);
00712 }
00713 if (acceptSMDqmDiscard_[dqmIndex] > 0) {
00714 acceptSMDqmDiscard_[dqmIndex]--;
00715 if (nbPendingSMDqmDiscards_ > 0) {
00716 nbPendingSMDqmDiscards_--;
00717 } else {
00718 LOG4CPLUS_WARN(
00719 log_,
00720 "Spurious??? DQM discard by StorageManager, index "
00721 << dqmIndex << " cell state "
00722 << shmBuffer_->dqmState(dqmIndex)
00723 << " accept flag " << acceptSMDqmDiscard_[dqmIndex]);
00724 }
00725 try {
00726 shmBuffer_->discardDqmCell(dqmIndex);
00727 } catch (evf::Exception& e) {
00728 rethrowShmBufferException(e);
00729 }
00730 bufRef->release();
00731
00732 } else {
00733 LOG4CPLUS_ERROR(
00734 log_,
00735 "Spurious DQM discard for cell " << dqmIndex
00736 << " from StorageManager while cell is not accepting discards");
00737 }
00738
00739 return true;
00740 }
00741
00742
00743
00744 bool FUResourceTable::discardDqmEventWhileHalting(MemRef_t* bufRef) {
00745 I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
00746 msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
00747 UInt_t dqmIndex = msg->rbBufferID;
00748 unsigned int ntries = 0;
00749 try {
00750 while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
00751 LOG4CPLUS_WARN(
00752 log_,
00753 "DQM discard for cell " << dqmIndex
00754 << " which is not yer in SENT state - waiting");
00755 ::usleep(10000);
00756 if (ntries++ > 10) {
00757 LOG4CPLUS_ERROR(
00758 log_,
00759 "DQM cell " << dqmIndex
00760 << " discard timed out while cell still in state "
00761 << shmBuffer_->dqmState(dqmIndex));
00762 bufRef->release();
00763 return true;
00764 }
00765 }
00766 } catch (evf::Exception& e) {
00767 rethrowShmBufferException(e);
00768 }
00769 if (acceptSMDqmDiscard_[dqmIndex] > 0) {
00770 acceptSMDqmDiscard_[dqmIndex]--;
00771 if (nbPendingSMDqmDiscards_ > 0) {
00772 nbPendingSMDqmDiscards_--;
00773 } else {
00774 try {
00775 LOG4CPLUS_WARN(
00776 log_,
00777 "Spurious??? DQM discard by StorageManager, index "
00778 << dqmIndex << " cell state "
00779 << shmBuffer_->dqmState(dqmIndex)
00780 << " accept flag "
00781 << acceptSMDqmDiscard_[dqmIndex]);
00782 } catch (evf::Exception& e) {
00783 rethrowShmBufferException(e);
00784 }
00785 }
00786
00787 } else {
00788 LOG4CPLUS_ERROR(
00789 log_,
00790 "Spurious DQM discard for cell " << dqmIndex
00791 << " from StorageManager while cell is not accepting discards");
00792 }
00793
00794 bufRef->release();
00795 return false;
00796 }
00797
00798
00799 void FUResourceTable::postEndOfLumiSection(MemRef_t* bufRef) {
00800 I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
00801 *msg =
00802 (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
00803
00804
00805
00806 for (unsigned int i = 0; i < nbRawCells_; i++) {
00807
00808 nbEolPosted_++;
00809 try {
00810 shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
00811 } catch (evf::Exception& e) {
00812 rethrowShmBufferException(e);
00813 }
00814 }
00815 }
00816
00817
00818 void FUResourceTable::dropEvent() {
00819 FUShmRawCell* cell = 0;
00820 try {
00821 cell = shmBuffer_->rawCellToRead();
00822 } catch (evf::Exception& e) {
00823 rethrowShmBufferException(e);
00824 }
00825 UInt_t fuResourceId = cell->fuResourceId();
00826 try {
00827 shmBuffer_->finishReadingRawCell(cell);
00828 shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
00829 } catch (evf::Exception& e) {
00830 rethrowShmBufferException(e);
00831 }
00832 }
00833
00834
00835 bool FUResourceTable::handleCrashedEP(UInt_t runNumber, pid_t pid) {
00836 bool retval = false;
00837 vector<pid_t> pids = cellPrcIds();
00838 UInt_t iRawCell = pids.size();
00839 for (UInt_t i = 0; i < pids.size(); i++) {
00840 if (pid == pids[i]) {
00841 iRawCell = i;
00842 break;
00843 }
00844 }
00845
00846 if (iRawCell < pids.size()) {
00847 try {
00848 shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell, true);
00849 } catch (evf::Exception& e) {
00850 rethrowShmBufferException(e);
00851 }
00852 retval = true;
00853 } else
00854 LOG4CPLUS_WARN(log_,
00855 "No raw data to send to error stream for process " << pid);
00856 try {
00857 shmBuffer_->removeClientPrcId(pid);
00858 } catch (evf::Exception& e) {
00859 rethrowShmBufferException(e);
00860 }
00861 return retval;
00862 }
00863
00864
00865 void FUResourceTable::shutDownClients() {
00866 nbClientsToShutDown_ = nbClients();
00867 isReadyToShutDown_ = false;
00868
00869 if (nbClientsToShutDown_ == 0) {
00870 LOG4CPLUS_INFO(
00871 log_,
00872 "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
00873 UInt_t n = nbResources();
00874 try {
00875 for (UInt_t i = 0; i < n; i++) {
00876 evt::State_t state = shmBuffer_->evtState(i);
00877 if (state != evt::EMPTY) {
00878 LOG4CPLUS_WARN(
00879 log_,
00880 "Schedule discard at STOP for orphaned event in state "
00881 << state);
00882 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00883 }
00884 }
00885 shmBuffer_->scheduleRawEmptyCellForDiscard();
00886 } catch (evf::Exception& e) {
00887 rethrowShmBufferException(e);
00888 }
00889 } else {
00890
00891 int checks = 0;
00892 try {
00893 while (shmBuffer_->nbRawCellsToWrite() < nbClients() && nbClients()
00894 != 0) {
00895 checks++;
00896 vector<pid_t> prcids = clientPrcIds();
00897 for (UInt_t i = 0; i < prcids.size(); i++) {
00898 pid_t pid = prcids[i];
00899 int status = kill(pid, 0);
00900 if (status != 0) {
00901 LOG4CPLUS_ERROR(log_,
00902 "EP prc " << pid << " completed with error.");
00903 handleCrashedEP(runNumber_, pid);
00904 }
00905 }
00906
00907 LOG4CPLUS_WARN(
00908 log_,
00909 "no cell to write stop "
00910 << shmBuffer_->nbRawCellsToWrite()
00911 << " nClients " << nbClients());
00912 if (checks > 10) {
00913 string msg = "No Raw Cell to Write STOP messages";
00914 XCEPT_RAISE(evf::Exception, msg);
00915 }
00916 ::usleep(500000);
00917 }
00918 } catch (evf::Exception& e) {
00919 rethrowShmBufferException(e);
00920 }
00921 nbClientsToShutDown_ = nbClients();
00922 if (nbClientsToShutDown_ == 0) {
00923 UInt_t n = nbResources();
00924 for (UInt_t i = 0; i < n; i++) {
00925
00926 evt::State_t state = evt::EMPTY;
00927 try {
00928 state = shmBuffer_->evtState(i);
00929 } catch (evf::Exception& e) {
00930 rethrowShmBufferException(e);
00931 }
00932 if (state != evt::EMPTY) {
00933 LOG4CPLUS_WARN(
00934 log_,
00935 "Schedule discard at STOP for orphaned event in state "
00936 << state);
00937 try {
00938 shmBuffer_->setEvtDiscard(i, 1, true);
00939 shmBuffer_->scheduleRawCellForDiscardServerSide(i);
00940 } catch (evf::Exception& e) {
00941 rethrowShmBufferException(e);
00942 }
00943 }
00944 }
00945 try {
00946 shmBuffer_->scheduleRawEmptyCellForDiscard();
00947 } catch (evf::Exception& e) {
00948 rethrowShmBufferException(e);
00949 }
00950 }
00951 UInt_t n = nbClientsToShutDown_;
00952 try {
00953 for (UInt_t i = 0; i < n; ++i)
00954 shmBuffer_->writeRawEmptyEvent();
00955 } catch (evf::Exception& e) {
00956 rethrowShmBufferException(e);
00957 }
00958 }
00959 }
00960
00961
00962 void FUResourceTable::clear() {
00963 for (UInt_t i = 0; i < resources_.size(); i++) {
00964 resources_[i]->release(true);
00965 delete resources_[i];
00966 }
00967 resources_.clear();
00968 while (!freeResourceIds_.empty())
00969 freeResourceIds_.pop();
00970 }
00971
00973
00975
00976
00977 void FUResourceTable::resetCounters() {
00978 if (0 != shmBuffer_) {
00979 try {
00980 for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
00981 acceptSMDataDiscard_[i] = false;
00982 for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
00983 acceptSMDqmDiscard_[i] = 0;
00984 } catch (evf::Exception& e) {
00985 rethrowShmBufferException(e);
00986 }
00987 }
00988
00989
00990 nbAllocated_ = 0;
00991 nbPending_ = 0;
00992 nbCompleted_ = 0;
00993 nbSent_ = 0;
00994 nbSentError_ = 0;
00995 nbSentDqm_ = 0;
00996 nbPendingSMDiscards_ = 0;
00997 nbPendingSMDqmDiscards_ = 0;
00998 nbDiscarded_ = 0;
00999 nbLost_ = 0;
01000
01001 nbEolPosted_ = 0;
01002 nbEolDiscarded_ = 0;
01003
01004 nbErrors_ = 0;
01005 nbCrcErrors_ = 0;
01006 nbAllocSent_ = 0;
01007
01008 sumOfSquares_ = 0;
01009 sumOfSizes_ = 0;
01010 }
01011
01012
01013 UInt_t FUResourceTable::nbClients() const {
01014 UInt_t result(0);
01015 try {
01016 if (0 != shmBuffer_)
01017 result = shmBuffer_->nClients();
01018 } catch (evf::Exception& e) {
01019 rethrowShmBufferException(e);
01020 }
01021 return result;
01022 }
01023
01024
01025 vector<pid_t> FUResourceTable::clientPrcIds() const {
01026 vector<pid_t> result;
01027 try {
01028 if (0 != shmBuffer_) {
01029 UInt_t n = nbClients();
01030 for (UInt_t i = 0; i < n; i++)
01031 result.push_back(shmBuffer_->clientPrcId(i));
01032 }
01033 } catch (evf::Exception& e) {
01034 rethrowShmBufferException(e);
01035 }
01036 return result;
01037 }
01038
01039
01040 string FUResourceTable::clientPrcIdsAsString() const {
01041 stringstream ss;
01042 try {
01043 if (0 != shmBuffer_) {
01044 UInt_t n = nbClients();
01045 for (UInt_t i = 0; i < n; i++) {
01046 if (i > 0)
01047 ss << ",";
01048 ss << shmBuffer_->clientPrcId(i);
01049 }
01050 }
01051 } catch (evf::Exception& e) {
01052 rethrowShmBufferException(e);
01053 }
01054 return ss.str();
01055 }
01056
01057
01058 vector<string> FUResourceTable::cellStates() const {
01059 vector<string> result;
01060 if (0 != shmBuffer_) {
01061 UInt_t n = nbResources();
01062 shmBuffer_->lock();
01063 try {
01064 for (UInt_t i = 0; i < n; i++) {
01065 evt::State_t state = shmBuffer_->evtState(i);
01066 if (state == evt::EMPTY)
01067 result.push_back("EMPTY");
01068 else if (state == evt::STOP)
01069 result.push_back("STOP");
01070 else if (state == evt::LUMISECTION)
01071 result.push_back("LUMISECTION");
01072
01073 else if (state == evt::USEDLS)
01074 result.push_back("USEDLS");
01075 else if (state == evt::RAWWRITING)
01076 result.push_back("RAWWRITING");
01077 else if (state == evt::RAWWRITTEN)
01078 result.push_back("RAWWRITTEN");
01079 else if (state == evt::RAWREADING)
01080 result.push_back("RAWREADING");
01081 else if (state == evt::RAWREAD)
01082 result.push_back("RAWREAD");
01083 else if (state == evt::PROCESSING)
01084 result.push_back("PROCESSING");
01085 else if (state == evt::PROCESSED)
01086 result.push_back("PROCESSED");
01087 else if (state == evt::RECOWRITING)
01088 result.push_back("RECOWRITING");
01089 else if (state == evt::RECOWRITTEN)
01090 result.push_back("RECOWRITTEN");
01091 else if (state == evt::SENDING)
01092 result.push_back("SENDING");
01093 else if (state == evt::SENT)
01094 result.push_back("SENT");
01095 else if (state == evt::DISCARDING)
01096 result.push_back("DISCARDING");
01097 }
01098 } catch (evf::Exception& e) {
01099 rethrowShmBufferException(e);
01100 }
01101 shmBuffer_->unlock();
01102 }
01103 return result;
01104 }
01105
01106 vector<string> FUResourceTable::dqmCellStates() const {
01107 vector<string> result;
01108 if (0 != shmBuffer_) {
01109 UInt_t n = nbDqmCells_;
01110 shmBuffer_->lock();
01111 try {
01112 for (UInt_t i = 0; i < n; i++) {
01113 dqm::State_t state = shmBuffer_->dqmState(i);
01114 if (state == dqm::EMPTY)
01115 result.push_back("EMPTY");
01116 else if (state == dqm::WRITING)
01117 result.push_back("WRITING");
01118 else if (state == dqm::WRITTEN)
01119 result.push_back("WRITTEN");
01120 else if (state == dqm::SENDING)
01121 result.push_back("SENDING");
01122 else if (state == dqm::SENT)
01123 result.push_back("SENT");
01124 else if (state == dqm::DISCARDING)
01125 result.push_back("DISCARDING");
01126 }
01127 } catch (evf::Exception& e) {
01128 rethrowShmBufferException(e);
01129 }
01130 shmBuffer_->unlock();
01131 }
01132 return result;
01133 }
01134
01135
01136 vector<UInt_t> FUResourceTable::cellEvtNumbers() const {
01137 vector<UInt_t> result;
01138 if (0 != shmBuffer_) {
01139 UInt_t n = nbResources();
01140 shmBuffer_->lock();
01141 try {
01142 for (UInt_t i = 0; i < n; i++)
01143 result.push_back(shmBuffer_->evtNumber(i));
01144 } catch (evf::Exception& e) {
01145 rethrowShmBufferException(e);
01146 }
01147 shmBuffer_->unlock();
01148 }
01149 return result;
01150 }
01151
01152
01153 vector<pid_t> FUResourceTable::cellPrcIds() const {
01154 vector<pid_t> result;
01155 if (0 != shmBuffer_) {
01156 UInt_t n = nbResources();
01157 shmBuffer_->lock();
01158 try {
01159 for (UInt_t i = 0; i < n; i++)
01160 result.push_back(shmBuffer_->evtPrcId(i));
01161 } catch (evf::Exception& e) {
01162 rethrowShmBufferException(e);
01163 }
01164 shmBuffer_->unlock();
01165 }
01166 return result;
01167 }
01168
01169
01170 vector<time_t> FUResourceTable::cellTimeStamps() const {
01171 vector<time_t> result;
01172 try {
01173 if (0 != shmBuffer_) {
01174 UInt_t n = nbResources();
01175 shmBuffer_->lock();
01176 for (UInt_t i = 0; i < n; i++)
01177 result.push_back(shmBuffer_->evtTimeStamp(i));
01178 shmBuffer_->unlock();
01179 }
01180 } catch (evf::Exception& e) {
01181 rethrowShmBufferException(e);
01182 }
01183 return result;
01184 }
01185
01187
01189
01190 void FUResourceTable::lastResort() {
01191 try {
01192 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01193 << " more rawcells to read " << std::endl;
01194 while (shmBuffer_->nbRawCellsToRead() != 0) {
01195 FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
01196 std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
01197 << std::endl;
01198
01199 shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
01200
01201 std::cout << "lastResort: schedule raw cell for discard "
01202 << newCell->index() << std::endl;
01203 }
01204
01205 shmBuffer_->scheduleRawEmptyCellForDiscard();
01206 } catch (evf::Exception& e) {
01207 rethrowShmBufferException(e);
01208 }
01209 }
01210
01211 void FUResourceTable::resetIPC() {
01212 if (shmBuffer_ != 0) {
01213 shmBuffer_->reset();
01214 LOG4CPLUS_INFO(log_, "ShmBuffer was reset!");
01215 }
01216 }
01217
01218 void FUResourceTable::rethrowShmBufferException(evf::Exception& e) const
01219 throw (evf::Exception) {
01220 stringstream details;
01221 vector<string> dataStates = cellStates();
01222 vector<string> dqmStates = dqmCellStates();
01223 details << "Exception raised: " << e.what() << " (in module: "
01224 << e.module() << " in function: " << e.function() << " at line: "
01225 << e.line() << ")";
01226 details << " Dumping cell state... ";
01227 details << "data cells --> ";
01228 for (unsigned int i = 0; i < dataStates.size(); i++)
01229 details << dataStates[i] << " ";
01230 details << "dqm cells --> ";
01231 for (unsigned int i = 0; i < dqmStates.size(); i++)
01232 details << dqmStates[i] << " ";
01233 XCEPT_RETHROW(evf::Exception, details.str(), e);
01234 }