00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include <unistd.h>
00016 #include <iostream>
00017 #include <string>
00018 #include <cassert>
00019
00020 #include <sstream>
00021 #include <fstream>
00022
00023 #include <cstdlib>
00024 #include <cstring>
00025 #include <stdint.h>
00026
00027
00028
00029
00030
00031 #define SHM_DESCRIPTOR_KEYID 1
00032 #define SHM_KEYID 2
00033 #define SEM_KEYID 1
00034
00035 #define NSKIP_MAX 100
00036
00037 using namespace std;
00038 using namespace evf;
00039
00040
00041 const char* FUShmBuffer::shmKeyPath_ =
00042 (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null"
00043 : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045 (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null"
00046 : getenv("FUSEM_KEYFILE"));
00047
00049
00051
00052
00053 FUShmBuffer::FUShmBuffer(bool segmentationMode, unsigned int nRawCells,
00054 unsigned int nRecoCells, unsigned int nDqmCells,
00055 unsigned int rawCellSize, unsigned int recoCellSize,
00056 unsigned int dqmCellSize) :
00057 segmentationMode_(segmentationMode), nClientsMax_(128),
00058 nRawCells_(nRawCells), rawCellPayloadSize_(rawCellSize),
00059 nRecoCells_(nRecoCells), recoCellPayloadSize_(recoCellSize),
00060 nDqmCells_(nDqmCells), dqmCellPayloadSize_(dqmCellSize) {
00061 rawCellTotalSize_ = FUShmRawCell::size(rawCellPayloadSize_);
00062 recoCellTotalSize_ = FUShmRecoCell::size(recoCellPayloadSize_);
00063 dqmCellTotalSize_ = FUShmDqmCell::size(dqmCellPayloadSize_);
00064
00065 void* addr;
00066
00067 rawWriteOffset_ = sizeof(FUShmBuffer);
00068 addr = (void*) ((unsigned long) this + rawWriteOffset_);
00069 new (addr) unsigned int[nRawCells_];
00070
00071 rawReadOffset_ = rawWriteOffset_ + nRawCells_ * sizeof(unsigned int);
00072 addr = (void*) ((unsigned long) this + rawReadOffset_);
00073 new (addr) unsigned int[nRawCells_];
00074
00075 recoWriteOffset_ = rawReadOffset_ + nRawCells_ * sizeof(unsigned int);
00076 addr = (void*) ((unsigned long) this + recoWriteOffset_);
00077 new (addr) unsigned int[nRecoCells_];
00078
00079 recoReadOffset_ = recoWriteOffset_ + nRecoCells_ * sizeof(unsigned int);
00080 addr = (void*) ((unsigned long) this + recoReadOffset_);
00081 new (addr) unsigned int[nRecoCells_];
00082
00083 dqmWriteOffset_ = recoReadOffset_ + nRecoCells_ * sizeof(unsigned int);
00084 addr = (void*) ((unsigned long) this + dqmWriteOffset_);
00085 new (addr) unsigned int[nDqmCells_];
00086
00087 dqmReadOffset_ = dqmWriteOffset_ + nDqmCells_ * sizeof(unsigned int);
00088 addr = (void*) ((unsigned long) this + dqmReadOffset_);
00089 new (addr) unsigned int[nDqmCells_];
00090
00091 evtStateOffset_ = dqmReadOffset_ + nDqmCells_ * sizeof(unsigned int);
00092 addr = (void*) ((unsigned long) this + evtStateOffset_);
00093 new (addr) evt::State_t[nRawCells_];
00094
00095 evtDiscardOffset_ = evtStateOffset_ + nRawCells_ * sizeof(evt::State_t);
00096 addr = (void*) ((unsigned long) this + evtDiscardOffset_);
00097 new (addr) unsigned int[nRawCells_];
00098
00099 evtNumberOffset_ = evtDiscardOffset_ + nRawCells_ * sizeof(unsigned int);
00100 addr = (void*) ((unsigned long) this + evtNumberOffset_);
00101 new (addr) unsigned int[nRawCells_];
00102
00103 evtPrcIdOffset_ = evtNumberOffset_ + nRawCells_ * sizeof(unsigned int);
00104 addr = (void*) ((unsigned long) this + evtPrcIdOffset_);
00105 new (addr) pid_t[nRawCells_];
00106
00107 evtTimeStampOffset_ = evtPrcIdOffset_ + nRawCells_ * sizeof(pid_t);
00108 addr = (void*) ((unsigned long) this + evtTimeStampOffset_);
00109 new (addr) time_t[nRawCells_];
00110
00111 dqmStateOffset_ = evtTimeStampOffset_ + nRawCells_ * sizeof(time_t);
00112 addr = (void*) ((unsigned long) this + dqmStateOffset_);
00113 new (addr) dqm::State_t[nDqmCells_];
00114
00115 clientPrcIdOffset_ = dqmStateOffset_ + nDqmCells_ * sizeof(dqm::State_t);
00116 addr = (void*) ((unsigned long) this + clientPrcIdOffset_);
00117 new (addr) pid_t[nClientsMax_];
00118
00119 rawCellOffset_ = clientPrcIdOffset_ + nClientsMax_ * sizeof(pid_t);
00120
00121 if (segmentationMode_) {
00122 recoCellOffset_ = rawCellOffset_ + nRawCells_ * sizeof(key_t);
00123 dqmCellOffset_ = recoCellOffset_ + nRecoCells_ * sizeof(key_t);
00124 addr = (void*) ((unsigned long) this + rawCellOffset_);
00125 new (addr) key_t[nRawCells_];
00126 addr = (void*) ((unsigned long) this + recoCellOffset_);
00127 new (addr) key_t[nRecoCells_];
00128 addr = (void*) ((unsigned long) this + dqmCellOffset_);
00129 new (addr) key_t[nDqmCells_];
00130 } else {
00131 recoCellOffset_ = rawCellOffset_ + nRawCells_ * rawCellTotalSize_;
00132 dqmCellOffset_ = recoCellOffset_ + nRecoCells_ * recoCellTotalSize_;
00133 for (unsigned int i = 0; i < nRawCells_; i++) {
00134 addr = (void*) ((unsigned long) this + rawCellOffset_ + i
00135 * rawCellTotalSize_);
00136 new (addr) FUShmRawCell(rawCellSize);
00137 }
00138 for (unsigned int i = 0; i < nRecoCells_; i++) {
00139 addr = (void*) ((unsigned long) this + recoCellOffset_ + i
00140 * recoCellTotalSize_);
00141 new (addr) FUShmRecoCell(recoCellSize);
00142 }
00143 for (unsigned int i = 0; i < nDqmCells_; i++) {
00144 addr = (void*) ((unsigned long) this + dqmCellOffset_ + i
00145 * dqmCellTotalSize_);
00146 new (addr) FUShmDqmCell(dqmCellSize);
00147 }
00148 }
00149 }
00150
00151
00152 FUShmBuffer::~FUShmBuffer() {
00153
00154 }
00155
00157
00159
00160
00161
00162 void FUShmBuffer::initialize(unsigned int shmid, unsigned int semid) {
00163 shmid_ = shmid;
00164 semid_ = semid;
00165
00166 if (segmentationMode_) {
00167 int shmKeyId = 666;
00168 key_t* keyAddr = (key_t*) ((unsigned long) this + rawCellOffset_);
00169 for (unsigned int i = 0; i < nRawCells_; i++) {
00170 *keyAddr = ftok(shmKeyPath_, shmKeyId++);
00171 int shmid = shm_create(*keyAddr, rawCellTotalSize_);
00172 void* shmAddr = shm_attach(shmid);
00173 new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00174 shmdt(shmAddr);
00175 ++keyAddr;
00176 }
00177 keyAddr = (key_t*) ((unsigned long) this + recoCellOffset_);
00178 for (unsigned int i = 0; i < nRecoCells_; i++) {
00179 *keyAddr = ftok(shmKeyPath_, shmKeyId++);
00180 int shmid = shm_create(*keyAddr, recoCellTotalSize_);
00181 void* shmAddr = shm_attach(shmid);
00182 new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00183 shmdt(shmAddr);
00184 ++keyAddr;
00185 }
00186 keyAddr = (key_t*) ((unsigned long) this + dqmCellOffset_);
00187 for (unsigned int i = 0; i < nDqmCells_; i++) {
00188 *keyAddr = ftok(shmKeyPath_, shmKeyId++);
00189 int shmid = shm_create(*keyAddr, dqmCellTotalSize_);
00190 void* shmAddr = shm_attach(shmid);
00191 new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00192 shmdt(shmAddr);
00193 ++keyAddr;
00194 }
00195 }
00196
00197 reset(true);
00198 }
00199
00200
00201 void FUShmBuffer::reset(bool shm_detach) {
00202 nClients_ = 0;
00203
00204
00205 for (unsigned int i = 0; i < nRawCells_; i++) {
00206 FUShmRawCell* cell = rawCell(i);
00207 cell->initialize(i);
00208 if (segmentationMode_ && shm_detach)
00209 shmdt(cell);
00210 }
00211
00212 for (unsigned int i = 0; i < nRecoCells_; i++) {
00213 FUShmRecoCell* cell = recoCell(i);
00214 cell->initialize(i);
00215 if (segmentationMode_ && shm_detach)
00216 shmdt(cell);
00217 }
00218
00219 for (unsigned int i = 0; i < nDqmCells_; i++) {
00220 FUShmDqmCell* cell = dqmCell(i);
00221 cell->initialize(i);
00222 if (segmentationMode_ && shm_detach)
00223 shmdt(cell);
00224 }
00225
00226
00227
00228 sem_init(0, 1);
00229 sem_init(1, nRawCells_);
00230 sem_init(2, 0);
00231 sem_init(3, 1);
00232 sem_init(4, 0);
00233 sem_init(5, nRecoCells_);
00234 sem_init(6, 0);
00235 sem_init(7, nDqmCells_);
00236 sem_init(8, 0);
00237
00238 sem_print();
00239
00240 unsigned int *iWrite, *iRead;
00241
00242 rawWriteNext_ = 0;
00243 rawWriteLast_ = 0;
00244 rawReadNext_ = 0;
00245 rawReadLast_ = 0;
00246 iWrite = (unsigned int*) ((unsigned long) this + rawWriteOffset_);
00247 iRead = (unsigned int*) ((unsigned long) this + rawReadOffset_);
00248 for (unsigned int i = 0; i < nRawCells_; i++) {
00249 *iWrite++ = i;
00250 *iRead++ = 0xffffffff;
00251 }
00252
00253 recoWriteNext_ = 0;
00254 recoWriteLast_ = 0;
00255 recoReadNext_ = 0;
00256 recoReadLast_ = 0;
00257 iWrite = (unsigned int*) ((unsigned long) this + recoWriteOffset_);
00258 iRead = (unsigned int*) ((unsigned long) this + recoReadOffset_);
00259 for (unsigned int i = 0; i < nRecoCells_; i++) {
00260 *iWrite++ = i;
00261 *iRead++ = 0xffffffff;
00262 }
00263
00264 dqmWriteNext_ = 0;
00265 dqmWriteLast_ = 0;
00266 dqmReadNext_ = 0;
00267 dqmReadLast_ = 0;
00268 iWrite = (unsigned int*) ((unsigned long) this + dqmWriteOffset_);
00269 iRead = (unsigned int*) ((unsigned long) this + dqmReadOffset_);
00270 for (unsigned int i = 0; i < nDqmCells_; i++) {
00271 *iWrite++ = i;
00272 *iRead++ = 0xffffffff;
00273 }
00274
00275 for (unsigned int i = 0; i < nRawCells_; i++) {
00276 setEvtState(i, evt::EMPTY);
00277 setEvtDiscard(i, 0);
00278 setEvtNumber(i, 0xffffffff);
00279 setEvtPrcId(i, 0);
00280 setEvtTimeStamp(i, 0);
00281 }
00282
00283 for (unsigned int i = 0; i < nDqmCells_; i++)
00284 setDqmState(i, dqm::EMPTY);
00285 }
00286
00287
00288 unsigned int FUShmBuffer::nbRawCellsToWrite() const {
00289 return semctl(semid(), 1, GETVAL);
00290 }
00291
00292
00293 int FUShmBuffer::nbRawCellsToRead() const {
00294 return semctl(semid(), 2, GETVAL);
00295 }
00296
00297
00298 FUShmRawCell* FUShmBuffer::rawCellToWrite() {
00299 if (waitRawWrite() != 0)
00300 return 0;
00301 unsigned int iCell = nextRawWriteIndex();
00302 FUShmRawCell* cell = rawCell(iCell);
00303 evt::State_t state = evtState(iCell);
00304 if(!(state == evt::EMPTY)) {
00305 stringstream details;
00306 details << "state==evt::EMPTY assertion failed! Actual state is " << state << ", iCell = " << iCell;
00307 XCEPT_ASSERT(false, evf::Exception, details.str());
00308 }
00309 lock();
00310 setEvtState(iCell, evt::RAWWRITING,false);
00311 setEvtDiscard(iCell, 1,false,false);
00312 unlock();
00313 return cell;
00314 }
00315
00316
00317 FUShmRawCell* FUShmBuffer::rawCellToRead() {
00318 waitRawRead();
00319 unsigned int iCell = nextRawReadIndex();
00320 FUShmRawCell* cell = rawCell(iCell);
00321 evt::State_t state = evtState(iCell);
00322 if(!(state == evt::RAWWRITTEN || state == evt::EMPTY ||
00323 state == evt::STOP || state == evt::LUMISECTION))
00324 {
00325 stringstream details;
00326 details
00327 << "state==evt::RAWWRITTEN ||state==evt::EMPTY ||state==evt::STOP"
00328 << "||state==evt::LUMISECTION assertion failed! Actual state is "
00329 << state << ", iCell = " << iCell;
00330 XCEPT_ASSERT(false,evf::Exception, details.str());
00331 }
00332 if (state == evt::RAWWRITTEN) {
00333 setEvtPrcId(iCell, getpid());
00334 setEvtState(iCell, evt::RAWREADING);
00335 }
00336 return cell;
00337 }
00338
00339
00340 FUShmRecoCell* FUShmBuffer::recoCellToRead() {
00341 waitRecoRead();
00342 unsigned int iCell = nextRecoReadIndex();
00343 FUShmRecoCell* cell = recoCell(iCell);
00344 unsigned int iRawCell = cell->rawCellIndex();
00345 if (iRawCell < nRawCells_) {
00346
00347
00348 setEvtState(iRawCell, evt::SENDING);
00349 }
00350 return cell;
00351 }
00352
00353
00354 FUShmDqmCell* FUShmBuffer::dqmCellToRead() {
00355 waitDqmRead();
00356 unsigned int iCell = nextDqmReadIndex();
00357 FUShmDqmCell* cell = dqmCell(iCell);
00358 dqm::State_t state = dqmState(iCell);
00359
00360 if(!(state == dqm::WRITTEN || state == dqm::EMPTY)) {
00361 stringstream details;
00362 details << "state==dqm::WRITTEN || state==dqm::EMPTY assertion failed! Actual state is "
00363 << state << ", iCell = " << iCell;
00364 XCEPT_ASSERT(false, evf::Exception, details.str());
00365 }
00366
00367 if (state == dqm::WRITTEN)
00368 setDqmState(iCell, dqm::SENDING);
00369 return cell;
00370 }
00371
00372
00373 FUShmRawCell* FUShmBuffer::rawCellToDiscard() {
00374 waitRawDiscarded();
00375 FUShmRawCell* cell = rawCell(rawDiscardIndex_);
00376 evt::State_t state = evtState(cell->index());
00377 if(!(state == evt::PROCESSED || state == evt::SENT ||
00378 state == evt::EMPTY || state == evt::STOP ||
00379 state == evt::USEDLS)) {
00380 stringstream details;
00381 details << "state==evt::PROCESSED || state==evt::SENT || "
00382 << "state==evt::EMPTY || state==evt::STOP || "
00383 << "state==evt::USEDLS assertion failed! Actual state is "
00384 << state << ", index = " << cell->index();
00385 XCEPT_ASSERT(false,evf::Exception,details.str());
00386 }
00387 if (state != evt::EMPTY && state != evt::USEDLS && state != evt::STOP)
00388 setEvtState(cell->index(), evt::DISCARDING);
00389 return cell;
00390 }
00391
00392
00393 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell) {
00394 evt::State_t state = evtState(cell->index());
00395 if(!(state == evt::RAWWRITING)) {
00396 stringstream details;
00397 details << "state==evt::RAWWRITING assertion failed! Actual state is "
00398 << state << ", index = " << cell->index();
00399 XCEPT_ASSERT(false, evf::Exception, details.str());
00400 }
00401 setEvtState(cell->index(), evt::RAWWRITTEN);
00402 setEvtNumber(cell->index(), cell->evtNumber());
00403 postRawIndexToRead(cell->index());
00404 if (segmentationMode_)
00405 shmdt(cell);
00406 postRawRead();
00407 }
00408
00409
00410 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell) {
00411 evt::State_t state = evtState(cell->index());
00412 if(!(state == evt::RAWREADING)) {
00413 stringstream details;
00414 details << "state==evt::RAWREADING assertion failed! Actual state is "
00415 << state << ", index = " << cell->index();
00416 XCEPT_ASSERT(false, evf::Exception, details.str());
00417 }
00418 setEvtState(cell->index(), evt::RAWREAD);
00419 setEvtState(cell->index(), evt::PROCESSING);
00420 setEvtTimeStamp(cell->index(), time(0));
00421 if (segmentationMode_)
00422 shmdt(cell);
00423 }
00424
00425
00426 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell) {
00427 unsigned int iRawCell = cell->rawCellIndex();
00428 if (iRawCell < nRawCells_) {
00429
00430
00431 setEvtState(cell->rawCellIndex(), evt::SENT);
00432 }
00433 if (segmentationMode_)
00434 shmdt(cell);
00435 }
00436
00437
00438 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell) {
00439 dqm::State_t state = dqmState(cell->index());
00440 if(!(state == dqm::SENDING || state == dqm::EMPTY)) {
00441 stringstream details;
00442 details << "state==dqm::SENDING||state==dqm::EMPTY assertion failed! Actual state is "
00443 << state << ", index = " << cell->index();
00444 XCEPT_ASSERT(false, evf::Exception, details.str());
00445
00446 }
00447 if (state == dqm::SENDING)
00448 setDqmState(cell->index(), dqm::SENT);
00449 if (segmentationMode_)
00450 shmdt(cell);
00451 }
00452
00453
00454 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell) {
00455 waitRawDiscard();
00456 if (rawCellReadyForDiscard(iCell)) {
00457 rawDiscardIndex_ = iCell;
00458 evt::State_t state = evtState(iCell);
00459 if(!(state == evt::PROCESSING || state == evt::SENT ||
00460 state == evt::EMPTY || state == evt::STOP ||
00461 state == evt::LUMISECTION || state == evt::RECOWRITTEN)) {
00462 stringstream details;
00463 details << "state==evt::PROCESSING||state==evt::SENT||state==evt::EMPTY||"
00464 <<"state==evt::STOP||state==evt::LUMISECTION||state==evt::RECOWRITTEN assertion failed! Actual state is "
00465 << state << ", iCell = " << iCell;
00466 XCEPT_ASSERT( false,evf::Exception,details.str());
00467 }
00468 if (state == evt::PROCESSING)
00469 setEvtState(iCell, evt::PROCESSED);
00470 if (state == evt::LUMISECTION)
00471 setEvtState(iCell, evt::USEDLS);
00472 postRawDiscarded();
00473 } else
00474 postRawDiscard();
00475 }
00476
00477
00478 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell) {
00479 waitRawDiscard();
00480 if (rawCellReadyForDiscard(iCell)) {
00481 rawDiscardIndex_ = iCell;
00482 evt::State_t state = evtState(iCell);
00483
00484 if (state != evt::LUMISECTION && state != evt::EMPTY
00485 && state != evt::USEDLS && state != evt::STOP)
00486 setEvtState(iCell, evt::PROCESSED);
00487 if (state == evt::LUMISECTION)
00488 setEvtState(iCell, evt::USEDLS);
00489 postRawDiscarded();
00490 } else
00491 postRawDiscard();
00492 }
00493
00494
00495 void FUShmBuffer::discardRawCell(FUShmRawCell* cell) {
00496 releaseRawCell(cell);
00497 postRawDiscard();
00498 }
00499
00500
00501 void FUShmBuffer::discardRecoCell(unsigned int iCell) {
00502 FUShmRecoCell* cell = recoCell(iCell);
00503 unsigned int iRawCell = cell->rawCellIndex();
00504 if (iRawCell < nRawCells_) {
00505
00506
00507 scheduleRawCellForDiscard(iRawCell);
00508 }
00509 cell->clear();
00510 if (segmentationMode_)
00511 shmdt(cell);
00512 postRecoIndexToWrite(iCell);
00513 postRecoWrite();
00514 }
00515
00516
00517 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell) {
00518 FUShmRecoCell* cell = recoCell(iCell);
00519 cell->clear();
00520 if (segmentationMode_)
00521 shmdt(cell);
00522 postRecoIndexToWrite(iCell);
00523 postRecoWrite();
00524 }
00525
00526
00527 void FUShmBuffer::discardDqmCell(unsigned int iCell) {
00528 dqm::State_t state = dqmState(iCell);
00529 if(!(state == dqm::EMPTY || state == dqm::SENT)) {
00530 stringstream details;
00531 details << "state==dqm::EMPTY||state==dqm::SENT assertion failed! Actual state is "
00532 << state << ", iCell = " << iCell;
00533 XCEPT_ASSERT(false, evf::Exception,details.str());
00534 }
00535 setDqmState(iCell, dqm::DISCARDING);
00536 FUShmDqmCell* cell = dqmCell(iCell);
00537 cell->clear();
00538 if (segmentationMode_)
00539 shmdt(cell);
00540 setDqmState(iCell, dqm::EMPTY);
00541 postDqmIndexToWrite(iCell);
00542 postDqmWrite();
00543 }
00544
00545
00546 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell) {
00547 evt::State_t state = evtState(cell->index());
00548 if(!( state == evt::DISCARDING || state == evt::RAWWRITING ||
00549 state== evt::EMPTY || state == evt::STOP ||
00550
00551 state == evt::USEDLS)) {
00552 std::cout << "=================releaseRawCell state " << state << std::endl;
00553 stringstream details;
00554 details << "state==evt::DISCARDING||state==evt::RAWWRITING||"
00555 << "state==evt::EMPTY||state==evt::STOP||state==evt::USEDLS"
00556 << " assertion failed! Actual state is " << state << ", index = " << cell->index();
00557 XCEPT_ASSERT( false, evf::Exception, details.str());
00558 }
00559 setEvtState(cell->index(), evt::EMPTY);
00560 setEvtDiscard(cell->index(), 0);
00561 setEvtNumber(cell->index(), 0xffffffff);
00562 setEvtPrcId(cell->index(), 0);
00563 setEvtTimeStamp(cell->index(), 0);
00564 cell->clear();
00565 postRawIndexToWrite(cell->index());
00566 if (segmentationMode_)
00567 shmdt(cell);
00568 postRawWrite();
00569 }
00570
00571
00572 void FUShmBuffer::writeRawEmptyEvent() {
00573 FUShmRawCell* cell = rawCellToWrite();
00574 if (cell == 0)
00575 return;
00576 evt::State_t state = evtState(cell->index());
00577 if(!(state == evt::RAWWRITING)) {
00578 stringstream details;
00579 details << "state==evt::RAWWRITING assertion failed! Actual state is "
00580 << state << ", index = " << cell->index();
00581 XCEPT_ASSERT(false, evf::Exception, details.str());
00582 }
00583 setEvtState(cell->index(), evt::STOP);
00584 cell->setEventTypeStopper();
00585 postRawIndexToRead(cell->index());
00586 if (segmentationMode_)
00587 shmdt(cell);
00588 postRawRead();
00589 }
00590
00591
00592 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls) {
00593 FUShmRawCell* cell = rawCellToWrite();
00594 if (cell == 0)
00595 return;
00596 cell->setLumiSection(ls);
00597 evt::State_t state = evtState(cell->index());
00598 if (!(state == evt::RAWWRITING)) {
00599 stringstream details;
00600 details << "state==evt::RAWWRITING assertion failed! Actual state is "
00601 << state << ", index = " << cell->index();
00602 XCEPT_ASSERT(false, evf::Exception, details.str());
00603 }
00604 setEvtNumber(cell->index(),0xfffffffe);
00605 setEvtState(cell->index(), evt::LUMISECTION);
00606 cell->setEventTypeEol();
00607 postRawIndexToRead(cell->index());
00608 if (segmentationMode_)
00609 shmdt(cell);
00610 postRawRead();
00611 }
00612
00613
00614 void FUShmBuffer::writeRecoEmptyEvent() {
00615 waitRecoWrite();
00616 unsigned int iCell = nextRecoWriteIndex();
00617 FUShmRecoCell* cell = recoCell(iCell);
00618 cell->clear();
00619 postRecoIndexToRead(iCell);
00620 if (segmentationMode_)
00621 shmdt(cell);
00622 postRecoRead();
00623 }
00624
00625
00626 void FUShmBuffer::writeDqmEmptyEvent() {
00627 waitDqmWrite();
00628 unsigned int iCell = nextDqmWriteIndex();
00629 FUShmDqmCell* cell = dqmCell(iCell);
00630 cell->clear();
00631 postDqmIndexToRead(iCell);
00632 if (segmentationMode_)
00633 shmdt(cell);
00634 postDqmRead();
00635 }
00636
00637
00638 void FUShmBuffer::scheduleRawEmptyCellForDiscard() {
00639 FUShmRawCell* cell = rawCellToWrite();
00640 if (cell == 0)
00641 return;
00642 rawDiscardIndex_ = cell->index();
00643 setEvtState(cell->index(), evt::STOP);
00644 cell->setEventTypeStopper();
00645 setEvtNumber(cell->index(), 0xffffffff);
00646 setEvtPrcId(cell->index(), 0);
00647 setEvtTimeStamp(cell->index(), 0);
00648 postRawDiscarded();
00649 }
00650
00651
00652 bool FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell, bool & pidstatus) {
00653 waitRawDiscard();
00654 if (rawCellReadyForDiscard(cell->index())) {
00655 rawDiscardIndex_ = cell->index();
00656
00657
00658
00659
00660
00661
00662
00663 pidstatus = removeClientPrcId(getpid());
00664 if (segmentationMode_)
00665 shmdt(cell);
00666 postRawDiscarded();
00667 return true;
00668 } else {
00669 postRawDiscard();
00670 return false;
00671 }
00672 }
00673
00674
00675 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell) {
00676
00677 if (rawCellReadyForDiscard(cell->index())) {
00678 rawDiscardIndex_ = cell->index();
00679
00680
00681
00682
00683
00684
00685 if (segmentationMode_)
00686 shmdt(cell);
00687 postRawDiscarded();
00688 } else
00689 postRawDiscard();
00690 }
00691
00692
00693 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
00694 unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
00695 unsigned int dataSize, unsigned int nExpectedEPs) {
00696 if (dataSize > recoCellPayloadSize_) {
00697 cout << "FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."
00698 << endl;
00699 return false;
00700 }
00701
00702 waitRecoWrite();
00703 unsigned int iCell = nextRecoWriteIndex();
00704 FUShmRecoCell* cell = recoCell(iCell);
00705 cell->writeInitMsg(outModId, fuProcessId, fuGuid, data, dataSize,nExpectedEPs);
00706 postRecoIndexToRead(iCell);
00707 if (segmentationMode_)
00708 shmdt(cell);
00709 postRecoRead();
00710 return true;
00711 }
00712
00713
00714 bool FUShmBuffer::writeRecoEventData(unsigned int runNumber,
00715 unsigned int evtNumber, unsigned int outModId,
00716 unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
00717 unsigned int dataSize) {
00718 if (dataSize > recoCellPayloadSize_) {
00719 cout << "FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."
00720 << endl;
00721 return false;
00722 }
00723
00724 waitRecoWrite();
00725 unsigned int rawCellIndex = indexForEvtNumber(evtNumber);
00726 unsigned int iCell = nextRecoWriteIndex();
00727 FUShmRecoCell* cell = recoCell(iCell);
00728
00729
00730
00731 lock();
00732 setEvtState(rawCellIndex, evt::RECOWRITING,false);
00733 incEvtDiscard(rawCellIndex,false);
00734 unlock();
00735 cell->writeEventData(rawCellIndex, runNumber, evtNumber, outModId,
00736 fuProcessId, fuGuid, data, dataSize);
00737 setEvtState(rawCellIndex, evt::RECOWRITTEN);
00738 postRecoIndexToRead(iCell);
00739 if (segmentationMode_)
00740 shmdt(cell);
00741 postRecoRead();
00742 return true;
00743 }
00744
00745
00746 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00747 unsigned int fuProcessId, unsigned int iRawCell,bool checkValue) {
00748 FUShmRawCell *raw = rawCell(iRawCell);
00749
00750 unsigned int dataSize = sizeof(uint32_t) * (4 + 1024) + raw->eventSize();
00751 unsigned char *data = new unsigned char[dataSize];
00752 uint32_t *pos = (uint32_t*) data;
00753
00754
00755
00756
00757
00758
00759
00760 *pos++ = (uint32_t) 2;
00761 *pos++ = (uint32_t) runNumber;
00762 *pos++ = (uint32_t) evf::evtn::getlbn(
00763 raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00764 *pos++ = (uint32_t) raw->evtNumber();
00765 for (unsigned int i = 0; i < 1024; i++)
00766 *pos++ = (uint32_t) raw->fedSize(i);
00767 memcpy(pos, raw->payloadAddr(), raw->eventSize());
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799 waitRecoWrite();
00800 unsigned int iRecoCell = nextRecoWriteIndex();
00801 FUShmRecoCell* reco = recoCell(iRecoCell);
00802 lock();
00803 setEvtState(iRawCell, evt::RECOWRITING,false);
00804 setEvtDiscard(iRawCell, 1, true,false);
00805 unlock();
00806 reco->writeErrorEvent(iRawCell, runNumber, raw->evtNumber(), fuProcessId,
00807 data, dataSize);
00808 delete[] data;
00809 setEvtState(iRawCell, evt::RECOWRITTEN);
00810 postRecoIndexToRead(iRecoCell);
00811 if (segmentationMode_) {
00812 shmdt(raw);
00813 shmdt(reco);
00814 }
00815 postRecoRead();
00816 return true;
00817 }
00818
00819
00820 bool FUShmBuffer::writeDqmEventData(unsigned int runNumber,
00821 unsigned int evtAtUpdate, unsigned int folderId,
00822 unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
00823 unsigned int dataSize) {
00824 if (dataSize > dqmCellPayloadSize_) {
00825 cout << "FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."
00826 << endl;
00827 return false;
00828 }
00829
00830 waitDqmWrite();
00831 unsigned int iCell = nextDqmWriteIndex();
00832 FUShmDqmCell* cell = dqmCell(iCell);
00833 dqm::State_t state = dqmState(iCell);
00834 if (!(state == dqm::EMPTY)) {
00835 stringstream details;
00836 details << "state==dqm::EMPTY assertion failed! Actual state is " << state
00837 << ", iCell = " << iCell;
00838 XCEPT_ASSERT(false, evf::Exception, details.str());
00839 }
00840 setDqmState(iCell, dqm::WRITING);
00841 cell->writeData(runNumber, evtAtUpdate, folderId, fuProcessId, fuGuid,
00842 data, dataSize);
00843 setDqmState(iCell, dqm::WRITTEN);
00844 postDqmIndexToRead(iCell);
00845 if (segmentationMode_)
00846 shmdt(cell);
00847 postDqmRead();
00848 return true;
00849 }
00850
00851
00852 void FUShmBuffer::sem_print() {
00853 cout << "--> current sem values:" << endl << " lock=" << semctl(semid(), 0,
00854 GETVAL) << endl << " wraw=" << semctl(semid(), 1, GETVAL)
00855 << " rraw=" << semctl(semid(), 2, GETVAL) << endl << " wdsc="
00856 << semctl(semid(), 3, GETVAL) << " rdsc=" << semctl(semid(), 4,
00857 GETVAL) << endl << " wrec=" << semctl(semid(), 5, GETVAL)
00858 << " rrec=" << semctl(semid(), 6, GETVAL) << endl << " wdqm="
00859 << semctl(semid(), 7, GETVAL) << " rdqm=" << semctl(semid(), 8,
00860 GETVAL) << endl;
00861 }
00862
00863 std::string FUShmBuffer::sem_print_s() {
00864 ostringstream ostr;
00865 ostr << "--> current sem values:" << endl << " lock=" << semctl(semid(), 0,
00866 GETVAL) << endl << " wraw=" << semctl(semid(), 1, GETVAL)
00867 << " rraw=" << semctl(semid(), 2, GETVAL) << endl << " wdsc="
00868 << semctl(semid(), 3, GETVAL) << " rdsc=" << semctl(semid(), 4,
00869 GETVAL) << endl << " wrec=" << semctl(semid(), 5, GETVAL)
00870 << " rrec=" << semctl(semid(), 6, GETVAL) << endl << " wdqm="
00871 << semctl(semid(), 7, GETVAL) << " rdqm=" << semctl(semid(), 8,
00872 GETVAL) << endl;
00873 return ostr.str();
00874
00875 }
00876
00877
00878 void FUShmBuffer::printEvtState(unsigned int index) {
00879 evt::State_t state = evtState(index);
00880 std::string stateName;
00881 if (state == evt::EMPTY)
00882 stateName = "EMPTY";
00883 else if (state == evt::STOP)
00884 stateName = "STOP";
00885 else if (state == evt::RAWWRITING)
00886 stateName = "RAWWRITING";
00887 else if (state == evt::RAWWRITTEN)
00888 stateName = "RAWRITTEN";
00889 else if (state == evt::RAWREADING)
00890 stateName = "RAWREADING";
00891 else if (state == evt::RAWREAD)
00892 stateName = "RAWREAD";
00893 else if (state == evt::PROCESSING)
00894 stateName = "PROCESSING";
00895 else if (state == evt::PROCESSED)
00896 stateName = "PROCESSED";
00897 else if (state == evt::RECOWRITING)
00898 stateName = "RECOWRITING";
00899 else if (state == evt::RECOWRITTEN)
00900 stateName = "RECOWRITTEN";
00901 else if (state == evt::SENDING)
00902 stateName = "SENDING";
00903 else if (state == evt::SENT)
00904 stateName = "SENT";
00905 else if (state == evt::DISCARDING)
00906 stateName = "DISCARDING";
00907 cout << "evt " << index << " in state '" << stateName << "'." << endl;
00908 }
00909
00910
00911 void FUShmBuffer::printDqmState(unsigned int index) {
00912 dqm::State_t state = dqmState(index);
00913 cout << "dqm evt " << index << " in state '" << state << "'." << endl;
00914 }
00915
00916
00917 FUShmBuffer* FUShmBuffer::createShmBuffer(bool segmentationMode,
00918 unsigned int nRawCells, unsigned int nRecoCells,
00919 unsigned int nDqmCells, unsigned int rawCellSize,
00920 unsigned int recoCellSize, unsigned int dqmCellSize) {
00921
00922 if (FUShmBuffer::releaseSharedMemory())
00923 cout << "FUShmBuffer::createShmBuffer: "
00924 << "REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL." << endl;
00925
00926
00927 int size = sizeof(unsigned int) * 7;
00928 int shmid = shm_create(FUShmBuffer::getShmDescriptorKey(), size);
00929 if (shmid < 0)
00930 return 0;
00931 void*shmAddr = shm_attach(shmid);
00932 if (0 == shmAddr)
00933 return 0;
00934
00935 if (1 != shm_nattch(shmid)) {
00936 cout << "FUShmBuffer::createShmBuffer() FAILED: nattch=" << shm_nattch(
00937 shmid) << endl;
00938 shmdt(shmAddr);
00939 return 0;
00940 }
00941
00942 unsigned int* p = (unsigned int*) shmAddr;
00943 *p++ = segmentationMode;
00944 *p++ = nRawCells;
00945 *p++ = nRecoCells;
00946 *p++ = nDqmCells;
00947 *p++ = rawCellSize;
00948 *p++ = recoCellSize;
00949 *p++ = dqmCellSize;
00950 shmdt(shmAddr);
00951
00952
00953 size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
00954 nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
00955 shmid = shm_create(FUShmBuffer::getShmKey(), size);
00956 if (shmid < 0)
00957 return 0;
00958 int semid = sem_create(FUShmBuffer::getSemKey(), 9);
00959 if (semid < 0)
00960 return 0;
00961 shmAddr = shm_attach(shmid);
00962 if (0 == shmAddr)
00963 return 0;
00964
00965 if (1 != shm_nattch(shmid)) {
00966 cout << "FUShmBuffer::createShmBuffer FAILED: nattch=" << shm_nattch(
00967 shmid) << endl;
00968 shmdt(shmAddr);
00969 return 0;
00970 }
00971 FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
00972 nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
00973 dqmCellSize);
00974
00975 cout << "FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."
00976 << endl;
00977 cout << " segmentationMode="
00978 << segmentationMode << endl;
00979
00980 buffer->initialize(shmid, semid);
00981
00982 return buffer;
00983 }
00984
00985
00986 FUShmBuffer* FUShmBuffer::getShmBuffer() {
00987
00988 int size = sizeof(unsigned int) * 7;
00989 int shmid = shm_get(FUShmBuffer::getShmDescriptorKey(), size);
00990 if (shmid < 0)
00991 return 0;
00992 void* shmAddr = shm_attach(shmid);
00993 if (0 == shmAddr)
00994 return 0;
00995
00996 unsigned int *p = (unsigned int*) shmAddr;
00997 bool segmentationMode = *p++;
00998 unsigned int nRawCells = *p++;
00999 unsigned int nRecoCells = *p++;
01000 unsigned int nDqmCells = *p++;
01001 unsigned int rawCellSize = *p++;
01002 unsigned int recoCellSize = *p++;
01003 unsigned int dqmCellSize = *p++;
01004 shmdt(shmAddr);
01005
01006 cout << "FUShmBuffer::getShmBuffer():" << " segmentationMode="
01007 << segmentationMode << " nRawCells=" << nRawCells << " nRecoCells="
01008 << nRecoCells << " nDqmCells=" << nDqmCells << " rawCellSize="
01009 << rawCellSize << " recoCellSize=" << recoCellSize
01010 << " dqmCellSize=" << dqmCellSize << endl;
01011
01012
01013 size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
01014 nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
01015 shmid = shm_get(FUShmBuffer::getShmKey(), size);
01016 if (shmid < 0)
01017 return 0;
01018 int semid = sem_get(FUShmBuffer::getSemKey(), 9);
01019 if (semid < 0)
01020 return 0;
01021 shmAddr = shm_attach(shmid);
01022 if (0 == shmAddr)
01023 return 0;
01024
01025 if (0 == shm_nattch(shmid)) {
01026 cout << "FUShmBuffer::getShmBuffer() FAILED: nattch=" << shm_nattch(
01027 shmid) << endl;
01028 return 0;
01029 }
01030 FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
01031 nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
01032 dqmCellSize);
01033
01034 cout << "FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED. PID:" << getpid()
01035 << endl;
01036 cout << " segmentationMode="
01037 << segmentationMode << endl;
01038
01039 buffer->setClientPrcId(getpid());
01040
01041 return buffer;
01042 }
01043
01044
01045 bool FUShmBuffer::releaseSharedMemory() {
01046
01047 int size = sizeof(unsigned int) * 7;
01048 int shmidd = shm_get(FUShmBuffer::getShmDescriptorKey(), size);
01049 if (shmidd < 0)
01050 return false;
01051 void* shmAddr = shm_attach(shmidd);
01052 if (0 == shmAddr)
01053 return false;
01054
01055 unsigned int*p = (unsigned int*) shmAddr;
01056 bool segmentationMode = *p++;
01057 unsigned int nRawCells = *p++;
01058 unsigned int nRecoCells = *p++;
01059 unsigned int nDqmCells = *p++;
01060 unsigned int rawCellSize = *p++;
01061 unsigned int recoCellSize = *p++;
01062 unsigned int dqmCellSize = *p++;
01063 shmdt(shmAddr);
01064
01065
01066 size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
01067 nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
01068 int shmid = shm_get(FUShmBuffer::getShmKey(), size);
01069 if (shmid < 0)
01070 return false;
01071 int semid = sem_get(FUShmBuffer::getSemKey(), 9);
01072 if (semid < 0)
01073 return false;
01074 shmAddr = shm_attach(shmid);
01075 if (0 == shmAddr)
01076 return false;
01077
01078 int att = 0;
01079 for (; att < 10; att++) {
01080 if (shm_nattch(shmid) > 1) {
01081 cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="
01082 << shm_nattch(shmid)
01083 << ", failed attempt to release shared memory." << endl;
01084 ::sleep(1);
01085 } else
01086 break;
01087 }
01088
01089 if (att >= 10)
01090 return false;
01091
01092 if (segmentationMode) {
01093 FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
01094 nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
01095 dqmCellSize);
01096 int cellid;
01097 for (unsigned int i = 0; i < nRawCells; i++) {
01098 cellid = shm_get(buffer->rawCellShmKey(i),
01099 FUShmRawCell::size(rawCellSize));
01100 if ((shm_destroy(cellid) == -1))
01101 return false;
01102 }
01103 for (unsigned int i = 0; i < nRecoCells; i++) {
01104 cellid = shm_get(buffer->recoCellShmKey(i),
01105 FUShmRecoCell::size(recoCellSize));
01106 if ((shm_destroy(cellid) == -1))
01107 return false;
01108 }
01109 for (unsigned int i = 0; i < nDqmCells; i++) {
01110 cellid = shm_get(buffer->dqmCellShmKey(i),
01111 FUShmDqmCell::size(dqmCellSize));
01112 if ((shm_destroy(cellid) == -1))
01113 return false;
01114 }
01115 }
01116 shmdt(shmAddr);
01117 if (sem_destroy(semid) == -1)
01118 return false;
01119 if (shm_destroy(shmid) == -1)
01120 return false;
01121 if (shm_destroy(shmidd) == -1)
01122 return false;
01123
01124 return true;
01125 }
01126
01127
01128 unsigned int FUShmBuffer::size(bool segmentationMode, unsigned int nRawCells,
01129 unsigned int nRecoCells, unsigned int nDqmCells,
01130 unsigned int rawCellSize, unsigned int recoCellSize,
01131 unsigned int dqmCellSize) {
01132 unsigned int offset = sizeof(FUShmBuffer) + sizeof(unsigned int) * 4
01133 * nRawCells + sizeof(evt::State_t) * nRawCells
01134 + sizeof(dqm::State_t) * nDqmCells;
01135
01136 unsigned int rawCellTotalSize = FUShmRawCell::size(rawCellSize);
01137 unsigned int recoCellTotalSize = FUShmRecoCell::size(recoCellSize);
01138 unsigned int dqmCellTotalSize = FUShmDqmCell::size(dqmCellSize);
01139
01140 unsigned int realSize = (segmentationMode) ? offset + sizeof(key_t)
01141 * (nRawCells + nRecoCells + nDqmCells) : offset + rawCellTotalSize
01142 * nRawCells + recoCellTotalSize * nRecoCells + dqmCellTotalSize
01143 * nDqmCells;
01144
01145 unsigned int result = realSize / 0x10 * 0x10 + (realSize % 0x10 > 0) * 0x10;
01146
01147 return result;
01148 }
01149
01150
01151 key_t FUShmBuffer::getShmDescriptorKey() {
01152 key_t result = getuid() * 1000 + SHM_DESCRIPTOR_KEYID;
01153 if (result == (key_t) -1)
01154 cout << "FUShmBuffer::getShmDescriptorKey: failed " << "for file "
01155 << shmKeyPath_ << "!" << endl;
01156 return result;
01157 }
01158
01159
01160 key_t FUShmBuffer::getShmKey() {
01161 key_t result = getuid() * 1000 + SHM_KEYID;
01162 if (result == (key_t) -1)
01163 cout << "FUShmBuffer::getShmKey: ftok() failed " << "for file "
01164 << shmKeyPath_ << "!" << endl;
01165 return result;
01166 }
01167
01168
01169 key_t FUShmBuffer::getSemKey() {
01170 key_t result = getuid() * 1000 + SEM_KEYID;
01171 if (result == (key_t) -1)
01172 cout << "FUShmBuffer::getSemKey: ftok() failed " << "for file "
01173 << semKeyPath_ << "!" << endl;
01174 return result;
01175 }
01176
01177
01178 int FUShmBuffer::shm_create(key_t key, int size) {
01179
01180 int shmid = shmget(key, 1, 0644);
01181 if (shmid != -1) {
01182
01183 shmid_ds shmstat;
01184 shmctl(shmid, IPC_STAT, &shmstat);
01185 cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01186 << " created by process " << shmstat.shm_cpid << " owned by "
01187 << shmstat.shm_perm.uid << " permissions " << hex
01188 << shmstat.shm_perm.mode << dec << endl;
01189 shmctl(shmid, IPC_RMID, &shmstat);
01190 }
01191 shmid = shmget(key, size, IPC_CREAT | 0644);
01192 if (shmid == -1) {
01193 int err = errno;
01194 cout << "FUShmBuffer::shm_create(" << key << "," << size
01195 << ") failed: " << strerror(err) << endl;
01196 }
01197 return shmid;
01198 }
01199
01200
01201 int FUShmBuffer::shm_get(key_t key, int size) {
01202 int shmid = shmget(key, size, 0644);
01203 if (shmid == -1) {
01204 int err = errno;
01205 cout << "FUShmBuffer::shm_get(" << key << "," << size << ") failed: "
01206 << strerror(err) << endl;
01207 }
01208 return shmid;
01209 }
01210
01211
01212 void* FUShmBuffer::shm_attach(int shmid) {
01213 void* result = shmat(shmid, NULL, 0);
01214 if (0 == result) {
01215 int err = errno;
01216 cout << "FUShmBuffer::shm_attach(" << shmid << ") failed: "
01217 << strerror(err) << endl;
01218 }
01219 return result;
01220 }
01221
01222
01223 int FUShmBuffer::shm_nattch(int shmid) {
01224 shmid_ds shmstat;
01225 shmctl(shmid, IPC_STAT, &shmstat);
01226 return shmstat.shm_nattch;
01227 }
01228
01229
01230 int FUShmBuffer::shm_destroy(int shmid) {
01231 shmid_ds shmstat;
01232 int result = shmctl(shmid, IPC_RMID, &shmstat);
01233 if (result == -1)
01234 cout << "FUShmBuffer::shm_destroy(shmid=" << shmid << ") failed."
01235 << endl;
01236 return result;
01237 }
01238
01239
01240 int FUShmBuffer::sem_create(key_t key, int nsem) {
01241 int semid = semget(key, nsem, IPC_CREAT | 0666);
01242 if (semid < 0) {
01243 int err = errno;
01244 cout << "FUShmBuffer::sem_create(key=" << key << ",nsem=" << nsem
01245 << ") failed: " << strerror(err) << endl;
01246 }
01247 return semid;
01248 }
01249
01250
01251 int FUShmBuffer::sem_get(key_t key, int nsem) {
01252 int semid = semget(key, nsem, 0666);
01253 if (semid < 0) {
01254 int err = errno;
01255 cout << "FUShmBuffer::sem_get(key=" << key << ",nsem=" << nsem
01256 << ") failed: " << strerror(err) << endl;
01257 }
01258 return semid;
01259 }
01260
01261
01262 int FUShmBuffer::sem_destroy(int semid) {
01263 int result = semctl(semid, 0, IPC_RMID);
01264 if (result == -1)
01265 cout << "FUShmBuffer::sem_destroy(semid=" << semid << ") failed."
01266 << endl;
01267 return result;
01268 }
01269
01271
01273
01274
01275 unsigned int FUShmBuffer::nextIndex(unsigned int offset, unsigned int nCells,
01276 unsigned int& iNext) {
01277 lock();
01278 unsigned int* pindex = (unsigned int*) ((unsigned long) this + offset);
01279 pindex += iNext;
01280 iNext = (iNext + 1) % nCells;
01281 unsigned int result = *pindex;
01282 unlock();
01283 return result;
01284 }
01285
01286
01287 void FUShmBuffer::postIndex(unsigned int index, unsigned int offset,
01288 unsigned int nCells, unsigned int& iLast) {
01289 lock();
01290 unsigned int* pindex = (unsigned int*) ((unsigned long) this + offset);
01291 pindex += iLast;
01292 *pindex = index;
01293 iLast = (iLast + 1) % nCells;
01294 unlock();
01295 }
01296
01297
01298 unsigned int FUShmBuffer::nextRawWriteIndex() {
01299 return nextIndex(rawWriteOffset_, nRawCells_, rawWriteNext_);
01300 }
01301
01302
01303 unsigned int FUShmBuffer::nextRawReadIndex() {
01304 return nextIndex(rawReadOffset_, nRawCells_, rawReadNext_);
01305 }
01306
01307
01308 void FUShmBuffer::postRawIndexToWrite(unsigned int index) {
01309 postIndex(index, rawWriteOffset_, nRawCells_, rawWriteLast_);
01310 }
01311
01312
01313 void FUShmBuffer::postRawIndexToRead(unsigned int index) {
01314 postIndex(index, rawReadOffset_, nRawCells_, rawReadLast_);
01315 }
01316
01317
01318 unsigned int FUShmBuffer::nextRecoWriteIndex() {
01319 return nextIndex(recoWriteOffset_, nRecoCells_, recoWriteNext_);
01320 }
01321
01322
01323 unsigned int FUShmBuffer::nextRecoReadIndex() {
01324 return nextIndex(recoReadOffset_, nRecoCells_, recoReadNext_);
01325 }
01326
01327
01328 void FUShmBuffer::postRecoIndexToWrite(unsigned int index) {
01329 postIndex(index, recoWriteOffset_, nRecoCells_, recoWriteLast_);
01330 }
01331
01332
01333 void FUShmBuffer::postRecoIndexToRead(unsigned int index) {
01334 postIndex(index, recoReadOffset_, nRecoCells_, recoReadLast_);
01335 }
01336
01337
01338 unsigned int FUShmBuffer::nextDqmWriteIndex() {
01339 return nextIndex(dqmWriteOffset_, nDqmCells_, dqmWriteNext_);
01340 }
01341
01342
01343 unsigned int FUShmBuffer::nextDqmReadIndex() {
01344 return nextIndex(dqmReadOffset_, nDqmCells_, dqmReadNext_);
01345 }
01346
01347
01348 void FUShmBuffer::postDqmIndexToWrite(unsigned int index) {
01349 postIndex(index, dqmWriteOffset_, nDqmCells_, dqmWriteLast_);
01350 }
01351
01352
01353 void FUShmBuffer::postDqmIndexToRead(unsigned int index) {
01354 postIndex(index, dqmReadOffset_, nDqmCells_, dqmReadLast_);
01355 }
01356
01357
01358 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber) {
01359 unsigned int *pevt = (unsigned int*) ((unsigned long) this
01360 + evtNumberOffset_);
01361 for (unsigned int i = 0; i < nRawCells_; i++) {
01362 if ((*pevt++) == evtNumber)
01363 return i;
01364 }
01365 XCEPT_ASSERT(false, evf::Exception, "This point should not be reached!");
01366 return 0xffffffff;
01367 }
01368
01369
01370 unsigned int FUShmBuffer::indexForEvtPrcId(pid_t prcid) {
01371 pid_t *pevt = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
01372 for (unsigned int i = 0; i < nRawCells_; i++) {
01373 if ((*pevt++) == prcid)
01374 return i;
01375 }
01376 XCEPT_ASSERT(false, evf::Exception, "This point should not be reached!");
01377 return 0xffffffff;
01378 }
01379
01380
01381 evt::State_t FUShmBuffer::evtState(unsigned int index) {
01382 if (!(index < nRawCells_)) {
01383 stringstream details;
01384 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01385 XCEPT_ASSERT(false, evf::Exception, details.str());
01386 }
01387 evt::State_t *pstate = (evt::State_t*) ((unsigned long) this
01388 + evtStateOffset_);
01389 pstate += index;
01390 return *pstate;
01391 }
01392
01393
01394 dqm::State_t FUShmBuffer::dqmState(unsigned int index) {
01395 if (!(index < nDqmCells_)) {
01396 stringstream details;
01397 details << "index<nDqmCells_ assertion failed! Actual index is " << index;
01398 XCEPT_ASSERT(false, evf::Exception, details.str());
01399 }
01400 dqm::State_t *pstate = (dqm::State_t*) ((unsigned long) this
01401 + dqmStateOffset_);
01402 pstate += index;
01403 return *pstate;
01404 }
01405
01406
01407 unsigned int FUShmBuffer::evtNumber(unsigned int index) {
01408 if (!(index < nRawCells_)) {
01409 stringstream details;
01410 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01411 XCEPT_ASSERT(false, evf::Exception, details.str());
01412 }
01413 unsigned int *pevt = (unsigned int*) ((unsigned long) this
01414 + evtNumberOffset_);
01415 pevt += index;
01416 return *pevt;
01417 }
01418
01419
01420 pid_t FUShmBuffer::evtPrcId(unsigned int index) {
01421 if (!(index < nRawCells_)) {
01422 stringstream details;
01423 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01424 XCEPT_ASSERT(false, evf::Exception, details.str());
01425 }
01426 pid_t *prcid = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
01427 prcid += index;
01428 return *prcid;
01429 }
01430
01431
01432 time_t FUShmBuffer::evtTimeStamp(unsigned int index) {
01433 if (!(index < nRawCells_)) {
01434 stringstream details;
01435 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01436 XCEPT_ASSERT(false, evf::Exception, details.str());
01437 }
01438 time_t *ptstmp = (time_t*) ((unsigned long) this + evtTimeStampOffset_);
01439 ptstmp += index;
01440 return *ptstmp;
01441 }
01442
01443
01444 pid_t FUShmBuffer::clientPrcId(unsigned int index) {
01445 if (!(index < nClientsMax_)) {
01446 stringstream details;
01447 details << "index<nClientsMax_ assertion failed! Actual index is " << index;
01448 XCEPT_ASSERT(false, evf::Exception, details.str());
01449 }
01450 pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
01451 prcid += index;
01452 return *prcid;
01453 }
01454
01455
01456 bool FUShmBuffer::setEvtState(unsigned int index, evt::State_t state, bool lockShm) {
01457 if(!(index < nRawCells_)) {
01458 stringstream details;
01459 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01460
01461 if (!lockShm) unlock();
01462 XCEPT_ASSERT(false, evf::Exception, details.str());
01463 }
01464 evt::State_t *pstate = (evt::State_t*) ((unsigned long) this
01465 + evtStateOffset_);
01466 pstate += index;
01467 if (lockShm) lock();
01468 *pstate = state;
01469 if (lockShm) unlock();
01470 return true;
01471 }
01472
01473
01474 bool FUShmBuffer::setDqmState(unsigned int index, dqm::State_t state) {
01475 if(!(index < nDqmCells_)) {
01476 stringstream details;
01477 details << "index<nDqmCells_ assertion failed! Actual index is " << index;
01478 XCEPT_ASSERT(false, evf::Exception, details.str());
01479 }
01480 dqm::State_t *pstate = (dqm::State_t*) ((unsigned long) this
01481 + dqmStateOffset_);
01482 pstate += index;
01483 lock();
01484 *pstate = state;
01485 unlock();
01486 return true;
01487 }
01488
01489
01490 bool FUShmBuffer::setEvtDiscard(unsigned int index, unsigned int discard, bool checkValue, bool lockShm) {
01491 if(!(index < nRawCells_)) {
01492 stringstream details;
01493 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01494
01495 if (!lockShm) unlock();
01496 XCEPT_ASSERT(false, evf::Exception, details.str());
01497 }
01498 unsigned int *pcount = (unsigned int*) ((unsigned long) this
01499 + evtDiscardOffset_);
01500 pcount += index;
01501 if (lockShm) lock();
01502 if (checkValue) {
01503 if (*pcount < discard)
01504 *pcount = discard;
01505 } else
01506 *pcount = discard;
01507 if (lockShm) unlock();
01508 return true;
01509 }
01510
01511
01512 int FUShmBuffer::incEvtDiscard(unsigned int index, bool lockShm) {
01513 int result = 0;
01514 if(!(index < nRawCells_)) {
01515 stringstream details;
01516 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01517
01518 if (!lockShm) unlock();
01519 XCEPT_ASSERT(false,evf::Exception,details.str());
01520 }
01521 unsigned int *pcount = (unsigned int*) ((unsigned long) this
01522 + evtDiscardOffset_);
01523 pcount += index;
01524 if (lockShm) lock();
01525 (*pcount)++;
01526 result = *pcount;
01527 if (lockShm) unlock();
01528 return result;
01529 }
01530
01531
01532 bool FUShmBuffer::setEvtNumber(unsigned int index, unsigned int evtNumber) {
01533 if(!(index < nRawCells_)) {
01534 stringstream details;
01535 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01536 XCEPT_ASSERT(false, evf::Exception, details.str());
01537 }
01538 unsigned int *pevt = (unsigned int*) ((unsigned long) this
01539 + evtNumberOffset_);
01540 pevt += index;
01541 lock();
01542 *pevt = evtNumber;
01543 unlock();
01544 return true;
01545 }
01546
01547
01548 bool FUShmBuffer::setEvtPrcId(unsigned int index, pid_t prcId) {
01549 if(!(index < nRawCells_)) {
01550 stringstream details;
01551 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01552 XCEPT_ASSERT(false, evf::Exception, details.str());
01553 }
01554 pid_t* prcid = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
01555 prcid += index;
01556 lock();
01557 *prcid = prcId;
01558 unlock();
01559 return true;
01560 }
01561
01562
01563 bool FUShmBuffer::setEvtTimeStamp(unsigned int index, time_t timeStamp) {
01564 if(!(index < nRawCells_)) {
01565 stringstream details;
01566 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01567 XCEPT_ASSERT(false, evf::Exception, details.str());
01568 }
01569 time_t *ptstmp = (time_t*) ((unsigned long) this + evtTimeStampOffset_);
01570 ptstmp += index;
01571 lock();
01572 *ptstmp = timeStamp;
01573 unlock();
01574 return true;
01575 }
01576
01577
01578 bool FUShmBuffer::setClientPrcId(pid_t prcId) {
01579 lock();
01580 if(!(nClients_ < nClientsMax_)) {
01581 stringstream details;
01582 details << "nClients_<nClientsMax_ assertion failed! Actual nClients is "
01583 << nClients_ << " and nClientsMax is " << nClientsMax_;
01584 unlock();
01585 XCEPT_ASSERT(false, evf::Exception, details.str());
01586 }
01587 pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
01588 for (unsigned int i = 0; i < nClients_; i++) {
01589 if ((*prcid) == prcId) {
01590 unlock();
01591 return false;
01592 }
01593 prcid++;
01594 }
01595 nClients_++;
01596 *prcid = prcId;
01597 unlock();
01598 return true;
01599 }
01600
01601
01602 bool FUShmBuffer::removeClientPrcId(pid_t prcId) {
01603 lock();
01604 pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
01605 unsigned int iClient(0);
01606 while (iClient < nClients_ && (*prcid) != prcId) {
01607 prcid++;
01608 iClient++;
01609 }
01610 if (iClient==nClients_) {
01611 unlock();
01612 return false;
01613 }
01614
01615
01616
01617
01618 pid_t* next = prcid;
01619 next++;
01620 while (iClient < nClients_ - 1) {
01621 *prcid = *next;
01622 prcid++;
01623 next++;
01624 iClient++;
01625 }
01626 nClients_--;
01627 unlock();
01628 return true;
01629 }
01630
01631
01632 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell) {
01633 FUShmRawCell* result(0);
01634
01635 if (iCell >= nRawCells_) {
01636 cout << "FUShmBuffer::rawCell(" << iCell << ") ERROR: " << "iCell="
01637 << iCell << " >= nRawCells()=" << nRawCells_ << endl;
01638 return result;
01639 }
01640
01641 if (segmentationMode_) {
01642 key_t shmkey = rawCellShmKey(iCell);
01643 int shmid = shm_get(shmkey, rawCellTotalSize_);
01644 void* cellAddr = shm_attach(shmid);
01645 result = new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01646 } else {
01647 result = (FUShmRawCell*) ((unsigned long) this + rawCellOffset_ + iCell
01648 * rawCellTotalSize_);
01649 }
01650
01651 return result;
01652 }
01653
01654
01655 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell) {
01656 FUShmRecoCell* result(0);
01657
01658 if (iCell >= nRecoCells_) {
01659 cout << "FUShmBuffer::recoCell(" << iCell << ") ERROR: " << "iCell="
01660 << iCell << " >= nRecoCells=" << nRecoCells_ << endl;
01661 return result;
01662 }
01663
01664 if (segmentationMode_) {
01665 key_t shmkey = recoCellShmKey(iCell);
01666 int shmid = shm_get(shmkey, recoCellTotalSize_);
01667 void* cellAddr = shm_attach(shmid);
01668 result = new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01669 } else {
01670 result = (FUShmRecoCell*) ((unsigned long) this + recoCellOffset_
01671 + iCell * recoCellTotalSize_);
01672 }
01673
01674 return result;
01675 }
01676
01677
01678 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell) {
01679 FUShmDqmCell* result(0);
01680
01681 if (iCell >= nDqmCells_) {
01682 cout << "FUShmBuffer::dqmCell(" << iCell << ") ERROR: " << "iCell="
01683 << iCell << " >= nDqmCells=" << nDqmCells_ << endl;
01684 return result;
01685 }
01686
01687 if (segmentationMode_) {
01688 key_t shmkey = dqmCellShmKey(iCell);
01689 int shmid = shm_get(shmkey, dqmCellTotalSize_);
01690 void* cellAddr = shm_attach(shmid);
01691 result = new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01692 } else {
01693 result = (FUShmDqmCell*) ((unsigned long) this + dqmCellOffset_ + iCell
01694 * dqmCellTotalSize_);
01695 }
01696
01697 return result;
01698 }
01699
01700
01701 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index) {
01702 if(!(index < nRawCells_)) {
01703 stringstream details;
01704 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01705 XCEPT_ASSERT(false, evf::Exception, details.str());
01706 }
01707 unsigned int *pcount = (unsigned int*) ((unsigned long) this
01708 + evtDiscardOffset_);
01709 pcount += index;
01710 lock();
01711 if(!(*pcount > 0)) {
01712 stringstream details;
01713 details << "*pcount>0 assertion failed! Value at pcount is " << *pcount << " for cell index " << index;
01714 unlock();
01715 XCEPT_ASSERT(false, evf::Exception, details.str());
01716 }
01717 --(*pcount);
01718 bool result = (*pcount == 0);
01719 unlock();
01720 return result;
01721 }
01722
01723
01724 key_t FUShmBuffer::shmKey(unsigned int iCell, unsigned int offset) {
01725 if (!segmentationMode_) {
01726 cout << "FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"
01727 << endl;
01728 return -1;
01729 }
01730 key_t* addr = (key_t*) ((unsigned long) this + offset);
01731 for (unsigned int i = 0; i < iCell; i++)
01732 ++addr;
01733 return *addr;
01734 }
01735
01736
01737 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell) {
01738 if (iCell >= nRawCells_) {
01739 cout << "FUShmBuffer::rawCellShmKey() ERROR: " << "iCell>=nRawCells: "
01740 << iCell << ">=" << nRawCells_ << endl;
01741 return -1;
01742 }
01743 return shmKey(iCell, rawCellOffset_);
01744 }
01745
01746
01747 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell) {
01748 if (iCell >= nRecoCells_) {
01749 cout << "FUShmBuffer::recoCellShmKey() ERROR: "
01750 << "iCell>=nRecoCells: " << iCell << ">=" << nRecoCells_
01751 << endl;
01752 return -1;
01753 }
01754 return shmKey(iCell, recoCellOffset_);
01755 }
01756
01757
01758 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell) {
01759 if (iCell >= nDqmCells_) {
01760 cout << "FUShmBuffer::dqmCellShmKey() ERROR: " << "iCell>=nDqmCells: "
01761 << iCell << ">=" << nDqmCells_ << endl;
01762 return -1;
01763 }
01764 return shmKey(iCell, dqmCellOffset_);
01765 }
01766
01767
01768 void FUShmBuffer::sem_init(int isem, int value) {
01769 if (semctl(semid(), isem, SETVAL, value) < 0) {
01770 cout << "FUShmBuffer: FATAL ERROR in semaphore initialization." << endl;
01771 }
01772 }
01773
01774
01775 int FUShmBuffer::sem_wait(int isem) {
01776 struct sembuf sops[1];
01777 sops[0].sem_num = isem;
01778 sops[0].sem_op = -1;
01779 sops[0].sem_flg = 0;
01780 if (semop(semid(), sops, 1) == -1) {
01781 cout << "FUShmBuffer: ERROR in semaphore operation sem_wait." << endl;
01782 return -1;
01783 }
01784 return 0;
01785 }
01786
01787
01788 void FUShmBuffer::sem_post(int isem) {
01789 struct sembuf sops[1];
01790 sops[0].sem_num = isem;
01791 sops[0].sem_op = 1;
01792 sops[0].sem_flg = 0;
01793 if (semop(semid(), sops, 1) == -1) {
01794 cout << "FUShmBuffer: ERROR in semaphore operation sem_post." << endl;
01795 }
01796 }