00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00011
00012 #include <unistd.h>
00013 #include <iostream>
00014 #include <string>
00015 #include <cassert>
00016
00017 #include <sstream>
00018 #include <fstream>
00019
00020 #include <cstdlib>
00021 #include <cstring>
00022
00023 #define SHM_DESCRIPTOR_KEYID 1
00024 #define SHM_KEYID 2
00025 #define SEM_KEYID 1
00026
00027 #define NSKIP_MAX 100
00028
00029
00030 using namespace std;
00031 using namespace evf;
00032
00033 const char* FUShmBuffer::shmKeyPath_ =
00034 (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00035 const char* FUShmBuffer::semKeyPath_ =
00036 (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00037
00038
00040
00042
00043
00044 FUShmBuffer::FUShmBuffer(bool segmentationMode,
00045 unsigned int nRawCells,
00046 unsigned int nRecoCells,
00047 unsigned int nDqmCells,
00048 unsigned int rawCellSize,
00049 unsigned int recoCellSize,
00050 unsigned int dqmCellSize)
00051 : segmentationMode_(segmentationMode)
00052 , nClientsMax_(16)
00053 , nRawCells_(nRawCells)
00054 , rawCellPayloadSize_(rawCellSize)
00055 , nRecoCells_(nRecoCells)
00056 , recoCellPayloadSize_(recoCellSize)
00057 , nDqmCells_(nDqmCells)
00058 , dqmCellPayloadSize_(dqmCellSize)
00059 {
00060 rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00061 recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00062 dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00063
00064 void* addr;
00065
00066 rawWriteOffset_=sizeof(FUShmBuffer);
00067 addr=(void*)((unsigned int)this+rawWriteOffset_);
00068 new (addr) unsigned int[nRawCells_];
00069
00070 rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00071 addr=(void*)((unsigned int)this+rawReadOffset_);
00072 new (addr) unsigned int[nRawCells_];
00073
00074 recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00075 addr=(void*)((unsigned int)this+recoWriteOffset_);
00076 new (addr) unsigned int[nRecoCells_];
00077
00078 recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00079 addr=(void*)((unsigned int)this+recoReadOffset_);
00080 new (addr) unsigned int[nRecoCells_];
00081
00082 dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00083 addr=(void*)((unsigned int)this+dqmWriteOffset_);
00084 new (addr) unsigned int[nDqmCells_];
00085
00086 dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00087 addr=(void*)((unsigned int)this+dqmReadOffset_);
00088 new (addr) unsigned int[nDqmCells_];
00089
00090 evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00091 addr=(void*)((unsigned int)this+evtStateOffset_);
00092 new (addr) evt::State_t[nRawCells_];
00093
00094 evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00095 addr=(void*)((unsigned int)this+evtDiscardOffset_);
00096 new (addr) unsigned int[nRawCells_];
00097
00098 evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00099 addr=(void*)((unsigned int)this+evtNumberOffset_);
00100 new (addr) unsigned int[nRawCells_];
00101
00102 evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00103 addr=(void*)((unsigned int)this+evtPrcIdOffset_);
00104 new (addr) pid_t[nRawCells_];
00105
00106 evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00107 addr=(void*)((unsigned int)this+evtTimeStampOffset_);
00108 new (addr) time_t[nRawCells_];
00109
00110 dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00111 addr=(void*)((unsigned int)this+dqmStateOffset_);
00112 new (addr) dqm::State_t[nDqmCells_];
00113
00114 clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00115 addr=(void*)((unsigned int)this+clientPrcIdOffset_);
00116 new (addr) pid_t[nClientsMax_];
00117
00118 rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00119
00120 if (segmentationMode_) {
00121 recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00122 dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00123 addr=(void*)((unsigned int)this+rawCellOffset_);
00124 new (addr) key_t[nRawCells_];
00125 addr=(void*)((unsigned int)this+recoCellOffset_);
00126 new (addr) key_t[nRecoCells_];
00127 addr=(void*)((unsigned int)this+dqmCellOffset_);
00128 new (addr) key_t[nDqmCells_];
00129 }
00130 else {
00131 recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00132 dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00133 for (unsigned int i=0;i<nRawCells_;i++) {
00134 addr=(void*)((unsigned int)this+rawCellOffset_+i*rawCellTotalSize_);
00135 new (addr) FUShmRawCell(rawCellSize);
00136 }
00137 for (unsigned int i=0;i<nRecoCells_;i++) {
00138 addr=(void*)((unsigned int)this+recoCellOffset_+i*recoCellTotalSize_);
00139 new (addr) FUShmRecoCell(recoCellSize);
00140 }
00141 for (unsigned int i=0;i<nDqmCells_;i++) {
00142 addr=(void*)((unsigned int)this+dqmCellOffset_+i*dqmCellTotalSize_);
00143 new (addr) FUShmDqmCell(dqmCellSize);
00144 }
00145 }
00146 }
00147
00148
00149
00150 FUShmBuffer::~FUShmBuffer()
00151 {
00152
00153 }
00154
00155
00157
00159
00160
00161 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00162 {
00163 shmid_=shmid;
00164 semid_=semid;
00165
00166 if (segmentationMode_) {
00167 int shmKeyId=666;
00168 key_t* keyAddr =(key_t*)((unsigned int)this+rawCellOffset_);
00169 for (unsigned int i=0;i<nRawCells_;i++) {
00170 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00171 int shmid =shm_create(*keyAddr,rawCellTotalSize_);
00172 void* shmAddr=shm_attach(shmid);
00173 new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00174 shmdt(shmAddr);
00175 ++keyAddr;
00176 }
00177 keyAddr =(key_t*)((unsigned int)this+recoCellOffset_);
00178 for (unsigned int i=0;i<nRecoCells_;i++) {
00179 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00180 int shmid =shm_create(*keyAddr,recoCellTotalSize_);
00181 void* shmAddr=shm_attach(shmid);
00182 new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00183 shmdt(shmAddr);
00184 ++keyAddr;
00185 }
00186 keyAddr =(key_t*)((unsigned int)this+dqmCellOffset_);
00187 for (unsigned int i=0;i<nDqmCells_;i++) {
00188 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00189 int shmid =shm_create(*keyAddr,dqmCellTotalSize_);
00190 void* shmAddr=shm_attach(shmid);
00191 new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00192 shmdt(shmAddr);
00193 ++keyAddr;
00194 }
00195 }
00196
00197 for (unsigned int i=0;i<nRawCells_;i++) {
00198 FUShmRawCell* cell=rawCell(i);
00199 cell->initialize(i);
00200 if (segmentationMode_) shmdt(cell);
00201 }
00202
00203 for (unsigned int i=0;i<nRecoCells_;i++) {
00204 FUShmRecoCell* cell=recoCell(i);
00205 cell->initialize(i);
00206 if (segmentationMode_) shmdt(cell);
00207 }
00208
00209 for (unsigned int i=0;i<nDqmCells_;i++) {
00210 FUShmDqmCell* cell=dqmCell(i);
00211 cell->initialize(i);
00212 if (segmentationMode_) shmdt(cell);
00213 }
00214
00215 reset();
00216 }
00217
00218
00219
00220 void FUShmBuffer::reset()
00221 {
00222 nClients_=0;
00223
00224
00225 sem_init(0,1);
00226 sem_init(1,nRawCells_);
00227 sem_init(2,0);
00228 sem_init(3,1);
00229 sem_init(4,0);
00230 sem_init(5,nRecoCells_);
00231 sem_init(6,0);
00232 sem_init(7,nDqmCells_);
00233 sem_init(8,0);
00234
00235 sem_print();
00236
00237 unsigned int *iWrite,*iRead;
00238
00239 rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00240 iWrite=(unsigned int*)((unsigned int)this+rawWriteOffset_);
00241 iRead =(unsigned int*)((unsigned int)this+rawReadOffset_);
00242 for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00243
00244 recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00245 iWrite=(unsigned int*)((unsigned int)this+recoWriteOffset_);
00246 iRead =(unsigned int*)((unsigned int)this+recoReadOffset_);
00247 for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00248
00249 dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00250 iWrite=(unsigned int*)((unsigned int)this+dqmWriteOffset_);
00251 iRead =(unsigned int*)((unsigned int)this+dqmReadOffset_);
00252 for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00253
00254 for (unsigned int i=0;i<nRawCells_;i++) {
00255 setEvtState(i,evt::EMPTY);
00256 setEvtDiscard(i,0);
00257 setEvtNumber(i,0xffffffff);
00258 setEvtPrcId(i,0);
00259 setEvtTimeStamp(i,0);
00260 }
00261
00262 for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00263 }
00264
00265
00266
00267 int FUShmBuffer::nbRawCellsToWrite() const
00268 {
00269 return semctl(semid(),1,GETVAL);
00270 }
00271
00272
00273
00274 int FUShmBuffer::nbRawCellsToRead() const
00275 {
00276 return semctl(semid(),2,GETVAL);
00277 }
00278
00279
00280
00281 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00282 {
00283 waitRawWrite();
00284 unsigned int iCell=nextRawWriteIndex();
00285 FUShmRawCell* cell =rawCell(iCell);
00286 evt::State_t state=evtState(iCell);
00287 assert(state==evt::EMPTY);
00288 setEvtState(iCell,evt::RAWWRITING);
00289 setEvtDiscard(iCell,1);
00290 return cell;
00291 }
00292
00293
00294
00295 FUShmRawCell* FUShmBuffer::rawCellToRead()
00296 {
00297 waitRawRead();
00298 unsigned int iCell=nextRawReadIndex();
00299 FUShmRawCell* cell=rawCell(iCell);
00300 evt::State_t state=evtState(iCell);
00301 assert(state==evt::RAWWRITTEN||state==evt::EMPTY);
00302 if (state==evt::RAWWRITTEN) {
00303 setEvtPrcId(iCell,getpid());
00304 setEvtState(iCell,evt::RAWREADING);
00305 }
00306 return cell;
00307 }
00308
00309
00310
00311 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00312 {
00313 waitRecoRead();
00314 unsigned int iCell =nextRecoReadIndex();
00315 FUShmRecoCell* cell =recoCell(iCell);
00316 unsigned int iRawCell=cell->rawCellIndex();
00317 if (iRawCell<nRawCells_) {
00318
00319
00320 setEvtState(iRawCell,evt::SENDING);
00321 }
00322 return cell;
00323 }
00324
00325
00326
00327 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00328 {
00329 waitDqmRead();
00330 unsigned int iCell=nextDqmReadIndex();
00331 FUShmDqmCell* cell=dqmCell(iCell);
00332 dqm::State_t state=dqmState(iCell);
00333 assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00334 if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00335 return cell;
00336 }
00337
00338
00339
00340 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00341 {
00342 waitRawDiscarded();
00343 FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00344 evt::State_t state=evtState(cell->index());
00345 assert(state==evt::PROCESSED||state==evt::SENT||state==evt::EMPTY);
00346 if (state!=evt::EMPTY) setEvtState(cell->index(),evt::DISCARDING);
00347 return cell;
00348 }
00349
00350
00351
00352 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00353 {
00354 evt::State_t state=evtState(cell->index());
00355 assert(state==evt::RAWWRITING);
00356 setEvtState(cell->index(),evt::RAWWRITTEN);
00357 setEvtNumber(cell->index(),cell->evtNumber());
00358 postRawIndexToRead(cell->index());
00359 if (segmentationMode_) shmdt(cell);
00360 postRawRead();
00361 }
00362
00363
00364
00365 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00366 {
00367 evt::State_t state=evtState(cell->index());
00368 assert(state==evt::RAWREADING);
00369 setEvtState(cell->index(),evt::RAWREAD);
00370 setEvtState(cell->index(),evt::PROCESSING);
00371 setEvtTimeStamp(cell->index(),time(0));
00372 if (segmentationMode_) shmdt(cell);
00373 }
00374
00375
00376
00377 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00378 {
00379 unsigned int iRawCell=cell->rawCellIndex();
00380 if (iRawCell<nRawCells_) {
00381
00382
00383 setEvtState(cell->rawCellIndex(),evt::SENT);
00384 }
00385 if (segmentationMode_) shmdt(cell);
00386 }
00387
00388
00389
00390 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00391 {
00392 dqm::State_t state=dqmState(cell->index());
00393 assert(state==dqm::SENDING||state==dqm::EMPTY);
00394 if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00395 if (segmentationMode_) shmdt(cell);
00396 }
00397
00398
00399
00400 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00401 {
00402 waitRawDiscard();
00403 if (rawCellReadyForDiscard(iCell)) {
00404 rawDiscardIndex_=iCell;
00405 evt::State_t state=evtState(iCell);
00406 assert(state==evt::PROCESSING||state==evt::SENT||state==evt::EMPTY);
00407 if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00408 postRawDiscarded();
00409 }
00410 else postRawDiscard();
00411 }
00412
00413
00414
00415 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00416 {
00417 releaseRawCell(cell);
00418 postRawDiscard();
00419 }
00420
00421
00422
00423 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00424 {
00425 FUShmRecoCell* cell=recoCell(iCell);
00426 unsigned int iRawCell=cell->rawCellIndex();
00427 if (iRawCell<nRawCells_) {
00428
00429
00430 scheduleRawCellForDiscard(iRawCell);
00431 }
00432 cell->clear();
00433 if (segmentationMode_) shmdt(cell);
00434 postRecoIndexToWrite(iCell);
00435 postRecoWrite();
00436 }
00437
00438
00439
00440 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00441 {
00442 dqm::State_t state=dqmState(iCell);
00443 assert(state==dqm::EMPTY||state==dqm::SENT);
00444 setDqmState(iCell,dqm::DISCARDING);
00445 FUShmDqmCell* cell=dqmCell(iCell);
00446 cell->clear();
00447 if (segmentationMode_) shmdt(cell);
00448 setDqmState(iCell,dqm::EMPTY);
00449 postDqmIndexToWrite(iCell);
00450 postDqmWrite();
00451 }
00452
00453
00454
00455 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00456 {
00457 evt::State_t state=evtState(cell->index());
00458 assert(state==evt::DISCARDING||state==evt::RAWWRITING||state==evt::EMPTY);
00459 setEvtState(cell->index(),evt::EMPTY);
00460 setEvtDiscard(cell->index(),0);
00461 setEvtNumber(cell->index(),0xffffffff);
00462 setEvtPrcId(cell->index(),0);
00463 setEvtTimeStamp(cell->index(),0);
00464 cell->clear();
00465 postRawIndexToWrite(cell->index());
00466 if (segmentationMode_) shmdt(cell);
00467 postRawWrite();
00468 }
00469
00470
00471
00472 void FUShmBuffer::writeRawEmptyEvent()
00473 {
00474 FUShmRawCell* cell=rawCellToWrite();
00475 evt::State_t state=evtState(cell->index());
00476 assert(state==evt::RAWWRITING);
00477 setEvtState(cell->index(),evt::EMPTY);
00478 postRawIndexToRead(cell->index());
00479 if (segmentationMode_) shmdt(cell);
00480 postRawRead();
00481 }
00482
00483
00484
00485 void FUShmBuffer::writeRecoEmptyEvent()
00486 {
00487 waitRecoWrite();
00488 unsigned int iCell=nextRecoWriteIndex();
00489 FUShmRecoCell* cell =recoCell(iCell);
00490 cell->clear();
00491 postRecoIndexToRead(iCell);
00492 if (segmentationMode_) shmdt(cell);
00493 postRecoRead();
00494 }
00495
00496
00497
00498 void FUShmBuffer::writeDqmEmptyEvent()
00499 {
00500 waitDqmWrite();
00501 unsigned int iCell=nextDqmWriteIndex();
00502 FUShmDqmCell* cell=dqmCell(iCell);
00503 cell->clear();
00504 postDqmIndexToRead(iCell);
00505 if (segmentationMode_) shmdt(cell);
00506 postDqmRead();
00507 }
00508
00509
00510
00511 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00512 {
00513 FUShmRawCell* cell=rawCellToWrite();
00514 scheduleRawEmptyCellForDiscard(cell);
00515 }
00516
00517
00518
00519 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00520 {
00521 waitRawDiscard();
00522 if (rawCellReadyForDiscard(cell->index())) {
00523 rawDiscardIndex_=cell->index();
00524 setEvtState(cell->index(),evt::EMPTY);
00525 setEvtNumber(cell->index(),0xffffffff);
00526 setEvtPrcId(cell->index(),0);
00527 setEvtTimeStamp(cell->index(),0);
00528 removeClientPrcId(getpid());
00529 if (segmentationMode_) shmdt(cell);
00530 postRawDiscarded();
00531 }
00532 else postRawDiscard();
00533 }
00534
00535
00536
00537 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
00538 unsigned int fuProcessId,
00539 unsigned int fuGuid,
00540 unsigned char *data,
00541 unsigned int dataSize)
00542 {
00543 if (dataSize>recoCellPayloadSize_) {
00544 cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00545 return false;
00546 }
00547
00548 waitRecoWrite();
00549 unsigned int iCell=nextRecoWriteIndex();
00550 FUShmRecoCell* cell =recoCell(iCell);
00551 cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00552 postRecoIndexToRead(iCell);
00553 if (segmentationMode_) shmdt(cell);
00554 postRecoRead();
00555 return true;
00556 }
00557
00558
00559
00560 bool FUShmBuffer::writeRecoEventData(unsigned int runNumber,
00561 unsigned int evtNumber,
00562 unsigned int outModId,
00563 unsigned int fuProcessId,
00564 unsigned int fuGuid,
00565 unsigned char *data,
00566 unsigned int dataSize)
00567 {
00568 if (dataSize>recoCellPayloadSize_) {
00569 cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00570 return false;
00571 }
00572
00573 waitRecoWrite();
00574 unsigned int iCell=nextRecoWriteIndex();
00575 FUShmRecoCell* cell =recoCell(iCell);
00576 unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00577
00578
00579 setEvtState(rawCellIndex,evt::RECOWRITING);
00580 incEvtDiscard(rawCellIndex);
00581 cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00582 fuProcessId,fuGuid,data,dataSize);
00583 setEvtState(rawCellIndex,evt::RECOWRITTEN);
00584 postRecoIndexToRead(iCell);
00585 if (segmentationMode_) shmdt(cell);
00586 postRecoRead();
00587 return true;
00588 }
00589
00590
00591
00592 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00593 unsigned int fuProcessId,
00594 unsigned int iRawCell)
00595 {
00596 FUShmRawCell *raw=rawCell(iRawCell);
00597
00598 unsigned int dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00599 unsigned char *data =new unsigned char[dataSize];
00600 uint32_t *pos =(uint32_t*)data;
00601
00602
00603
00604
00605
00606
00607
00608 *pos++=(uint32_t)2;
00609 *pos++=(uint32_t)runNumber;
00610
00611
00612 *pos++=(uint32_t)1;
00613 *pos++=(uint32_t)raw->evtNumber();
00614 for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00615 memcpy(pos,raw->payloadAddr(),raw->eventSize());
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647 waitRecoWrite();
00648 unsigned int iRecoCell=nextRecoWriteIndex();
00649 FUShmRecoCell* reco =recoCell(iRecoCell);
00650 setEvtState(iRawCell,evt::RECOWRITING);
00651 setEvtDiscard(iRawCell,1);
00652 reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00653 data,dataSize);
00654 delete [] data;
00655 setEvtState(iRawCell,evt::RECOWRITTEN);
00656 postRecoIndexToRead(iRecoCell);
00657 if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00658 postRecoRead();
00659 return true;
00660 }
00661
00662
00663
00664 bool FUShmBuffer::writeDqmEventData(unsigned int runNumber,
00665 unsigned int evtAtUpdate,
00666 unsigned int folderId,
00667 unsigned int fuProcessId,
00668 unsigned int fuGuid,
00669 unsigned char *data,
00670 unsigned int dataSize)
00671 {
00672 if (dataSize>dqmCellPayloadSize_) {
00673 cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00674 return false;
00675 }
00676
00677 waitDqmWrite();
00678 unsigned int iCell=nextDqmWriteIndex();
00679 FUShmDqmCell* cell=dqmCell(iCell);
00680 dqm::State_t state=dqmState(iCell);
00681 assert(state==dqm::EMPTY);
00682 setDqmState(iCell,dqm::WRITING);
00683 cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00684 setDqmState(iCell,dqm::WRITTEN);
00685 postDqmIndexToRead(iCell);
00686 if (segmentationMode_) shmdt(cell);
00687 postDqmRead();
00688 return true;
00689 }
00690
00691
00692
00693 void FUShmBuffer::sem_print()
00694 {
00695 cout<<"--> current sem values:"
00696 <<endl
00697 <<" lock="<<semctl(semid(),0,GETVAL)
00698 <<endl
00699 <<" wraw="<<semctl(semid(),1,GETVAL)
00700 <<" rraw="<<semctl(semid(),2,GETVAL)
00701 <<endl
00702 <<" wdsc="<<semctl(semid(),3,GETVAL)
00703 <<" rdsc="<<semctl(semid(),4,GETVAL)
00704 <<endl
00705 <<" wrec="<<semctl(semid(),5,GETVAL)
00706 <<" rrec="<<semctl(semid(),6,GETVAL)
00707 <<endl
00708 <<" wdqm="<<semctl(semid(),7,GETVAL)
00709 <<" rdqm="<<semctl(semid(),8,GETVAL)
00710 <<endl;
00711 }
00712
00713
00714
00715 void FUShmBuffer::printEvtState(unsigned int index)
00716 {
00717 evt::State_t state=evtState(index);
00718 std::string stateName;
00719 if (state==evt::EMPTY) stateName="EMPTY";
00720 else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00721 else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00722 else if (state==evt::RAWREADING) stateName="RAWREADING";
00723 else if (state==evt::RAWREAD) stateName="RAWREAD";
00724 else if (state==evt::PROCESSING) stateName="PROCESSING";
00725 else if (state==evt::PROCESSED) stateName="PROCESSED";
00726 else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00727 else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00728 else if (state==evt::SENDING) stateName="SENDING";
00729 else if (state==evt::SENT) stateName="SENT";
00730 else if (state==evt::DISCARDING) stateName="DISCARDING";
00731 cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00732 }
00733
00734
00735
00736 void FUShmBuffer::printDqmState(unsigned int index)
00737 {
00738 dqm::State_t state=dqmState(index);
00739 cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00740 }
00741
00742
00743
00744 FUShmBuffer* FUShmBuffer::createShmBuffer(bool segmentationMode,
00745 unsigned int nRawCells,
00746 unsigned int nRecoCells,
00747 unsigned int nDqmCells,
00748 unsigned int rawCellSize,
00749 unsigned int recoCellSize,
00750 unsigned int dqmCellSize)
00751 {
00752
00753 if (FUShmBuffer::releaseSharedMemory())
00754 cout<<"FUShmBuffer::createShmBuffer: "
00755 <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00756 <<endl;
00757
00758
00759 int size =sizeof(unsigned int)*7;
00760 int shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00761 void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00762
00763 if(1!=shm_nattch(shmid)) {
00764 cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00765 shmdt(shmAddr);
00766 return 0;
00767 }
00768
00769 unsigned int* p=(unsigned int*)shmAddr;
00770 *p++=segmentationMode;
00771 *p++=nRawCells;
00772 *p++=nRecoCells;
00773 *p++=nDqmCells;
00774 *p++=rawCellSize;
00775 *p++=recoCellSize;
00776 *p++=dqmCellSize;
00777 shmdt(shmAddr);
00778
00779
00780 size =FUShmBuffer::size(segmentationMode,
00781 nRawCells,nRecoCells,nDqmCells,
00782 rawCellSize,recoCellSize,dqmCellSize);
00783 shmid =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00784 int semid=sem_create(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00785 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00786
00787 if (1!=shm_nattch(shmid)) {
00788 cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00789 shmdt(shmAddr);
00790 return 0;
00791 }
00792 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00793 nRawCells,nRecoCells,nDqmCells,
00794 rawCellSize,recoCellSize,dqmCellSize);
00795
00796 cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00797 cout<<" segmentationMode="<<segmentationMode<<endl;
00798
00799 buffer->initialize(shmid,semid);
00800
00801 return buffer;
00802 }
00803
00804
00805
00806 FUShmBuffer* FUShmBuffer::getShmBuffer()
00807 {
00808
00809 int size =sizeof(unsigned int)*7;
00810 int shmid =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00811 void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00812
00813 unsigned int *p=(unsigned int*)shmAddr;
00814 bool segmentationMode=*p++;
00815 unsigned int nRawCells =*p++;
00816 unsigned int nRecoCells =*p++;
00817 unsigned int nDqmCells =*p++;
00818 unsigned int rawCellSize =*p++;
00819 unsigned int recoCellSize =*p++;
00820 unsigned int dqmCellSize =*p++;
00821 shmdt(shmAddr);
00822
00823 cout<<"FUShmBuffer::getShmBuffer():"
00824 <<" segmentationMode="<<segmentationMode
00825 <<" nRawCells="<<nRawCells
00826 <<" nRecoCells="<<nRecoCells
00827 <<" nDqmCells="<<nDqmCells
00828 <<" rawCellSize="<<rawCellSize
00829 <<" recoCellSize="<<recoCellSize
00830 <<" dqmCellSize="<<dqmCellSize
00831 <<endl;
00832
00833
00834 size =FUShmBuffer::size(segmentationMode,
00835 nRawCells,nRecoCells,nDqmCells,
00836 rawCellSize,recoCellSize,dqmCellSize);
00837 shmid =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00838 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00839 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00840
00841 if (0==shm_nattch(shmid)) {
00842 cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00843 return 0;
00844 }
00845
00846 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00847 nRawCells,nRecoCells,nDqmCells,
00848 rawCellSize,recoCellSize,dqmCellSize);
00849
00850 cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00851 cout<<" segmentationMode="<<segmentationMode<<endl;
00852
00853 buffer->setClientPrcId(getpid());
00854
00855 return buffer;
00856 }
00857
00858
00859
00860 bool FUShmBuffer::releaseSharedMemory()
00861 {
00862
00863 int size =sizeof(unsigned int)*7;
00864 int shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00865 void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00866
00867 unsigned int*p=(unsigned int*)shmAddr;
00868 bool segmentationMode=*p++;
00869 unsigned int nRawCells =*p++;
00870 unsigned int nRecoCells =*p++;
00871 unsigned int nDqmCells =*p++;
00872 unsigned int rawCellSize =*p++;
00873 unsigned int recoCellSize =*p++;
00874 unsigned int dqmCellSize =*p++;
00875 shmdt(shmAddr);
00876
00877
00878
00879 size =FUShmBuffer::size(segmentationMode,
00880 nRawCells,nRecoCells,nDqmCells,
00881 rawCellSize,recoCellSize,dqmCellSize);
00882 int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0) return false;
00883 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return false;
00884 shmAddr =shm_attach(shmid); if (0==shmAddr) return false;
00885
00886 int att = 0;
00887 for(; att <10; att++)
00888 {
00889 if(shm_nattch(shmid)>1) {
00890 cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
00891 <<", failed attempt to release shared memory."<<endl;
00892 ::sleep(1);
00893 }
00894 else
00895 break;
00896 }
00897
00898 if(att>=10) return false;
00899
00900 if (segmentationMode) {
00901 FUShmBuffer* buffer=
00902 new (shmAddr) FUShmBuffer(segmentationMode,
00903 nRawCells,nRecoCells,nDqmCells,
00904 rawCellSize,recoCellSize,dqmCellSize);
00905 int cellid;
00906 for (unsigned int i=0;i<nRawCells;i++) {
00907 cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
00908 if ((shm_destroy(cellid)==-1)) return false;
00909 }
00910 for (unsigned int i=0;i<nRecoCells;i++) {
00911 cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
00912 if ((shm_destroy(cellid)==-1)) return false;
00913 }
00914 for (unsigned int i=0;i<nDqmCells;i++) {
00915 cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
00916 if ((shm_destroy(cellid)==-1)) return false;
00917 }
00918 }
00919 shmdt(shmAddr);
00920 if (sem_destroy(semid)==-1) return false;
00921 if (shm_destroy(shmid)==-1) return false;
00922 if (shm_destroy(shmidd)==-1) return false;
00923
00924 return true;
00925 }
00926
00927
00928
00929 unsigned int FUShmBuffer::size(bool segmentationMode,
00930 unsigned int nRawCells,
00931 unsigned int nRecoCells,
00932 unsigned int nDqmCells,
00933 unsigned int rawCellSize,
00934 unsigned int recoCellSize,
00935 unsigned int dqmCellSize)
00936 {
00937 unsigned int offset=
00938 sizeof(FUShmBuffer)+
00939 sizeof(unsigned int)*4*nRawCells+
00940 sizeof(evt::State_t)*nRawCells+
00941 sizeof(dqm::State_t)*nDqmCells;
00942
00943 unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
00944 unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
00945 unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
00946
00947 unsigned int realSize =
00948 (segmentationMode) ?
00949 offset
00950 +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
00951 :
00952 offset
00953 +rawCellTotalSize*nRawCells
00954 +recoCellTotalSize*nRecoCells
00955 +dqmCellTotalSize*nDqmCells;
00956
00957 unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
00958
00959 return result;
00960 }
00961
00962
00963
00964 key_t FUShmBuffer::getShmDescriptorKey()
00965 {
00966 key_t result=ftok(shmKeyPath_,SHM_DESCRIPTOR_KEYID);
00967 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: ftok() failed "
00968 <<"for file "<<shmKeyPath_<<"!"<<endl;
00969 return result;
00970 }
00971
00972
00973
00974 key_t FUShmBuffer::getShmKey()
00975 {
00976 key_t result=ftok(shmKeyPath_,SHM_KEYID);
00977 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
00978 <<"for file "<<shmKeyPath_<<"!"<<endl;
00979 return result;
00980 }
00981
00982
00983
00984 key_t FUShmBuffer::getSemKey()
00985 {
00986 key_t result=ftok(semKeyPath_,SEM_KEYID);
00987 if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
00988 <<"for file "<<semKeyPath_<<"!"<<endl;
00989 return result;
00990 }
00991
00992
00993
00994 int FUShmBuffer::shm_create(key_t key,int size)
00995 {
00996 int shmid=shmget(key,size,IPC_CREAT|0644);
00997 if (shmid==-1) {
00998 int err=errno;
00999 cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01000 <<strerror(err)<<endl;
01001 }
01002 return shmid;
01003 }
01004
01005
01006
01007 int FUShmBuffer::shm_get(key_t key,int size)
01008 {
01009 int shmid=shmget(key,size,0644);
01010 if (shmid==-1) {
01011 int err=errno;
01012 cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01013 <<strerror(err)<<endl;
01014 }
01015 return shmid;
01016 }
01017
01018
01019
01020 void* FUShmBuffer::shm_attach(int shmid)
01021 {
01022 void* result=shmat(shmid,NULL,0);
01023 if (0==result) {
01024 int err=errno;
01025 cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01026 <<strerror(err)<<endl;
01027 }
01028 return result;
01029 }
01030
01031
01032
01033 int FUShmBuffer::shm_nattch(int shmid)
01034 {
01035 shmid_ds shmstat;
01036 shmctl(shmid,IPC_STAT,&shmstat);
01037 return shmstat.shm_nattch;
01038 }
01039
01040
01041
01042 int FUShmBuffer::shm_destroy(int shmid)
01043 {
01044 shmid_ds shmstat;
01045 int result=shmctl(shmid,IPC_RMID,&shmstat);
01046 if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01047 return result;
01048 }
01049
01050
01051
01052 int FUShmBuffer::sem_create(key_t key,int nsem)
01053 {
01054 int semid=semget(key,nsem,IPC_CREAT|0666);
01055 if (semid<0) {
01056 int err=errno;
01057 cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01058 <<strerror(err)<<endl;
01059 }
01060 return semid;
01061 }
01062
01063
01064
01065 int FUShmBuffer::sem_get(key_t key,int nsem)
01066 {
01067 int semid=semget(key,nsem,0666);
01068 if (semid<0) {
01069 int err=errno;
01070 cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01071 <<strerror(err)<<endl;
01072 }
01073 return semid;
01074 }
01075
01076
01077
01078 int FUShmBuffer::sem_destroy(int semid)
01079 {
01080 int result=semctl(semid,0,IPC_RMID);
01081 if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01082 return result;
01083 }
01084
01085
01086
01088
01090
01091
01092 unsigned int FUShmBuffer::nextIndex(unsigned int offset,
01093 unsigned int nCells,
01094 unsigned int& iNext)
01095 {
01096 lock();
01097 unsigned int* pindex=(unsigned int*)((unsigned int)this+offset);
01098 pindex+=iNext;
01099 iNext=(iNext+1)%nCells;
01100 unsigned int result=*pindex;
01101 unlock();
01102 return result;
01103 }
01104
01105
01106
01107 void FUShmBuffer::postIndex(unsigned int index,
01108 unsigned int offset,
01109 unsigned int nCells,
01110 unsigned int& iLast)
01111 {
01112 lock();
01113 unsigned int* pindex=(unsigned int*)((unsigned int)this+offset);
01114 pindex+=iLast;
01115 *pindex=index;
01116 iLast=(iLast+1)%nCells;
01117 unlock();
01118 }
01119
01120
01121
01122 unsigned int FUShmBuffer::nextRawWriteIndex()
01123 {
01124 return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01125 }
01126
01127
01128
01129 unsigned int FUShmBuffer::nextRawReadIndex()
01130 {
01131 return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01132 }
01133
01134
01135
01136 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01137 {
01138 postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01139 }
01140
01141
01142
01143 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01144 {
01145 postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01146 }
01147
01148
01149
01150 unsigned int FUShmBuffer::nextRecoWriteIndex()
01151 {
01152 return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01153 }
01154
01155
01156
01157 unsigned int FUShmBuffer::nextRecoReadIndex()
01158 {
01159 return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01160 }
01161
01162
01163
01164 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01165 {
01166 postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01167 }
01168
01169
01170
01171 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01172 {
01173 postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01174 }
01175
01176
01177
01178 unsigned int FUShmBuffer::nextDqmWriteIndex()
01179 {
01180 return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01181 }
01182
01183
01184
01185 unsigned int FUShmBuffer::nextDqmReadIndex()
01186 {
01187 return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01188 }
01189
01190
01191
01192 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01193 {
01194 postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01195 }
01196
01197
01198
01199 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01200 {
01201 postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01202 }
01203
01204
01205
01206 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01207 {
01208 unsigned int *pevt=(unsigned int*)((unsigned int)this+evtNumberOffset_);
01209 for (unsigned int i=0;i<nRawCells_;i++) {
01210 if ((*pevt++)==evtNumber) return i;
01211 }
01212 assert(false);
01213 return 0xffffffff;
01214 }
01215
01216
01217
01218 evt::State_t FUShmBuffer::evtState(unsigned int index)
01219 {
01220 assert(index<nRawCells_);
01221 evt::State_t *pstate=(evt::State_t*)((unsigned int)this+evtStateOffset_);
01222 pstate+=index;
01223 return *pstate;
01224 }
01225
01226
01227
01228 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01229 {
01230 assert(index<nDqmCells_);
01231 dqm::State_t *pstate=(dqm::State_t*)((unsigned int)this+dqmStateOffset_);
01232 pstate+=index;
01233 return *pstate;
01234 }
01235
01236
01237
01238 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01239 {
01240 assert(index<nRawCells_);
01241 unsigned int *pevt=(unsigned int*)((unsigned int)this+evtNumberOffset_);
01242 pevt+=index;
01243 return *pevt;
01244 }
01245
01246
01247
01248 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01249 {
01250 assert(index<nRawCells_);
01251 pid_t *prcid=(pid_t*)((unsigned int)this+evtPrcIdOffset_);
01252 prcid+=index;
01253 return *prcid;
01254 }
01255
01256
01257
01258 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01259 {
01260 assert(index<nRawCells_);
01261 time_t *ptstmp=(time_t*)((unsigned int)this+evtTimeStampOffset_);
01262 ptstmp+=index;
01263 return *ptstmp;
01264 }
01265
01266
01267
01268 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01269 {
01270 assert(index<nClientsMax_);
01271 pid_t *prcid=(pid_t*)((unsigned int)this+clientPrcIdOffset_);
01272 prcid+=index;
01273 return *prcid;
01274 }
01275
01276
01277
01278 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01279 {
01280 assert(index<nRawCells_);
01281 evt::State_t *pstate=(evt::State_t*)((unsigned int)this+evtStateOffset_);
01282 pstate+=index;
01283 lock();
01284 *pstate=state;
01285 unlock();
01286 return true;
01287 }
01288
01289
01290
01291 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01292 {
01293 assert(index<nDqmCells_);
01294 dqm::State_t *pstate=(dqm::State_t*)((unsigned int)this+dqmStateOffset_);
01295 pstate+=index;
01296 lock();
01297 *pstate=state;
01298 unlock();
01299 return true;
01300 }
01301
01302
01303
01304 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01305 {
01306 assert(index<nRawCells_);
01307 unsigned int *pcount=(unsigned int*)((unsigned int)this+evtDiscardOffset_);
01308 pcount+=index;
01309 lock();
01310 *pcount=discard;
01311 unlock();
01312 return true;
01313 }
01314
01315
01316
01317 int FUShmBuffer::incEvtDiscard(unsigned int index)
01318 {
01319 int result = 0;
01320 assert(index<nRawCells_);
01321 unsigned int *pcount=(unsigned int*)((unsigned int)this+evtDiscardOffset_);
01322 pcount+=index;
01323 lock();
01324 (*pcount)++;
01325 result = *pcount;
01326 unlock();
01327 return result;
01328 }
01329
01330
01331
01332 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01333 {
01334 assert(index<nRawCells_);
01335 unsigned int *pevt=(unsigned int*)((unsigned int)this+evtNumberOffset_);
01336 pevt+=index;
01337 lock();
01338 *pevt=evtNumber;
01339 unlock();
01340 return true;
01341 }
01342
01343
01344
01345 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01346 {
01347 assert(index<nRawCells_);
01348 pid_t* prcid=(pid_t*)((unsigned int)this+evtPrcIdOffset_);
01349 prcid+=index;
01350 lock();
01351 *prcid=prcId;
01352 unlock();
01353 return true;
01354 }
01355
01356
01357
01358 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01359 {
01360 assert(index<nRawCells_);
01361 time_t *ptstmp=(time_t*)((unsigned int)this+evtTimeStampOffset_);
01362 ptstmp+=index;
01363 lock();
01364 *ptstmp=timeStamp;
01365 unlock();
01366 return true;
01367 }
01368
01369
01370
01371 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01372 {
01373 lock();
01374 assert(nClients_<nClientsMax_);
01375 pid_t *prcid=(pid_t*)((unsigned int)this+clientPrcIdOffset_);
01376 for (unsigned int i=0;i<nClients_;i++) {
01377 if ((*prcid)==prcId) { unlock(); return false; }
01378 prcid++;
01379 }
01380 nClients_++;
01381 *prcid=prcId;
01382 unlock();
01383 return true;
01384 }
01385
01386
01387
01388 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01389 {
01390 lock();
01391 pid_t *prcid=(pid_t*)((unsigned int)this+clientPrcIdOffset_);
01392 unsigned int iClient(0);
01393 while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01394 assert(iClient!=nClients_);
01395 pid_t* next=prcid; next++;
01396 while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01397 nClients_--;
01398 unlock();
01399 return true;
01400 }
01401
01402
01403
01404 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01405 {
01406 FUShmRawCell* result(0);
01407
01408 if (iCell>=nRawCells_) {
01409 cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01410 <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01411 return result;
01412 }
01413
01414 if (segmentationMode_) {
01415 key_t shmkey =rawCellShmKey(iCell);
01416 int shmid =shm_get(shmkey,rawCellTotalSize_);
01417 void* cellAddr=shm_attach(shmid);
01418 result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01419 }
01420 else {
01421 result=
01422 (FUShmRawCell*)((unsigned int)this+rawCellOffset_+iCell*rawCellTotalSize_);
01423 }
01424
01425 return result;
01426 }
01427
01428
01429
01430 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01431 {
01432 FUShmRecoCell* result(0);
01433
01434 if (iCell>=nRecoCells_) {
01435 cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01436 <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01437 return result;
01438 }
01439
01440 if (segmentationMode_) {
01441 key_t shmkey =recoCellShmKey(iCell);
01442 int shmid =shm_get(shmkey,recoCellTotalSize_);
01443 void* cellAddr=shm_attach(shmid);
01444 result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01445 }
01446 else {
01447 result=
01448 (FUShmRecoCell*)((unsigned int)this+recoCellOffset_+iCell*recoCellTotalSize_);
01449 }
01450
01451 return result;
01452 }
01453
01454
01455
01456 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01457 {
01458 FUShmDqmCell* result(0);
01459
01460 if (iCell>=nDqmCells_) {
01461 cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01462 <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01463 return result;
01464 }
01465
01466 if (segmentationMode_) {
01467 key_t shmkey =dqmCellShmKey(iCell);
01468 int shmid =shm_get(shmkey,dqmCellTotalSize_);
01469 void* cellAddr=shm_attach(shmid);
01470 result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01471 }
01472 else {
01473 result=
01474 (FUShmDqmCell*)((unsigned int)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01475 }
01476
01477 return result;
01478 }
01479
01480
01481
01482 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01483 {
01484 assert(index<nRawCells_);
01485 unsigned int *pcount=(unsigned int*)((unsigned int)this+evtDiscardOffset_);
01486 pcount+=index;
01487 lock();
01488 assert(*pcount>0);
01489 --(*pcount);
01490 bool result=(*pcount==0);
01491 unlock();
01492 return result;
01493 }
01494
01495
01496
01497 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01498 {
01499 if (!segmentationMode_) {
01500 cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01501 return -1;
01502 }
01503 key_t* addr=(key_t*)((unsigned int)this+offset);
01504 for (unsigned int i=0;i<iCell;i++) ++addr;
01505 return *addr;
01506 }
01507
01508
01509
01510 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01511 {
01512 if (iCell>=nRawCells_) {
01513 cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01514 <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01515 return -1;
01516 }
01517 return shmKey(iCell,rawCellOffset_);
01518 }
01519
01520
01521
01522 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01523 {
01524 if (iCell>=nRecoCells_) {
01525 cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01526 <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01527 return -1;
01528 }
01529 return shmKey(iCell,recoCellOffset_);
01530 }
01531
01532
01533
01534 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01535 {
01536 if (iCell>=nDqmCells_) {
01537 cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01538 <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01539 return -1;
01540 }
01541 return shmKey(iCell,dqmCellOffset_);
01542 }
01543
01544
01545
01546 void FUShmBuffer::sem_init(int isem,int value)
01547 {
01548 if (semctl(semid(),isem,SETVAL,value)<0) {
01549 cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01550 }
01551 }
01552
01553
01554
01555 void FUShmBuffer::sem_wait(int isem)
01556 {
01557 struct sembuf sops[1];
01558 sops[0].sem_num=isem;
01559 sops[0].sem_op = -1;
01560 sops[0].sem_flg= 0;
01561 if (semop(semid(),sops,1)==-1) {
01562 cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01563 }
01564 }
01565
01566
01567
01568 void FUShmBuffer::sem_post(int isem)
01569 {
01570 struct sembuf sops[1];
01571 sops[0].sem_num=isem;
01572 sops[0].sem_op = 1;
01573 sops[0].sem_flg= 0;
01574 if (semop(semid(),sops,1)==-1) {
01575 cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01576 }
01577 }