00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013 #include "EventFilter/Utilities/interface/Exception.h"
00014
00015 #include <unistd.h>
00016 #include <iostream>
00017 #include <string>
00018 #include <cassert>
00019
00020 #include <sstream>
00021 #include <fstream>
00022
00023 #include <cstdlib>
00024 #include <cstring>
00025 #include <stdint.h>
00026
00027
00028
00029
00030
00031 #define SHM_DESCRIPTOR_KEYID 1
00032 #define SHM_KEYID 2
00033 #define SEM_KEYID 1
00034
00035 #define NSKIP_MAX 100
00036
00037 using namespace std;
00038 using namespace evf;
00039
00040
00041 const char* FUShmBuffer::shmKeyPath_ =
00042 (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null"
00043 : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045 (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null"
00046 : getenv("FUSEM_KEYFILE"));
00047
00049
00051
00052
00053 FUShmBuffer::FUShmBuffer(bool segmentationMode, unsigned int nRawCells,
00054 unsigned int nRecoCells, unsigned int nDqmCells,
00055 unsigned int rawCellSize, unsigned int recoCellSize,
00056 unsigned int dqmCellSize) :
00057 segmentationMode_(segmentationMode), nClientsMax_(128),
00058 nRawCells_(nRawCells), rawCellPayloadSize_(rawCellSize),
00059 nRecoCells_(nRecoCells), recoCellPayloadSize_(recoCellSize),
00060 nDqmCells_(nDqmCells), dqmCellPayloadSize_(dqmCellSize) {
00061 rawCellTotalSize_ = FUShmRawCell::size(rawCellPayloadSize_);
00062 recoCellTotalSize_ = FUShmRecoCell::size(recoCellPayloadSize_);
00063 dqmCellTotalSize_ = FUShmDqmCell::size(dqmCellPayloadSize_);
00064
00065 void* addr;
00066
00067 rawWriteOffset_ = sizeof(FUShmBuffer);
00068 addr = (void*) ((unsigned long) this + rawWriteOffset_);
00069 new (addr) unsigned int[nRawCells_];
00070
00071 rawReadOffset_ = rawWriteOffset_ + nRawCells_ * sizeof(unsigned int);
00072 addr = (void*) ((unsigned long) this + rawReadOffset_);
00073 new (addr) unsigned int[nRawCells_];
00074
00075 recoWriteOffset_ = rawReadOffset_ + nRawCells_ * sizeof(unsigned int);
00076 addr = (void*) ((unsigned long) this + recoWriteOffset_);
00077 new (addr) unsigned int[nRecoCells_];
00078
00079 recoReadOffset_ = recoWriteOffset_ + nRecoCells_ * sizeof(unsigned int);
00080 addr = (void*) ((unsigned long) this + recoReadOffset_);
00081 new (addr) unsigned int[nRecoCells_];
00082
00083 dqmWriteOffset_ = recoReadOffset_ + nRecoCells_ * sizeof(unsigned int);
00084 addr = (void*) ((unsigned long) this + dqmWriteOffset_);
00085 new (addr) unsigned int[nDqmCells_];
00086
00087 dqmReadOffset_ = dqmWriteOffset_ + nDqmCells_ * sizeof(unsigned int);
00088 addr = (void*) ((unsigned long) this + dqmReadOffset_);
00089 new (addr) unsigned int[nDqmCells_];
00090
00091 evtStateOffset_ = dqmReadOffset_ + nDqmCells_ * sizeof(unsigned int);
00092 addr = (void*) ((unsigned long) this + evtStateOffset_);
00093 new (addr) evt::State_t[nRawCells_];
00094
00095 evtDiscardOffset_ = evtStateOffset_ + nRawCells_ * sizeof(evt::State_t);
00096 addr = (void*) ((unsigned long) this + evtDiscardOffset_);
00097 new (addr) unsigned int[nRawCells_];
00098
00099 evtNumberOffset_ = evtDiscardOffset_ + nRawCells_ * sizeof(unsigned int);
00100 addr = (void*) ((unsigned long) this + evtNumberOffset_);
00101 new (addr) unsigned int[nRawCells_];
00102
00103 evtPrcIdOffset_ = evtNumberOffset_ + nRawCells_ * sizeof(unsigned int);
00104 addr = (void*) ((unsigned long) this + evtPrcIdOffset_);
00105 new (addr) pid_t[nRawCells_];
00106
00107 evtTimeStampOffset_ = evtPrcIdOffset_ + nRawCells_ * sizeof(pid_t);
00108 addr = (void*) ((unsigned long) this + evtTimeStampOffset_);
00109 new (addr) time_t[nRawCells_];
00110
00111 dqmStateOffset_ = evtTimeStampOffset_ + nRawCells_ * sizeof(time_t);
00112 addr = (void*) ((unsigned long) this + dqmStateOffset_);
00113 new (addr) dqm::State_t[nDqmCells_];
00114
00115 clientPrcIdOffset_ = dqmStateOffset_ + nDqmCells_ * sizeof(dqm::State_t);
00116 addr = (void*) ((unsigned long) this + clientPrcIdOffset_);
00117 new (addr) pid_t[nClientsMax_];
00118
00119 rawCellOffset_ = dqmStateOffset_ + nClientsMax_ * sizeof(pid_t);
00120
00121 if (segmentationMode_) {
00122 recoCellOffset_ = rawCellOffset_ + nRawCells_ * sizeof(key_t);
00123 dqmCellOffset_ = recoCellOffset_ + nRecoCells_ * sizeof(key_t);
00124 addr = (void*) ((unsigned long) this + rawCellOffset_);
00125 new (addr) key_t[nRawCells_];
00126 addr = (void*) ((unsigned long) this + recoCellOffset_);
00127 new (addr) key_t[nRecoCells_];
00128 addr = (void*) ((unsigned long) this + dqmCellOffset_);
00129 new (addr) key_t[nDqmCells_];
00130 } else {
00131 recoCellOffset_ = rawCellOffset_ + nRawCells_ * rawCellTotalSize_;
00132 dqmCellOffset_ = recoCellOffset_ + nRecoCells_ * recoCellTotalSize_;
00133 for (unsigned int i = 0; i < nRawCells_; i++) {
00134 addr = (void*) ((unsigned long) this + rawCellOffset_ + i
00135 * rawCellTotalSize_);
00136 new (addr) FUShmRawCell(rawCellSize);
00137 }
00138 for (unsigned int i = 0; i < nRecoCells_; i++) {
00139 addr = (void*) ((unsigned long) this + recoCellOffset_ + i
00140 * recoCellTotalSize_);
00141 new (addr) FUShmRecoCell(recoCellSize);
00142 }
00143 for (unsigned int i = 0; i < nDqmCells_; i++) {
00144 addr = (void*) ((unsigned long) this + dqmCellOffset_ + i
00145 * dqmCellTotalSize_);
00146 new (addr) FUShmDqmCell(dqmCellSize);
00147 }
00148 }
00149 }
00150
00151
00152 FUShmBuffer::~FUShmBuffer() {
00153
00154 }
00155
00157
00159
00160
00161
00162 void FUShmBuffer::initialize(unsigned int shmid, unsigned int semid) {
00163 shmid_ = shmid;
00164 semid_ = semid;
00165
00166 if (segmentationMode_) {
00167 int shmKeyId = 666;
00168 key_t* keyAddr = (key_t*) ((unsigned long) this + rawCellOffset_);
00169 for (unsigned int i = 0; i < nRawCells_; i++) {
00170 *keyAddr = ftok(shmKeyPath_, shmKeyId++);
00171 int shmid = shm_create(*keyAddr, rawCellTotalSize_);
00172 void* shmAddr = shm_attach(shmid);
00173 new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00174 shmdt(shmAddr);
00175 ++keyAddr;
00176 }
00177 keyAddr = (key_t*) ((unsigned long) this + recoCellOffset_);
00178 for (unsigned int i = 0; i < nRecoCells_; i++) {
00179 *keyAddr = ftok(shmKeyPath_, shmKeyId++);
00180 int shmid = shm_create(*keyAddr, recoCellTotalSize_);
00181 void* shmAddr = shm_attach(shmid);
00182 new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00183 shmdt(shmAddr);
00184 ++keyAddr;
00185 }
00186 keyAddr = (key_t*) ((unsigned long) this + dqmCellOffset_);
00187 for (unsigned int i = 0; i < nDqmCells_; i++) {
00188 *keyAddr = ftok(shmKeyPath_, shmKeyId++);
00189 int shmid = shm_create(*keyAddr, dqmCellTotalSize_);
00190 void* shmAddr = shm_attach(shmid);
00191 new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00192 shmdt(shmAddr);
00193 ++keyAddr;
00194 }
00195 }
00196
00197 for (unsigned int i = 0; i < nRawCells_; i++) {
00198 FUShmRawCell* cell = rawCell(i);
00199 cell->initialize(i);
00200 if (segmentationMode_)
00201 shmdt(cell);
00202 }
00203
00204 for (unsigned int i = 0; i < nRecoCells_; i++) {
00205 FUShmRecoCell* cell = recoCell(i);
00206 cell->initialize(i);
00207 if (segmentationMode_)
00208 shmdt(cell);
00209 }
00210
00211 for (unsigned int i = 0; i < nDqmCells_; i++) {
00212 FUShmDqmCell* cell = dqmCell(i);
00213 cell->initialize(i);
00214 if (segmentationMode_)
00215 shmdt(cell);
00216 }
00217
00218 reset();
00219 }
00220
00221
00222 void FUShmBuffer::reset() {
00223 nClients_ = 0;
00224
00225
00226 sem_init(0, 1);
00227 sem_init(1, nRawCells_);
00228 sem_init(2, 0);
00229 sem_init(3, 1);
00230 sem_init(4, 0);
00231 sem_init(5, nRecoCells_);
00232 sem_init(6, 0);
00233 sem_init(7, nDqmCells_);
00234 sem_init(8, 0);
00235
00236 sem_print();
00237
00238 unsigned int *iWrite, *iRead;
00239
00240 rawWriteNext_ = 0;
00241 rawWriteLast_ = 0;
00242 rawReadNext_ = 0;
00243 rawReadLast_ = 0;
00244 iWrite = (unsigned int*) ((unsigned long) this + rawWriteOffset_);
00245 iRead = (unsigned int*) ((unsigned long) this + rawReadOffset_);
00246 for (unsigned int i = 0; i < nRawCells_; i++) {
00247 *iWrite++ = i;
00248 *iRead++ = 0xffffffff;
00249 }
00250
00251 recoWriteNext_ = 0;
00252 recoWriteLast_ = 0;
00253 recoReadNext_ = 0;
00254 recoReadLast_ = 0;
00255 iWrite = (unsigned int*) ((unsigned long) this + recoWriteOffset_);
00256 iRead = (unsigned int*) ((unsigned long) this + recoReadOffset_);
00257 for (unsigned int i = 0; i < nRecoCells_; i++) {
00258 *iWrite++ = i;
00259 *iRead++ = 0xffffffff;
00260 }
00261
00262 dqmWriteNext_ = 0;
00263 dqmWriteLast_ = 0;
00264 dqmReadNext_ = 0;
00265 dqmReadLast_ = 0;
00266 iWrite = (unsigned int*) ((unsigned long) this + dqmWriteOffset_);
00267 iRead = (unsigned int*) ((unsigned long) this + dqmReadOffset_);
00268 for (unsigned int i = 0; i < nDqmCells_; i++) {
00269 *iWrite++ = i;
00270 *iRead++ = 0xffffffff;
00271 }
00272
00273 for (unsigned int i = 0; i < nRawCells_; i++) {
00274 setEvtState(i, evt::EMPTY);
00275 setEvtDiscard(i, 0);
00276 setEvtNumber(i, 0xffffffff);
00277 setEvtPrcId(i, 0);
00278 setEvtTimeStamp(i, 0);
00279 }
00280
00281 for (unsigned int i = 0; i < nDqmCells_; i++)
00282 setDqmState(i, dqm::EMPTY);
00283 }
00284
00285
00286 unsigned int FUShmBuffer::nbRawCellsToWrite() const {
00287 return semctl(semid(), 1, GETVAL);
00288 }
00289
00290
00291 int FUShmBuffer::nbRawCellsToRead() const {
00292 return semctl(semid(), 2, GETVAL);
00293 }
00294
00295
00296 FUShmRawCell* FUShmBuffer::rawCellToWrite() {
00297 if (waitRawWrite() != 0)
00298 return 0;
00299 unsigned int iCell = nextRawWriteIndex();
00300 FUShmRawCell* cell = rawCell(iCell);
00301 evt::State_t state = evtState(iCell);
00302 stringstream details;
00303 details << "state==evt::EMPTY assertion failed! Actual state is " << state
00304 << ", iCell = " << iCell;
00305 XCEPT_ASSERT(state == evt::EMPTY, evf::Exception, details.str());
00306 setEvtState(iCell, evt::RAWWRITING);
00307 setEvtDiscard(iCell, 1);
00308 return cell;
00309 }
00310
00311
00312 FUShmRawCell* FUShmBuffer::rawCellToRead() {
00313 waitRawRead();
00314 unsigned int iCell = nextRawReadIndex();
00315 FUShmRawCell* cell = rawCell(iCell);
00316 evt::State_t state = evtState(iCell);
00317 stringstream details;
00318 details
00319 << "state==evt::RAWWRITTEN ||state==evt::EMPTY ||state==evt::STOP ||state==evt::LUMISECTION assertion failed! Actual state is "
00320 << state << ", iCell = " << iCell;
00321 XCEPT_ASSERT(
00322 state == evt::RAWWRITTEN || state == evt::EMPTY || state
00323 == evt::STOP || state == evt::LUMISECTION, evf::Exception,
00324 details.str());
00325 if (state == evt::RAWWRITTEN) {
00326 setEvtPrcId(iCell, getpid());
00327 setEvtState(iCell, evt::RAWREADING);
00328 }
00329 return cell;
00330 }
00331
00332
00333 FUShmRecoCell* FUShmBuffer::recoCellToRead() {
00334 waitRecoRead();
00335 unsigned int iCell = nextRecoReadIndex();
00336 FUShmRecoCell* cell = recoCell(iCell);
00337 unsigned int iRawCell = cell->rawCellIndex();
00338 if (iRawCell < nRawCells_) {
00339
00340
00341 setEvtState(iRawCell, evt::SENDING);
00342 }
00343 return cell;
00344 }
00345
00346
00347 FUShmDqmCell* FUShmBuffer::dqmCellToRead() {
00348 waitDqmRead();
00349 unsigned int iCell = nextDqmReadIndex();
00350 FUShmDqmCell* cell = dqmCell(iCell);
00351 dqm::State_t state = dqmState(iCell);
00352 stringstream details;
00353 details
00354 << "state==dqm::WRITTEN || state==dqm::EMPTY assertion failed! Actual state is "
00355 << state << ", iCell = " << iCell;
00356 XCEPT_ASSERT(state == dqm::WRITTEN || state == dqm::EMPTY, evf::Exception,
00357 details.str());
00358 if (state == dqm::WRITTEN)
00359 setDqmState(iCell, dqm::SENDING);
00360 return cell;
00361 }
00362
00363
00364 FUShmRawCell* FUShmBuffer::rawCellToDiscard() {
00365 waitRawDiscarded();
00366 FUShmRawCell* cell = rawCell(rawDiscardIndex_);
00367 evt::State_t state = evtState(cell->index());
00368 stringstream details;
00369 details
00370 << "state==evt::PROCESSED || state==evt::SENT || state==evt::EMPTY || state==evt::STOP || state==evt::USEDLS assertion failed! Actual state is "
00371 << state << ", index = " << cell->index();
00372 XCEPT_ASSERT(
00373 state == evt::PROCESSED || state == evt::SENT || state
00374 == evt::EMPTY || state == evt::STOP || state == evt::USEDLS,
00375 evf::Exception, details.str());
00376 if (state != evt::EMPTY && state != evt::USEDLS && state != evt::STOP)
00377 setEvtState(cell->index(), evt::DISCARDING);
00378 return cell;
00379 }
00380
00381
00382 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell) {
00383 evt::State_t state = evtState(cell->index());
00384 stringstream details;
00385 details << "state==evt::RAWWRITING assertion failed! Actual state is "
00386 << state << ", index = " << cell->index();
00387 XCEPT_ASSERT(state == evt::RAWWRITING, evf::Exception, details.str());
00388 setEvtState(cell->index(), evt::RAWWRITTEN);
00389 setEvtNumber(cell->index(), cell->evtNumber());
00390 postRawIndexToRead(cell->index());
00391 if (segmentationMode_)
00392 shmdt(cell);
00393 postRawRead();
00394 }
00395
00396
00397 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell) {
00398 evt::State_t state = evtState(cell->index());
00399 stringstream details;
00400 details << "state==evt::RAWREADING assertion failed! Actual state is "
00401 << state << ", index = " << cell->index();
00402 XCEPT_ASSERT(state == evt::RAWREADING, evf::Exception, details.str());
00403 setEvtState(cell->index(), evt::RAWREAD);
00404 setEvtState(cell->index(), evt::PROCESSING);
00405 setEvtTimeStamp(cell->index(), time(0));
00406 if (segmentationMode_)
00407 shmdt(cell);
00408 }
00409
00410
00411 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell) {
00412 unsigned int iRawCell = cell->rawCellIndex();
00413 if (iRawCell < nRawCells_) {
00414
00415
00416 setEvtState(cell->rawCellIndex(), evt::SENT);
00417 }
00418 if (segmentationMode_)
00419 shmdt(cell);
00420 }
00421
00422
00423 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell) {
00424 dqm::State_t state = dqmState(cell->index());
00425 stringstream details;
00426 details
00427 << "state==dqm::SENDING||state==dqm::EMPTY assertion failed! Actual state is "
00428 << state << ", index = " << cell->index();
00429 XCEPT_ASSERT(state == dqm::SENDING || state == dqm::EMPTY, evf::Exception,
00430 details.str());
00431 if (state == dqm::SENDING)
00432 setDqmState(cell->index(), dqm::SENT);
00433 if (segmentationMode_)
00434 shmdt(cell);
00435 }
00436
00437
00438 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell) {
00439 waitRawDiscard();
00440 if (rawCellReadyForDiscard(iCell)) {
00441 rawDiscardIndex_ = iCell;
00442 evt::State_t state = evtState(iCell);
00443 stringstream details;
00444 details
00445 << "state==evt::PROCESSING||state==evt::SENT||state==evt::EMPTY||state==evt::STOP||state==evt::LUMISECTION assertion failed! Actual state is "
00446 << state << ", iCell = " << iCell;
00447 XCEPT_ASSERT(
00448 state == evt::PROCESSING || state == evt::SENT || state
00449 == evt::EMPTY || state == evt::STOP || state
00450 == evt::LUMISECTION, evf::Exception, details.str());
00451 if (state == evt::PROCESSING)
00452 setEvtState(iCell, evt::PROCESSED);
00453 if (state == evt::LUMISECTION)
00454 setEvtState(iCell, evt::USEDLS);
00455 postRawDiscarded();
00456 } else
00457 postRawDiscard();
00458 }
00459
00460
00461 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell) {
00462 waitRawDiscard();
00463 if (rawCellReadyForDiscard(iCell)) {
00464 rawDiscardIndex_ = iCell;
00465 evt::State_t state = evtState(iCell);
00466
00467 if (state != evt::LUMISECTION && state != evt::EMPTY && state
00468 != evt::USEDLS && state != evt::STOP)
00469 setEvtState(iCell, evt::PROCESSED);
00470 if (state == evt::LUMISECTION)
00471 setEvtState(iCell, evt::USEDLS);
00472 postRawDiscarded();
00473 } else
00474 postRawDiscard();
00475 }
00476
00477
00478 void FUShmBuffer::discardRawCell(FUShmRawCell* cell) {
00479 releaseRawCell(cell);
00480 postRawDiscard();
00481 }
00482
00483
00484 void FUShmBuffer::discardRecoCell(unsigned int iCell) {
00485 FUShmRecoCell* cell = recoCell(iCell);
00486 unsigned int iRawCell = cell->rawCellIndex();
00487 if (iRawCell < nRawCells_) {
00488
00489
00490 scheduleRawCellForDiscard(iRawCell);
00491 }
00492 cell->clear();
00493 if (segmentationMode_)
00494 shmdt(cell);
00495 postRecoIndexToWrite(iCell);
00496 postRecoWrite();
00497 }
00498
00499
00500 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell) {
00501 FUShmRecoCell* cell = recoCell(iCell);
00502 cell->clear();
00503 if (segmentationMode_)
00504 shmdt(cell);
00505 postRecoIndexToWrite(iCell);
00506 postRecoWrite();
00507 }
00508
00509
00510 void FUShmBuffer::discardDqmCell(unsigned int iCell) {
00511 dqm::State_t state = dqmState(iCell);
00512 stringstream details;
00513 details
00514 << "state==dqm::EMPTY||state==dqm::SENT assertion failed! Actual state is "
00515 << state << ", iCell = " << iCell;
00516 XCEPT_ASSERT(state == dqm::EMPTY || state == dqm::SENT, evf::Exception,
00517 details.str());
00518 setDqmState(iCell, dqm::DISCARDING);
00519 FUShmDqmCell* cell = dqmCell(iCell);
00520 cell->clear();
00521 if (segmentationMode_)
00522 shmdt(cell);
00523 setDqmState(iCell, dqm::EMPTY);
00524 postDqmIndexToWrite(iCell);
00525 postDqmWrite();
00526 }
00527
00528
00529 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell) {
00530 evt::State_t state = evtState(cell->index());
00531 if (!(state == evt::DISCARDING || state == evt::RAWWRITING || state
00532 == evt::EMPTY || state == evt::STOP
00533
00534 || state == evt::USEDLS))
00535 std::cout << "=================releaseRawCell state " << state
00536 << std::endl;
00537 stringstream details;
00538 details
00539 << "state==evt::DISCARDING||state==evt::RAWWRITING||state==evt::EMPTY||state==evt::STOP||state==evt::USEDLS assertion failed! Actual state is "
00540 << state << ", index = " << cell->index();
00541 XCEPT_ASSERT(
00542 state == evt::DISCARDING || state == evt::RAWWRITING || state
00543 == evt::EMPTY || state == evt::STOP
00544
00545 || state == evt::USEDLS, evf::Exception, details.str());
00546 setEvtState(cell->index(), evt::EMPTY);
00547 setEvtDiscard(cell->index(), 0);
00548 setEvtNumber(cell->index(), 0xffffffff);
00549 setEvtPrcId(cell->index(), 0);
00550 setEvtTimeStamp(cell->index(), 0);
00551 cell->clear();
00552 postRawIndexToWrite(cell->index());
00553 if (segmentationMode_)
00554 shmdt(cell);
00555 postRawWrite();
00556 }
00557
00558
00559 void FUShmBuffer::writeRawEmptyEvent() {
00560 FUShmRawCell* cell = rawCellToWrite();
00561 if (cell == 0)
00562 return;
00563 evt::State_t state = evtState(cell->index());
00564 stringstream details;
00565 details << "state==evt::RAWWRITING assertion failed! Actual state is "
00566 << state << ", index = " << cell->index();
00567 XCEPT_ASSERT(state == evt::RAWWRITING, evf::Exception, details.str());
00568 setEvtState(cell->index(), evt::STOP);
00569 cell->setEventTypeStopper();
00570 postRawIndexToRead(cell->index());
00571 if (segmentationMode_)
00572 shmdt(cell);
00573 postRawRead();
00574 }
00575
00576
00577 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls) {
00578 FUShmRawCell* cell = rawCellToWrite();
00579 if (cell == 0)
00580 return;
00581 cell->setLumiSection(ls);
00582 evt::State_t state = evtState(cell->index());
00583 stringstream details;
00584 details << "state==evt::RAWWRITING assertion failed! Actual state is "
00585 << state << ", index = " << cell->index();
00586 XCEPT_ASSERT(state == evt::RAWWRITING, evf::Exception, details.str());
00587 setEvtState(cell->index(), evt::LUMISECTION);
00588 cell->setEventTypeEol();
00589 postRawIndexToRead(cell->index());
00590 if (segmentationMode_)
00591 shmdt(cell);
00592 postRawRead();
00593 }
00594
00595
00596 void FUShmBuffer::writeRecoEmptyEvent() {
00597 waitRecoWrite();
00598 unsigned int iCell = nextRecoWriteIndex();
00599 FUShmRecoCell* cell = recoCell(iCell);
00600 cell->clear();
00601 postRecoIndexToRead(iCell);
00602 if (segmentationMode_)
00603 shmdt(cell);
00604 postRecoRead();
00605 }
00606
00607
00608 void FUShmBuffer::writeDqmEmptyEvent() {
00609 waitDqmWrite();
00610 unsigned int iCell = nextDqmWriteIndex();
00611 FUShmDqmCell* cell = dqmCell(iCell);
00612 cell->clear();
00613 postDqmIndexToRead(iCell);
00614 if (segmentationMode_)
00615 shmdt(cell);
00616 postDqmRead();
00617 }
00618
00619
00620 void FUShmBuffer::scheduleRawEmptyCellForDiscard() {
00621 FUShmRawCell* cell = rawCellToWrite();
00622 if (cell == 0)
00623 return;
00624 rawDiscardIndex_ = cell->index();
00625 setEvtState(cell->index(), evt::STOP);
00626 cell->setEventTypeStopper();
00627 setEvtNumber(cell->index(), 0xffffffff);
00628 setEvtPrcId(cell->index(), 0);
00629 setEvtTimeStamp(cell->index(), 0);
00630 postRawDiscarded();
00631 }
00632
00633
00634 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell) {
00635 waitRawDiscard();
00636 if (rawCellReadyForDiscard(cell->index())) {
00637 rawDiscardIndex_ = cell->index();
00638
00639
00640
00641
00642
00643
00644
00645 removeClientPrcId(getpid());
00646 if (segmentationMode_)
00647 shmdt(cell);
00648 postRawDiscarded();
00649 } else
00650 postRawDiscard();
00651 }
00652
00653
00654 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell) {
00655
00656 if (rawCellReadyForDiscard(cell->index())) {
00657 rawDiscardIndex_ = cell->index();
00658
00659
00660
00661
00662
00663
00664 if (segmentationMode_)
00665 shmdt(cell);
00666 postRawDiscarded();
00667 } else
00668 postRawDiscard();
00669 }
00670
00671
00672 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
00673 unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
00674 unsigned int dataSize, unsigned int nExpectedEPs) {
00675 if (dataSize > recoCellPayloadSize_) {
00676 cout << "FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."
00677 << endl;
00678 return false;
00679 }
00680
00681 waitRecoWrite();
00682 unsigned int iCell = nextRecoWriteIndex();
00683 FUShmRecoCell* cell = recoCell(iCell);
00684 cell->writeInitMsg(outModId, fuProcessId, fuGuid, data, dataSize,nExpectedEPs);
00685 postRecoIndexToRead(iCell);
00686 if (segmentationMode_)
00687 shmdt(cell);
00688 postRecoRead();
00689 return true;
00690 }
00691
00692
00693 bool FUShmBuffer::writeRecoEventData(unsigned int runNumber,
00694 unsigned int evtNumber, unsigned int outModId,
00695 unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
00696 unsigned int dataSize) {
00697 if (dataSize > recoCellPayloadSize_) {
00698 cout << "FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."
00699 << endl;
00700 return false;
00701 }
00702
00703 waitRecoWrite();
00704 unsigned int iCell = nextRecoWriteIndex();
00705 FUShmRecoCell* cell = recoCell(iCell);
00706 unsigned int rawCellIndex = indexForEvtNumber(evtNumber);
00707
00708
00709 setEvtState(rawCellIndex, evt::RECOWRITING);
00710 incEvtDiscard(rawCellIndex);
00711 cell->writeEventData(rawCellIndex, runNumber, evtNumber, outModId,
00712 fuProcessId, fuGuid, data, dataSize);
00713 setEvtState(rawCellIndex, evt::RECOWRITTEN);
00714 postRecoIndexToRead(iCell);
00715 if (segmentationMode_)
00716 shmdt(cell);
00717 postRecoRead();
00718 return true;
00719 }
00720
00721
00722 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00723 unsigned int fuProcessId, unsigned int iRawCell, bool checkValue) {
00724 FUShmRawCell *raw = rawCell(iRawCell);
00725
00726 unsigned int dataSize = sizeof(uint32_t) * (4 + 1024) + raw->eventSize();
00727 unsigned char *data = new unsigned char[dataSize];
00728 uint32_t *pos = (uint32_t*) data;
00729
00730
00731
00732
00733
00734
00735
00736 *pos++ = (uint32_t) 2;
00737 *pos++ = (uint32_t) runNumber;
00738 *pos++ = (uint32_t) evf::evtn::getlbn(
00739 raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00740 *pos++ = (uint32_t) raw->evtNumber();
00741 for (unsigned int i = 0; i < 1024; i++)
00742 *pos++ = (uint32_t) raw->fedSize(i);
00743 memcpy(pos, raw->payloadAddr(), raw->eventSize());
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775 waitRecoWrite();
00776 unsigned int iRecoCell = nextRecoWriteIndex();
00777 FUShmRecoCell* reco = recoCell(iRecoCell);
00778 setEvtState(iRawCell, evt::RECOWRITING);
00779 setEvtDiscard(iRawCell, 1, checkValue);
00780 reco->writeErrorEvent(iRawCell, runNumber, raw->evtNumber(), fuProcessId,
00781 data, dataSize);
00782 delete[] data;
00783 setEvtState(iRawCell, evt::RECOWRITTEN);
00784 postRecoIndexToRead(iRecoCell);
00785 if (segmentationMode_) {
00786 shmdt(raw);
00787 shmdt(reco);
00788 }
00789 postRecoRead();
00790 return true;
00791 }
00792
00793
00794 bool FUShmBuffer::writeDqmEventData(unsigned int runNumber,
00795 unsigned int evtAtUpdate, unsigned int folderId,
00796 unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
00797 unsigned int dataSize) {
00798 if (dataSize > dqmCellPayloadSize_) {
00799 cout << "FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."
00800 << endl;
00801 return false;
00802 }
00803
00804 waitDqmWrite();
00805 unsigned int iCell = nextDqmWriteIndex();
00806 FUShmDqmCell* cell = dqmCell(iCell);
00807 dqm::State_t state = dqmState(iCell);
00808 stringstream details;
00809 details << "state==dqm::EMPTY assertion failed! Actual state is " << state
00810 << ", iCell = " << iCell;
00811 XCEPT_ASSERT(state == dqm::EMPTY, evf::Exception, details.str());
00812 setDqmState(iCell, dqm::WRITING);
00813 cell->writeData(runNumber, evtAtUpdate, folderId, fuProcessId, fuGuid,
00814 data, dataSize);
00815 setDqmState(iCell, dqm::WRITTEN);
00816 postDqmIndexToRead(iCell);
00817 if (segmentationMode_)
00818 shmdt(cell);
00819 postDqmRead();
00820 return true;
00821 }
00822
00823
00824 void FUShmBuffer::sem_print() {
00825 cout << "--> current sem values:" << endl << " lock=" << semctl(semid(), 0,
00826 GETVAL) << endl << " wraw=" << semctl(semid(), 1, GETVAL)
00827 << " rraw=" << semctl(semid(), 2, GETVAL) << endl << " wdsc="
00828 << semctl(semid(), 3, GETVAL) << " rdsc=" << semctl(semid(), 4,
00829 GETVAL) << endl << " wrec=" << semctl(semid(), 5, GETVAL)
00830 << " rrec=" << semctl(semid(), 6, GETVAL) << endl << " wdqm="
00831 << semctl(semid(), 7, GETVAL) << " rdqm=" << semctl(semid(), 8,
00832 GETVAL) << endl;
00833 }
00834
00835
00836 void FUShmBuffer::printEvtState(unsigned int index) {
00837 evt::State_t state = evtState(index);
00838 std::string stateName;
00839 if (state == evt::EMPTY)
00840 stateName = "EMPTY";
00841 else if (state == evt::STOP)
00842 stateName = "STOP";
00843 else if (state == evt::RAWWRITING)
00844 stateName = "RAWWRITING";
00845 else if (state == evt::RAWWRITTEN)
00846 stateName = "RAWRITTEN";
00847 else if (state == evt::RAWREADING)
00848 stateName = "RAWREADING";
00849 else if (state == evt::RAWREAD)
00850 stateName = "RAWREAD";
00851 else if (state == evt::PROCESSING)
00852 stateName = "PROCESSING";
00853 else if (state == evt::PROCESSED)
00854 stateName = "PROCESSED";
00855 else if (state == evt::RECOWRITING)
00856 stateName = "RECOWRITING";
00857 else if (state == evt::RECOWRITTEN)
00858 stateName = "RECOWRITTEN";
00859 else if (state == evt::SENDING)
00860 stateName = "SENDING";
00861 else if (state == evt::SENT)
00862 stateName = "SENT";
00863 else if (state == evt::DISCARDING)
00864 stateName = "DISCARDING";
00865 cout << "evt " << index << " in state '" << stateName << "'." << endl;
00866 }
00867
00868
00869 void FUShmBuffer::printDqmState(unsigned int index) {
00870 dqm::State_t state = dqmState(index);
00871 cout << "dqm evt " << index << " in state '" << state << "'." << endl;
00872 }
00873
00874
00875 FUShmBuffer* FUShmBuffer::createShmBuffer(bool segmentationMode,
00876 unsigned int nRawCells, unsigned int nRecoCells,
00877 unsigned int nDqmCells, unsigned int rawCellSize,
00878 unsigned int recoCellSize, unsigned int dqmCellSize) {
00879
00880 if (FUShmBuffer::releaseSharedMemory())
00881 cout << "FUShmBuffer::createShmBuffer: "
00882 << "REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL." << endl;
00883
00884
00885 int size = sizeof(unsigned int) * 7;
00886 int shmid = shm_create(FUShmBuffer::getShmDescriptorKey(), size);
00887 if (shmid < 0)
00888 return 0;
00889 void*shmAddr = shm_attach(shmid);
00890 if (0 == shmAddr)
00891 return 0;
00892
00893 if (1 != shm_nattch(shmid)) {
00894 cout << "FUShmBuffer::createShmBuffer() FAILED: nattch=" << shm_nattch(
00895 shmid) << endl;
00896 shmdt(shmAddr);
00897 return 0;
00898 }
00899
00900 unsigned int* p = (unsigned int*) shmAddr;
00901 *p++ = segmentationMode;
00902 *p++ = nRawCells;
00903 *p++ = nRecoCells;
00904 *p++ = nDqmCells;
00905 *p++ = rawCellSize;
00906 *p++ = recoCellSize;
00907 *p++ = dqmCellSize;
00908 shmdt(shmAddr);
00909
00910
00911 size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
00912 nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
00913 shmid = shm_create(FUShmBuffer::getShmKey(), size);
00914 if (shmid < 0)
00915 return 0;
00916 int semid = sem_create(FUShmBuffer::getSemKey(), 9);
00917 if (semid < 0)
00918 return 0;
00919 shmAddr = shm_attach(shmid);
00920 if (0 == shmAddr)
00921 return 0;
00922
00923 if (1 != shm_nattch(shmid)) {
00924 cout << "FUShmBuffer::createShmBuffer FAILED: nattch=" << shm_nattch(
00925 shmid) << endl;
00926 shmdt(shmAddr);
00927 return 0;
00928 }
00929 FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
00930 nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
00931 dqmCellSize);
00932
00933 cout << "FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."
00934 << endl;
00935 cout << " segmentationMode="
00936 << segmentationMode << endl;
00937
00938 buffer->initialize(shmid, semid);
00939
00940 return buffer;
00941 }
00942
00943
00944 FUShmBuffer* FUShmBuffer::getShmBuffer() {
00945
00946 int size = sizeof(unsigned int) * 7;
00947 int shmid = shm_get(FUShmBuffer::getShmDescriptorKey(), size);
00948 if (shmid < 0)
00949 return 0;
00950 void* shmAddr = shm_attach(shmid);
00951 if (0 == shmAddr)
00952 return 0;
00953
00954 unsigned int *p = (unsigned int*) shmAddr;
00955 bool segmentationMode = *p++;
00956 unsigned int nRawCells = *p++;
00957 unsigned int nRecoCells = *p++;
00958 unsigned int nDqmCells = *p++;
00959 unsigned int rawCellSize = *p++;
00960 unsigned int recoCellSize = *p++;
00961 unsigned int dqmCellSize = *p++;
00962 shmdt(shmAddr);
00963
00964 cout << "FUShmBuffer::getShmBuffer():" << " segmentationMode="
00965 << segmentationMode << " nRawCells=" << nRawCells << " nRecoCells="
00966 << nRecoCells << " nDqmCells=" << nDqmCells << " rawCellSize="
00967 << rawCellSize << " recoCellSize=" << recoCellSize
00968 << " dqmCellSize=" << dqmCellSize << endl;
00969
00970
00971 size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
00972 nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
00973 shmid = shm_get(FUShmBuffer::getShmKey(), size);
00974 if (shmid < 0)
00975 return 0;
00976 int semid = sem_get(FUShmBuffer::getSemKey(), 9);
00977 if (semid < 0)
00978 return 0;
00979 shmAddr = shm_attach(shmid);
00980 if (0 == shmAddr)
00981 return 0;
00982
00983 if (0 == shm_nattch(shmid)) {
00984 cout << "FUShmBuffer::getShmBuffer() FAILED: nattch=" << shm_nattch(
00985 shmid) << endl;
00986 return 0;
00987 }
00988
00989 FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
00990 nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
00991 dqmCellSize);
00992
00993 cout << "FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."
00994 << endl;
00995 cout << " segmentationMode="
00996 << segmentationMode << endl;
00997
00998 buffer->setClientPrcId(getpid());
00999
01000 return buffer;
01001 }
01002
01003
01004 bool FUShmBuffer::releaseSharedMemory() {
01005
01006 int size = sizeof(unsigned int) * 7;
01007 int shmidd = shm_get(FUShmBuffer::getShmDescriptorKey(), size);
01008 if (shmidd < 0)
01009 return false;
01010 void* shmAddr = shm_attach(shmidd);
01011 if (0 == shmAddr)
01012 return false;
01013
01014 unsigned int*p = (unsigned int*) shmAddr;
01015 bool segmentationMode = *p++;
01016 unsigned int nRawCells = *p++;
01017 unsigned int nRecoCells = *p++;
01018 unsigned int nDqmCells = *p++;
01019 unsigned int rawCellSize = *p++;
01020 unsigned int recoCellSize = *p++;
01021 unsigned int dqmCellSize = *p++;
01022 shmdt(shmAddr);
01023
01024
01025 size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
01026 nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
01027 int shmid = shm_get(FUShmBuffer::getShmKey(), size);
01028 if (shmid < 0)
01029 return false;
01030 int semid = sem_get(FUShmBuffer::getSemKey(), 9);
01031 if (semid < 0)
01032 return false;
01033 shmAddr = shm_attach(shmid);
01034 if (0 == shmAddr)
01035 return false;
01036
01037 int att = 0;
01038 for (; att < 10; att++) {
01039 if (shm_nattch(shmid) > 1) {
01040 cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="
01041 << shm_nattch(shmid)
01042 << ", failed attempt to release shared memory." << endl;
01043 ::sleep(1);
01044 } else
01045 break;
01046 }
01047
01048 if (att >= 10)
01049 return false;
01050
01051 if (segmentationMode) {
01052 FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
01053 nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
01054 dqmCellSize);
01055 int cellid;
01056 for (unsigned int i = 0; i < nRawCells; i++) {
01057 cellid = shm_get(buffer->rawCellShmKey(i),
01058 FUShmRawCell::size(rawCellSize));
01059 if ((shm_destroy(cellid) == -1))
01060 return false;
01061 }
01062 for (unsigned int i = 0; i < nRecoCells; i++) {
01063 cellid = shm_get(buffer->recoCellShmKey(i),
01064 FUShmRecoCell::size(recoCellSize));
01065 if ((shm_destroy(cellid) == -1))
01066 return false;
01067 }
01068 for (unsigned int i = 0; i < nDqmCells; i++) {
01069 cellid = shm_get(buffer->dqmCellShmKey(i),
01070 FUShmDqmCell::size(dqmCellSize));
01071 if ((shm_destroy(cellid) == -1))
01072 return false;
01073 }
01074 }
01075 shmdt(shmAddr);
01076 if (sem_destroy(semid) == -1)
01077 return false;
01078 if (shm_destroy(shmid) == -1)
01079 return false;
01080 if (shm_destroy(shmidd) == -1)
01081 return false;
01082
01083 return true;
01084 }
01085
01086
01087 unsigned int FUShmBuffer::size(bool segmentationMode, unsigned int nRawCells,
01088 unsigned int nRecoCells, unsigned int nDqmCells,
01089 unsigned int rawCellSize, unsigned int recoCellSize,
01090 unsigned int dqmCellSize) {
01091 unsigned int offset = sizeof(FUShmBuffer) + sizeof(unsigned int) * 4
01092 * nRawCells + sizeof(evt::State_t) * nRawCells
01093 + sizeof(dqm::State_t) * nDqmCells;
01094
01095 unsigned int rawCellTotalSize = FUShmRawCell::size(rawCellSize);
01096 unsigned int recoCellTotalSize = FUShmRecoCell::size(recoCellSize);
01097 unsigned int dqmCellTotalSize = FUShmDqmCell::size(dqmCellSize);
01098
01099 unsigned int realSize = (segmentationMode) ? offset + sizeof(key_t)
01100 * (nRawCells + nRecoCells + nDqmCells) : offset + rawCellTotalSize
01101 * nRawCells + recoCellTotalSize * nRecoCells + dqmCellTotalSize
01102 * nDqmCells;
01103
01104 unsigned int result = realSize / 0x10 * 0x10 + (realSize % 0x10 > 0) * 0x10;
01105
01106 return result;
01107 }
01108
01109
01110 key_t FUShmBuffer::getShmDescriptorKey() {
01111 key_t result = getuid() * 1000 + SHM_DESCRIPTOR_KEYID;
01112 if (result == (key_t) -1)
01113 cout << "FUShmBuffer::getShmDescriptorKey: failed " << "for file "
01114 << shmKeyPath_ << "!" << endl;
01115 return result;
01116 }
01117
01118
01119 key_t FUShmBuffer::getShmKey() {
01120 key_t result = getuid() * 1000 + SHM_KEYID;
01121 if (result == (key_t) -1)
01122 cout << "FUShmBuffer::getShmKey: ftok() failed " << "for file "
01123 << shmKeyPath_ << "!" << endl;
01124 return result;
01125 }
01126
01127
01128 key_t FUShmBuffer::getSemKey() {
01129 key_t result = getuid() * 1000 + SEM_KEYID;
01130 if (result == (key_t) -1)
01131 cout << "FUShmBuffer::getSemKey: ftok() failed " << "for file "
01132 << semKeyPath_ << "!" << endl;
01133 return result;
01134 }
01135
01136
01137 int FUShmBuffer::shm_create(key_t key, int size) {
01138
01139 int shmid = shmget(key, 1, 0644);
01140 if (shmid != -1) {
01141
01142 shmid_ds shmstat;
01143 shmctl(shmid, IPC_STAT, &shmstat);
01144 cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01145 << " created by process " << shmstat.shm_cpid << " owned by "
01146 << shmstat.shm_perm.uid << " permissions " << hex
01147 << shmstat.shm_perm.mode << dec << endl;
01148 shmctl(shmid, IPC_RMID, &shmstat);
01149 }
01150 shmid = shmget(key, size, IPC_CREAT | 0644);
01151 if (shmid == -1) {
01152 int err = errno;
01153 cout << "FUShmBuffer::shm_create(" << key << "," << size
01154 << ") failed: " << strerror(err) << endl;
01155 }
01156 return shmid;
01157 }
01158
01159
01160 int FUShmBuffer::shm_get(key_t key, int size) {
01161 int shmid = shmget(key, size, 0644);
01162 if (shmid == -1) {
01163 int err = errno;
01164 cout << "FUShmBuffer::shm_get(" << key << "," << size << ") failed: "
01165 << strerror(err) << endl;
01166 }
01167 return shmid;
01168 }
01169
01170
01171 void* FUShmBuffer::shm_attach(int shmid) {
01172 void* result = shmat(shmid, NULL, 0);
01173 if (0 == result) {
01174 int err = errno;
01175 cout << "FUShmBuffer::shm_attach(" << shmid << ") failed: "
01176 << strerror(err) << endl;
01177 }
01178 return result;
01179 }
01180
01181
01182 int FUShmBuffer::shm_nattch(int shmid) {
01183 shmid_ds shmstat;
01184 shmctl(shmid, IPC_STAT, &shmstat);
01185 return shmstat.shm_nattch;
01186 }
01187
01188
01189 int FUShmBuffer::shm_destroy(int shmid) {
01190 shmid_ds shmstat;
01191 int result = shmctl(shmid, IPC_RMID, &shmstat);
01192 if (result == -1)
01193 cout << "FUShmBuffer::shm_destroy(shmid=" << shmid << ") failed."
01194 << endl;
01195 return result;
01196 }
01197
01198
01199 int FUShmBuffer::sem_create(key_t key, int nsem) {
01200 int semid = semget(key, nsem, IPC_CREAT | 0666);
01201 if (semid < 0) {
01202 int err = errno;
01203 cout << "FUShmBuffer::sem_create(key=" << key << ",nsem=" << nsem
01204 << ") failed: " << strerror(err) << endl;
01205 }
01206 return semid;
01207 }
01208
01209
01210 int FUShmBuffer::sem_get(key_t key, int nsem) {
01211 int semid = semget(key, nsem, 0666);
01212 if (semid < 0) {
01213 int err = errno;
01214 cout << "FUShmBuffer::sem_get(key=" << key << ",nsem=" << nsem
01215 << ") failed: " << strerror(err) << endl;
01216 }
01217 return semid;
01218 }
01219
01220
01221 int FUShmBuffer::sem_destroy(int semid) {
01222 int result = semctl(semid, 0, IPC_RMID);
01223 if (result == -1)
01224 cout << "FUShmBuffer::sem_destroy(semid=" << semid << ") failed."
01225 << endl;
01226 return result;
01227 }
01228
01230
01232
01233
01234 unsigned int FUShmBuffer::nextIndex(unsigned int offset, unsigned int nCells,
01235 unsigned int& iNext) {
01236 lock();
01237 unsigned int* pindex = (unsigned int*) ((unsigned long) this + offset);
01238 pindex += iNext;
01239 iNext = (iNext + 1) % nCells;
01240 unsigned int result = *pindex;
01241 unlock();
01242 return result;
01243 }
01244
01245
01246 void FUShmBuffer::postIndex(unsigned int index, unsigned int offset,
01247 unsigned int nCells, unsigned int& iLast) {
01248 lock();
01249 unsigned int* pindex = (unsigned int*) ((unsigned long) this + offset);
01250 pindex += iLast;
01251 *pindex = index;
01252 iLast = (iLast + 1) % nCells;
01253 unlock();
01254 }
01255
01256
01257 unsigned int FUShmBuffer::nextRawWriteIndex() {
01258 return nextIndex(rawWriteOffset_, nRawCells_, rawWriteNext_);
01259 }
01260
01261
01262 unsigned int FUShmBuffer::nextRawReadIndex() {
01263 return nextIndex(rawReadOffset_, nRawCells_, rawReadNext_);
01264 }
01265
01266
01267 void FUShmBuffer::postRawIndexToWrite(unsigned int index) {
01268 postIndex(index, rawWriteOffset_, nRawCells_, rawWriteLast_);
01269 }
01270
01271
01272 void FUShmBuffer::postRawIndexToRead(unsigned int index) {
01273 postIndex(index, rawReadOffset_, nRawCells_, rawReadLast_);
01274 }
01275
01276
01277 unsigned int FUShmBuffer::nextRecoWriteIndex() {
01278 return nextIndex(recoWriteOffset_, nRecoCells_, recoWriteNext_);
01279 }
01280
01281
01282 unsigned int FUShmBuffer::nextRecoReadIndex() {
01283 return nextIndex(recoReadOffset_, nRecoCells_, recoReadNext_);
01284 }
01285
01286
01287 void FUShmBuffer::postRecoIndexToWrite(unsigned int index) {
01288 postIndex(index, recoWriteOffset_, nRecoCells_, recoWriteLast_);
01289 }
01290
01291
01292 void FUShmBuffer::postRecoIndexToRead(unsigned int index) {
01293 postIndex(index, recoReadOffset_, nRecoCells_, recoReadLast_);
01294 }
01295
01296
01297 unsigned int FUShmBuffer::nextDqmWriteIndex() {
01298 return nextIndex(dqmWriteOffset_, nDqmCells_, dqmWriteNext_);
01299 }
01300
01301
01302 unsigned int FUShmBuffer::nextDqmReadIndex() {
01303 return nextIndex(dqmReadOffset_, nDqmCells_, dqmReadNext_);
01304 }
01305
01306
01307 void FUShmBuffer::postDqmIndexToWrite(unsigned int index) {
01308 postIndex(index, dqmWriteOffset_, nDqmCells_, dqmWriteLast_);
01309 }
01310
01311
01312 void FUShmBuffer::postDqmIndexToRead(unsigned int index) {
01313 postIndex(index, dqmReadOffset_, nDqmCells_, dqmReadLast_);
01314 }
01315
01316
01317 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber) {
01318 unsigned int *pevt = (unsigned int*) ((unsigned long) this
01319 + evtNumberOffset_);
01320 for (unsigned int i = 0; i < nRawCells_; i++) {
01321 if ((*pevt++) == evtNumber)
01322 return i;
01323 }
01324 XCEPT_ASSERT(false, evf::Exception, "This point should not be reached!");
01325 return 0xffffffff;
01326 }
01327
01328
01329 evt::State_t FUShmBuffer::evtState(unsigned int index) {
01330 stringstream details;
01331 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01332 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01333 evt::State_t *pstate = (evt::State_t*) ((unsigned long) this
01334 + evtStateOffset_);
01335 pstate += index;
01336 return *pstate;
01337 }
01338
01339
01340 dqm::State_t FUShmBuffer::dqmState(unsigned int index) {
01341 stringstream details;
01342 details << "index<nDqmCells_ assertion failed! Actual index is " << index;
01343 XCEPT_ASSERT(index < nDqmCells_, evf::Exception, details.str());
01344 dqm::State_t *pstate = (dqm::State_t*) ((unsigned long) this
01345 + dqmStateOffset_);
01346 pstate += index;
01347 return *pstate;
01348 }
01349
01350
01351 unsigned int FUShmBuffer::evtNumber(unsigned int index) {
01352 stringstream details;
01353 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01354 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01355 unsigned int *pevt = (unsigned int*) ((unsigned long) this
01356 + evtNumberOffset_);
01357 pevt += index;
01358 return *pevt;
01359 }
01360
01361
01362 pid_t FUShmBuffer::evtPrcId(unsigned int index) {
01363 stringstream details;
01364 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01365 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01366 pid_t *prcid = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
01367 prcid += index;
01368 return *prcid;
01369 }
01370
01371
01372 time_t FUShmBuffer::evtTimeStamp(unsigned int index) {
01373 stringstream details;
01374 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01375 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01376 time_t *ptstmp = (time_t*) ((unsigned long) this + evtTimeStampOffset_);
01377 ptstmp += index;
01378 return *ptstmp;
01379 }
01380
01381
01382 pid_t FUShmBuffer::clientPrcId(unsigned int index) {
01383 stringstream details;
01384 details << "index<nClientsMax_ assertion failed! Actual index is " << index;
01385 XCEPT_ASSERT(index < nClientsMax_, evf::Exception, details.str());
01386 pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
01387 prcid += index;
01388 return *prcid;
01389 }
01390
01391
01392 bool FUShmBuffer::setEvtState(unsigned int index, evt::State_t state) {
01393 stringstream details;
01394 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01395 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01396 evt::State_t *pstate = (evt::State_t*) ((unsigned long) this
01397 + evtStateOffset_);
01398 pstate += index;
01399 lock();
01400 *pstate = state;
01401 unlock();
01402 return true;
01403 }
01404
01405
01406 bool FUShmBuffer::setDqmState(unsigned int index, dqm::State_t state) {
01407 stringstream details;
01408 details << "index<nDqmCells_ assertion failed! Actual index is " << index;
01409 XCEPT_ASSERT(index < nDqmCells_, evf::Exception, details.str());
01410 dqm::State_t *pstate = (dqm::State_t*) ((unsigned long) this
01411 + dqmStateOffset_);
01412 pstate += index;
01413 lock();
01414 *pstate = state;
01415 unlock();
01416 return true;
01417 }
01418
01419
01420 bool FUShmBuffer::setEvtDiscard(unsigned int index, unsigned int discard,
01421 bool checkValue) {
01422 stringstream details;
01423 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01424 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01425 unsigned int *pcount = (unsigned int*) ((unsigned long) this
01426 + evtDiscardOffset_);
01427 pcount += index;
01428 lock();
01429 if (checkValue) {
01430 if (*pcount < discard)
01431 *pcount = discard;
01432 } else
01433 *pcount = discard;
01434 unlock();
01435 return true;
01436 }
01437
01438
01439 int FUShmBuffer::incEvtDiscard(unsigned int index) {
01440 int result = 0;
01441 stringstream details;
01442 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01443 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01444 unsigned int *pcount = (unsigned int*) ((unsigned long) this
01445 + evtDiscardOffset_);
01446 pcount += index;
01447 lock();
01448 (*pcount)++;
01449 result = *pcount;
01450 unlock();
01451 return result;
01452 }
01453
01454
01455 bool FUShmBuffer::setEvtNumber(unsigned int index, unsigned int evtNumber) {
01456 stringstream details;
01457 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01458 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01459 unsigned int *pevt = (unsigned int*) ((unsigned long) this
01460 + evtNumberOffset_);
01461 pevt += index;
01462 lock();
01463 *pevt = evtNumber;
01464 unlock();
01465 return true;
01466 }
01467
01468
01469 bool FUShmBuffer::setEvtPrcId(unsigned int index, pid_t prcId) {
01470 stringstream details;
01471 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01472 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01473 pid_t* prcid = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
01474 prcid += index;
01475 lock();
01476 *prcid = prcId;
01477 unlock();
01478 return true;
01479 }
01480
01481
01482 bool FUShmBuffer::setEvtTimeStamp(unsigned int index, time_t timeStamp) {
01483 stringstream details;
01484 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01485 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01486 time_t *ptstmp = (time_t*) ((unsigned long) this + evtTimeStampOffset_);
01487 ptstmp += index;
01488 lock();
01489 *ptstmp = timeStamp;
01490 unlock();
01491 return true;
01492 }
01493
01494
01495 bool FUShmBuffer::setClientPrcId(pid_t prcId) {
01496 lock();
01497 stringstream details;
01498 details << "nClients_<nClientsMax_ assertion failed! Actual nClients is "
01499 << nClients_ << " and nClientsMax is " << nClientsMax_;
01500 XCEPT_ASSERT(nClients_ < nClientsMax_, evf::Exception, details.str());
01501 pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
01502 for (unsigned int i = 0; i < nClients_; i++) {
01503 if ((*prcid) == prcId) {
01504 unlock();
01505 return false;
01506 }
01507 prcid++;
01508 }
01509 nClients_++;
01510 *prcid = prcId;
01511 unlock();
01512 return true;
01513 }
01514
01515
01516 bool FUShmBuffer::removeClientPrcId(pid_t prcId) {
01517 lock();
01518 pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
01519 unsigned int iClient(0);
01520 while (iClient <= nClients_ && (*prcid) != prcId) {
01521 prcid++;
01522 iClient++;
01523 }
01524 stringstream details;
01525 details << "iClient!=nClients_ assertion failed! Actual iClient is "
01526 << iClient;
01527 XCEPT_ASSERT(iClient != nClients_, evf::Exception, details.str());
01528 pid_t* next = prcid;
01529 next++;
01530 while (iClient < nClients_ - 1) {
01531 *prcid = *next;
01532 prcid++;
01533 next++;
01534 iClient++;
01535 }
01536 nClients_--;
01537 unlock();
01538 return true;
01539 }
01540
01541
01542 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell) {
01543 FUShmRawCell* result(0);
01544
01545 if (iCell >= nRawCells_) {
01546 cout << "FUShmBuffer::rawCell(" << iCell << ") ERROR: " << "iCell="
01547 << iCell << " >= nRawCells()=" << nRawCells_ << endl;
01548 return result;
01549 }
01550
01551 if (segmentationMode_) {
01552 key_t shmkey = rawCellShmKey(iCell);
01553 int shmid = shm_get(shmkey, rawCellTotalSize_);
01554 void* cellAddr = shm_attach(shmid);
01555 result = new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01556 } else {
01557 result = (FUShmRawCell*) ((unsigned long) this + rawCellOffset_ + iCell
01558 * rawCellTotalSize_);
01559 }
01560
01561 return result;
01562 }
01563
01564
01565 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell) {
01566 FUShmRecoCell* result(0);
01567
01568 if (iCell >= nRecoCells_) {
01569 cout << "FUShmBuffer::recoCell(" << iCell << ") ERROR: " << "iCell="
01570 << iCell << " >= nRecoCells=" << nRecoCells_ << endl;
01571 return result;
01572 }
01573
01574 if (segmentationMode_) {
01575 key_t shmkey = recoCellShmKey(iCell);
01576 int shmid = shm_get(shmkey, recoCellTotalSize_);
01577 void* cellAddr = shm_attach(shmid);
01578 result = new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01579 } else {
01580 result = (FUShmRecoCell*) ((unsigned long) this + recoCellOffset_
01581 + iCell * recoCellTotalSize_);
01582 }
01583
01584 return result;
01585 }
01586
01587
01588 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell) {
01589 FUShmDqmCell* result(0);
01590
01591 if (iCell >= nDqmCells_) {
01592 cout << "FUShmBuffer::dqmCell(" << iCell << ") ERROR: " << "iCell="
01593 << iCell << " >= nDqmCells=" << nDqmCells_ << endl;
01594 return result;
01595 }
01596
01597 if (segmentationMode_) {
01598 key_t shmkey = dqmCellShmKey(iCell);
01599 int shmid = shm_get(shmkey, dqmCellTotalSize_);
01600 void* cellAddr = shm_attach(shmid);
01601 result = new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01602 } else {
01603 result = (FUShmDqmCell*) ((unsigned long) this + dqmCellOffset_ + iCell
01604 * dqmCellTotalSize_);
01605 }
01606
01607 return result;
01608 }
01609
01610
01611 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index) {
01612 stringstream details;
01613 details << "index<nRawCells_ assertion failed! Actual index is " << index;
01614 XCEPT_ASSERT(index < nRawCells_, evf::Exception, details.str());
01615 unsigned int *pcount = (unsigned int*) ((unsigned long) this
01616 + evtDiscardOffset_);
01617 pcount += index;
01618 lock();
01619 stringstream details2;
01620 details2 << "*pcount>0 assertion failed! Value at pcount is " << *pcount;
01621 XCEPT_ASSERT(*pcount > 0, evf::Exception, details2.str());
01622 --(*pcount);
01623 bool result = (*pcount == 0);
01624 unlock();
01625 return result;
01626 }
01627
01628
01629 key_t FUShmBuffer::shmKey(unsigned int iCell, unsigned int offset) {
01630 if (!segmentationMode_) {
01631 cout << "FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"
01632 << endl;
01633 return -1;
01634 }
01635 key_t* addr = (key_t*) ((unsigned long) this + offset);
01636 for (unsigned int i = 0; i < iCell; i++)
01637 ++addr;
01638 return *addr;
01639 }
01640
01641
01642 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell) {
01643 if (iCell >= nRawCells_) {
01644 cout << "FUShmBuffer::rawCellShmKey() ERROR: " << "iCell>=nRawCells: "
01645 << iCell << ">=" << nRawCells_ << endl;
01646 return -1;
01647 }
01648 return shmKey(iCell, rawCellOffset_);
01649 }
01650
01651
01652 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell) {
01653 if (iCell >= nRecoCells_) {
01654 cout << "FUShmBuffer::recoCellShmKey() ERROR: "
01655 << "iCell>=nRecoCells: " << iCell << ">=" << nRecoCells_
01656 << endl;
01657 return -1;
01658 }
01659 return shmKey(iCell, recoCellOffset_);
01660 }
01661
01662
01663 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell) {
01664 if (iCell >= nDqmCells_) {
01665 cout << "FUShmBuffer::dqmCellShmKey() ERROR: " << "iCell>=nDqmCells: "
01666 << iCell << ">=" << nDqmCells_ << endl;
01667 return -1;
01668 }
01669 return shmKey(iCell, dqmCellOffset_);
01670 }
01671
01672
01673 void FUShmBuffer::sem_init(int isem, int value) {
01674 if (semctl(semid(), isem, SETVAL, value) < 0) {
01675 cout << "FUShmBuffer: FATAL ERROR in semaphore initialization." << endl;
01676 }
01677 }
01678
01679
01680 int FUShmBuffer::sem_wait(int isem) {
01681 struct sembuf sops[1];
01682 sops[0].sem_num = isem;
01683 sops[0].sem_op = -1;
01684 sops[0].sem_flg = 0;
01685 if (semop(semid(), sops, 1) == -1) {
01686 cout << "FUShmBuffer: ERROR in semaphore operation sem_wait." << endl;
01687 return -1;
01688 }
01689 return 0;
01690 }
01691
01692
01693 void FUShmBuffer::sem_post(int isem) {
01694 struct sembuf sops[1];
01695 sops[0].sem_num = isem;
01696 sops[0].sem_op = 1;
01697 sops[0].sem_flg = 0;
01698 if (semop(semid(), sops, 1) == -1) {
01699 cout << "FUShmBuffer: ERROR in semaphore operation sem_post." << endl;
01700 }
01701 }