00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013
00014 #include <unistd.h>
00015 #include <iostream>
00016 #include <string>
00017 #include <cassert>
00018
00019 #include <sstream>
00020 #include <fstream>
00021
00022 #include <cstdlib>
00023 #include <cstring>
00024 #include <stdint.h>
00025
00026
00027
00028
00029
00030 #define SHM_DESCRIPTOR_KEYID 1
00031 #define SHM_KEYID 2
00032 #define SEM_KEYID 1
00033
00034 #define NSKIP_MAX 100
00035
00036
00037 using namespace std;
00038 using namespace evf;
00039
00040
00041
00042 const char* FUShmBuffer::shmKeyPath_ =
00043 (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045 (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00046
00047
00049
00051
00052
00053 FUShmBuffer::FUShmBuffer(bool segmentationMode,
00054 unsigned int nRawCells,
00055 unsigned int nRecoCells,
00056 unsigned int nDqmCells,
00057 unsigned int rawCellSize,
00058 unsigned int recoCellSize,
00059 unsigned int dqmCellSize)
00060 : segmentationMode_(segmentationMode)
00061 , nClientsMax_(128)
00062 , nRawCells_(nRawCells)
00063 , rawCellPayloadSize_(rawCellSize)
00064 , nRecoCells_(nRecoCells)
00065 , recoCellPayloadSize_(recoCellSize)
00066 , nDqmCells_(nDqmCells)
00067 , dqmCellPayloadSize_(dqmCellSize)
00068 {
00069 rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00070 recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00071 dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00072
00073 void* addr;
00074
00075 rawWriteOffset_=sizeof(FUShmBuffer);
00076 addr=(void*)((unsigned long)this+rawWriteOffset_);
00077 new (addr) unsigned int[nRawCells_];
00078
00079 rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00080 addr=(void*)((unsigned long)this+rawReadOffset_);
00081 new (addr) unsigned int[nRawCells_];
00082
00083 recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00084 addr=(void*)((unsigned long)this+recoWriteOffset_);
00085 new (addr) unsigned int[nRecoCells_];
00086
00087 recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00088 addr=(void*)((unsigned long)this+recoReadOffset_);
00089 new (addr) unsigned int[nRecoCells_];
00090
00091 dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00092 addr=(void*)((unsigned long)this+dqmWriteOffset_);
00093 new (addr) unsigned int[nDqmCells_];
00094
00095 dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00096 addr=(void*)((unsigned long)this+dqmReadOffset_);
00097 new (addr) unsigned int[nDqmCells_];
00098
00099 evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00100 addr=(void*)((unsigned long)this+evtStateOffset_);
00101 new (addr) evt::State_t[nRawCells_];
00102
00103 evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00104 addr=(void*)((unsigned long)this+evtDiscardOffset_);
00105 new (addr) unsigned int[nRawCells_];
00106
00107 evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00108 addr=(void*)((unsigned long)this+evtNumberOffset_);
00109 new (addr) unsigned int[nRawCells_];
00110
00111 evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00112 addr=(void*)((unsigned long)this+evtPrcIdOffset_);
00113 new (addr) pid_t[nRawCells_];
00114
00115 evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00116 addr=(void*)((unsigned long)this+evtTimeStampOffset_);
00117 new (addr) time_t[nRawCells_];
00118
00119 dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00120 addr=(void*)((unsigned long)this+dqmStateOffset_);
00121 new (addr) dqm::State_t[nDqmCells_];
00122
00123 clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00124 addr=(void*)((unsigned long)this+clientPrcIdOffset_);
00125 new (addr) pid_t[nClientsMax_];
00126
00127 rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00128
00129 if (segmentationMode_) {
00130 recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00131 dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00132 addr=(void*)((unsigned long)this+rawCellOffset_);
00133 new (addr) key_t[nRawCells_];
00134 addr=(void*)((unsigned long)this+recoCellOffset_);
00135 new (addr) key_t[nRecoCells_];
00136 addr=(void*)((unsigned long)this+dqmCellOffset_);
00137 new (addr) key_t[nDqmCells_];
00138 }
00139 else {
00140 recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00141 dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00142 for (unsigned int i=0;i<nRawCells_;i++) {
00143 addr=(void*)((unsigned long)this+rawCellOffset_+i*rawCellTotalSize_);
00144 new (addr) FUShmRawCell(rawCellSize);
00145 }
00146 for (unsigned int i=0;i<nRecoCells_;i++) {
00147 addr=(void*)((unsigned long)this+recoCellOffset_+i*recoCellTotalSize_);
00148 new (addr) FUShmRecoCell(recoCellSize);
00149 }
00150 for (unsigned int i=0;i<nDqmCells_;i++) {
00151 addr=(void*)((unsigned long)this+dqmCellOffset_+i*dqmCellTotalSize_);
00152 new (addr) FUShmDqmCell(dqmCellSize);
00153 }
00154 }
00155 }
00156
00157
00158
00159 FUShmBuffer::~FUShmBuffer()
00160 {
00161
00162 }
00163
00164
00166
00168
00169
00170 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00171 {
00172 shmid_=shmid;
00173 semid_=semid;
00174
00175 if (segmentationMode_) {
00176 int shmKeyId=666;
00177 key_t* keyAddr =(key_t*)((unsigned long)this+rawCellOffset_);
00178 for (unsigned int i=0;i<nRawCells_;i++) {
00179 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00180 int shmid =shm_create(*keyAddr,rawCellTotalSize_);
00181 void* shmAddr=shm_attach(shmid);
00182 new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00183 shmdt(shmAddr);
00184 ++keyAddr;
00185 }
00186 keyAddr =(key_t*)((unsigned long)this+recoCellOffset_);
00187 for (unsigned int i=0;i<nRecoCells_;i++) {
00188 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00189 int shmid =shm_create(*keyAddr,recoCellTotalSize_);
00190 void* shmAddr=shm_attach(shmid);
00191 new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00192 shmdt(shmAddr);
00193 ++keyAddr;
00194 }
00195 keyAddr =(key_t*)((unsigned long)this+dqmCellOffset_);
00196 for (unsigned int i=0;i<nDqmCells_;i++) {
00197 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00198 int shmid =shm_create(*keyAddr,dqmCellTotalSize_);
00199 void* shmAddr=shm_attach(shmid);
00200 new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00201 shmdt(shmAddr);
00202 ++keyAddr;
00203 }
00204 }
00205
00206 for (unsigned int i=0;i<nRawCells_;i++) {
00207 FUShmRawCell* cell=rawCell(i);
00208 cell->initialize(i);
00209 if (segmentationMode_) shmdt(cell);
00210 }
00211
00212 for (unsigned int i=0;i<nRecoCells_;i++) {
00213 FUShmRecoCell* cell=recoCell(i);
00214 cell->initialize(i);
00215 if (segmentationMode_) shmdt(cell);
00216 }
00217
00218 for (unsigned int i=0;i<nDqmCells_;i++) {
00219 FUShmDqmCell* cell=dqmCell(i);
00220 cell->initialize(i);
00221 if (segmentationMode_) shmdt(cell);
00222 }
00223
00224 reset();
00225 }
00226
00227
00228
00229 void FUShmBuffer::reset()
00230 {
00231 nClients_=0;
00232
00233
00234 sem_init(0,1);
00235 sem_init(1,nRawCells_);
00236 sem_init(2,0);
00237 sem_init(3,1);
00238 sem_init(4,0);
00239 sem_init(5,nRecoCells_);
00240 sem_init(6,0);
00241 sem_init(7,nDqmCells_);
00242 sem_init(8,0);
00243
00244 sem_print();
00245
00246 unsigned int *iWrite,*iRead;
00247
00248 rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00249 iWrite=(unsigned int*)((unsigned long)this+rawWriteOffset_);
00250 iRead =(unsigned int*)((unsigned long)this+rawReadOffset_);
00251 for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00252
00253 recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00254 iWrite=(unsigned int*)((unsigned long)this+recoWriteOffset_);
00255 iRead =(unsigned int*)((unsigned long)this+recoReadOffset_);
00256 for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00257
00258 dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00259 iWrite=(unsigned int*)((unsigned long)this+dqmWriteOffset_);
00260 iRead =(unsigned int*)((unsigned long)this+dqmReadOffset_);
00261 for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00262
00263 for (unsigned int i=0;i<nRawCells_;i++) {
00264 setEvtState(i,evt::EMPTY);
00265 setEvtDiscard(i,0);
00266 setEvtNumber(i,0xffffffff);
00267 setEvtPrcId(i,0);
00268 setEvtTimeStamp(i,0);
00269 }
00270
00271 for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00272 }
00273
00274
00275
00276 unsigned int FUShmBuffer::nbRawCellsToWrite() const
00277 {
00278 return semctl(semid(),1,GETVAL);
00279 }
00280
00281
00282
00283 int FUShmBuffer::nbRawCellsToRead() const
00284 {
00285 return semctl(semid(),2,GETVAL);
00286 }
00287
00288
00289
00290 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00291 {
00292 if(waitRawWrite()!=0) return 0;
00293 unsigned int iCell=nextRawWriteIndex();
00294 FUShmRawCell* cell =rawCell(iCell);
00295 evt::State_t state=evtState(iCell);
00296 assert(state==evt::EMPTY);
00297 setEvtState(iCell,evt::RAWWRITING);
00298 setEvtDiscard(iCell,1);
00299 return cell;
00300 }
00301
00302
00303
00304 FUShmRawCell* FUShmBuffer::rawCellToRead()
00305 {
00306 waitRawRead();
00307 unsigned int iCell=nextRawReadIndex();
00308 FUShmRawCell* cell=rawCell(iCell);
00309 evt::State_t state=evtState(iCell);
00310 assert(state==evt::RAWWRITTEN
00311 ||state==evt::EMPTY
00312 ||state==evt::STOP
00313 ||state==evt::LUMISECTION);
00314 if (state==evt::RAWWRITTEN) {
00315 setEvtPrcId(iCell,getpid());
00316 setEvtState(iCell,evt::RAWREADING);
00317 }
00318 return cell;
00319 }
00320
00321
00322
00323 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00324 {
00325 waitRecoRead();
00326 unsigned int iCell =nextRecoReadIndex();
00327 FUShmRecoCell* cell =recoCell(iCell);
00328 unsigned int iRawCell=cell->rawCellIndex();
00329 if (iRawCell<nRawCells_) {
00330
00331
00332 setEvtState(iRawCell,evt::SENDING);
00333 }
00334 return cell;
00335 }
00336
00337
00338
00339 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00340 {
00341 waitDqmRead();
00342 unsigned int iCell=nextDqmReadIndex();
00343 FUShmDqmCell* cell=dqmCell(iCell);
00344 dqm::State_t state=dqmState(iCell);
00345 assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00346 if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00347 return cell;
00348 }
00349
00350
00351
00352 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00353 {
00354 waitRawDiscarded();
00355 FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00356 evt::State_t state=evtState(cell->index());
00357 assert(state==evt::PROCESSED
00358 ||state==evt::SENT
00359 ||state==evt::EMPTY
00360 ||state==evt::STOP
00361 ||state==evt::USEDLS);
00362 if (state!=evt::EMPTY
00363 && state!=evt::USEDLS
00364 && state!=evt::STOP
00365 )
00366 setEvtState(cell->index(),evt::DISCARDING);
00367 return cell;
00368 }
00369
00370
00371
00372 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00373 {
00374 evt::State_t state=evtState(cell->index());
00375 assert(state==evt::RAWWRITING);
00376 setEvtState(cell->index(),evt::RAWWRITTEN);
00377 setEvtNumber(cell->index(),cell->evtNumber());
00378 postRawIndexToRead(cell->index());
00379 if (segmentationMode_) shmdt(cell);
00380 postRawRead();
00381 }
00382
00383
00384
00385 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00386 {
00387 evt::State_t state=evtState(cell->index());
00388 assert(state==evt::RAWREADING);
00389 setEvtState(cell->index(),evt::RAWREAD);
00390 setEvtState(cell->index(),evt::PROCESSING);
00391 setEvtTimeStamp(cell->index(),time(0));
00392 if (segmentationMode_) shmdt(cell);
00393 }
00394
00395
00396
00397 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00398 {
00399 unsigned int iRawCell=cell->rawCellIndex();
00400 if (iRawCell<nRawCells_) {
00401
00402
00403 setEvtState(cell->rawCellIndex(),evt::SENT);
00404 }
00405 if (segmentationMode_) shmdt(cell);
00406 }
00407
00408
00409
00410 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00411 {
00412 dqm::State_t state=dqmState(cell->index());
00413 assert(state==dqm::SENDING||state==dqm::EMPTY);
00414 if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00415 if (segmentationMode_) shmdt(cell);
00416 }
00417
00418
00419
00420 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00421 {
00422 waitRawDiscard();
00423 if (rawCellReadyForDiscard(iCell)) {
00424 rawDiscardIndex_=iCell;
00425 evt::State_t state=evtState(iCell);
00426 assert(state==evt::PROCESSING
00427 ||state==evt::SENT
00428 ||state==evt::EMPTY
00429 ||state==evt::STOP
00430 ||state==evt::LUMISECTION
00431 );
00432 if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00433 if (state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00434 postRawDiscarded();
00435 }
00436 else postRawDiscard();
00437 }
00438
00439
00440
00441 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell)
00442 {
00443 waitRawDiscard();
00444 if (rawCellReadyForDiscard(iCell)) {
00445 rawDiscardIndex_=iCell;
00446 evt::State_t state=evtState(iCell);
00447 if(state!=evt::LUMISECTION && state!=evt::EMPTY && state!=evt::USEDLS) setEvtState(iCell,evt::PROCESSED);
00448 if(state==evt::LUMISECTION) setEvtState(iCell,evt::USEDLS);
00449 postRawDiscarded();
00450 }
00451 else postRawDiscard();
00452 }
00453
00454
00455
00456 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00457 {
00458 releaseRawCell(cell);
00459 postRawDiscard();
00460 }
00461
00462
00463
00464 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00465 {
00466 FUShmRecoCell* cell=recoCell(iCell);
00467 unsigned int iRawCell=cell->rawCellIndex();
00468 if (iRawCell<nRawCells_) {
00469
00470
00471 scheduleRawCellForDiscard(iRawCell);
00472 }
00473 cell->clear();
00474 if (segmentationMode_) shmdt(cell);
00475 postRecoIndexToWrite(iCell);
00476 postRecoWrite();
00477 }
00478
00479
00480 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell)
00481 {
00482 FUShmRecoCell* cell=recoCell(iCell);
00483 cell->clear();
00484 if (segmentationMode_) shmdt(cell);
00485 postRecoIndexToWrite(iCell);
00486 postRecoWrite();
00487 }
00488
00489
00490
00491 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00492 {
00493 dqm::State_t state=dqmState(iCell);
00494 assert(state==dqm::EMPTY||state==dqm::SENT);
00495 setDqmState(iCell,dqm::DISCARDING);
00496 FUShmDqmCell* cell=dqmCell(iCell);
00497 cell->clear();
00498 if (segmentationMode_) shmdt(cell);
00499 setDqmState(iCell,dqm::EMPTY);
00500 postDqmIndexToWrite(iCell);
00501 postDqmWrite();
00502 }
00503
00504
00505
00506 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00507 {
00508 evt::State_t state=evtState(cell->index());
00509 if(!(
00510 state==evt::DISCARDING
00511 ||state==evt::RAWWRITING
00512 ||state==evt::EMPTY
00513 ||state==evt::STOP
00514
00515 ||state==evt::USEDLS
00516 )) std::cout << "=================releaseRawCell state " << state
00517 << std::endl;
00518
00519 assert(
00520 state==evt::DISCARDING
00521 ||state==evt::RAWWRITING
00522 ||state==evt::EMPTY
00523 ||state==evt::STOP
00524
00525 ||state==evt::USEDLS
00526 );
00527 setEvtState(cell->index(),evt::EMPTY);
00528 setEvtDiscard(cell->index(),0);
00529 setEvtNumber(cell->index(),0xffffffff);
00530 setEvtPrcId(cell->index(),0);
00531 setEvtTimeStamp(cell->index(),0);
00532 cell->clear();
00533 postRawIndexToWrite(cell->index());
00534 if (segmentationMode_) shmdt(cell);
00535 postRawWrite();
00536 }
00537
00538
00539
00540 void FUShmBuffer::writeRawEmptyEvent()
00541 {
00542 FUShmRawCell* cell=rawCellToWrite();
00543 if(cell==0) return;
00544 evt::State_t state=evtState(cell->index());
00545 assert(state==evt::RAWWRITING);
00546 setEvtState(cell->index(),evt::STOP);
00547 cell->setEventTypeStopper();
00548 postRawIndexToRead(cell->index());
00549 if (segmentationMode_) shmdt(cell);
00550 postRawRead();
00551 }
00552
00553
00554 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls)
00555 {
00556 FUShmRawCell* cell=rawCellToWrite();
00557 if(cell==0) return;
00558 cell->setLumiSection(ls);
00559 evt::State_t state=evtState(cell->index());
00560 assert(state==evt::RAWWRITING);
00561 setEvtState(cell->index(),evt::LUMISECTION);
00562 cell->setEventTypeEol();
00563 postRawIndexToRead(cell->index());
00564 if (segmentationMode_) shmdt(cell);
00565 postRawRead();
00566 }
00567
00568
00569
00570 void FUShmBuffer::writeRecoEmptyEvent()
00571 {
00572 waitRecoWrite();
00573 unsigned int iCell=nextRecoWriteIndex();
00574 FUShmRecoCell* cell =recoCell(iCell);
00575 cell->clear();
00576 postRecoIndexToRead(iCell);
00577 if (segmentationMode_) shmdt(cell);
00578 postRecoRead();
00579 }
00580
00581
00582
00583 void FUShmBuffer::writeDqmEmptyEvent()
00584 {
00585 waitDqmWrite();
00586 unsigned int iCell=nextDqmWriteIndex();
00587 FUShmDqmCell* cell=dqmCell(iCell);
00588 cell->clear();
00589 postDqmIndexToRead(iCell);
00590 if (segmentationMode_) shmdt(cell);
00591 postDqmRead();
00592 }
00593
00594
00595
00596 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00597 {
00598 FUShmRawCell* cell=rawCellToWrite();
00599 if(cell==0) return;
00600 rawDiscardIndex_=cell->index();
00601 setEvtState(cell->index(),evt::STOP);
00602 cell->setEventTypeStopper();
00603 setEvtNumber(cell->index(),0xffffffff);
00604 setEvtPrcId(cell->index(),0);
00605 setEvtTimeStamp(cell->index(),0);
00606 postRawDiscarded();
00607 }
00608
00609
00610
00611 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00612 {
00613 waitRawDiscard();
00614 if (rawCellReadyForDiscard(cell->index())) {
00615 rawDiscardIndex_=cell->index();
00616
00617
00618
00619
00620
00621
00622
00623 removeClientPrcId(getpid());
00624 if (segmentationMode_) shmdt(cell);
00625 postRawDiscarded();
00626 }
00627 else postRawDiscard();
00628 }
00629
00630
00631 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell)
00632 {
00633
00634 if (rawCellReadyForDiscard(cell->index())) {
00635 rawDiscardIndex_=cell->index();
00636
00637
00638
00639
00640
00641
00642 if (segmentationMode_) shmdt(cell);
00643 postRawDiscarded();
00644 }
00645 else postRawDiscard();
00646 }
00647
00648
00649
00650 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
00651 unsigned int fuProcessId,
00652 unsigned int fuGuid,
00653 unsigned char *data,
00654 unsigned int dataSize)
00655 {
00656 if (dataSize>recoCellPayloadSize_) {
00657 cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00658 return false;
00659 }
00660
00661 waitRecoWrite();
00662 unsigned int iCell=nextRecoWriteIndex();
00663 FUShmRecoCell* cell =recoCell(iCell);
00664 cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00665 postRecoIndexToRead(iCell);
00666 if (segmentationMode_) shmdt(cell);
00667 postRecoRead();
00668 return true;
00669 }
00670
00671
00672
00673 bool FUShmBuffer::writeRecoEventData(unsigned int runNumber,
00674 unsigned int evtNumber,
00675 unsigned int outModId,
00676 unsigned int fuProcessId,
00677 unsigned int fuGuid,
00678 unsigned char *data,
00679 unsigned int dataSize)
00680 {
00681 if (dataSize>recoCellPayloadSize_) {
00682 cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00683 return false;
00684 }
00685
00686 waitRecoWrite();
00687 unsigned int iCell=nextRecoWriteIndex();
00688 FUShmRecoCell* cell =recoCell(iCell);
00689 unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00690
00691
00692 setEvtState(rawCellIndex,evt::RECOWRITING);
00693 incEvtDiscard(rawCellIndex);
00694 cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00695 fuProcessId,fuGuid,data,dataSize);
00696 setEvtState(rawCellIndex,evt::RECOWRITTEN);
00697 postRecoIndexToRead(iCell);
00698 if (segmentationMode_) shmdt(cell);
00699 postRecoRead();
00700 return true;
00701 }
00702
00703
00704
00705 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00706 unsigned int fuProcessId,
00707 unsigned int iRawCell)
00708 {
00709 FUShmRawCell *raw=rawCell(iRawCell);
00710
00711 unsigned int dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00712 unsigned char *data =new unsigned char[dataSize];
00713 uint32_t *pos =(uint32_t*)data;
00714
00715
00716
00717
00718
00719
00720
00721 *pos++=(uint32_t)2;
00722 *pos++=(uint32_t)runNumber;
00723 *pos++=(uint32_t)evf::evtn::getlbn(raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00724 *pos++=(uint32_t)raw->evtNumber();
00725 for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00726 memcpy(pos,raw->payloadAddr(),raw->eventSize());
00727
00728
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758 waitRecoWrite();
00759 unsigned int iRecoCell=nextRecoWriteIndex();
00760 FUShmRecoCell* reco =recoCell(iRecoCell);
00761 setEvtState(iRawCell,evt::RECOWRITING);
00762 setEvtDiscard(iRawCell,1);
00763 reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00764 data,dataSize);
00765 delete [] data;
00766 setEvtState(iRawCell,evt::RECOWRITTEN);
00767 postRecoIndexToRead(iRecoCell);
00768 if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00769 postRecoRead();
00770 return true;
00771 }
00772
00773
00774
00775 bool FUShmBuffer::writeDqmEventData(unsigned int runNumber,
00776 unsigned int evtAtUpdate,
00777 unsigned int folderId,
00778 unsigned int fuProcessId,
00779 unsigned int fuGuid,
00780 unsigned char *data,
00781 unsigned int dataSize)
00782 {
00783 if (dataSize>dqmCellPayloadSize_) {
00784 cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00785 return false;
00786 }
00787
00788 waitDqmWrite();
00789 unsigned int iCell=nextDqmWriteIndex();
00790 FUShmDqmCell* cell=dqmCell(iCell);
00791 dqm::State_t state=dqmState(iCell);
00792 assert(state==dqm::EMPTY);
00793 setDqmState(iCell,dqm::WRITING);
00794 cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00795 setDqmState(iCell,dqm::WRITTEN);
00796 postDqmIndexToRead(iCell);
00797 if (segmentationMode_) shmdt(cell);
00798 postDqmRead();
00799 return true;
00800 }
00801
00802
00803
00804 void FUShmBuffer::sem_print()
00805 {
00806 cout<<"--> current sem values:"
00807 <<endl
00808 <<" lock="<<semctl(semid(),0,GETVAL)
00809 <<endl
00810 <<" wraw="<<semctl(semid(),1,GETVAL)
00811 <<" rraw="<<semctl(semid(),2,GETVAL)
00812 <<endl
00813 <<" wdsc="<<semctl(semid(),3,GETVAL)
00814 <<" rdsc="<<semctl(semid(),4,GETVAL)
00815 <<endl
00816 <<" wrec="<<semctl(semid(),5,GETVAL)
00817 <<" rrec="<<semctl(semid(),6,GETVAL)
00818 <<endl
00819 <<" wdqm="<<semctl(semid(),7,GETVAL)
00820 <<" rdqm="<<semctl(semid(),8,GETVAL)
00821 <<endl;
00822 }
00823
00824
00825
00826 void FUShmBuffer::printEvtState(unsigned int index)
00827 {
00828 evt::State_t state=evtState(index);
00829 std::string stateName;
00830 if (state==evt::EMPTY) stateName="EMPTY";
00831 else if (state==evt::STOP) stateName="STOP";
00832 else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00833 else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00834 else if (state==evt::RAWREADING) stateName="RAWREADING";
00835 else if (state==evt::RAWREAD) stateName="RAWREAD";
00836 else if (state==evt::PROCESSING) stateName="PROCESSING";
00837 else if (state==evt::PROCESSED) stateName="PROCESSED";
00838 else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00839 else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00840 else if (state==evt::SENDING) stateName="SENDING";
00841 else if (state==evt::SENT) stateName="SENT";
00842 else if (state==evt::DISCARDING) stateName="DISCARDING";
00843 cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00844 }
00845
00846
00847
00848 void FUShmBuffer::printDqmState(unsigned int index)
00849 {
00850 dqm::State_t state=dqmState(index);
00851 cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00852 }
00853
00854
00855
00856 FUShmBuffer* FUShmBuffer::createShmBuffer(bool segmentationMode,
00857 unsigned int nRawCells,
00858 unsigned int nRecoCells,
00859 unsigned int nDqmCells,
00860 unsigned int rawCellSize,
00861 unsigned int recoCellSize,
00862 unsigned int dqmCellSize)
00863 {
00864
00865 if (FUShmBuffer::releaseSharedMemory())
00866 cout<<"FUShmBuffer::createShmBuffer: "
00867 <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00868 <<endl;
00869
00870
00871 int size =sizeof(unsigned int)*7;
00872 int shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00873 void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00874
00875 if(1!=shm_nattch(shmid)) {
00876 cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00877 shmdt(shmAddr);
00878 return 0;
00879 }
00880
00881 unsigned int* p=(unsigned int*)shmAddr;
00882 *p++=segmentationMode;
00883 *p++=nRawCells;
00884 *p++=nRecoCells;
00885 *p++=nDqmCells;
00886 *p++=rawCellSize;
00887 *p++=recoCellSize;
00888 *p++=dqmCellSize;
00889 shmdt(shmAddr);
00890
00891
00892 size =FUShmBuffer::size(segmentationMode,
00893 nRawCells,nRecoCells,nDqmCells,
00894 rawCellSize,recoCellSize,dqmCellSize);
00895 shmid =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00896 int semid=sem_create(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00897 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00898
00899 if (1!=shm_nattch(shmid)) {
00900 cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00901 shmdt(shmAddr);
00902 return 0;
00903 }
00904 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00905 nRawCells,nRecoCells,nDqmCells,
00906 rawCellSize,recoCellSize,dqmCellSize);
00907
00908 cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00909 cout<<" segmentationMode="<<segmentationMode<<endl;
00910
00911 buffer->initialize(shmid,semid);
00912
00913 return buffer;
00914 }
00915
00916
00917
00918 FUShmBuffer* FUShmBuffer::getShmBuffer()
00919 {
00920
00921 int size =sizeof(unsigned int)*7;
00922 int shmid =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00923 void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00924
00925 unsigned int *p=(unsigned int*)shmAddr;
00926 bool segmentationMode=*p++;
00927 unsigned int nRawCells =*p++;
00928 unsigned int nRecoCells =*p++;
00929 unsigned int nDqmCells =*p++;
00930 unsigned int rawCellSize =*p++;
00931 unsigned int recoCellSize =*p++;
00932 unsigned int dqmCellSize =*p++;
00933 shmdt(shmAddr);
00934
00935 cout<<"FUShmBuffer::getShmBuffer():"
00936 <<" segmentationMode="<<segmentationMode
00937 <<" nRawCells="<<nRawCells
00938 <<" nRecoCells="<<nRecoCells
00939 <<" nDqmCells="<<nDqmCells
00940 <<" rawCellSize="<<rawCellSize
00941 <<" recoCellSize="<<recoCellSize
00942 <<" dqmCellSize="<<dqmCellSize
00943 <<endl;
00944
00945
00946 size =FUShmBuffer::size(segmentationMode,
00947 nRawCells,nRecoCells,nDqmCells,
00948 rawCellSize,recoCellSize,dqmCellSize);
00949 shmid =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00950 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00951 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00952
00953 if (0==shm_nattch(shmid)) {
00954 cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00955 return 0;
00956 }
00957
00958 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00959 nRawCells,nRecoCells,nDqmCells,
00960 rawCellSize,recoCellSize,dqmCellSize);
00961
00962 cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00963 cout<<" segmentationMode="<<segmentationMode<<endl;
00964
00965 buffer->setClientPrcId(getpid());
00966
00967 return buffer;
00968 }
00969
00970
00971
00972 bool FUShmBuffer::releaseSharedMemory()
00973 {
00974
00975 int size =sizeof(unsigned int)*7;
00976 int shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00977 void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00978
00979 unsigned int*p=(unsigned int*)shmAddr;
00980 bool segmentationMode=*p++;
00981 unsigned int nRawCells =*p++;
00982 unsigned int nRecoCells =*p++;
00983 unsigned int nDqmCells =*p++;
00984 unsigned int rawCellSize =*p++;
00985 unsigned int recoCellSize =*p++;
00986 unsigned int dqmCellSize =*p++;
00987 shmdt(shmAddr);
00988
00989
00990
00991 size =FUShmBuffer::size(segmentationMode,
00992 nRawCells,nRecoCells,nDqmCells,
00993 rawCellSize,recoCellSize,dqmCellSize);
00994 int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0) return false;
00995 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return false;
00996 shmAddr =shm_attach(shmid); if (0==shmAddr) return false;
00997
00998 int att = 0;
00999 for(; att <10; att++)
01000 {
01001 if(shm_nattch(shmid)>1) {
01002 cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
01003 <<", failed attempt to release shared memory."<<endl;
01004 ::sleep(1);
01005 }
01006 else
01007 break;
01008 }
01009
01010 if(att>=10) return false;
01011
01012 if (segmentationMode) {
01013 FUShmBuffer* buffer=
01014 new (shmAddr) FUShmBuffer(segmentationMode,
01015 nRawCells,nRecoCells,nDqmCells,
01016 rawCellSize,recoCellSize,dqmCellSize);
01017 int cellid;
01018 for (unsigned int i=0;i<nRawCells;i++) {
01019 cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
01020 if ((shm_destroy(cellid)==-1)) return false;
01021 }
01022 for (unsigned int i=0;i<nRecoCells;i++) {
01023 cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
01024 if ((shm_destroy(cellid)==-1)) return false;
01025 }
01026 for (unsigned int i=0;i<nDqmCells;i++) {
01027 cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
01028 if ((shm_destroy(cellid)==-1)) return false;
01029 }
01030 }
01031 shmdt(shmAddr);
01032 if (sem_destroy(semid)==-1) return false;
01033 if (shm_destroy(shmid)==-1) return false;
01034 if (shm_destroy(shmidd)==-1) return false;
01035
01036 return true;
01037 }
01038
01039
01040
01041 unsigned int FUShmBuffer::size(bool segmentationMode,
01042 unsigned int nRawCells,
01043 unsigned int nRecoCells,
01044 unsigned int nDqmCells,
01045 unsigned int rawCellSize,
01046 unsigned int recoCellSize,
01047 unsigned int dqmCellSize)
01048 {
01049 unsigned int offset=
01050 sizeof(FUShmBuffer)+
01051 sizeof(unsigned int)*4*nRawCells+
01052 sizeof(evt::State_t)*nRawCells+
01053 sizeof(dqm::State_t)*nDqmCells;
01054
01055 unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
01056 unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
01057 unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
01058
01059 unsigned int realSize =
01060 (segmentationMode) ?
01061 offset
01062 +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
01063 :
01064 offset
01065 +rawCellTotalSize*nRawCells
01066 +recoCellTotalSize*nRecoCells
01067 +dqmCellTotalSize*nDqmCells;
01068
01069 unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
01070
01071 return result;
01072 }
01073
01074
01075
01076 key_t FUShmBuffer::getShmDescriptorKey()
01077 {
01078 key_t result=getuid()*1000+SHM_DESCRIPTOR_KEYID;
01079 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: failed "
01080 <<"for file "<<shmKeyPath_<<"!"<<endl;
01081 return result;
01082 }
01083
01084
01085
01086 key_t FUShmBuffer::getShmKey()
01087 {
01088 key_t result=getuid()*1000+SHM_KEYID;
01089 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
01090 <<"for file "<<shmKeyPath_<<"!"<<endl;
01091 return result;
01092 }
01093
01094
01095
01096 key_t FUShmBuffer::getSemKey()
01097 {
01098 key_t result=getuid()*1000+SEM_KEYID;
01099 if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
01100 <<"for file "<<semKeyPath_<<"!"<<endl;
01101 return result;
01102 }
01103
01104
01105
01106 int FUShmBuffer::shm_create(key_t key,int size)
01107 {
01108
01109 int shmid=shmget(key,1,0644);
01110 if(shmid!=-1){
01111
01112 shmid_ds shmstat;
01113 int result=shmctl(shmid,IPC_STAT,&shmstat);
01114 cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01115 << " created by process " << shmstat.shm_cpid << " owned by "
01116 << shmstat.shm_perm.uid << " permissions "
01117 << hex << shmstat.shm_perm.mode << dec << endl;
01118 result=shmctl(shmid,IPC_RMID,&shmstat);
01119 }
01120 shmid=shmget(key,size,IPC_CREAT|0644);
01121 if (shmid==-1) {
01122 int err=errno;
01123 cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01124 <<strerror(err)<<endl;
01125 }
01126 return shmid;
01127 }
01128
01129
01130
01131 int FUShmBuffer::shm_get(key_t key,int size)
01132 {
01133 int shmid=shmget(key,size,0644);
01134 if (shmid==-1) {
01135 int err=errno;
01136 cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01137 <<strerror(err)<<endl;
01138 }
01139 return shmid;
01140 }
01141
01142
01143
01144 void* FUShmBuffer::shm_attach(int shmid)
01145 {
01146 void* result=shmat(shmid,NULL,0);
01147 if (0==result) {
01148 int err=errno;
01149 cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01150 <<strerror(err)<<endl;
01151 }
01152 return result;
01153 }
01154
01155
01156
01157 int FUShmBuffer::shm_nattch(int shmid)
01158 {
01159 shmid_ds shmstat;
01160 shmctl(shmid,IPC_STAT,&shmstat);
01161 return shmstat.shm_nattch;
01162 }
01163
01164
01165
01166 int FUShmBuffer::shm_destroy(int shmid)
01167 {
01168 shmid_ds shmstat;
01169 int result=shmctl(shmid,IPC_RMID,&shmstat);
01170 if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01171 return result;
01172 }
01173
01174
01175
01176 int FUShmBuffer::sem_create(key_t key,int nsem)
01177 {
01178 int semid=semget(key,nsem,IPC_CREAT|0666);
01179 if (semid<0) {
01180 int err=errno;
01181 cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01182 <<strerror(err)<<endl;
01183 }
01184 return semid;
01185 }
01186
01187
01188
01189 int FUShmBuffer::sem_get(key_t key,int nsem)
01190 {
01191 int semid=semget(key,nsem,0666);
01192 if (semid<0) {
01193 int err=errno;
01194 cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01195 <<strerror(err)<<endl;
01196 }
01197 return semid;
01198 }
01199
01200
01201
01202 int FUShmBuffer::sem_destroy(int semid)
01203 {
01204 int result=semctl(semid,0,IPC_RMID);
01205 if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01206 return result;
01207 }
01208
01209
01210
01212
01214
01215
01216 unsigned int FUShmBuffer::nextIndex(unsigned int offset,
01217 unsigned int nCells,
01218 unsigned int& iNext)
01219 {
01220 lock();
01221 unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01222 pindex+=iNext;
01223 iNext=(iNext+1)%nCells;
01224 unsigned int result=*pindex;
01225 unlock();
01226 return result;
01227 }
01228
01229
01230
01231 void FUShmBuffer::postIndex(unsigned int index,
01232 unsigned int offset,
01233 unsigned int nCells,
01234 unsigned int& iLast)
01235 {
01236 lock();
01237 unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01238 pindex+=iLast;
01239 *pindex=index;
01240 iLast=(iLast+1)%nCells;
01241 unlock();
01242 }
01243
01244
01245
01246 unsigned int FUShmBuffer::nextRawWriteIndex()
01247 {
01248 return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01249 }
01250
01251
01252
01253 unsigned int FUShmBuffer::nextRawReadIndex()
01254 {
01255 return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01256 }
01257
01258
01259
01260 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01261 {
01262 postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01263 }
01264
01265
01266
01267 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01268 {
01269 postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01270 }
01271
01272
01273
01274 unsigned int FUShmBuffer::nextRecoWriteIndex()
01275 {
01276 return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01277 }
01278
01279
01280
01281 unsigned int FUShmBuffer::nextRecoReadIndex()
01282 {
01283 return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01284 }
01285
01286
01287
01288 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01289 {
01290 postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01291 }
01292
01293
01294
01295 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01296 {
01297 postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01298 }
01299
01300
01301
01302 unsigned int FUShmBuffer::nextDqmWriteIndex()
01303 {
01304 return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01305 }
01306
01307
01308
01309 unsigned int FUShmBuffer::nextDqmReadIndex()
01310 {
01311 return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01312 }
01313
01314
01315
01316 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01317 {
01318 postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01319 }
01320
01321
01322
01323 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01324 {
01325 postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01326 }
01327
01328
01329
01330 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01331 {
01332 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01333 for (unsigned int i=0;i<nRawCells_;i++) {
01334 if ((*pevt++)==evtNumber) return i;
01335 }
01336 assert(false);
01337 return 0xffffffff;
01338 }
01339
01340
01341
01342 evt::State_t FUShmBuffer::evtState(unsigned int index)
01343 {
01344 assert(index<nRawCells_);
01345 evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01346 pstate+=index;
01347 return *pstate;
01348 }
01349
01350
01351
01352 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01353 {
01354 assert(index<nDqmCells_);
01355 dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01356 pstate+=index;
01357 return *pstate;
01358 }
01359
01360
01361
01362 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01363 {
01364 assert(index<nRawCells_);
01365 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01366 pevt+=index;
01367 return *pevt;
01368 }
01369
01370
01371
01372 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01373 {
01374 assert(index<nRawCells_);
01375 pid_t *prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01376 prcid+=index;
01377 return *prcid;
01378 }
01379
01380
01381
01382 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01383 {
01384 assert(index<nRawCells_);
01385 time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01386 ptstmp+=index;
01387 return *ptstmp;
01388 }
01389
01390
01391
01392 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01393 {
01394 assert(index<nClientsMax_);
01395 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01396 prcid+=index;
01397 return *prcid;
01398 }
01399
01400
01401
01402 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01403 {
01404 assert(index<nRawCells_);
01405 evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01406 pstate+=index;
01407 lock();
01408 *pstate=state;
01409 unlock();
01410 return true;
01411 }
01412
01413
01414
01415 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01416 {
01417 assert(index<nDqmCells_);
01418 dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01419 pstate+=index;
01420 lock();
01421 *pstate=state;
01422 unlock();
01423 return true;
01424 }
01425
01426
01427
01428 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01429 {
01430 assert(index<nRawCells_);
01431 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01432 pcount+=index;
01433 lock();
01434 *pcount=discard;
01435 unlock();
01436 return true;
01437 }
01438
01439
01440
01441 int FUShmBuffer::incEvtDiscard(unsigned int index)
01442 {
01443 int result = 0;
01444 assert(index<nRawCells_);
01445 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01446 pcount+=index;
01447 lock();
01448 (*pcount)++;
01449 result = *pcount;
01450 unlock();
01451 return result;
01452 }
01453
01454
01455
01456 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01457 {
01458 assert(index<nRawCells_);
01459 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01460 pevt+=index;
01461 lock();
01462 *pevt=evtNumber;
01463 unlock();
01464 return true;
01465 }
01466
01467
01468
01469 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01470 {
01471 assert(index<nRawCells_);
01472 pid_t* prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01473 prcid+=index;
01474 lock();
01475 *prcid=prcId;
01476 unlock();
01477 return true;
01478 }
01479
01480
01481
01482 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01483 {
01484 assert(index<nRawCells_);
01485 time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01486 ptstmp+=index;
01487 lock();
01488 *ptstmp=timeStamp;
01489 unlock();
01490 return true;
01491 }
01492
01493
01494
01495 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01496 {
01497 lock();
01498 assert(nClients_<nClientsMax_);
01499 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01500 for (unsigned int i=0;i<nClients_;i++) {
01501 if ((*prcid)==prcId) { unlock(); return false; }
01502 prcid++;
01503 }
01504 nClients_++;
01505 *prcid=prcId;
01506 unlock();
01507 return true;
01508 }
01509
01510
01511
01512 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01513 {
01514 lock();
01515 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01516 unsigned int iClient(0);
01517 while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01518 assert(iClient!=nClients_);
01519 pid_t* next=prcid; next++;
01520 while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01521 nClients_--;
01522 unlock();
01523 return true;
01524 }
01525
01526
01527
01528 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01529 {
01530 FUShmRawCell* result(0);
01531
01532 if (iCell>=nRawCells_) {
01533 cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01534 <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01535 return result;
01536 }
01537
01538 if (segmentationMode_) {
01539 key_t shmkey =rawCellShmKey(iCell);
01540 int shmid =shm_get(shmkey,rawCellTotalSize_);
01541 void* cellAddr=shm_attach(shmid);
01542 result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01543 }
01544 else {
01545 result=
01546 (FUShmRawCell*)((unsigned long)this+rawCellOffset_+iCell*rawCellTotalSize_);
01547 }
01548
01549 return result;
01550 }
01551
01552
01553
01554 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01555 {
01556 FUShmRecoCell* result(0);
01557
01558 if (iCell>=nRecoCells_) {
01559 cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01560 <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01561 return result;
01562 }
01563
01564 if (segmentationMode_) {
01565 key_t shmkey =recoCellShmKey(iCell);
01566 int shmid =shm_get(shmkey,recoCellTotalSize_);
01567 void* cellAddr=shm_attach(shmid);
01568 result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01569 }
01570 else {
01571 result=
01572 (FUShmRecoCell*)((unsigned long)this+recoCellOffset_+iCell*recoCellTotalSize_);
01573 }
01574
01575 return result;
01576 }
01577
01578
01579
01580 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01581 {
01582 FUShmDqmCell* result(0);
01583
01584 if (iCell>=nDqmCells_) {
01585 cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01586 <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01587 return result;
01588 }
01589
01590 if (segmentationMode_) {
01591 key_t shmkey =dqmCellShmKey(iCell);
01592 int shmid =shm_get(shmkey,dqmCellTotalSize_);
01593 void* cellAddr=shm_attach(shmid);
01594 result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01595 }
01596 else {
01597 result=
01598 (FUShmDqmCell*)((unsigned long)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01599 }
01600
01601 return result;
01602 }
01603
01604
01605
01606 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01607 {
01608 assert(index<nRawCells_);
01609 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01610 pcount+=index;
01611 lock();
01612 assert(*pcount>0);
01613 --(*pcount);
01614 bool result=(*pcount==0);
01615 unlock();
01616 return result;
01617 }
01618
01619
01620
01621 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01622 {
01623 if (!segmentationMode_) {
01624 cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01625 return -1;
01626 }
01627 key_t* addr=(key_t*)((unsigned long)this+offset);
01628 for (unsigned int i=0;i<iCell;i++) ++addr;
01629 return *addr;
01630 }
01631
01632
01633
01634 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01635 {
01636 if (iCell>=nRawCells_) {
01637 cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01638 <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01639 return -1;
01640 }
01641 return shmKey(iCell,rawCellOffset_);
01642 }
01643
01644
01645
01646 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01647 {
01648 if (iCell>=nRecoCells_) {
01649 cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01650 <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01651 return -1;
01652 }
01653 return shmKey(iCell,recoCellOffset_);
01654 }
01655
01656
01657
01658 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01659 {
01660 if (iCell>=nDqmCells_) {
01661 cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01662 <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01663 return -1;
01664 }
01665 return shmKey(iCell,dqmCellOffset_);
01666 }
01667
01668
01669
01670 void FUShmBuffer::sem_init(int isem,int value)
01671 {
01672 if (semctl(semid(),isem,SETVAL,value)<0) {
01673 cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01674 }
01675 }
01676
01677
01678
01679 int FUShmBuffer::sem_wait(int isem)
01680 {
01681 struct sembuf sops[1];
01682 sops[0].sem_num=isem;
01683 sops[0].sem_op = -1;
01684 sops[0].sem_flg= 0;
01685 if (semop(semid(),sops,1)==-1) {
01686 cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01687 return -1;
01688 }
01689 return 0;
01690 }
01691
01692
01693
01694 void FUShmBuffer::sem_post(int isem)
01695 {
01696 struct sembuf sops[1];
01697 sops[0].sem_num=isem;
01698 sops[0].sem_op = 1;
01699 sops[0].sem_flg= 0;
01700 if (semop(semid(),sops,1)==-1) {
01701 cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01702 }
01703 }