00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
00011 #include "EventFilter/FEDInterface/interface/GlobalEventNumber.h"
00012 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00013
00014 #include <unistd.h>
00015 #include <iostream>
00016 #include <string>
00017 #include <cassert>
00018
00019 #include <sstream>
00020 #include <fstream>
00021
00022 #include <cstdlib>
00023 #include <cstring>
00024 #include <stdint.h>
00025
00026
00027
00028
00029
00030 #define SHM_DESCRIPTOR_KEYID 1
00031 #define SHM_KEYID 2
00032 #define SEM_KEYID 1
00033
00034 #define NSKIP_MAX 100
00035
00036
00037 using namespace std;
00038 using namespace evf;
00039
00040
00041
00042 const char* FUShmBuffer::shmKeyPath_ =
00043 (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00044 const char* FUShmBuffer::semKeyPath_ =
00045 (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00046
00047
00049
00051
00052
00053 FUShmBuffer::FUShmBuffer(bool segmentationMode,
00054 unsigned int nRawCells,
00055 unsigned int nRecoCells,
00056 unsigned int nDqmCells,
00057 unsigned int rawCellSize,
00058 unsigned int recoCellSize,
00059 unsigned int dqmCellSize)
00060 : segmentationMode_(segmentationMode)
00061 , nClientsMax_(128)
00062 , nRawCells_(nRawCells)
00063 , rawCellPayloadSize_(rawCellSize)
00064 , nRecoCells_(nRecoCells)
00065 , recoCellPayloadSize_(recoCellSize)
00066 , nDqmCells_(nDqmCells)
00067 , dqmCellPayloadSize_(dqmCellSize)
00068 {
00069 rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00070 recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00071 dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00072
00073 void* addr;
00074
00075 rawWriteOffset_=sizeof(FUShmBuffer);
00076 addr=(void*)((unsigned long)this+rawWriteOffset_);
00077 new (addr) unsigned int[nRawCells_];
00078
00079 rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00080 addr=(void*)((unsigned long)this+rawReadOffset_);
00081 new (addr) unsigned int[nRawCells_];
00082
00083 recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00084 addr=(void*)((unsigned long)this+recoWriteOffset_);
00085 new (addr) unsigned int[nRecoCells_];
00086
00087 recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00088 addr=(void*)((unsigned long)this+recoReadOffset_);
00089 new (addr) unsigned int[nRecoCells_];
00090
00091 dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00092 addr=(void*)((unsigned long)this+dqmWriteOffset_);
00093 new (addr) unsigned int[nDqmCells_];
00094
00095 dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00096 addr=(void*)((unsigned long)this+dqmReadOffset_);
00097 new (addr) unsigned int[nDqmCells_];
00098
00099 evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00100 addr=(void*)((unsigned long)this+evtStateOffset_);
00101 new (addr) evt::State_t[nRawCells_];
00102
00103 evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00104 addr=(void*)((unsigned long)this+evtDiscardOffset_);
00105 new (addr) unsigned int[nRawCells_];
00106
00107 evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00108 addr=(void*)((unsigned long)this+evtNumberOffset_);
00109 new (addr) unsigned int[nRawCells_];
00110
00111 evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00112 addr=(void*)((unsigned long)this+evtPrcIdOffset_);
00113 new (addr) pid_t[nRawCells_];
00114
00115 evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00116 addr=(void*)((unsigned long)this+evtTimeStampOffset_);
00117 new (addr) time_t[nRawCells_];
00118
00119 dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00120 addr=(void*)((unsigned long)this+dqmStateOffset_);
00121 new (addr) dqm::State_t[nDqmCells_];
00122
00123 clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00124 addr=(void*)((unsigned long)this+clientPrcIdOffset_);
00125 new (addr) pid_t[nClientsMax_];
00126
00127 rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00128
00129 if (segmentationMode_) {
00130 recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00131 dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00132 addr=(void*)((unsigned long)this+rawCellOffset_);
00133 new (addr) key_t[nRawCells_];
00134 addr=(void*)((unsigned long)this+recoCellOffset_);
00135 new (addr) key_t[nRecoCells_];
00136 addr=(void*)((unsigned long)this+dqmCellOffset_);
00137 new (addr) key_t[nDqmCells_];
00138 }
00139 else {
00140 recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00141 dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00142 for (unsigned int i=0;i<nRawCells_;i++) {
00143 addr=(void*)((unsigned long)this+rawCellOffset_+i*rawCellTotalSize_);
00144 new (addr) FUShmRawCell(rawCellSize);
00145 }
00146 for (unsigned int i=0;i<nRecoCells_;i++) {
00147 addr=(void*)((unsigned long)this+recoCellOffset_+i*recoCellTotalSize_);
00148 new (addr) FUShmRecoCell(recoCellSize);
00149 }
00150 for (unsigned int i=0;i<nDqmCells_;i++) {
00151 addr=(void*)((unsigned long)this+dqmCellOffset_+i*dqmCellTotalSize_);
00152 new (addr) FUShmDqmCell(dqmCellSize);
00153 }
00154 }
00155 }
00156
00157
00158
00159 FUShmBuffer::~FUShmBuffer()
00160 {
00161
00162 }
00163
00164
00166
00168
00169
00170 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00171 {
00172 shmid_=shmid;
00173 semid_=semid;
00174
00175 if (segmentationMode_) {
00176 int shmKeyId=666;
00177 key_t* keyAddr =(key_t*)((unsigned long)this+rawCellOffset_);
00178 for (unsigned int i=0;i<nRawCells_;i++) {
00179 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00180 int shmid =shm_create(*keyAddr,rawCellTotalSize_);
00181 void* shmAddr=shm_attach(shmid);
00182 new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00183 shmdt(shmAddr);
00184 ++keyAddr;
00185 }
00186 keyAddr =(key_t*)((unsigned long)this+recoCellOffset_);
00187 for (unsigned int i=0;i<nRecoCells_;i++) {
00188 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00189 int shmid =shm_create(*keyAddr,recoCellTotalSize_);
00190 void* shmAddr=shm_attach(shmid);
00191 new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00192 shmdt(shmAddr);
00193 ++keyAddr;
00194 }
00195 keyAddr =(key_t*)((unsigned long)this+dqmCellOffset_);
00196 for (unsigned int i=0;i<nDqmCells_;i++) {
00197 *keyAddr =ftok(shmKeyPath_,shmKeyId++);
00198 int shmid =shm_create(*keyAddr,dqmCellTotalSize_);
00199 void* shmAddr=shm_attach(shmid);
00200 new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00201 shmdt(shmAddr);
00202 ++keyAddr;
00203 }
00204 }
00205
00206 for (unsigned int i=0;i<nRawCells_;i++) {
00207 FUShmRawCell* cell=rawCell(i);
00208 cell->initialize(i);
00209 if (segmentationMode_) shmdt(cell);
00210 }
00211
00212 for (unsigned int i=0;i<nRecoCells_;i++) {
00213 FUShmRecoCell* cell=recoCell(i);
00214 cell->initialize(i);
00215 if (segmentationMode_) shmdt(cell);
00216 }
00217
00218 for (unsigned int i=0;i<nDqmCells_;i++) {
00219 FUShmDqmCell* cell=dqmCell(i);
00220 cell->initialize(i);
00221 if (segmentationMode_) shmdt(cell);
00222 }
00223
00224 reset();
00225 }
00226
00227
00228
00229 void FUShmBuffer::reset()
00230 {
00231 nClients_=0;
00232
00233
00234 sem_init(0,1);
00235 sem_init(1,nRawCells_);
00236 sem_init(2,0);
00237 sem_init(3,1);
00238 sem_init(4,0);
00239 sem_init(5,nRecoCells_);
00240 sem_init(6,0);
00241 sem_init(7,nDqmCells_);
00242 sem_init(8,0);
00243
00244 sem_print();
00245
00246 unsigned int *iWrite,*iRead;
00247
00248 rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00249 iWrite=(unsigned int*)((unsigned long)this+rawWriteOffset_);
00250 iRead =(unsigned int*)((unsigned long)this+rawReadOffset_);
00251 for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00252
00253 recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00254 iWrite=(unsigned int*)((unsigned long)this+recoWriteOffset_);
00255 iRead =(unsigned int*)((unsigned long)this+recoReadOffset_);
00256 for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00257
00258 dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00259 iWrite=(unsigned int*)((unsigned long)this+dqmWriteOffset_);
00260 iRead =(unsigned int*)((unsigned long)this+dqmReadOffset_);
00261 for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00262
00263 for (unsigned int i=0;i<nRawCells_;i++) {
00264 setEvtState(i,evt::EMPTY);
00265 setEvtDiscard(i,0);
00266 setEvtNumber(i,0xffffffff);
00267 setEvtPrcId(i,0);
00268 setEvtTimeStamp(i,0);
00269 }
00270
00271 for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00272 }
00273
00274
00275
00276 int FUShmBuffer::nbRawCellsToWrite() const
00277 {
00278 return semctl(semid(),1,GETVAL);
00279 }
00280
00281
00282
00283 int FUShmBuffer::nbRawCellsToRead() const
00284 {
00285 return semctl(semid(),2,GETVAL);
00286 }
00287
00288
00289
00290 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00291 {
00292 waitRawWrite();
00293 unsigned int iCell=nextRawWriteIndex();
00294 FUShmRawCell* cell =rawCell(iCell);
00295 evt::State_t state=evtState(iCell);
00296 assert(state==evt::EMPTY);
00297 setEvtState(iCell,evt::RAWWRITING);
00298 setEvtDiscard(iCell,1);
00299 return cell;
00300 }
00301
00302
00303
00304 FUShmRawCell* FUShmBuffer::rawCellToRead()
00305 {
00306 waitRawRead();
00307 unsigned int iCell=nextRawReadIndex();
00308 FUShmRawCell* cell=rawCell(iCell);
00309 evt::State_t state=evtState(iCell);
00310 assert(state==evt::RAWWRITTEN
00311 ||state==evt::EMPTY
00312 ||state==evt::STOP
00313 ||state==evt::LUMISECTION);
00314 if (state==evt::RAWWRITTEN) {
00315 setEvtPrcId(iCell,getpid());
00316 setEvtState(iCell,evt::RAWREADING);
00317 }
00318 return cell;
00319 }
00320
00321
00322
00323 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00324 {
00325 waitRecoRead();
00326 unsigned int iCell =nextRecoReadIndex();
00327 FUShmRecoCell* cell =recoCell(iCell);
00328 unsigned int iRawCell=cell->rawCellIndex();
00329 if (iRawCell<nRawCells_) {
00330
00331
00332 setEvtState(iRawCell,evt::SENDING);
00333 }
00334 return cell;
00335 }
00336
00337
00338
00339 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00340 {
00341 waitDqmRead();
00342 unsigned int iCell=nextDqmReadIndex();
00343 FUShmDqmCell* cell=dqmCell(iCell);
00344 dqm::State_t state=dqmState(iCell);
00345 assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00346 if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00347 return cell;
00348 }
00349
00350
00351
00352 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00353 {
00354 waitRawDiscarded();
00355 FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00356 evt::State_t state=evtState(cell->index());
00357 assert(state==evt::PROCESSED
00358 ||state==evt::SENT
00359 ||state==evt::EMPTY
00360 ||state==evt::STOP
00361 ||state==evt::LUMISECTION);
00362 if (state!=evt::EMPTY
00363 && state!=evt::LUMISECTION
00364 && state!=evt::STOP
00365 )
00366 setEvtState(cell->index(),evt::DISCARDING);
00367 return cell;
00368 }
00369
00370
00371
00372 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00373 {
00374 evt::State_t state=evtState(cell->index());
00375 assert(state==evt::RAWWRITING);
00376 setEvtState(cell->index(),evt::RAWWRITTEN);
00377 setEvtNumber(cell->index(),cell->evtNumber());
00378 postRawIndexToRead(cell->index());
00379 if (segmentationMode_) shmdt(cell);
00380 postRawRead();
00381 }
00382
00383
00384
00385 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00386 {
00387 evt::State_t state=evtState(cell->index());
00388 assert(state==evt::RAWREADING);
00389 setEvtState(cell->index(),evt::RAWREAD);
00390 setEvtState(cell->index(),evt::PROCESSING);
00391 setEvtTimeStamp(cell->index(),time(0));
00392 if (segmentationMode_) shmdt(cell);
00393 }
00394
00395
00396
00397 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00398 {
00399 unsigned int iRawCell=cell->rawCellIndex();
00400 if (iRawCell<nRawCells_) {
00401
00402
00403 setEvtState(cell->rawCellIndex(),evt::SENT);
00404 }
00405 if (segmentationMode_) shmdt(cell);
00406 }
00407
00408
00409
00410 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00411 {
00412 dqm::State_t state=dqmState(cell->index());
00413 assert(state==dqm::SENDING||state==dqm::EMPTY);
00414 if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00415 if (segmentationMode_) shmdt(cell);
00416 }
00417
00418
00419
00420 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00421 {
00422 waitRawDiscard();
00423 if (rawCellReadyForDiscard(iCell)) {
00424 rawDiscardIndex_=iCell;
00425 evt::State_t state=evtState(iCell);
00426 assert(state==evt::PROCESSING
00427 ||state==evt::SENT
00428 ||state==evt::EMPTY
00429 ||state==evt::STOP
00430 ||state==evt::LUMISECTION
00431 );
00432 if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00433 postRawDiscarded();
00434 }
00435 else postRawDiscard();
00436 }
00437
00438
00439
00440 void FUShmBuffer::scheduleRawCellForDiscardServerSide(unsigned int iCell)
00441 {
00442 waitRawDiscard();
00443 if (rawCellReadyForDiscard(iCell)) {
00444 rawDiscardIndex_=iCell;
00445 evt::State_t state=evtState(iCell);
00446 assert(state==evt::RAWWRITTEN
00447 );
00448 setEvtState(iCell,evt::PROCESSED);
00449 postRawDiscarded();
00450 }
00451 else postRawDiscard();
00452 }
00453
00454
00455
00456 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00457 {
00458 releaseRawCell(cell);
00459 postRawDiscard();
00460 }
00461
00462
00463
00464 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00465 {
00466 FUShmRecoCell* cell=recoCell(iCell);
00467 unsigned int iRawCell=cell->rawCellIndex();
00468 if (iRawCell<nRawCells_) {
00469
00470
00471 scheduleRawCellForDiscard(iRawCell);
00472 }
00473 cell->clear();
00474 if (segmentationMode_) shmdt(cell);
00475 postRecoIndexToWrite(iCell);
00476 postRecoWrite();
00477 }
00478
00479
00480 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell)
00481 {
00482 FUShmRecoCell* cell=recoCell(iCell);
00483 cell->clear();
00484 if (segmentationMode_) shmdt(cell);
00485 postRecoIndexToWrite(iCell);
00486 postRecoWrite();
00487 }
00488
00489
00490
00491 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00492 {
00493 dqm::State_t state=dqmState(iCell);
00494 assert(state==dqm::EMPTY||state==dqm::SENT);
00495 setDqmState(iCell,dqm::DISCARDING);
00496 FUShmDqmCell* cell=dqmCell(iCell);
00497 cell->clear();
00498 if (segmentationMode_) shmdt(cell);
00499 setDqmState(iCell,dqm::EMPTY);
00500 postDqmIndexToWrite(iCell);
00501 postDqmWrite();
00502 }
00503
00504
00505
00506 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00507 {
00508 evt::State_t state=evtState(cell->index());
00509 if(!(
00510 state==evt::DISCARDING
00511 ||state==evt::RAWWRITING
00512 ||state==evt::EMPTY
00513 ||state==evt::STOP
00514 ||state==evt::LUMISECTION
00515 )) std::cout << "=================releaseRawCell state " << state
00516 << std::endl;
00517
00518 assert(
00519 state==evt::DISCARDING
00520 ||state==evt::RAWWRITING
00521 ||state==evt::EMPTY
00522 ||state==evt::STOP
00523 ||state==evt::LUMISECTION
00524 );
00525 setEvtState(cell->index(),evt::EMPTY);
00526 setEvtDiscard(cell->index(),0);
00527 setEvtNumber(cell->index(),0xffffffff);
00528 setEvtPrcId(cell->index(),0);
00529 setEvtTimeStamp(cell->index(),0);
00530 cell->clear();
00531 postRawIndexToWrite(cell->index());
00532 if (segmentationMode_) shmdt(cell);
00533 postRawWrite();
00534 }
00535
00536
00537
00538 void FUShmBuffer::writeRawEmptyEvent()
00539 {
00540 FUShmRawCell* cell=rawCellToWrite();
00541 evt::State_t state=evtState(cell->index());
00542 assert(state==evt::RAWWRITING);
00543 setEvtState(cell->index(),evt::STOP);
00544 postRawIndexToRead(cell->index());
00545 if (segmentationMode_) shmdt(cell);
00546 postRawRead();
00547 }
00548
00549
00550 void FUShmBuffer::writeRawLumiSectionEvent(unsigned int ls)
00551 {
00552 FUShmRawCell* cell=rawCellToWrite();
00553 cell->setLumiSection(ls);
00554 evt::State_t state=evtState(cell->index());
00555 assert(state==evt::RAWWRITING);
00556 setEvtState(cell->index(),evt::LUMISECTION);
00557 postRawIndexToRead(cell->index());
00558 if (segmentationMode_) shmdt(cell);
00559 postRawRead();
00560 }
00561
00562
00563
00564 void FUShmBuffer::writeRecoEmptyEvent()
00565 {
00566 waitRecoWrite();
00567 unsigned int iCell=nextRecoWriteIndex();
00568 FUShmRecoCell* cell =recoCell(iCell);
00569 cell->clear();
00570 postRecoIndexToRead(iCell);
00571 if (segmentationMode_) shmdt(cell);
00572 postRecoRead();
00573 }
00574
00575
00576
00577 void FUShmBuffer::writeDqmEmptyEvent()
00578 {
00579 waitDqmWrite();
00580 unsigned int iCell=nextDqmWriteIndex();
00581 FUShmDqmCell* cell=dqmCell(iCell);
00582 cell->clear();
00583 postDqmIndexToRead(iCell);
00584 if (segmentationMode_) shmdt(cell);
00585 postDqmRead();
00586 }
00587
00588
00589
00590 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00591 {
00592 FUShmRawCell* cell=rawCellToWrite();
00593 rawDiscardIndex_=cell->index();
00594 setEvtState(cell->index(),evt::STOP);
00595 setEvtNumber(cell->index(),0xffffffff);
00596 setEvtPrcId(cell->index(),0);
00597 setEvtTimeStamp(cell->index(),0);
00598 postRawDiscarded();
00599 }
00600
00601
00602
00603 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00604 {
00605 waitRawDiscard();
00606 if (rawCellReadyForDiscard(cell->index())) {
00607 rawDiscardIndex_=cell->index();
00608 setEvtState(cell->index(),evt::STOP);
00609 setEvtNumber(cell->index(),0xffffffff);
00610 setEvtPrcId(cell->index(),0);
00611 setEvtTimeStamp(cell->index(),0);
00612 removeClientPrcId(getpid());
00613 if (segmentationMode_) shmdt(cell);
00614 postRawDiscarded();
00615 }
00616 else postRawDiscard();
00617 }
00618
00619
00620 void FUShmBuffer::scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell* cell)
00621 {
00622
00623 if (rawCellReadyForDiscard(cell->index())) {
00624 rawDiscardIndex_=cell->index();
00625 setEvtState(cell->index(),evt::STOP);
00626 setEvtNumber(cell->index(),0xffffffff);
00627 setEvtPrcId(cell->index(),0);
00628 setEvtTimeStamp(cell->index(),0);
00629
00630 if (segmentationMode_) shmdt(cell);
00631 postRawDiscarded();
00632 }
00633 else postRawDiscard();
00634 }
00635
00636
00637
00638 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
00639 unsigned int fuProcessId,
00640 unsigned int fuGuid,
00641 unsigned char *data,
00642 unsigned int dataSize)
00643 {
00644 if (dataSize>recoCellPayloadSize_) {
00645 cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00646 return false;
00647 }
00648
00649 waitRecoWrite();
00650 unsigned int iCell=nextRecoWriteIndex();
00651 FUShmRecoCell* cell =recoCell(iCell);
00652 cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00653 postRecoIndexToRead(iCell);
00654 if (segmentationMode_) shmdt(cell);
00655 postRecoRead();
00656 return true;
00657 }
00658
00659
00660
00661 bool FUShmBuffer::writeRecoEventData(unsigned int runNumber,
00662 unsigned int evtNumber,
00663 unsigned int outModId,
00664 unsigned int fuProcessId,
00665 unsigned int fuGuid,
00666 unsigned char *data,
00667 unsigned int dataSize)
00668 {
00669 if (dataSize>recoCellPayloadSize_) {
00670 cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00671 return false;
00672 }
00673
00674 waitRecoWrite();
00675 unsigned int iCell=nextRecoWriteIndex();
00676 FUShmRecoCell* cell =recoCell(iCell);
00677 unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00678
00679
00680 setEvtState(rawCellIndex,evt::RECOWRITING);
00681 incEvtDiscard(rawCellIndex);
00682 cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00683 fuProcessId,fuGuid,data,dataSize);
00684 setEvtState(rawCellIndex,evt::RECOWRITTEN);
00685 postRecoIndexToRead(iCell);
00686 if (segmentationMode_) shmdt(cell);
00687 postRecoRead();
00688 return true;
00689 }
00690
00691
00692
00693 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00694 unsigned int fuProcessId,
00695 unsigned int iRawCell)
00696 {
00697 FUShmRawCell *raw=rawCell(iRawCell);
00698
00699 unsigned int dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00700 unsigned char *data =new unsigned char[dataSize];
00701 uint32_t *pos =(uint32_t*)data;
00702
00703
00704
00705
00706
00707
00708
00709 *pos++=(uint32_t)2;
00710 *pos++=(uint32_t)runNumber;
00711 *pos++=(uint32_t)evf::evtn::getlbn(raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
00712 *pos++=(uint32_t)raw->evtNumber();
00713 for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00714 memcpy(pos,raw->payloadAddr(),raw->eventSize());
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746 waitRecoWrite();
00747 unsigned int iRecoCell=nextRecoWriteIndex();
00748 FUShmRecoCell* reco =recoCell(iRecoCell);
00749 setEvtState(iRawCell,evt::RECOWRITING);
00750 setEvtDiscard(iRawCell,1);
00751 reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00752 data,dataSize);
00753 delete [] data;
00754 setEvtState(iRawCell,evt::RECOWRITTEN);
00755 postRecoIndexToRead(iRecoCell);
00756 if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00757 postRecoRead();
00758 return true;
00759 }
00760
00761
00762
00763 bool FUShmBuffer::writeDqmEventData(unsigned int runNumber,
00764 unsigned int evtAtUpdate,
00765 unsigned int folderId,
00766 unsigned int fuProcessId,
00767 unsigned int fuGuid,
00768 unsigned char *data,
00769 unsigned int dataSize)
00770 {
00771 if (dataSize>dqmCellPayloadSize_) {
00772 cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00773 return false;
00774 }
00775
00776 waitDqmWrite();
00777 unsigned int iCell=nextDqmWriteIndex();
00778 FUShmDqmCell* cell=dqmCell(iCell);
00779 dqm::State_t state=dqmState(iCell);
00780 assert(state==dqm::EMPTY);
00781 setDqmState(iCell,dqm::WRITING);
00782 cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00783 setDqmState(iCell,dqm::WRITTEN);
00784 postDqmIndexToRead(iCell);
00785 if (segmentationMode_) shmdt(cell);
00786 postDqmRead();
00787 return true;
00788 }
00789
00790
00791
00792 void FUShmBuffer::sem_print()
00793 {
00794 cout<<"--> current sem values:"
00795 <<endl
00796 <<" lock="<<semctl(semid(),0,GETVAL)
00797 <<endl
00798 <<" wraw="<<semctl(semid(),1,GETVAL)
00799 <<" rraw="<<semctl(semid(),2,GETVAL)
00800 <<endl
00801 <<" wdsc="<<semctl(semid(),3,GETVAL)
00802 <<" rdsc="<<semctl(semid(),4,GETVAL)
00803 <<endl
00804 <<" wrec="<<semctl(semid(),5,GETVAL)
00805 <<" rrec="<<semctl(semid(),6,GETVAL)
00806 <<endl
00807 <<" wdqm="<<semctl(semid(),7,GETVAL)
00808 <<" rdqm="<<semctl(semid(),8,GETVAL)
00809 <<endl;
00810 }
00811
00812
00813
00814 void FUShmBuffer::printEvtState(unsigned int index)
00815 {
00816 evt::State_t state=evtState(index);
00817 std::string stateName;
00818 if (state==evt::EMPTY) stateName="EMPTY";
00819 else if (state==evt::STOP) stateName="STOP";
00820 else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00821 else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00822 else if (state==evt::RAWREADING) stateName="RAWREADING";
00823 else if (state==evt::RAWREAD) stateName="RAWREAD";
00824 else if (state==evt::PROCESSING) stateName="PROCESSING";
00825 else if (state==evt::PROCESSED) stateName="PROCESSED";
00826 else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00827 else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00828 else if (state==evt::SENDING) stateName="SENDING";
00829 else if (state==evt::SENT) stateName="SENT";
00830 else if (state==evt::DISCARDING) stateName="DISCARDING";
00831 cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00832 }
00833
00834
00835
00836 void FUShmBuffer::printDqmState(unsigned int index)
00837 {
00838 dqm::State_t state=dqmState(index);
00839 cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00840 }
00841
00842
00843
00844 FUShmBuffer* FUShmBuffer::createShmBuffer(bool segmentationMode,
00845 unsigned int nRawCells,
00846 unsigned int nRecoCells,
00847 unsigned int nDqmCells,
00848 unsigned int rawCellSize,
00849 unsigned int recoCellSize,
00850 unsigned int dqmCellSize)
00851 {
00852
00853 if (FUShmBuffer::releaseSharedMemory())
00854 cout<<"FUShmBuffer::createShmBuffer: "
00855 <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00856 <<endl;
00857
00858
00859 int size =sizeof(unsigned int)*7;
00860 int shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00861 void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00862
00863 if(1!=shm_nattch(shmid)) {
00864 cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00865 shmdt(shmAddr);
00866 return 0;
00867 }
00868
00869 unsigned int* p=(unsigned int*)shmAddr;
00870 *p++=segmentationMode;
00871 *p++=nRawCells;
00872 *p++=nRecoCells;
00873 *p++=nDqmCells;
00874 *p++=rawCellSize;
00875 *p++=recoCellSize;
00876 *p++=dqmCellSize;
00877 shmdt(shmAddr);
00878
00879
00880 size =FUShmBuffer::size(segmentationMode,
00881 nRawCells,nRecoCells,nDqmCells,
00882 rawCellSize,recoCellSize,dqmCellSize);
00883 shmid =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00884 int semid=sem_create(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00885 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00886
00887 if (1!=shm_nattch(shmid)) {
00888 cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00889 shmdt(shmAddr);
00890 return 0;
00891 }
00892 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00893 nRawCells,nRecoCells,nDqmCells,
00894 rawCellSize,recoCellSize,dqmCellSize);
00895
00896 cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00897 cout<<" segmentationMode="<<segmentationMode<<endl;
00898
00899 buffer->initialize(shmid,semid);
00900
00901 return buffer;
00902 }
00903
00904
00905
00906 FUShmBuffer* FUShmBuffer::getShmBuffer()
00907 {
00908
00909 int size =sizeof(unsigned int)*7;
00910 int shmid =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00911 void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00912
00913 unsigned int *p=(unsigned int*)shmAddr;
00914 bool segmentationMode=*p++;
00915 unsigned int nRawCells =*p++;
00916 unsigned int nRecoCells =*p++;
00917 unsigned int nDqmCells =*p++;
00918 unsigned int rawCellSize =*p++;
00919 unsigned int recoCellSize =*p++;
00920 unsigned int dqmCellSize =*p++;
00921 shmdt(shmAddr);
00922
00923 cout<<"FUShmBuffer::getShmBuffer():"
00924 <<" segmentationMode="<<segmentationMode
00925 <<" nRawCells="<<nRawCells
00926 <<" nRecoCells="<<nRecoCells
00927 <<" nDqmCells="<<nDqmCells
00928 <<" rawCellSize="<<rawCellSize
00929 <<" recoCellSize="<<recoCellSize
00930 <<" dqmCellSize="<<dqmCellSize
00931 <<endl;
00932
00933
00934 size =FUShmBuffer::size(segmentationMode,
00935 nRawCells,nRecoCells,nDqmCells,
00936 rawCellSize,recoCellSize,dqmCellSize);
00937 shmid =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
00938 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
00939 shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
00940
00941 if (0==shm_nattch(shmid)) {
00942 cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00943 return 0;
00944 }
00945
00946 FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00947 nRawCells,nRecoCells,nDqmCells,
00948 rawCellSize,recoCellSize,dqmCellSize);
00949
00950 cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00951 cout<<" segmentationMode="<<segmentationMode<<endl;
00952
00953 buffer->setClientPrcId(getpid());
00954
00955 return buffer;
00956 }
00957
00958
00959
00960 bool FUShmBuffer::releaseSharedMemory()
00961 {
00962
00963 int size =sizeof(unsigned int)*7;
00964 int shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00965 void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00966
00967 unsigned int*p=(unsigned int*)shmAddr;
00968 bool segmentationMode=*p++;
00969 unsigned int nRawCells =*p++;
00970 unsigned int nRecoCells =*p++;
00971 unsigned int nDqmCells =*p++;
00972 unsigned int rawCellSize =*p++;
00973 unsigned int recoCellSize =*p++;
00974 unsigned int dqmCellSize =*p++;
00975 shmdt(shmAddr);
00976
00977
00978
00979 size =FUShmBuffer::size(segmentationMode,
00980 nRawCells,nRecoCells,nDqmCells,
00981 rawCellSize,recoCellSize,dqmCellSize);
00982 int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0) return false;
00983 int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return false;
00984 shmAddr =shm_attach(shmid); if (0==shmAddr) return false;
00985
00986 int att = 0;
00987 for(; att <10; att++)
00988 {
00989 if(shm_nattch(shmid)>1) {
00990 cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
00991 <<", failed attempt to release shared memory."<<endl;
00992 ::sleep(1);
00993 }
00994 else
00995 break;
00996 }
00997
00998 if(att>=10) return false;
00999
01000 if (segmentationMode) {
01001 FUShmBuffer* buffer=
01002 new (shmAddr) FUShmBuffer(segmentationMode,
01003 nRawCells,nRecoCells,nDqmCells,
01004 rawCellSize,recoCellSize,dqmCellSize);
01005 int cellid;
01006 for (unsigned int i=0;i<nRawCells;i++) {
01007 cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
01008 if ((shm_destroy(cellid)==-1)) return false;
01009 }
01010 for (unsigned int i=0;i<nRecoCells;i++) {
01011 cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
01012 if ((shm_destroy(cellid)==-1)) return false;
01013 }
01014 for (unsigned int i=0;i<nDqmCells;i++) {
01015 cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
01016 if ((shm_destroy(cellid)==-1)) return false;
01017 }
01018 }
01019 shmdt(shmAddr);
01020 if (sem_destroy(semid)==-1) return false;
01021 if (shm_destroy(shmid)==-1) return false;
01022 if (shm_destroy(shmidd)==-1) return false;
01023
01024 return true;
01025 }
01026
01027
01028
01029 unsigned int FUShmBuffer::size(bool segmentationMode,
01030 unsigned int nRawCells,
01031 unsigned int nRecoCells,
01032 unsigned int nDqmCells,
01033 unsigned int rawCellSize,
01034 unsigned int recoCellSize,
01035 unsigned int dqmCellSize)
01036 {
01037 unsigned int offset=
01038 sizeof(FUShmBuffer)+
01039 sizeof(unsigned int)*4*nRawCells+
01040 sizeof(evt::State_t)*nRawCells+
01041 sizeof(dqm::State_t)*nDqmCells;
01042
01043 unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
01044 unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
01045 unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
01046
01047 unsigned int realSize =
01048 (segmentationMode) ?
01049 offset
01050 +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
01051 :
01052 offset
01053 +rawCellTotalSize*nRawCells
01054 +recoCellTotalSize*nRecoCells
01055 +dqmCellTotalSize*nDqmCells;
01056
01057 unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
01058
01059 return result;
01060 }
01061
01062
01063
01064 key_t FUShmBuffer::getShmDescriptorKey()
01065 {
01066 key_t result=getuid()*1000+SHM_DESCRIPTOR_KEYID;
01067 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: failed "
01068 <<"for file "<<shmKeyPath_<<"!"<<endl;
01069 return result;
01070 }
01071
01072
01073
01074 key_t FUShmBuffer::getShmKey()
01075 {
01076 key_t result=getuid()*1000+SHM_KEYID;
01077 if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
01078 <<"for file "<<shmKeyPath_<<"!"<<endl;
01079 return result;
01080 }
01081
01082
01083
01084 key_t FUShmBuffer::getSemKey()
01085 {
01086 key_t result=getuid()*1000+SEM_KEYID;
01087 if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
01088 <<"for file "<<semKeyPath_<<"!"<<endl;
01089 return result;
01090 }
01091
01092
01093
01094 int FUShmBuffer::shm_create(key_t key,int size)
01095 {
01096
01097 int shmid=shmget(key,1,0644);
01098 if(shmid!=-1){
01099
01100 shmid_ds shmstat;
01101 int result=shmctl(shmid,IPC_STAT,&shmstat);
01102 cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
01103 << " created by process " << shmstat.shm_cpid << " owned by "
01104 << shmstat.shm_perm.uid << " permissions "
01105 << hex << shmstat.shm_perm.mode << dec << endl;
01106 result=shmctl(shmid,IPC_RMID,&shmstat);
01107 }
01108 shmid=shmget(key,size,IPC_CREAT|0644);
01109 if (shmid==-1) {
01110 int err=errno;
01111 cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01112 <<strerror(err)<<endl;
01113 }
01114 return shmid;
01115 }
01116
01117
01118
01119 int FUShmBuffer::shm_get(key_t key,int size)
01120 {
01121 int shmid=shmget(key,size,0644);
01122 if (shmid==-1) {
01123 int err=errno;
01124 cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01125 <<strerror(err)<<endl;
01126 }
01127 return shmid;
01128 }
01129
01130
01131
01132 void* FUShmBuffer::shm_attach(int shmid)
01133 {
01134 void* result=shmat(shmid,NULL,0);
01135 if (0==result) {
01136 int err=errno;
01137 cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01138 <<strerror(err)<<endl;
01139 }
01140 return result;
01141 }
01142
01143
01144
01145 int FUShmBuffer::shm_nattch(int shmid)
01146 {
01147 shmid_ds shmstat;
01148 shmctl(shmid,IPC_STAT,&shmstat);
01149 return shmstat.shm_nattch;
01150 }
01151
01152
01153
01154 int FUShmBuffer::shm_destroy(int shmid)
01155 {
01156 shmid_ds shmstat;
01157 int result=shmctl(shmid,IPC_RMID,&shmstat);
01158 if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01159 return result;
01160 }
01161
01162
01163
01164 int FUShmBuffer::sem_create(key_t key,int nsem)
01165 {
01166 int semid=semget(key,nsem,IPC_CREAT|0666);
01167 if (semid<0) {
01168 int err=errno;
01169 cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01170 <<strerror(err)<<endl;
01171 }
01172 return semid;
01173 }
01174
01175
01176
01177 int FUShmBuffer::sem_get(key_t key,int nsem)
01178 {
01179 int semid=semget(key,nsem,0666);
01180 if (semid<0) {
01181 int err=errno;
01182 cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01183 <<strerror(err)<<endl;
01184 }
01185 return semid;
01186 }
01187
01188
01189
01190 int FUShmBuffer::sem_destroy(int semid)
01191 {
01192 int result=semctl(semid,0,IPC_RMID);
01193 if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01194 return result;
01195 }
01196
01197
01198
01200
01202
01203
01204 unsigned int FUShmBuffer::nextIndex(unsigned int offset,
01205 unsigned int nCells,
01206 unsigned int& iNext)
01207 {
01208 lock();
01209 unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01210 pindex+=iNext;
01211 iNext=(iNext+1)%nCells;
01212 unsigned int result=*pindex;
01213 unlock();
01214 return result;
01215 }
01216
01217
01218
01219 void FUShmBuffer::postIndex(unsigned int index,
01220 unsigned int offset,
01221 unsigned int nCells,
01222 unsigned int& iLast)
01223 {
01224 lock();
01225 unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
01226 pindex+=iLast;
01227 *pindex=index;
01228 iLast=(iLast+1)%nCells;
01229 unlock();
01230 }
01231
01232
01233
01234 unsigned int FUShmBuffer::nextRawWriteIndex()
01235 {
01236 return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01237 }
01238
01239
01240
01241 unsigned int FUShmBuffer::nextRawReadIndex()
01242 {
01243 return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01244 }
01245
01246
01247
01248 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01249 {
01250 postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01251 }
01252
01253
01254
01255 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01256 {
01257 postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01258 }
01259
01260
01261
01262 unsigned int FUShmBuffer::nextRecoWriteIndex()
01263 {
01264 return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01265 }
01266
01267
01268
01269 unsigned int FUShmBuffer::nextRecoReadIndex()
01270 {
01271 return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01272 }
01273
01274
01275
01276 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01277 {
01278 postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01279 }
01280
01281
01282
01283 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01284 {
01285 postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01286 }
01287
01288
01289
01290 unsigned int FUShmBuffer::nextDqmWriteIndex()
01291 {
01292 return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01293 }
01294
01295
01296
01297 unsigned int FUShmBuffer::nextDqmReadIndex()
01298 {
01299 return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01300 }
01301
01302
01303
01304 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01305 {
01306 postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01307 }
01308
01309
01310
01311 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01312 {
01313 postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01314 }
01315
01316
01317
01318 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01319 {
01320 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01321 for (unsigned int i=0;i<nRawCells_;i++) {
01322 if ((*pevt++)==evtNumber) return i;
01323 }
01324 assert(false);
01325 return 0xffffffff;
01326 }
01327
01328
01329
01330 evt::State_t FUShmBuffer::evtState(unsigned int index)
01331 {
01332 assert(index<nRawCells_);
01333 evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01334 pstate+=index;
01335 return *pstate;
01336 }
01337
01338
01339
01340 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01341 {
01342 assert(index<nDqmCells_);
01343 dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01344 pstate+=index;
01345 return *pstate;
01346 }
01347
01348
01349
01350 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01351 {
01352 assert(index<nRawCells_);
01353 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01354 pevt+=index;
01355 return *pevt;
01356 }
01357
01358
01359
01360 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01361 {
01362 assert(index<nRawCells_);
01363 pid_t *prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01364 prcid+=index;
01365 return *prcid;
01366 }
01367
01368
01369
01370 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01371 {
01372 assert(index<nRawCells_);
01373 time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01374 ptstmp+=index;
01375 return *ptstmp;
01376 }
01377
01378
01379
01380 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01381 {
01382 assert(index<nClientsMax_);
01383 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01384 prcid+=index;
01385 return *prcid;
01386 }
01387
01388
01389
01390 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01391 {
01392 assert(index<nRawCells_);
01393 evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
01394 pstate+=index;
01395 lock();
01396 *pstate=state;
01397 unlock();
01398 return true;
01399 }
01400
01401
01402
01403 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01404 {
01405 assert(index<nDqmCells_);
01406 dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
01407 pstate+=index;
01408 lock();
01409 *pstate=state;
01410 unlock();
01411 return true;
01412 }
01413
01414
01415
01416 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01417 {
01418 assert(index<nRawCells_);
01419 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01420 pcount+=index;
01421 lock();
01422 *pcount=discard;
01423 unlock();
01424 return true;
01425 }
01426
01427
01428
01429 int FUShmBuffer::incEvtDiscard(unsigned int index)
01430 {
01431 int result = 0;
01432 assert(index<nRawCells_);
01433 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01434 pcount+=index;
01435 lock();
01436 (*pcount)++;
01437 result = *pcount;
01438 unlock();
01439 return result;
01440 }
01441
01442
01443
01444 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01445 {
01446 assert(index<nRawCells_);
01447 unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
01448 pevt+=index;
01449 lock();
01450 *pevt=evtNumber;
01451 unlock();
01452 return true;
01453 }
01454
01455
01456
01457 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01458 {
01459 assert(index<nRawCells_);
01460 pid_t* prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
01461 prcid+=index;
01462 lock();
01463 *prcid=prcId;
01464 unlock();
01465 return true;
01466 }
01467
01468
01469
01470 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01471 {
01472 assert(index<nRawCells_);
01473 time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
01474 ptstmp+=index;
01475 lock();
01476 *ptstmp=timeStamp;
01477 unlock();
01478 return true;
01479 }
01480
01481
01482
01483 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01484 {
01485 lock();
01486 assert(nClients_<nClientsMax_);
01487 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01488 for (unsigned int i=0;i<nClients_;i++) {
01489 if ((*prcid)==prcId) { unlock(); return false; }
01490 prcid++;
01491 }
01492 nClients_++;
01493 *prcid=prcId;
01494 unlock();
01495 return true;
01496 }
01497
01498
01499
01500 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01501 {
01502 lock();
01503 pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
01504 unsigned int iClient(0);
01505 while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01506 assert(iClient!=nClients_);
01507 pid_t* next=prcid; next++;
01508 while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01509 nClients_--;
01510 unlock();
01511 return true;
01512 }
01513
01514
01515
01516 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01517 {
01518 FUShmRawCell* result(0);
01519
01520 if (iCell>=nRawCells_) {
01521 cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01522 <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01523 return result;
01524 }
01525
01526 if (segmentationMode_) {
01527 key_t shmkey =rawCellShmKey(iCell);
01528 int shmid =shm_get(shmkey,rawCellTotalSize_);
01529 void* cellAddr=shm_attach(shmid);
01530 result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01531 }
01532 else {
01533 result=
01534 (FUShmRawCell*)((unsigned long)this+rawCellOffset_+iCell*rawCellTotalSize_);
01535 }
01536
01537 return result;
01538 }
01539
01540
01541
01542 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01543 {
01544 FUShmRecoCell* result(0);
01545
01546 if (iCell>=nRecoCells_) {
01547 cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01548 <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01549 return result;
01550 }
01551
01552 if (segmentationMode_) {
01553 key_t shmkey =recoCellShmKey(iCell);
01554 int shmid =shm_get(shmkey,recoCellTotalSize_);
01555 void* cellAddr=shm_attach(shmid);
01556 result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01557 }
01558 else {
01559 result=
01560 (FUShmRecoCell*)((unsigned long)this+recoCellOffset_+iCell*recoCellTotalSize_);
01561 }
01562
01563 return result;
01564 }
01565
01566
01567
01568 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01569 {
01570 FUShmDqmCell* result(0);
01571
01572 if (iCell>=nDqmCells_) {
01573 cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01574 <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01575 return result;
01576 }
01577
01578 if (segmentationMode_) {
01579 key_t shmkey =dqmCellShmKey(iCell);
01580 int shmid =shm_get(shmkey,dqmCellTotalSize_);
01581 void* cellAddr=shm_attach(shmid);
01582 result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01583 }
01584 else {
01585 result=
01586 (FUShmDqmCell*)((unsigned long)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01587 }
01588
01589 return result;
01590 }
01591
01592
01593
01594 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01595 {
01596 assert(index<nRawCells_);
01597 unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
01598 pcount+=index;
01599 lock();
01600 assert(*pcount>0);
01601 --(*pcount);
01602 bool result=(*pcount==0);
01603 unlock();
01604 return result;
01605 }
01606
01607
01608
01609 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01610 {
01611 if (!segmentationMode_) {
01612 cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01613 return -1;
01614 }
01615 key_t* addr=(key_t*)((unsigned long)this+offset);
01616 for (unsigned int i=0;i<iCell;i++) ++addr;
01617 return *addr;
01618 }
01619
01620
01621
01622 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01623 {
01624 if (iCell>=nRawCells_) {
01625 cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01626 <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01627 return -1;
01628 }
01629 return shmKey(iCell,rawCellOffset_);
01630 }
01631
01632
01633
01634 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01635 {
01636 if (iCell>=nRecoCells_) {
01637 cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01638 <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01639 return -1;
01640 }
01641 return shmKey(iCell,recoCellOffset_);
01642 }
01643
01644
01645
01646 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01647 {
01648 if (iCell>=nDqmCells_) {
01649 cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01650 <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01651 return -1;
01652 }
01653 return shmKey(iCell,dqmCellOffset_);
01654 }
01655
01656
01657
01658 void FUShmBuffer::sem_init(int isem,int value)
01659 {
01660 if (semctl(semid(),isem,SETVAL,value)<0) {
01661 cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01662 }
01663 }
01664
01665
01666
01667 int FUShmBuffer::sem_wait(int isem)
01668 {
01669 struct sembuf sops[1];
01670 sops[0].sem_num=isem;
01671 sops[0].sem_op = -1;
01672 sops[0].sem_flg= 0;
01673 if (semop(semid(),sops,1)==-1) {
01674 cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01675 return -1;
01676 }
01677 return 0;
01678 }
01679
01680
01681
01682 void FUShmBuffer::sem_post(int isem)
01683 {
01684 struct sembuf sops[1];
01685 sops[0].sem_num=isem;
01686 sops[0].sem_op = 1;
01687 sops[0].sem_flg= 0;
01688 if (semop(semid(),sops,1)==-1) {
01689 cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01690 }
01691 }