00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013
00014 #include <unistd.h>
00015 #include <iostream>
00016 #include <string>
00017 #include <cassert>
00018
00019 #include <sstream>
00020 #include <fstream>
00021
00022 #include <cstdlib>
00023 #include <cstring>
00024 #include <stdint.h>
00025
00026
00027
00028
00029
00030 #define SHM_DESCRIPTOR_KEYID 1
00031 #define SHM_KEYID 2
00032 #define SEM_KEYID 1
00033
00034 #define NSKIP_MAX 100
00035
00036
00037 using namespace std;
00038 using namespace evf;
00039
00040
00041
00042 const char* FUShmBuffer::shmKeyPath_ =
00043 (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045 (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00046
00047
00049
00051
00052
00053 FUShmBuffer::FUShmBuffer(bool segmentationMode,
00054 unsigned int nRawCells,
00055 unsigned int nRecoCells,
00056 unsigned int nDqmCells,
00057 unsigned int rawCellSize,
00058 unsigned int recoCellSize,
00059 unsigned int dqmCellSize)
00060 : segmentationMode_(segmentationMode)
00061 , nClientsMax_(128)
00062 , nRawCells_(nRawCells)
00063 , rawCellPayloadSize_(rawCellSize)
00064 , nRecoCells_(nRecoCells)
00065 , recoCellPayloadSize_(recoCellSize)
00066 , nDqmCells_(nDqmCells)
00067 , dqmCellPayloadSize_(dqmCellSize)
00068 {
00069 rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00070 recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00071 dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00072
00073 void* addr;
00074
00075 rawWriteOffset_=sizeof(FUShmBuffer);
00076 addr=(void*)((unsigned long)this+rawWriteOffset_);
00077 new (addr) unsigned int[nRawCells_];
00078
00079 rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00080 addr=(void*)((unsigned long)this+rawReadOffset_);
00081 new (addr) unsigned int[nRawCells_];
00082
00083 recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00084 addr=(void*)((unsigned long)this+recoWriteOffset_);
00085 new (addr) unsigned int[nRecoCells_];
00086
00087 recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00088 addr=(void*)((unsigned long)this+recoReadOffset_);
00089 new (addr) unsigned int[nRecoCells_];
00090
00091 dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00092 addr=(void*)((unsigned long)this+dqmWriteOffset_);
00093 new (addr) unsigned int[nDqmCells_];
00094
00095 dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00096 addr=(void*)((unsigned long)this+dqmReadOffset_);
00097 new (addr) unsigned int[nDqmCells_];
00098
00099 evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00100 addr=(void*)((unsigned long)this+evtStateOffset_);
00101 new (addr) evt::State_t[nRawCells_];
00102
00103 evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00104 addr=(void*)((unsigned long)this+evtDiscardOffset_);
00105 new (addr) unsigned int[nRawCells_];
00106
00107 evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00108 addr=(void*)((unsigned long)this+evtNumberOffset_);
00109 new (addr) unsigned int[nRawCells_];
00110
00111 evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00112 addr=(void*)((unsigned long)this+evtPrcIdOffset_);
00113 new (addr) pid_t[nRawCells_];
00114
00115 evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00116 addr=(void*)((unsigned long)this+evtTimeStampOffset_);
00117 new (addr) time_t[nRawCells_];
00118
00119 dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00120 addr=(void*)((unsigned long)this+dqmStateOffset_);
00121 new (addr) dqm::State_t[nDqmCells_];
00122
00123 clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00124 addr=(void*)((unsigned long)this+clientPrcIdOffset_);
00125 new (addr) pid_t[nClientsMax_];
00126
00127 rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00128
00129 if (segmentationMode_) {
00130 recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00131 dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00132 addr=(void*)((unsigned long)this+rawCellOffset_);
00133 new (addr) key_t[nRawCells_];
00134 addr=(void*)((unsigned long)this+recoCellOffset_);
00135 new (addr) key_t[nRecoCells_];
00136 addr=(void*)((unsigned long)this+dqmCellOffset_);
00137 new (addr) key_t[nDqmCells_];
00138 }
00139 else {
00140 recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00141 dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00142 for (unsigned int i=0;i<nRawCells_;i++) {
00143 addr=(void*)((unsigned long)this+rawCellOffset_+i*rawCellTotalSize_);
00144 new (addr) FUShmRawCell(rawCellSize);
00145 }
00146 for (unsigned int i=0;i<nRecoCells_;i++) {
00147 addr=(void*)((unsigned long)this+recoCellOffset_+i*recoCellTotalSize_);
00148 new (addr) FUShmRecoCell(recoCellSize);
00149 }
00150 for (unsigned int i=0;i<nDqmCells_;i++) {
00151 addr=(void*)((unsigned long)this+dqmCellOffset_+i*dqmCellTotalSize_);
00152 new (addr) FUShmDqmCell(dqmCellSize);
00153 }
00154 }
00155 }
00156
00157
00158
00159 FUShmBuffer::~FUShmBuffer()
00160 {
00161
00162 }
00163
00164
00166
00168
00169
00170 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00171 {
00172 shmid_=shmid;
00173 semid_=semid;
00174
00175 if (segmentationMode_) {
00176 int shmKeyId=666;
00177 key_t* keyAddr =(key_t*)((unsigned long)this+rawCellOffset_);
00178 for (unsigned int i=0;i<nRawCells_;i++) {
00179 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00180 int shmid =shm_create(*keyAddr,rawCellTotalSize_);
00181 void* shmAddr=shm_attach(shmid);
00182 new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00183 shmdt(shmAddr);
00184 ++keyAddr;
00185 }
00186 keyAddr =(key_t*)((unsigned long)this+recoCellOffset_);
00187 for (unsigned int i=0;i<nRecoCells_;i++) {
00188 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00189 int shmid =shm_create(*keyAddr,recoCellTotalSize_);
00190 void* shmAddr=shm_attach(shmid);
00191 new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00192 shmdt(shmAddr);
00193 ++keyAddr;
00194 }
00195 keyAddr =(key_t*)((unsigned long)this+dqmCellOffset_);
00196 for (unsigned int i=0;i<nDqmCells_;i++) {
00197 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00198 int shmid =shm_create(*keyAddr,dqmCellTotalSize_);
00199 void* shmAddr=shm_attach(shmid);
00200 new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00201 shmdt(shmAddr);
00202 ++keyAddr;
00203 }
00204 }
00205
00206 for (unsigned int i=0;i<nRawCells_;i++) {
00207 FUShmRawCell* cell=rawCell(i);
00208 cell->initialize(i);
00209 if (segmentationMode_) shmdt(cell);
00210 }
00211
00212 for (unsigned int i=0;i<nRecoCells_;i++) {
00213 FUShmRecoCell* cell=recoCell(i);
00214 cell->initialize(i);
00215 if (segmentationMode_) shmdt(cell);
00216 }
00217
00218 for (unsigned int i=0;i<nDqmCells_;i++) {
00219 FUShmDqmCell* cell=dqmCell(i);
00220 cell->initialize(i);
00221 if (segmentationMode_) shmdt(cell);
00222 }
00223
00224 reset();
00225 }
00226
00227
00228
00229 void FUShmBuffer::reset()
00230 {
00231 nClients_=0;
00232
00233
00234 sem_init(0,1);
00235 sem_init(1,nRawCells_);
00236 sem_init(2,0);
00237 sem_init(3,1);
00238 sem_init(4,0);
00239 sem_init(5,nRecoCells_);
00240 sem_init(6,0);
00241 sem_init(7,nDqmCells_);
00242 sem_init(8,0);
00243
00244 sem_print();
00245
00246 unsigned int *iWrite,*iRead;
00247
00248 rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00249 iWrite=(unsigned int*)((unsigned long)this+rawWriteOffset_);
00250 iRead =(unsigned int*)((unsigned long)this+rawReadOffset_);
00251 for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00252
00253 recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00254 iWrite=(unsigned int*)((unsigned long)this+recoWriteOffset_);
00255 iRead =(unsigned int*)((unsigned long)this+recoReadOffset_);
00256 for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00257
00258 dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00259 iWrite=(unsigned int*)((unsigned long)this+dqmWriteOffset_);
00260 iRead =(unsigned int*)((unsigned long)this+dqmReadOffset_);
00261 for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00262
00263 for (unsigned int i=0;i<nRawCells_;i++) {
00264 setEvtState(i,evt::EMPTY);
00265 setEvtDiscard(i,0);
00266 setEvtNumber(i,0xffffffff);
00267 setEvtPrcId(i,0);
00268 setEvtTimeStamp(i,0);
00269 }
00270
00271 for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00272 }
00273
00274
00275
00276 int FUShmBuffer::nbRawCellsToWrite() const
00277 {
00278 return semctl(semid(),1,GETVAL);
00279 }
00280
00281
00282
00283 int FUShmBuffer::nbRawCellsToRead() const
00284 {
00285 return semctl(semid(),2,GETVAL);
00286 }
00287
00288
00289
00290 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00291 {
00292 waitRawWrite();
00293 unsigned int iCell=nextRawWriteIndex();
00294 FUShmRawCell* cell =rawCell(iCell);
00295 evt::State_t state=evtState(iCell);
00296 assert(state==evt::EMPTY);
00297 setEvtState(iCell,evt::RAWWRITING);
00298 setEvtDiscard(iCell,1);
00299 return cell;
00300 }
00301
00302
00303
00304 FUShmRawCell* FUShmBuffer::rawCellToRead()
00305 {
00306 waitRawRead();
00307 unsigned int iCell=nextRawReadIndex();
00308 FUShmRawCell* cell=rawCell(iCell);
00309 evt::State_t state=evtState(iCell);
00310 assert(state==evt::RAWWRITTEN
00311 ||state==evt::EMPTY
00312 ||state==evt::STOP
00313 ||state==evt::LUMISECTION);
00314 if (state==evt::RAWWRITTEN) {
00315 setEvtPrcId(iCell,getpid());
00316 setEvtState(iCell,evt::RAWREADING);
00317 }
00318 return cell;
00319 }
00320
00321
00322
00323 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00324 {
00325 waitRecoRead();
00326 unsigned int iCell =nextRecoReadIndex();
00327 FUShmRecoCell* cell =recoCell(iCell);
00328 unsigned int iRawCell=cell->rawCellIndex();
00329 if (iRawCell<nRawCells_) {
00330
00331
00332 setEvtState(iRawCell,evt::SENDING);
00333 }
00334 return cell;
00335 }
00336
00337
00338
00339 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00340 {
00341 waitDqmRead();
00342 unsigned int iCell=nextDqmReadIndex();
00343 FUShmDqmCell* cell=dqmCell(iCell);
00344 dqm::State_t state=dqmState(iCell);
00345 assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00346 if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00347 return cell;
00348 }
00349
00350
00351
00352 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00353 {
00354 waitRawDiscarded();
00355 FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00356 evt::State_t state=evtState(cell->index());
00357 assert(state==evt::PROCESSED
00358 ||state==evt::SENT
00359 ||state==evt::EMPTY
00360 ||state==evt::STOP
00361 ||state==evt::USEDLS);
00362 if (state!=evt::EMPTY
00363 && state!=evt::USEDLS
00364 && state!=evt::STOP
00365 )
00366 setEvtState(cell->index(),evt::DISCARDING);
00367 return cell;
00368 }
00369
00370
00371
00372 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00373 {
00374 evt::State_t state=evtState(cell->index());
00375 assert(state==evt::RAWWRITING);
00376 setEvtState(cell->index(),evt::RAWWRITTEN);
00377 setEvtNumber(cell->index(),cell->evtNumber());
00378 postRawIndexToRead(cell->index());
00379 if (segmentationMode_) shmdt(cell);
00380 postRawRead();
00381 }
00382
00383
00384
00385 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00386 {
00387 evt::State_t state=evtState(cell->index());
00388 assert(state==evt::RAWREADING);
00389 setEvtState(cell->index(),evt::RAWREAD);
00390 setEvtState(cell->index(),evt::PROCESSING);
00391 setEvtTimeStamp(cell->index(),time(0));
00392 if (segmentationMode_) shmdt(cell);
00393 }
00394
00395
00396
00397 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00398 {
00399 unsigned int iRawCell=cell->rawCellIndex();
00400 if (iRawCell<nRawCells_) {
00401
00402
00403 setEvtState(cell->rawCellIndex(),evt::SENT);
00404 }
00405 if (segmentationMode_) shmdt(cell);
00406 }
00407
00408
00409
00410 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00411 {
00412 dqm::State_t state=dqmState(cell->index());
00413 assert(state==dqm::SENDING||state==dqm::EMPTY);
00414 if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00415 if (segmentationMode_) shmdt(cell);
00416 }
00417
00418
00419
00420 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00421 {
00422 waitRawDiscard();
00423 if (rawCellReadyForDiscard(iCell)) {
00424 rawDiscardIndex_=iCell;
00425 evt::State_t state=evtState(iCell);
00426 assert(state==evt::PROCESSING
00427 ||state==evt::SENT
00428 ||state==evt::EMPTY
00429 ||state==evt::STOP
00430 ||state==evt::LUMISECTION
00431 );
00432 if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00433 if (state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00434 postRawDiscarded();
00435 }
00436 else postRawDiscard();
00437 }
00438
00439
00440
00441 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell)
00442 {
00443 waitRawDiscard();
00444 if (rawCellReadyForDiscard(iCell)) {
00445 rawDiscardIndex_=iCell;
00446 evt::State_t state=evtState(iCell);
00447 if(state!=evt::LUMISECTION && state!=evt::EMPTY && state!=evt::USEDLS) setEvtState(iCell,evt::PROCESSED);
00448 if(state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00449 postRawDiscarded();
00450 }
00451 else postRawDiscard();
00452 }
00453
00454
00455
00456 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00457 {
00458 releaseRawCell(cell);
00459 postRawDiscard();
00460 }
00461
00462
00463
00464 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00465 {
00466 FUShmRecoCell* cell=recoCell(iCell);
00467 unsigned int iRawCell=cell->rawCellIndex();
00468 if (iRawCell<nRawCells_) {
00469
00470
00471 scheduleRawCellForDiscard(iRawCell);
00472 }
00473 cell->clear();
00474 if (segmentationMode_) shmdt(cell);
00475 postRecoIndexToWrite(iCell);
00476 postRecoWrite();
00477 }
00478
00479
00480 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell)
00481 {
00482 FUShmRecoCell* cell=recoCell(iCell);
00483 cell->clear();
00484 if (segmentationMode_) shmdt(cell);
00485 postRecoIndexToWrite(iCell);
00486 postRecoWrite();
00487 }
00488
00489
00490
00491 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00492 {
00493 dqm::State_t state=dqmState(iCell);
00494 assert(state==dqm::EMPTY||state==dqm::SENT);
00495 setDqmState(iCell,dqm::DISCARDING);
00496 FUShmDqmCell* cell=dqmCell(iCell);
00497 cell->clear();
00498 if (segmentationMode_) shmdt(cell);
00499 setDqmState(iCell,dqm::EMPTY);
00500 postDqmIndexToWrite(iCell);
00501 postDqmWrite();
00502 }
00503
00504
00505
00506 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00507 {
00508 evt::State_t state=evtState(cell->index());
00509 if(!(
00510 state==evt::DISCARDING
00511 ||state==evt::RAWWRITING
00512 ||state==evt::EMPTY
00513 ||state==evt::STOP
00514
00515 ||state==evt::USEDLS
00516 )) std::cout << "=================releaseRawCell state " << state
00517 << std::endl;
00518
00519 assert(
00520 state==evt::DISCARDING
00521 ||state==evt::RAWWRITING
00522 ||state==evt::EMPTY
00523 ||state==evt::STOP
00524
00525 ||state==evt::USEDLS
00526 );
00527 setEvtState(cell->index(),evt::EMPTY);
00528 setEvtDiscard(cell->index(),0);
00529 setEvtNumber(cell->index(),0xffffffff);
00530 setEvtPrcId(cell->index(),0);
00531 setEvtTimeStamp(cell->index(),0);
00532 cell->clear();
00533 postRawIndexToWrite(cell->index());
00534 if (segmentationMode_) shmdt(cell);
00535 postRawWrite();
00536 }
00537
00538
00539
00540 void FUShmBuffer::writeRawEmptyEvent()
00541 {
00542 FUShmRawCell* cell=rawCellToWrite();
00543 evt::State_t state=evtState(cell->index());
00544 assert(state==evt::RAWWRITING);
00545 setEvtState(cell->index(),evt::STOP);
00546 cell->setEventTypeStopper();
00547 postRawIndexToRead(cell->index());
00548 if (segmentationMode_) shmdt(cell);
00549 postRawRead();
00550 }
00551
00552
00553 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls)
00554 {
00555 FUShmRawCell* cell=rawCellToWrite();
00556 cell->setLumiSection(ls);
00557 evt::State_t state=evtState(cell->index());
00558 assert(state==evt::RAWWRITING);
00559 setEvtState(cell->index(),evt::LUMISECTION);
00560 cell->setEventTypeEol();
00561 postRawIndexToRead(cell->index());
00562 if (segmentationMode_) shmdt(cell);
00563 postRawRead();
00564 }
00565
00566
00567
00568 void FUShmBuffer::writeRecoEmptyEvent()
00569 {
00570 waitRecoWrite();
00571 unsigned int iCell=nextRecoWriteIndex();
00572 FUShmRecoCell* cell =recoCell(iCell);
00573 cell->clear();
00574 postRecoIndexToRead(iCell);
00575 if (segmentationMode_) shmdt(cell);
00576 postRecoRead();
00577 }
00578
00579
00580
00581 void FUShmBuffer::writeDqmEmptyEvent()
00582 {
00583 waitDqmWrite();
00584 unsigned int iCell=nextDqmWriteIndex();
00585 FUShmDqmCell* cell=dqmCell(iCell);
00586 cell->clear();
00587 postDqmIndexToRead(iCell);
00588 if (segmentationMode_) shmdt(cell);
00589 postDqmRead();
00590 }
00591
00592
00593
00594 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00595 {
00596 FUShmRawCell* cell=rawCellToWrite();
00597 rawDiscardIndex_=cell->index();
00598 setEvtState(cell->index(),evt::STOP);
00599 cell->setEventTypeStopper();
00600 setEvtNumber(cell->index(),0xffffffff);
00601 setEvtPrcId(cell->index(),0);
00602 setEvtTimeStamp(cell->index(),0);
00603 postRawDiscarded();
00604 }
00605
00606
00607
00608 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00609 {
00610 waitRawDiscard();
00611 if (rawCellReadyForDiscard(cell->index())) {
00612 rawDiscardIndex_=cell->index();
00613
00614
00615
00616
00617
00618
00619
00620 removeClientPrcId(getpid());
00621 if (segmentationMode_) shmdt(cell);
00622 postRawDiscarded();
00623 }
00624 else postRawDiscard();
00625 }
00626
00627
00628 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell)
00629 {
00630
00631 if (rawCellReadyForDiscard(cell->index())) {
00632 rawDiscardIndex_=cell->index();
00633
00634
00635
00636
00637
00638
00639 if (segmentationMode_) shmdt(cell);
00640 postRawDiscarded();
00641 }
00642 else postRawDiscard();
00643 }
00644
00645
00646
00647 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
00648 unsigned int fuProcessId,
00649 unsigned int fuGuid,
00650 unsigned char *data,
00651 unsigned int dataSize)
00652 {
00653 if (dataSize>recoCellPayloadSize_) {
00654 cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00655 return false;
00656 }
00657
00658 waitRecoWrite();
00659 unsigned int iCell=nextRecoWriteIndex();
00660 FUShmRecoCell* cell =recoCell(iCell);
00661 cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00662 postRecoIndexToRead(iCell);
00663 if (segmentationMode_) shmdt(cell);
00664 postRecoRead();
00665 return true;
00666 }
00667
00668
00669
00670 bool FUShmBuffer::writeRecoEventData(unsigned int runNumber,
00671 unsigned int evtNumber,
00672 unsigned int outModId,
00673 unsigned int fuProcessId,
00674 unsigned int fuGuid,
00675 unsigned char *data,
00676 unsigned int dataSize)
00677 {
00678 if (dataSize>recoCellPayloadSize_) {
00679 cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00680 return false;
00681 }
00682
00683 waitRecoWrite();
00684 unsigned int iCell=nextRecoWriteIndex();
00685 FUShmRecoCell* cell =recoCell(iCell);
00686 unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00687
00688
00689 setEvtState(rawCellIndex,evt::RECOWRITING);
00690 incEvtDiscard(rawCellIndex);
00691 cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00692 fuProcessId,fuGuid,data,dataSize);
00693 setEvtState(rawCellIndex,evt::RECOWRITTEN);
00694 postRecoIndexToRead(iCell);
00695 if (segmentationMode_) shmdt(cell);
00696 postRecoRead();
00697 return true;
00698 }
00699
00700
00701
00702 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00703 unsigned int fuProcessId,
00704 unsigned int iRawCell)
00705 {
00706 FUShmRawCell *raw=rawCell(iRawCell);
00707
00708 unsigned int dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00709 unsigned char *data =new unsigned char[dataSize];
00710 uint32_t *pos =(uint32_t*)data;
00711
00712
00713
00714
00715
00716
00717
00718 *pos++=(uint32_t)2;
00719 *pos++=(uint32_t)runNumber;
00720 *pos++=(uint32_t)evf::evtn::getlbn(raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00721 *pos++=(uint32_t)raw->evtNumber();
00722 for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00723 memcpy(pos,raw->payloadAddr(),raw->eventSize());
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755 waitRecoWrite();
00756 unsigned int iRecoCell=nextRecoWriteIndex();
00757 FUShmRecoCell* reco =recoCell(iRecoCell);
00758 setEvtState(iRawCell,evt::RECOWRITING);
00759 setEvtDiscard(iRawCell,1);
00760 reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00761 data,dataSize);
00762 delete [] data;
00763 setEvtState(iRawCell,evt::RECOWRITTEN);
00764 postRecoIndexToRead(iRecoCell);
00765 if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00766 postRecoRead();
00767 return true;
00768 }
00769
00770
00771
00772 bool FUShmBuffer::writeDqmEventData(unsigned int runNumber,
00773 unsigned int evtAtUpdate,
00774 unsigned int folderId,
00775 unsigned int fuProcessId,
00776 unsigned int fuGuid,
00777 unsigned char *data,
00778 unsigned int dataSize)
00779 {
00780 if (dataSize>dqmCellPayloadSize_) {
00781 cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00782 return false;
00783 }
00784
00785 waitDqmWrite();
00786 unsigned int iCell=nextDqmWriteIndex();
00787 FUShmDqmCell* cell=dqmCell(iCell);
00788 dqm::State_t state=dqmState(iCell);
00789 assert(state==dqm::EMPTY);
00790 setDqmState(iCell,dqm::WRITING);
00791 cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00792 setDqmState(iCell,dqm::WRITTEN);
00793 postDqmIndexToRead(iCell);
00794 if (segmentationMode_) shmdt(cell);
00795 postDqmRead();
00796 return true;
00797 }
00798
00799
00800
00801 void FUShmBuffer::sem_print()
00802 {
00803 cout<<"--> current sem values:"
00804 <<endl
00805 <<" lock="<<semctl(semid(),0,GETVAL)
00806 <<endl
00807 <<" wraw="<<semctl(semid(),1,GETVAL)
00808 <<" rraw="<<semctl(semid(),2,GETVAL)
00809 <<endl
00810 <<" wdsc="<<semctl(semid(),3,GETVAL)
00811 <<" rdsc="<<semctl(semid(),4,GETVAL)
00812 <<endl
00813 <<" wrec="<<semctl(semid(),5,GETVAL)
00814 <<" rrec="<<semctl(semid(),6,GETVAL)
00815 <<endl
00816 <<" wdqm="<<semctl(semid(),7,GETVAL)
00817 <<" rdqm="<<semctl(semid(),8,GETVAL)
00818 <<endl;
00819 }
00820
00821
00822
00823 void FUShmBuffer::printEvtState(unsigned int index)
00824 {
00825 evt::State_t state=evtState(index);
00826 std::string stateName;
00827 if (state==evt::EMPTY) stateName="EMPTY";
00828 else if (state==evt::STOP) stateName="STOP";
00829 else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00830 else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00831 else if (state==evt::RAWREADING) stateName="RAWREADING";
00832 else if (state==evt::RAWREAD) stateName="RAWREAD";
00833 else if (state==evt::PROCESSING) stateName="PROCESSING";
00834 else if (state==evt::PROCESSED) stateName="PROCESSED";
00835 else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00836 else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00837 else if (state==evt::SENDING) stateName="SENDING";
00838 else if (state==evt::SENT) stateName="SENT";
00839 else if (state==evt::DISCARDING) stateName="DISCARDING";
00840 cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00841 }
00842
00843
00844
00845 void FUShmBuffer::printDqmState(unsigned int index)
00846 {
00847 dqm::State_t state=dqmState(index);
00848 cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00849 }
00850
00851
00852
00853 FUShmBuffer* FUShmBuffer::createShmBuffer(bool segmentationMode,
00854 unsigned int nRawCells,
00855 unsigned int nRecoCells,
00856 unsigned int nDqmCells,
00857 unsigned int rawCellSize,
00858 unsigned int recoCellSize,
00859 unsigned int dqmCellSize)
00860 {
00861
00862 if (FUShmBuffer::releaseSharedMemory())
00863 cout<<"FUShmBuffer::createShmBuffer: "
00864 <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00865 <<endl;
00866
00867
00868 int size =sizeof(unsigned int)*7;
00869 int shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00870 void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00871
00872 if(1!=shm_nattch(shmid)) {
00873 cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00874 shmdt(shmAddr);
00875 return 0;
00876 }
00877
00878 unsigned int* p=(unsigned int*)shmAddr;
00879 *p++=segmentationMode;
00880 *p++=nRawCells;
00881 *p++=nRecoCells;
00882 *p++=nDqmCells;
00883 *p++=rawCellSize;
00884 *p++=recoCellSize;
00885 *p++=dqmCellSize;
00886 shmdt(shmAddr);
00887
00888
00889 size =FUShmBuffer::size(segmentationMode,
00890 nRawCells,nRecoCells,nDqmCells,
00891 rawCellSize,recoCellSize,dqmCellSize);
00892 shmid =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00893 int semid=sem_create(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00894 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00895
00896 if (1!=shm_nattch(shmid)) {
00897 cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00898 shmdt(shmAddr);
00899 return 0;
00900 }
00901 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00902 nRawCells,nRecoCells,nDqmCells,
00903 rawCellSize,recoCellSize,dqmCellSize);
00904
00905 cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00906 cout<<" segmentationMode="<<segmentationMode<<endl;
00907
00908 buffer->initialize(shmid,semid);
00909
00910 return buffer;
00911 }
00912
00913
00914
00915 FUShmBuffer* FUShmBuffer::getShmBuffer()
00916 {
00917
00918 int size =sizeof(unsigned int)*7;
00919 int shmid =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00920 void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00921
00922 unsigned int *p=(unsigned int*)shmAddr;
00923 bool segmentationMode=*p++;
00924 unsigned int nRawCells =*p++;
00925 unsigned int nRecoCells =*p++;
00926 unsigned int nDqmCells =*p++;
00927 unsigned int rawCellSize =*p++;
00928 unsigned int recoCellSize =*p++;
00929 unsigned int dqmCellSize =*p++;
00930 shmdt(shmAddr);
00931
00932 cout<<"FUShmBuffer::getShmBuffer():"
00933 <<" segmentationMode="<<segmentationMode
00934 <<" nRawCells="<<nRawCells
00935 <<" nRecoCells="<<nRecoCells
00936 <<" nDqmCells="<<nDqmCells
00937 <<" rawCellSize="<<rawCellSize
00938 <<" recoCellSize="<<recoCellSize
00939 <<" dqmCellSize="<<dqmCellSize
00940 <<endl;
00941
00942
00943 size =FUShmBuffer::size(segmentationMode,
00944 nRawCells,nRecoCells,nDqmCells,
00945 rawCellSize,recoCellSize,dqmCellSize);
00946 shmid =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00947 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00948 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00949
00950 if (0==shm_nattch(shmid)) {
00951 cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00952 return 0;
00953 }
00954
00955 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00956 nRawCells,nRecoCells,nDqmCells,
00957 rawCellSize,recoCellSize,dqmCellSize);
00958
00959 cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00960 cout<<" segmentationMode="<<segmentationMode<<endl;
00961
00962 buffer->setClientPrcId(getpid());
00963
00964 return buffer;
00965 }
00966
00967
00968
00969 bool FUShmBuffer::releaseSharedMemory()
00970 {
00971
00972 int size =sizeof(unsigned int)*7;
00973 int shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00974 void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00975
00976 unsigned int*p=(unsigned int*)shmAddr;
00977 bool segmentationMode=*p++;
00978 unsigned int nRawCells =*p++;
00979 unsigned int nRecoCells =*p++;
00980 unsigned int nDqmCells =*p++;
00981 unsigned int rawCellSize =*p++;
00982 unsigned int recoCellSize =*p++;
00983 unsigned int dqmCellSize =*p++;
00984 shmdt(shmAddr);
00985
00986
00987
00988 size =FUShmBuffer::size(segmentationMode,
00989 nRawCells,nRecoCells,nDqmCells,
00990 rawCellSize,recoCellSize,dqmCellSize);
00991 int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0) return false;
00992 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return false;
00993 shmAddr =shm_attach(shmid); if (0==shmAddr) return false;
00994
00995 int att = 0;
00996 for(; att <10; att++)
00997 {
00998 if(shm_nattch(shmid)>1) {
00999 cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
01000 <<", failed attempt to release shared memory."<<endl;
01001 ::sleep(1);
01002 }
01003 else
01004 break;
01005 }
01006
01007 if(att>=10) return false;
01008
01009 if (segmentationMode) {
01010 FUShmBuffer* buffer=
01011 new (shmAddr) FUShmBuffer(segmentationMode,
01012 nRawCells,nRecoCells,nDqmCells,
01013 rawCellSize,recoCellSize,dqmCellSize);
01014 int cellid;
01015 for (unsigned int i=0;i<nRawCells;i++) {
01016 cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
01017 if ((shm_destroy(cellid)==-1)) return false;
01018 }
01019 for (unsigned int i=0;i<nRecoCells;i++) {
01020 cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
01021 if ((shm_destroy(cellid)==-1)) return false;
01022 }
01023 for (unsigned int i=0;i<nDqmCells;i++) {
01024 cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
01025 if ((shm_destroy(cellid)==-1)) return false;
01026 }
01027 }
01028 shmdt(shmAddr);
01029 if (sem_destroy(semid)==-1) return false;
01030 if (shm_destroy(shmid)==-1) return false;
01031 if (shm_destroy(shmidd)==-1) return false;
01032
01033 return true;
01034 }
01035
01036
01037
01038 unsigned int FUShmBuffer::size(bool segmentationMode,
01039 unsigned int nRawCells,
01040 unsigned int nRecoCells,
01041 unsigned int nDqmCells,
01042 unsigned int rawCellSize,
01043 unsigned int recoCellSize,
01044 unsigned int dqmCellSize)
01045 {
01046 unsigned int offset=
01047 sizeof(FUShmBuffer)+
01048 sizeof(unsigned int)*4*nRawCells+
01049 sizeof(evt::State_t)*nRawCells+
01050 sizeof(dqm::State_t)*nDqmCells;
01051
01052 unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
01053 unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
01054 unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
01055
01056 unsigned int realSize =
01057 (segmentationMode) ?
01058 offset
01059 +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
01060 :
01061 offset
01062 +rawCellTotalSize*nRawCells
01063 +recoCellTotalSize*nRecoCells
01064 +dqmCellTotalSize*nDqmCells;
01065
01066 unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
01067
01068 return result;
01069 }
01070
01071
01072
01073 key_t FUShmBuffer::getShmDescriptorKey()
01074 {
01075 key_t result=getuid()*1000+SHM_DESCRIPTOR_KEYID;
01076 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: failed "
01077 <<"for file "<<shmKeyPath_<<"!"<<endl;
01078 return result;
01079 }
01080
01081
01082
01083 key_t FUShmBuffer::getShmKey()
01084 {
01085 key_t result=getuid()*1000+SHM_KEYID;
01086 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
01087 <<"for file "<<shmKeyPath_<<"!"<<endl;
01088 return result;
01089 }
01090
01091
01092
01093 key_t FUShmBuffer::getSemKey()
01094 {
01095 key_t result=getuid()*1000+SEM_KEYID;
01096 if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
01097 <<"for file "<<semKeyPath_<<"!"<<endl;
01098 return result;
01099 }
01100
01101
01102
01103 int FUShmBuffer::shm_create(key_t key,int size)
01104 {
01105
01106 int shmid=shmget(key,1,0644);
01107 if(shmid!=-1){
01108
01109 shmid_ds shmstat;
01110 int result=shmctl(shmid,IPC_STAT,&shmstat);
01111 cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01112 << " created by process " << shmstat.shm_cpid << " owned by "
01113 << shmstat.shm_perm.uid << " permissions "
01114 << hex << shmstat.shm_perm.mode << dec << endl;
01115 result=shmctl(shmid,IPC_RMID,&shmstat);
01116 }
01117 shmid=shmget(key,size,IPC_CREAT|0644);
01118 if (shmid==-1) {
01119 int err=errno;
01120 cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01121 <<strerror(err)<<endl;
01122 }
01123 return shmid;
01124 }
01125
01126
01127
01128 int FUShmBuffer::shm_get(key_t key,int size)
01129 {
01130 int shmid=shmget(key,size,0644);
01131 if (shmid==-1) {
01132 int err=errno;
01133 cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01134 <<strerror(err)<<endl;
01135 }
01136 return shmid;
01137 }
01138
01139
01140
01141 void* FUShmBuffer::shm_attach(int shmid)
01142 {
01143 void* result=shmat(shmid,NULL,0);
01144 if (0==result) {
01145 int err=errno;
01146 cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01147 <<strerror(err)<<endl;
01148 }
01149 return result;
01150 }
01151
01152
01153
01154 int FUShmBuffer::shm_nattch(int shmid)
01155 {
01156 shmid_ds shmstat;
01157 shmctl(shmid,IPC_STAT,&shmstat);
01158 return shmstat.shm_nattch;
01159 }
01160
01161
01162
01163 int FUShmBuffer::shm_destroy(int shmid)
01164 {
01165 shmid_ds shmstat;
01166 int result=shmctl(shmid,IPC_RMID,&shmstat);
01167 if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01168 return result;
01169 }
01170
01171
01172
01173 int FUShmBuffer::sem_create(key_t key,int nsem)
01174 {
01175 int semid=semget(key,nsem,IPC_CREAT|0666);
01176 if (semid<0) {
01177 int err=errno;
01178 cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01179 <<strerror(err)<<endl;
01180 }
01181 return semid;
01182 }
01183
01184
01185
01186 int FUShmBuffer::sem_get(key_t key,int nsem)
01187 {
01188 int semid=semget(key,nsem,0666);
01189 if (semid<0) {
01190 int err=errno;
01191 cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01192 <<strerror(err)<<endl;
01193 }
01194 return semid;
01195 }
01196
01197
01198
01199 int FUShmBuffer::sem_destroy(int semid)
01200 {
01201 int result=semctl(semid,0,IPC_RMID);
01202 if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01203 return result;
01204 }
01205
01206
01207
01209
01211
01212
01213 unsigned int FUShmBuffer::nextIndex(unsigned int offset,
01214 unsigned int nCells,
01215 unsigned int& iNext)
01216 {
01217 lock();
01218 unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01219 pindex+=iNext;
01220 iNext=(iNext+1)%nCells;
01221 unsigned int result=*pindex;
01222 unlock();
01223 return result;
01224 }
01225
01226
01227
01228 void FUShmBuffer::postIndex(unsigned int index,
01229 unsigned int offset,
01230 unsigned int nCells,
01231 unsigned int& iLast)
01232 {
01233 lock();
01234 unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01235 pindex+=iLast;
01236 *pindex=index;
01237 iLast=(iLast+1)%nCells;
01238 unlock();
01239 }
01240
01241
01242
01243 unsigned int FUShmBuffer::nextRawWriteIndex()
01244 {
01245 return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01246 }
01247
01248
01249
01250 unsigned int FUShmBuffer::nextRawReadIndex()
01251 {
01252 return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01253 }
01254
01255
01256
01257 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01258 {
01259 postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01260 }
01261
01262
01263
01264 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01265 {
01266 postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01267 }
01268
01269
01270
01271 unsigned int FUShmBuffer::nextRecoWriteIndex()
01272 {
01273 return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01274 }
01275
01276
01277
01278 unsigned int FUShmBuffer::nextRecoReadIndex()
01279 {
01280 return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01281 }
01282
01283
01284
01285 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01286 {
01287 postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01288 }
01289
01290
01291
01292 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01293 {
01294 postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01295 }
01296
01297
01298
01299 unsigned int FUShmBuffer::nextDqmWriteIndex()
01300 {
01301 return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01302 }
01303
01304
01305
01306 unsigned int FUShmBuffer::nextDqmReadIndex()
01307 {
01308 return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01309 }
01310
01311
01312
01313 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01314 {
01315 postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01316 }
01317
01318
01319
01320 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01321 {
01322 postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01323 }
01324
01325
01326
01327 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01328 {
01329 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01330 for (unsigned int i=0;i<nRawCells_;i++) {
01331 if ((*pevt++)==evtNumber) return i;
01332 }
01333 assert(false);
01334 return 0xffffffff;
01335 }
01336
01337
01338
01339 evt::State_t FUShmBuffer::evtState(unsigned int index)
01340 {
01341 assert(index<nRawCells_);
01342 evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01343 pstate+=index;
01344 return *pstate;
01345 }
01346
01347
01348
01349 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01350 {
01351 assert(index<nDqmCells_);
01352 dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01353 pstate+=index;
01354 return *pstate;
01355 }
01356
01357
01358
01359 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01360 {
01361 assert(index<nRawCells_);
01362 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01363 pevt+=index;
01364 return *pevt;
01365 }
01366
01367
01368
01369 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01370 {
01371 assert(index<nRawCells_);
01372 pid_t *prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01373 prcid+=index;
01374 return *prcid;
01375 }
01376
01377
01378
01379 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01380 {
01381 assert(index<nRawCells_);
01382 time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01383 ptstmp+=index;
01384 return *ptstmp;
01385 }
01386
01387
01388
01389 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01390 {
01391 assert(index<nClientsMax_);
01392 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01393 prcid+=index;
01394 return *prcid;
01395 }
01396
01397
01398
01399 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01400 {
01401 assert(index<nRawCells_);
01402 evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01403 pstate+=index;
01404 lock();
01405 *pstate=state;
01406 unlock();
01407 return true;
01408 }
01409
01410
01411
01412 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01413 {
01414 assert(index<nDqmCells_);
01415 dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01416 pstate+=index;
01417 lock();
01418 *pstate=state;
01419 unlock();
01420 return true;
01421 }
01422
01423
01424
01425 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01426 {
01427 assert(index<nRawCells_);
01428 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01429 pcount+=index;
01430 lock();
01431 *pcount=discard;
01432 unlock();
01433 return true;
01434 }
01435
01436
01437
01438 int FUShmBuffer::incEvtDiscard(unsigned int index)
01439 {
01440 int result = 0;
01441 assert(index<nRawCells_);
01442 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01443 pcount+=index;
01444 lock();
01445 (*pcount)++;
01446 result = *pcount;
01447 unlock();
01448 return result;
01449 }
01450
01451
01452
01453 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01454 {
01455 assert(index<nRawCells_);
01456 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01457 pevt+=index;
01458 lock();
01459 *pevt=evtNumber;
01460 unlock();
01461 return true;
01462 }
01463
01464
01465
01466 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01467 {
01468 assert(index<nRawCells_);
01469 pid_t* prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01470 prcid+=index;
01471 lock();
01472 *prcid=prcId;
01473 unlock();
01474 return true;
01475 }
01476
01477
01478
01479 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01480 {
01481 assert(index<nRawCells_);
01482 time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01483 ptstmp+=index;
01484 lock();
01485 *ptstmp=timeStamp;
01486 unlock();
01487 return true;
01488 }
01489
01490
01491
01492 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01493 {
01494 lock();
01495 assert(nClients_<nClientsMax_);
01496 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01497 for (unsigned int i=0;i<nClients_;i++) {
01498 if ((*prcid)==prcId) { unlock(); return false; }
01499 prcid++;
01500 }
01501 nClients_++;
01502 *prcid=prcId;
01503 unlock();
01504 return true;
01505 }
01506
01507
01508
01509 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01510 {
01511 lock();
01512 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01513 unsigned int iClient(0);
01514 while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01515 assert(iClient!=nClients_);
01516 pid_t* next=prcid; next++;
01517 while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01518 nClients_--;
01519 unlock();
01520 return true;
01521 }
01522
01523
01524
01525 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01526 {
01527 FUShmRawCell* result(0);
01528
01529 if (iCell>=nRawCells_) {
01530 cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01531 <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01532 return result;
01533 }
01534
01535 if (segmentationMode_) {
01536 key_t shmkey =rawCellShmKey(iCell);
01537 int shmid =shm_get(shmkey,rawCellTotalSize_);
01538 void* cellAddr=shm_attach(shmid);
01539 result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01540 }
01541 else {
01542 result=
01543 (FUShmRawCell*)((unsigned long)this+rawCellOffset_+iCell*rawCellTotalSize_);
01544 }
01545
01546 return result;
01547 }
01548
01549
01550
01551 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01552 {
01553 FUShmRecoCell* result(0);
01554
01555 if (iCell>=nRecoCells_) {
01556 cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01557 <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01558 return result;
01559 }
01560
01561 if (segmentationMode_) {
01562 key_t shmkey =recoCellShmKey(iCell);
01563 int shmid =shm_get(shmkey,recoCellTotalSize_);
01564 void* cellAddr=shm_attach(shmid);
01565 result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01566 }
01567 else {
01568 result=
01569 (FUShmRecoCell*)((unsigned long)this+recoCellOffset_+iCell*recoCellTotalSize_);
01570 }
01571
01572 return result;
01573 }
01574
01575
01576
01577 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01578 {
01579 FUShmDqmCell* result(0);
01580
01581 if (iCell>=nDqmCells_) {
01582 cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01583 <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01584 return result;
01585 }
01586
01587 if (segmentationMode_) {
01588 key_t shmkey =dqmCellShmKey(iCell);
01589 int shmid =shm_get(shmkey,dqmCellTotalSize_);
01590 void* cellAddr=shm_attach(shmid);
01591 result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01592 }
01593 else {
01594 result=
01595 (FUShmDqmCell*)((unsigned long)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01596 }
01597
01598 return result;
01599 }
01600
01601
01602
01603 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01604 {
01605 assert(index<nRawCells_);
01606 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01607 pcount+=index;
01608 lock();
01609 assert(*pcount>0);
01610 --(*pcount);
01611 bool result=(*pcount==0);
01612 unlock();
01613 return result;
01614 }
01615
01616
01617
01618 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01619 {
01620 if (!segmentationMode_) {
01621 cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01622 return -1;
01623 }
01624 key_t* addr=(key_t*)((unsigned long)this+offset);
01625 for (unsigned int i=0;i<iCell;i++) ++addr;
01626 return *addr;
01627 }
01628
01629
01630
01631 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01632 {
01633 if (iCell>=nRawCells_) {
01634 cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01635 <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01636 return -1;
01637 }
01638 return shmKey(iCell,rawCellOffset_);
01639 }
01640
01641
01642
01643 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01644 {
01645 if (iCell>=nRecoCells_) {
01646 cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01647 <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01648 return -1;
01649 }
01650 return shmKey(iCell,recoCellOffset_);
01651 }
01652
01653
01654
01655 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01656 {
01657 if (iCell>=nDqmCells_) {
01658 cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01659 <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01660 return -1;
01661 }
01662 return shmKey(iCell,dqmCellOffset_);
01663 }
01664
01665
01666
01667 void FUShmBuffer::sem_init(int isem,int value)
01668 {
01669 if (semctl(semid(),isem,SETVAL,value)<0) {
01670 cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01671 }
01672 }
01673
01674
01675
01676 int FUShmBuffer::sem_wait(int isem)
01677 {
01678 struct sembuf sops[1];
01679 sops[0].sem_num=isem;
01680 sops[0].sem_op = -1;
01681 sops[0].sem_flg= 0;
01682 if (semop(semid(),sops,1)==-1) {
01683 cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01684 return -1;
01685 }
01686 return 0;
01687 }
01688
01689
01690
01691 void FUShmBuffer::sem_post(int isem)
01692 {
01693 struct sembuf sops[1];
01694 sops[0].sem_num=isem;
01695 sops[0].sem_op = 1;
01696 sops[0].sem_flg= 0;
01697 if (semop(semid(),sops,1)==-1) {
01698 cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01699 }
01700 }