CMS 3D CMS Logo

FUShmBuffer.cc

Go to the documentation of this file.
00001 
00002 //
00003 // FUShmBuffer
00004 // -----------
00005 //
00006 //            15/11/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ShmBuffer/interface/FUShmBuffer.h"
00011 
00012 #include <unistd.h>
00013 #include <iostream>
00014 #include <string>
00015 #include <cassert>
00016 
00017 #include <sstream>
00018 #include <fstream>
00019 
00020 #include <cstdlib>
00021 #include <cstring>
00022 
00023 #define SHM_DESCRIPTOR_KEYID           1 /* Id used on ftok for 1. shmget key */
00024 #define SHM_KEYID                      2 /* Id used on ftok for 2. shmget key */
00025 #define SEM_KEYID                      1 /* Id used on ftok for semget key    */
00026 
00027 #define NSKIP_MAX                    100
00028 
00029 
00030 using namespace std;
00031 using namespace evf;
00032 
00033 const char* FUShmBuffer::shmKeyPath_ =
00034   (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
00035 const char* FUShmBuffer::semKeyPath_ =
00036   (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
00037 
00038 
00040 // construction/destruction
00042 
00043 //______________________________________________________________________________
00044 FUShmBuffer::FUShmBuffer(bool         segmentationMode,
00045                          unsigned int nRawCells,
00046                          unsigned int nRecoCells,
00047                          unsigned int nDqmCells,
00048                          unsigned int rawCellSize,
00049                          unsigned int recoCellSize,
00050                          unsigned int dqmCellSize)
00051   : segmentationMode_(segmentationMode)
00052   , nClientsMax_(16)
00053   , nRawCells_(nRawCells)
00054   , rawCellPayloadSize_(rawCellSize)
00055   , nRecoCells_(nRecoCells)
00056   , recoCellPayloadSize_(recoCellSize)
00057   , nDqmCells_(nDqmCells)
00058   , dqmCellPayloadSize_(dqmCellSize)
00059 {
00060   rawCellTotalSize_ =FUShmRawCell::size(rawCellPayloadSize_);
00061   recoCellTotalSize_=FUShmRecoCell::size(recoCellPayloadSize_);
00062   dqmCellTotalSize_ =FUShmDqmCell::size(dqmCellPayloadSize_);
00063 
00064   void* addr;
00065   
00066   rawWriteOffset_=sizeof(FUShmBuffer);
00067   addr=(void*)((unsigned int)this+rawWriteOffset_);
00068   new (addr) unsigned int[nRawCells_];
00069   
00070   rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
00071   addr=(void*)((unsigned int)this+rawReadOffset_);
00072   new (addr) unsigned int[nRawCells_];
00073  
00074   recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
00075   addr=(void*)((unsigned int)this+recoWriteOffset_);
00076   new (addr) unsigned int[nRecoCells_];
00077 
00078   recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
00079   addr=(void*)((unsigned int)this+recoReadOffset_);
00080   new (addr) unsigned int[nRecoCells_];
00081 
00082   dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
00083   addr=(void*)((unsigned int)this+dqmWriteOffset_);
00084   new (addr) unsigned int[nDqmCells_];
00085 
00086   dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
00087   addr=(void*)((unsigned int)this+dqmReadOffset_);
00088   new (addr) unsigned int[nDqmCells_];
00089 
00090   evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
00091   addr=(void*)((unsigned int)this+evtStateOffset_);
00092   new (addr) evt::State_t[nRawCells_];
00093   
00094   evtDiscardOffset_=evtStateOffset_+nRawCells_*sizeof(evt::State_t);
00095   addr=(void*)((unsigned int)this+evtDiscardOffset_);
00096   new (addr) unsigned int[nRawCells_];
00097   
00098   evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
00099   addr=(void*)((unsigned int)this+evtNumberOffset_);
00100   new (addr) unsigned int[nRawCells_];
00101   
00102   evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
00103   addr=(void*)((unsigned int)this+evtPrcIdOffset_);
00104   new (addr) pid_t[nRawCells_];
00105   
00106   evtTimeStampOffset_=evtPrcIdOffset_+nRawCells_*sizeof(pid_t);
00107   addr=(void*)((unsigned int)this+evtTimeStampOffset_);
00108   new (addr) time_t[nRawCells_];
00109   
00110   dqmStateOffset_=evtTimeStampOffset_+nRawCells_*sizeof(time_t);
00111   addr=(void*)((unsigned int)this+dqmStateOffset_);
00112   new (addr) dqm::State_t[nDqmCells_];
00113   
00114   clientPrcIdOffset_=dqmStateOffset_+nDqmCells_*sizeof(dqm::State_t);
00115   addr=(void*)((unsigned int)this+clientPrcIdOffset_);
00116   new (addr) pid_t[nClientsMax_];
00117 
00118   rawCellOffset_=dqmStateOffset_+nClientsMax_*sizeof(pid_t);
00119   
00120   if (segmentationMode_) {
00121     recoCellOffset_=rawCellOffset_+nRawCells_*sizeof(key_t);
00122     dqmCellOffset_ =recoCellOffset_+nRecoCells_*sizeof(key_t);
00123     addr=(void*)((unsigned int)this+rawCellOffset_);
00124     new (addr) key_t[nRawCells_];
00125     addr=(void*)((unsigned int)this+recoCellOffset_);
00126     new (addr) key_t[nRecoCells_];
00127     addr=(void*)((unsigned int)this+dqmCellOffset_);
00128     new (addr) key_t[nDqmCells_];
00129   }
00130   else {
00131     recoCellOffset_=rawCellOffset_+nRawCells_*rawCellTotalSize_;
00132     dqmCellOffset_ =recoCellOffset_+nRecoCells_*recoCellTotalSize_;
00133     for (unsigned int i=0;i<nRawCells_;i++) {
00134       addr=(void*)((unsigned int)this+rawCellOffset_+i*rawCellTotalSize_);
00135       new (addr) FUShmRawCell(rawCellSize);
00136     }
00137     for (unsigned int i=0;i<nRecoCells_;i++) {
00138       addr=(void*)((unsigned int)this+recoCellOffset_+i*recoCellTotalSize_);
00139       new (addr) FUShmRecoCell(recoCellSize);
00140     }
00141     for (unsigned int i=0;i<nDqmCells_;i++) {
00142       addr=(void*)((unsigned int)this+dqmCellOffset_+i*dqmCellTotalSize_);
00143       new (addr) FUShmDqmCell(dqmCellSize);
00144     }
00145   }
00146 }
00147 
00148 
00149 //______________________________________________________________________________
00150 FUShmBuffer::~FUShmBuffer()
00151 {
00152   
00153 }
00154 
00155 
00157 // implementation of member functions
00159 
00160 //______________________________________________________________________________
00161 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
00162 {
00163   shmid_=shmid;
00164   semid_=semid;
00165   
00166   if (segmentationMode_) {
00167     int    shmKeyId=666;
00168     key_t* keyAddr =(key_t*)((unsigned int)this+rawCellOffset_);
00169     for (unsigned int i=0;i<nRawCells_;i++) {
00170       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00171       int   shmid  =shm_create(*keyAddr,rawCellTotalSize_);
00172       void* shmAddr=shm_attach(shmid);
00173       new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
00174       shmdt(shmAddr);
00175       ++keyAddr;
00176     }
00177     keyAddr =(key_t*)((unsigned int)this+recoCellOffset_);
00178     for (unsigned int i=0;i<nRecoCells_;i++) {
00179       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00180       int   shmid  =shm_create(*keyAddr,recoCellTotalSize_);
00181       void* shmAddr=shm_attach(shmid);
00182       new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
00183       shmdt(shmAddr);
00184       ++keyAddr;
00185     }
00186     keyAddr =(key_t*)((unsigned int)this+dqmCellOffset_);
00187     for (unsigned int i=0;i<nDqmCells_;i++) {
00188       *keyAddr     =ftok(shmKeyPath_,shmKeyId++);
00189       int   shmid  =shm_create(*keyAddr,dqmCellTotalSize_);
00190       void* shmAddr=shm_attach(shmid);
00191       new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
00192       shmdt(shmAddr);
00193       ++keyAddr;
00194     }
00195   }
00196   
00197   for (unsigned int i=0;i<nRawCells_;i++) {
00198     FUShmRawCell* cell=rawCell(i);
00199     cell->initialize(i);
00200     if (segmentationMode_) shmdt(cell);
00201   }
00202   
00203   for (unsigned int i=0;i<nRecoCells_;i++) {
00204     FUShmRecoCell* cell=recoCell(i);
00205     cell->initialize(i);
00206     if (segmentationMode_) shmdt(cell);
00207   }
00208   
00209   for (unsigned int i=0;i<nDqmCells_;i++) {
00210     FUShmDqmCell* cell=dqmCell(i);
00211     cell->initialize(i);
00212     if (segmentationMode_) shmdt(cell);
00213   }
00214 
00215   reset();
00216 }
00217 
00218 
00219 //______________________________________________________________________________
00220 void FUShmBuffer::reset()
00221 {
00222   nClients_=0;
00223 
00224   // setup ipc semaphores
00225   sem_init(0,1);          // lock (binary)
00226   sem_init(1,nRawCells_); // raw  write semaphore
00227   sem_init(2,0);          // raw  read  semaphore
00228   sem_init(3,1);          // binary semaphore to schedule raw event for discard
00229   sem_init(4,0);          // binary semaphore to discard raw event
00230   sem_init(5,nRecoCells_);// reco write semaphore
00231   sem_init(6,0);          // reco send (read) semaphore
00232   sem_init(7,nDqmCells_); // dqm  write semaphore
00233   sem_init(8,0);          // dqm  send (read) semaphore
00234 
00235   sem_print();
00236 
00237   unsigned int *iWrite,*iRead;
00238   
00239   rawWriteNext_=0; rawWriteLast_=0; rawReadNext_ =0; rawReadLast_ =0;
00240   iWrite=(unsigned int*)((unsigned int)this+rawWriteOffset_);
00241   iRead =(unsigned int*)((unsigned int)this+rawReadOffset_);
00242   for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00243 
00244   recoWriteNext_=0; recoWriteLast_=0; recoReadNext_ =0; recoReadLast_ =0;
00245   iWrite=(unsigned int*)((unsigned int)this+recoWriteOffset_);
00246   iRead =(unsigned int*)((unsigned int)this+recoReadOffset_);
00247   for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00248 
00249   dqmWriteNext_=0; dqmWriteLast_=0; dqmReadNext_ =0; dqmReadLast_ =0;
00250   iWrite=(unsigned int*)((unsigned int)this+dqmWriteOffset_);
00251   iRead =(unsigned int*)((unsigned int)this+dqmReadOffset_);
00252   for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
00253   
00254   for (unsigned int i=0;i<nRawCells_;i++) {
00255     setEvtState(i,evt::EMPTY);
00256     setEvtDiscard(i,0);
00257     setEvtNumber(i,0xffffffff);
00258     setEvtPrcId(i,0);
00259     setEvtTimeStamp(i,0);
00260   }
00261 
00262   for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
00263 }
00264 
00265 
00266 //______________________________________________________________________________
00267 int FUShmBuffer::nbRawCellsToWrite() const
00268 {
00269   return semctl(semid(),1,GETVAL);
00270 }
00271 
00272 
00273 //______________________________________________________________________________
00274 int FUShmBuffer::nbRawCellsToRead() const
00275 {
00276   return semctl(semid(),2,GETVAL);
00277 }
00278 
00279 
00280 //______________________________________________________________________________
00281 FUShmRawCell* FUShmBuffer::rawCellToWrite()
00282 {
00283   waitRawWrite();
00284   unsigned int  iCell=nextRawWriteIndex();
00285   FUShmRawCell* cell =rawCell(iCell);
00286   evt::State_t  state=evtState(iCell);
00287   assert(state==evt::EMPTY);
00288   setEvtState(iCell,evt::RAWWRITING);
00289   setEvtDiscard(iCell,1);
00290   return cell;
00291 }
00292 
00293 
00294 //______________________________________________________________________________
00295 FUShmRawCell* FUShmBuffer::rawCellToRead()
00296 {
00297   waitRawRead();
00298   unsigned int iCell=nextRawReadIndex();
00299   FUShmRawCell* cell=rawCell(iCell);
00300   evt::State_t  state=evtState(iCell);
00301   assert(state==evt::RAWWRITTEN||state==evt::EMPTY);
00302   if (state==evt::RAWWRITTEN) {
00303     setEvtPrcId(iCell,getpid());
00304     setEvtState(iCell,evt::RAWREADING);
00305   }
00306   return cell;
00307 }
00308 
00309 
00310 //______________________________________________________________________________
00311 FUShmRecoCell* FUShmBuffer::recoCellToRead()
00312 {
00313   waitRecoRead();
00314   unsigned int   iCell   =nextRecoReadIndex();
00315   FUShmRecoCell* cell    =recoCell(iCell);
00316   unsigned int   iRawCell=cell->rawCellIndex();
00317   if (iRawCell<nRawCells_) {
00318     //evt::State_t   state=evtState(iRawCell);
00319     //assert(state==evt::RECOWRITTEN);
00320     setEvtState(iRawCell,evt::SENDING);
00321   }
00322   return cell;
00323 }
00324 
00325 
00326 //______________________________________________________________________________
00327 FUShmDqmCell* FUShmBuffer::dqmCellToRead()
00328 {
00329   waitDqmRead();
00330   unsigned int  iCell=nextDqmReadIndex();
00331   FUShmDqmCell* cell=dqmCell(iCell);
00332   dqm::State_t  state=dqmState(iCell);
00333   assert(state==dqm::WRITTEN||state==dqm::EMPTY);
00334   if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
00335   return cell;
00336 }
00337 
00338 
00339 //______________________________________________________________________________
00340 FUShmRawCell* FUShmBuffer::rawCellToDiscard()
00341 {
00342   waitRawDiscarded();
00343   FUShmRawCell* cell=rawCell(rawDiscardIndex_);
00344   evt::State_t  state=evtState(cell->index());
00345   assert(state==evt::PROCESSED||state==evt::SENT||state==evt::EMPTY);
00346   if (state!=evt::EMPTY) setEvtState(cell->index(),evt::DISCARDING);
00347   return cell;
00348 }
00349 
00350 
00351 //______________________________________________________________________________
00352 void FUShmBuffer::finishWritingRawCell(FUShmRawCell* cell)
00353 {
00354   evt::State_t state=evtState(cell->index());
00355   assert(state==evt::RAWWRITING);
00356   setEvtState(cell->index(),evt::RAWWRITTEN);
00357   setEvtNumber(cell->index(),cell->evtNumber());
00358   postRawIndexToRead(cell->index());
00359   if (segmentationMode_) shmdt(cell);
00360   postRawRead();
00361 }
00362 
00363 
00364 //______________________________________________________________________________
00365 void FUShmBuffer::finishReadingRawCell(FUShmRawCell* cell)
00366 {
00367   evt::State_t state=evtState(cell->index());
00368   assert(state==evt::RAWREADING);
00369   setEvtState(cell->index(),evt::RAWREAD);
00370   setEvtState(cell->index(),evt::PROCESSING);
00371   setEvtTimeStamp(cell->index(),time(0));
00372   if (segmentationMode_) shmdt(cell);
00373 }
00374 
00375 
00376 //______________________________________________________________________________
00377 void FUShmBuffer::finishReadingRecoCell(FUShmRecoCell* cell)
00378 {
00379   unsigned int iRawCell=cell->rawCellIndex();
00380   if (iRawCell<nRawCells_) {
00381     //evt::State_t state=evtState(cell->rawCellIndex());
00382     //assert(state==evt::SENDING);
00383     setEvtState(cell->rawCellIndex(),evt::SENT);
00384   }
00385   if (segmentationMode_) shmdt(cell);
00386 }
00387 
00388 
00389 //______________________________________________________________________________
00390 void FUShmBuffer::finishReadingDqmCell(FUShmDqmCell* cell)
00391 {
00392   dqm::State_t state=dqmState(cell->index());
00393   assert(state==dqm::SENDING||state==dqm::EMPTY);
00394   if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
00395   if (segmentationMode_) shmdt(cell);
00396 }
00397 
00398 
00399 //______________________________________________________________________________
00400 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell)
00401 {
00402   waitRawDiscard();
00403   if (rawCellReadyForDiscard(iCell)) {
00404     rawDiscardIndex_=iCell;
00405     evt::State_t  state=evtState(iCell);
00406     assert(state==evt::PROCESSING||state==evt::SENT||state==evt::EMPTY);
00407     if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
00408     postRawDiscarded();
00409   }
00410   else postRawDiscard();
00411 }
00412 
00413 
00414 //______________________________________________________________________________
00415 void FUShmBuffer::discardRawCell(FUShmRawCell* cell)
00416 {
00417   releaseRawCell(cell);
00418   postRawDiscard();
00419 }
00420 
00421 
00422 //______________________________________________________________________________
00423 void FUShmBuffer::discardRecoCell(unsigned int iCell)
00424 {
00425   FUShmRecoCell* cell=recoCell(iCell);
00426   unsigned int iRawCell=cell->rawCellIndex();
00427   if (iRawCell<nRawCells_) {
00428     //evt::State_t state=evtState(iRawCell);
00429     //assert(state==evt::SENT);
00430     scheduleRawCellForDiscard(iRawCell);
00431   }
00432   cell->clear();
00433   if (segmentationMode_) shmdt(cell);
00434   postRecoIndexToWrite(iCell);
00435   postRecoWrite();
00436 }
00437 
00438 
00439 //______________________________________________________________________________
00440 void FUShmBuffer::discardDqmCell(unsigned int iCell)
00441 {
00442   dqm::State_t state=dqmState(iCell);
00443   assert(state==dqm::EMPTY||state==dqm::SENT);
00444   setDqmState(iCell,dqm::DISCARDING);
00445   FUShmDqmCell* cell=dqmCell(iCell);
00446   cell->clear();
00447   if (segmentationMode_) shmdt(cell);
00448   setDqmState(iCell,dqm::EMPTY);
00449   postDqmIndexToWrite(iCell);
00450   postDqmWrite();
00451 }
00452 
00453 
00454 //______________________________________________________________________________
00455 void FUShmBuffer::releaseRawCell(FUShmRawCell* cell)
00456 {
00457   evt::State_t state=evtState(cell->index());
00458   assert(state==evt::DISCARDING||state==evt::RAWWRITING||state==evt::EMPTY);
00459   setEvtState(cell->index(),evt::EMPTY);
00460   setEvtDiscard(cell->index(),0);
00461   setEvtNumber(cell->index(),0xffffffff);
00462   setEvtPrcId(cell->index(),0);
00463   setEvtTimeStamp(cell->index(),0);
00464   cell->clear();
00465   postRawIndexToWrite(cell->index());
00466   if (segmentationMode_) shmdt(cell);
00467   postRawWrite();
00468 }
00469 
00470 
00471 //______________________________________________________________________________
00472 void FUShmBuffer::writeRawEmptyEvent()
00473 {
00474   FUShmRawCell* cell=rawCellToWrite();
00475   evt::State_t state=evtState(cell->index());
00476   assert(state==evt::RAWWRITING);
00477   setEvtState(cell->index(),evt::EMPTY);
00478   postRawIndexToRead(cell->index());
00479   if (segmentationMode_) shmdt(cell);
00480   postRawRead();
00481 }
00482 
00483 
00484 //______________________________________________________________________________
00485 void FUShmBuffer::writeRecoEmptyEvent()
00486 {
00487   waitRecoWrite();
00488   unsigned int   iCell=nextRecoWriteIndex();
00489   FUShmRecoCell* cell =recoCell(iCell);
00490   cell->clear();
00491   postRecoIndexToRead(iCell);
00492   if (segmentationMode_) shmdt(cell);
00493   postRecoRead();
00494 }
00495 
00496 
00497 //______________________________________________________________________________
00498 void FUShmBuffer::writeDqmEmptyEvent()
00499 {
00500   waitDqmWrite();
00501   unsigned int  iCell=nextDqmWriteIndex();
00502   FUShmDqmCell* cell=dqmCell(iCell);
00503   cell->clear();
00504   postDqmIndexToRead(iCell);
00505   if (segmentationMode_) shmdt(cell);
00506   postDqmRead();
00507 }
00508 
00509 
00510 //______________________________________________________________________________
00511 void FUShmBuffer::scheduleRawEmptyCellForDiscard()
00512 {
00513   FUShmRawCell* cell=rawCellToWrite();
00514   scheduleRawEmptyCellForDiscard(cell);
00515 }
00516 
00517 
00518 //______________________________________________________________________________
00519 void FUShmBuffer::scheduleRawEmptyCellForDiscard(FUShmRawCell* cell)
00520 {
00521   waitRawDiscard();
00522   if (rawCellReadyForDiscard(cell->index())) {
00523     rawDiscardIndex_=cell->index();
00524     setEvtState(cell->index(),evt::EMPTY);
00525     setEvtNumber(cell->index(),0xffffffff);
00526     setEvtPrcId(cell->index(),0);
00527     setEvtTimeStamp(cell->index(),0);
00528     removeClientPrcId(getpid());
00529     if (segmentationMode_) shmdt(cell);
00530     postRawDiscarded();
00531   }
00532   else postRawDiscard();
00533 }
00534 
00535 
00536 //______________________________________________________________________________
00537 bool FUShmBuffer::writeRecoInitMsg(unsigned int   outModId,
00538                                    unsigned int   fuProcessId,
00539                                    unsigned int   fuGuid,
00540                                    unsigned char *data,
00541                                    unsigned int   dataSize)
00542 {
00543   if (dataSize>recoCellPayloadSize_) {
00544     cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
00545     return false;
00546   }
00547   
00548   waitRecoWrite();
00549   unsigned int   iCell=nextRecoWriteIndex();
00550   FUShmRecoCell* cell =recoCell(iCell);
00551   cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
00552   postRecoIndexToRead(iCell);
00553   if (segmentationMode_) shmdt(cell);
00554   postRecoRead();
00555   return true;
00556 }
00557 
00558 
00559 //______________________________________________________________________________
00560 bool FUShmBuffer::writeRecoEventData(unsigned int   runNumber,
00561                                      unsigned int   evtNumber,
00562                                      unsigned int   outModId,
00563                                      unsigned int   fuProcessId,
00564                                      unsigned int   fuGuid,
00565                                      unsigned char *data,
00566                                      unsigned int   dataSize)
00567 {
00568   if (dataSize>recoCellPayloadSize_) {
00569     cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
00570     return false;
00571   }
00572   
00573   waitRecoWrite();
00574   unsigned int   iCell=nextRecoWriteIndex();
00575   FUShmRecoCell* cell =recoCell(iCell);
00576   unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
00577   //evt::State_t state=evtState(rawCellIndex);
00578   //assert(state==evt::PROCESSING||state==evt::RECOWRITING||state==evt::SENT);
00579   setEvtState(rawCellIndex,evt::RECOWRITING);
00580   incEvtDiscard(rawCellIndex);
00581   cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
00582                        fuProcessId,fuGuid,data,dataSize);
00583   setEvtState(rawCellIndex,evt::RECOWRITTEN);
00584   postRecoIndexToRead(iCell);
00585   if (segmentationMode_) shmdt(cell);
00586   postRecoRead();
00587   return true;
00588 }
00589 
00590 
00591 //______________________________________________________________________________
00592 bool FUShmBuffer::writeErrorEventData(unsigned int runNumber,
00593                                       unsigned int fuProcessId,
00594                                       unsigned int iRawCell)
00595 {
00596   FUShmRawCell *raw=rawCell(iRawCell);
00597 
00598   unsigned int   dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
00599   unsigned char *data    =new unsigned char[dataSize];
00600   uint32_t      *pos     =(uint32_t*)data;
00601   // 06-Oct-2008, KAB - added a version number for the error event format.
00602   //
00603   // Version 1 had no version number, so the run number appeared in the
00604   // first value.  So, a reasonable test for version 1 is whether the
00605   // first value is larger than some relatively small cutoff (say 32).
00606   // Version 2 added the lumi block number.
00607   //
00608   *pos++=(uint32_t)2;  // protocol version number
00609   *pos++=(uint32_t)runNumber;
00610   // 06-Oct-2008, KAB - added space for lumi block number, but I don't know
00611   // exactly how to get its value just yet...
00612   *pos++=(uint32_t)1;   //evf::evtn::getlbn(fedAddr(/* which FED ID? */));
00613   *pos++=(uint32_t)raw->evtNumber();
00614   for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
00615   memcpy(pos,raw->payloadAddr(),raw->eventSize());
00616   
00617   // DEBUG
00618   /*
00619     if (1) {
00620     stringstream ss;
00621     ss<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".err";
00622     ofstream fout;
00623     fout.open(ss.str().c_str(),ios::out|ios::binary);
00624     if (!fout.write((char*)data,dataSize))
00625     cout<<"Failed to write error event to "<<ss.str()<<endl;
00626     fout.close();
00627     
00628     stringstream ss2;
00629     ss2<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".info";
00630     ofstream fout2;
00631     fout2.open(ss2.str().c_str());
00632     fout2<<"dataSize = "<<dataSize<<endl;
00633     fout2<<"runNumber = "<<runNumber<<endl;
00634     fout2<<"evtNumber = "<<raw->evtNumber()<<endl;
00635     fout2<<"eventSize = "<<raw->eventSize()<<endl;
00636     unsigned int totalSize(0);
00637     for (unsigned int i=0;i<1024;i++) {
00638     unsigned int fedSize = raw->fedSize(i);
00639     totalSize += fedSize;
00640     if (fedSize>0) fout2<<i<<": "<<fedSize<<endl;
00641     }
00642     fout2<<"totalSize = "<<totalSize<<endl;
00643     fout2.close();
00644     }
00645   */// END DEBUG
00646   
00647   waitRecoWrite();
00648   unsigned int   iRecoCell=nextRecoWriteIndex();
00649   FUShmRecoCell* reco     =recoCell(iRecoCell);
00650   setEvtState(iRawCell,evt::RECOWRITING);
00651   setEvtDiscard(iRawCell,1);
00652   reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
00653                         data,dataSize);
00654   delete [] data;
00655   setEvtState(iRawCell,evt::RECOWRITTEN);
00656   postRecoIndexToRead(iRecoCell);
00657   if (segmentationMode_) { shmdt(raw); shmdt(reco); }
00658   postRecoRead();
00659   return true;
00660 }
00661 
00662 
00663 //______________________________________________________________________________
00664 bool FUShmBuffer::writeDqmEventData(unsigned int   runNumber,
00665                                     unsigned int   evtAtUpdate,
00666                                     unsigned int   folderId,
00667                                     unsigned int   fuProcessId,
00668                                     unsigned int   fuGuid,
00669                                     unsigned char *data,
00670                                     unsigned int   dataSize)
00671 {
00672   if (dataSize>dqmCellPayloadSize_) {
00673     cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
00674     return false;
00675   }
00676   
00677   waitDqmWrite();
00678   unsigned int  iCell=nextDqmWriteIndex();
00679   FUShmDqmCell* cell=dqmCell(iCell);
00680   dqm::State_t state=dqmState(iCell);
00681   assert(state==dqm::EMPTY);
00682   setDqmState(iCell,dqm::WRITING);
00683   cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
00684   setDqmState(iCell,dqm::WRITTEN);
00685   postDqmIndexToRead(iCell);
00686   if (segmentationMode_) shmdt(cell);
00687   postDqmRead();
00688   return true;
00689 }
00690 
00691 
00692 //______________________________________________________________________________
00693 void FUShmBuffer::sem_print()
00694 {
00695   cout<<"--> current sem values:"
00696       <<endl
00697       <<" lock="<<semctl(semid(),0,GETVAL)
00698       <<endl
00699       <<" wraw="<<semctl(semid(),1,GETVAL)
00700       <<" rraw="<<semctl(semid(),2,GETVAL)
00701       <<endl
00702       <<" wdsc="<<semctl(semid(),3,GETVAL)
00703       <<" rdsc="<<semctl(semid(),4,GETVAL)
00704       <<endl
00705       <<" wrec="<<semctl(semid(),5,GETVAL)
00706       <<" rrec="<<semctl(semid(),6,GETVAL)
00707       <<endl
00708       <<" wdqm="<<semctl(semid(),7,GETVAL)
00709       <<" rdqm="<<semctl(semid(),8,GETVAL)
00710       <<endl;
00711 }
00712 
00713 
00714 //______________________________________________________________________________
00715 void FUShmBuffer::printEvtState(unsigned int index)
00716 {
00717   evt::State_t state=evtState(index);
00718   std::string stateName;
00719   if      (state==evt::EMPTY)      stateName="EMPTY";
00720   else if (state==evt::RAWWRITING) stateName="RAWWRITING";
00721   else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
00722   else if (state==evt::RAWREADING) stateName="RAWREADING";
00723   else if (state==evt::RAWREAD)    stateName="RAWREAD";
00724   else if (state==evt::PROCESSING) stateName="PROCESSING";
00725   else if (state==evt::PROCESSED)  stateName="PROCESSED";
00726   else if (state==evt::RECOWRITING)stateName="RECOWRITING";
00727   else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
00728   else if (state==evt::SENDING)    stateName="SENDING";
00729   else if (state==evt::SENT)       stateName="SENT";
00730   else if (state==evt::DISCARDING) stateName="DISCARDING";
00731   cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
00732 }
00733 
00734 
00735 //______________________________________________________________________________
00736 void FUShmBuffer::printDqmState(unsigned int index)
00737 {
00738   dqm::State_t state=dqmState(index);
00739   cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
00740 }
00741 
00742 
00743 //______________________________________________________________________________
00744 FUShmBuffer* FUShmBuffer::createShmBuffer(bool         segmentationMode,
00745                                           unsigned int nRawCells,
00746                                           unsigned int nRecoCells,
00747                                           unsigned int nDqmCells,
00748                                           unsigned int rawCellSize,
00749                                           unsigned int recoCellSize,
00750                                           unsigned int dqmCellSize)
00751 {
00752   // if necessary, release shared memory first!
00753   if (FUShmBuffer::releaseSharedMemory())
00754     cout<<"FUShmBuffer::createShmBuffer: "
00755         <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
00756         <<endl;
00757   
00758   // create bookkeeping shared memory segment
00759   int  size =sizeof(unsigned int)*7;
00760   int  shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00761   void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
00762   
00763   if(1!=shm_nattch(shmid)) {
00764     cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00765     shmdt(shmAddr);
00766     return 0;
00767   }
00768   
00769   unsigned int* p=(unsigned int*)shmAddr;
00770   *p++=segmentationMode;
00771   *p++=nRawCells;
00772   *p++=nRecoCells;
00773   *p++=nDqmCells;
00774   *p++=rawCellSize;
00775   *p++=recoCellSize;
00776   *p++=dqmCellSize;
00777   shmdt(shmAddr);
00778   
00779   // create the 'real' shared memory buffer
00780   size     =FUShmBuffer::size(segmentationMode,
00781                               nRawCells,nRecoCells,nDqmCells,
00782                               rawCellSize,recoCellSize,dqmCellSize);
00783   shmid    =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00784   int semid=sem_create(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00785   shmAddr  =shm_attach(shmid);                         if (0==shmAddr) return 0;
00786   
00787   if (1!=shm_nattch(shmid)) {
00788     cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
00789     shmdt(shmAddr);
00790     return 0;
00791   }
00792   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00793                                                nRawCells,nRecoCells,nDqmCells,
00794                                                rawCellSize,recoCellSize,dqmCellSize);
00795   
00796   cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
00797   cout<<"                                segmentationMode="<<segmentationMode<<endl;
00798   
00799   buffer->initialize(shmid,semid);
00800   
00801   return buffer;
00802 }
00803 
00804 
00805 //______________________________________________________________________________
00806 FUShmBuffer* FUShmBuffer::getShmBuffer()
00807 {
00808   // get bookkeeping shared memory segment
00809   int   size   =sizeof(unsigned int)*7;
00810   int   shmid  =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
00811   void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
00812   
00813   unsigned int *p=(unsigned int*)shmAddr;
00814   bool          segmentationMode=*p++;
00815   unsigned int  nRawCells       =*p++;
00816   unsigned int  nRecoCells      =*p++;
00817   unsigned int  nDqmCells       =*p++;
00818   unsigned int  rawCellSize     =*p++;
00819   unsigned int  recoCellSize    =*p++;
00820   unsigned int  dqmCellSize     =*p++;
00821   shmdt(shmAddr);
00822 
00823   cout<<"FUShmBuffer::getShmBuffer():"
00824       <<" segmentationMode="<<segmentationMode
00825       <<" nRawCells="<<nRawCells
00826       <<" nRecoCells="<<nRecoCells
00827       <<" nDqmCells="<<nDqmCells
00828       <<" rawCellSize="<<rawCellSize
00829       <<" recoCellSize="<<recoCellSize
00830       <<" dqmCellSize="<<dqmCellSize
00831       <<endl;
00832   
00833   // get the 'real' shared memory buffer
00834   size     =FUShmBuffer::size(segmentationMode,
00835                               nRawCells,nRecoCells,nDqmCells,
00836                               rawCellSize,recoCellSize,dqmCellSize);
00837   shmid    =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0)    return 0;
00838   int semid=sem_get(FUShmBuffer::getSemKey(),9);    if (semid<0)    return 0;
00839   shmAddr  =shm_attach(shmid);                      if (0==shmAddr) return 0;
00840   
00841   if (0==shm_nattch(shmid)) {
00842     cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
00843     return 0;
00844   }
00845   
00846   FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
00847                                                nRawCells,nRecoCells,nDqmCells,
00848                                                rawCellSize,recoCellSize,dqmCellSize);
00849   
00850   cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
00851   cout<<"                             segmentationMode="<<segmentationMode<<endl;
00852   
00853   buffer->setClientPrcId(getpid());
00854 
00855   return buffer;
00856 }
00857 
00858 
00859 //______________________________________________________________________________
00860 bool FUShmBuffer::releaseSharedMemory()
00861 {
00862   // get bookkeeping shared memory segment
00863   int   size   =sizeof(unsigned int)*7;
00864   int   shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
00865   void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
00866 
00867   unsigned int*p=(unsigned int*)shmAddr;
00868   bool         segmentationMode=*p++;
00869   unsigned int nRawCells       =*p++;
00870   unsigned int nRecoCells      =*p++;
00871   unsigned int nDqmCells       =*p++;
00872   unsigned int rawCellSize     =*p++;
00873   unsigned int recoCellSize    =*p++;
00874   unsigned int dqmCellSize     =*p++;
00875   shmdt(shmAddr);
00876 
00877   
00878   // get the 'real' shared memory segment
00879   size     =FUShmBuffer::size(segmentationMode,
00880                               nRawCells,nRecoCells,nDqmCells,
00881                               rawCellSize,recoCellSize,dqmCellSize);
00882   int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0)    return false;
00883   int semid=sem_get(FUShmBuffer::getSemKey(),9);   if (semid<0)    return false;
00884   shmAddr  =shm_attach(shmid);                     if (0==shmAddr) return false; 
00885 
00886   int att = 0;
00887   for(; att <10; att++)
00888     {
00889       if(shm_nattch(shmid)>1) {
00890         cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
00891              <<", failed attempt to release shared memory."<<endl;
00892         ::sleep(1);
00893       }
00894       else
00895         break;
00896     }
00897 
00898   if(att>=10) return false;
00899 
00900   if (segmentationMode) {
00901     FUShmBuffer* buffer=
00902       new (shmAddr) FUShmBuffer(segmentationMode,
00903                                 nRawCells,nRecoCells,nDqmCells,
00904                                 rawCellSize,recoCellSize,dqmCellSize);
00905     int cellid;
00906     for (unsigned int i=0;i<nRawCells;i++) {
00907       cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
00908       if ((shm_destroy(cellid)==-1)) return false;
00909     }
00910     for (unsigned int i=0;i<nRecoCells;i++) {
00911       cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
00912       if ((shm_destroy(cellid)==-1)) return false;
00913     }
00914     for (unsigned int i=0;i<nDqmCells;i++) {
00915       cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
00916       if ((shm_destroy(cellid)==-1)) return false;
00917     }
00918   }
00919   shmdt(shmAddr);
00920   if (sem_destroy(semid)==-1)  return false;
00921   if (shm_destroy(shmid)==-1)  return false;
00922   if (shm_destroy(shmidd)==-1) return false;
00923 
00924   return true;
00925 }
00926 
00927 
00928 //______________________________________________________________________________
00929 unsigned int FUShmBuffer::size(bool         segmentationMode,
00930                                unsigned int nRawCells,
00931                                unsigned int nRecoCells,
00932                                unsigned int nDqmCells,
00933                                unsigned int rawCellSize,
00934                                unsigned int recoCellSize,
00935                                unsigned int dqmCellSize)
00936 {
00937   unsigned int offset=
00938     sizeof(FUShmBuffer)+
00939     sizeof(unsigned int)*4*nRawCells+
00940     sizeof(evt::State_t)*nRawCells+
00941     sizeof(dqm::State_t)*nDqmCells;
00942   
00943   unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
00944   unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
00945   unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
00946   
00947   unsigned int realSize =
00948     (segmentationMode) ?
00949     offset
00950     +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
00951     :
00952     offset
00953     +rawCellTotalSize*nRawCells
00954     +recoCellTotalSize*nRecoCells
00955     +dqmCellTotalSize*nDqmCells;
00956 
00957   unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
00958   
00959   return result;
00960 }
00961 
00962 
00963 //______________________________________________________________________________
00964 key_t FUShmBuffer::getShmDescriptorKey()
00965 {
00966   key_t result=ftok(shmKeyPath_,SHM_DESCRIPTOR_KEYID);
00967   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: ftok() failed "
00968                              <<"for file "<<shmKeyPath_<<"!"<<endl;
00969   return result;
00970 }
00971 
00972 
00973 //______________________________________________________________________________
00974 key_t FUShmBuffer::getShmKey()
00975 {
00976   key_t result=ftok(shmKeyPath_,SHM_KEYID);
00977   if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
00978                              <<"for file "<<shmKeyPath_<<"!"<<endl;
00979   return result;
00980 }
00981 
00982 
00983 //______________________________________________________________________________
00984 key_t FUShmBuffer::getSemKey()
00985 {
00986   key_t result=ftok(semKeyPath_,SEM_KEYID);
00987   if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
00988                              <<"for file "<<semKeyPath_<<"!"<<endl;
00989   return result;
00990 }
00991 
00992 
00993 //______________________________________________________________________________
00994 int FUShmBuffer::shm_create(key_t key,int size)
00995 {
00996   int shmid=shmget(key,size,IPC_CREAT|0644);
00997   if (shmid==-1) {
00998     int err=errno;
00999     cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
01000         <<strerror(err)<<endl;
01001   }
01002   return shmid;
01003 }
01004 
01005 
01006 //______________________________________________________________________________
01007 int FUShmBuffer::shm_get(key_t key,int size)
01008 {
01009   int shmid=shmget(key,size,0644);
01010   if (shmid==-1) {
01011     int err=errno;
01012     cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
01013         <<strerror(err)<<endl;
01014   }
01015   return shmid;
01016 }
01017 
01018 
01019 //______________________________________________________________________________
01020 void* FUShmBuffer::shm_attach(int shmid)
01021 {
01022   void* result=shmat(shmid,NULL,0);
01023   if (0==result) {
01024     int err=errno;
01025     cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
01026         <<strerror(err)<<endl;
01027   }
01028   return result;
01029 }
01030 
01031 
01032 //______________________________________________________________________________
01033 int FUShmBuffer::shm_nattch(int shmid)
01034 {
01035   shmid_ds shmstat;
01036   shmctl(shmid,IPC_STAT,&shmstat);
01037   return shmstat.shm_nattch;
01038 }
01039 
01040 
01041 //______________________________________________________________________________
01042 int FUShmBuffer::shm_destroy(int shmid)
01043 {
01044   shmid_ds shmstat;
01045   int result=shmctl(shmid,IPC_RMID,&shmstat);
01046   if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
01047   return result;
01048 }
01049 
01050 
01051 //______________________________________________________________________________
01052 int FUShmBuffer::sem_create(key_t key,int nsem)
01053 {
01054   int semid=semget(key,nsem,IPC_CREAT|0666);
01055   if (semid<0) {
01056     int err=errno;
01057     cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
01058         <<strerror(err)<<endl;
01059   }
01060   return semid;
01061 }
01062 
01063 
01064 //______________________________________________________________________________
01065 int FUShmBuffer::sem_get(key_t key,int nsem)
01066 {
01067   int semid=semget(key,nsem,0666);
01068   if (semid<0) {
01069     int err=errno;
01070     cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
01071         <<strerror(err)<<endl;
01072   }
01073   return semid;
01074 }
01075 
01076 
01077 //______________________________________________________________________________
01078 int FUShmBuffer::sem_destroy(int semid)
01079 {
01080   int result=semctl(semid,0,IPC_RMID);
01081   if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
01082   return result;
01083 }
01084 
01085 
01086 
01088 // implementation of private member functions
01090 
01091 //______________________________________________________________________________
01092 unsigned int FUShmBuffer::nextIndex(unsigned int  offset,
01093                                     unsigned int  nCells,
01094                                     unsigned int& iNext)
01095 {
01096   lock();
01097   unsigned int* pindex=(unsigned int*)((unsigned int)this+offset);
01098   pindex+=iNext;
01099   iNext=(iNext+1)%nCells;
01100   unsigned int result=*pindex;
01101   unlock();
01102   return result;
01103 }
01104 
01105 
01106 //______________________________________________________________________________
01107 void FUShmBuffer::postIndex(unsigned int  index,
01108                             unsigned int  offset,
01109                             unsigned int  nCells,
01110                             unsigned int& iLast)
01111 {
01112   lock();
01113   unsigned int* pindex=(unsigned int*)((unsigned int)this+offset);
01114   pindex+=iLast;
01115   *pindex=index;
01116   iLast=(iLast+1)%nCells;
01117   unlock();
01118 }
01119 
01120 
01121 //______________________________________________________________________________
01122 unsigned int FUShmBuffer::nextRawWriteIndex()
01123 {
01124   return nextIndex(rawWriteOffset_,nRawCells_,rawWriteNext_);
01125 }
01126 
01127 
01128 //______________________________________________________________________________
01129 unsigned int FUShmBuffer::nextRawReadIndex()
01130 {
01131   return nextIndex(rawReadOffset_,nRawCells_,rawReadNext_);
01132 }
01133 
01134 
01135 //______________________________________________________________________________
01136 void FUShmBuffer::postRawIndexToWrite(unsigned int index)
01137 {
01138   postIndex(index,rawWriteOffset_,nRawCells_,rawWriteLast_);
01139 }
01140 
01141 
01142 //______________________________________________________________________________
01143 void FUShmBuffer::postRawIndexToRead(unsigned int index)
01144 {
01145   postIndex(index,rawReadOffset_,nRawCells_,rawReadLast_);
01146 }
01147 
01148 
01149 //______________________________________________________________________________
01150 unsigned int FUShmBuffer::nextRecoWriteIndex()
01151 {
01152   return nextIndex(recoWriteOffset_,nRecoCells_,recoWriteNext_);
01153 }
01154 
01155 
01156 //______________________________________________________________________________
01157 unsigned int FUShmBuffer::nextRecoReadIndex()
01158 {
01159   return nextIndex(recoReadOffset_,nRecoCells_,recoReadNext_);
01160 }
01161 
01162 
01163 //______________________________________________________________________________
01164 void FUShmBuffer::postRecoIndexToWrite(unsigned int index)
01165 {
01166   postIndex(index,recoWriteOffset_,nRecoCells_,recoWriteLast_);
01167 }
01168 
01169 
01170 //______________________________________________________________________________
01171 void FUShmBuffer::postRecoIndexToRead(unsigned int index)
01172 {
01173   postIndex(index,recoReadOffset_,nRecoCells_,recoReadLast_);
01174 }
01175 
01176 
01177 //______________________________________________________________________________
01178 unsigned int FUShmBuffer::nextDqmWriteIndex()
01179 {
01180   return nextIndex(dqmWriteOffset_,nDqmCells_,dqmWriteNext_);
01181 }
01182 
01183 
01184 //______________________________________________________________________________
01185 unsigned int FUShmBuffer::nextDqmReadIndex()
01186 {
01187   return nextIndex(dqmReadOffset_,nDqmCells_,dqmReadNext_);
01188 }
01189 
01190 
01191 //______________________________________________________________________________
01192 void FUShmBuffer::postDqmIndexToWrite(unsigned int index)
01193 {
01194   postIndex(index,dqmWriteOffset_,nDqmCells_,dqmWriteLast_);
01195 }
01196 
01197 
01198 //______________________________________________________________________________
01199 void FUShmBuffer::postDqmIndexToRead(unsigned int index)
01200 {
01201   postIndex(index,dqmReadOffset_,nDqmCells_,dqmReadLast_);
01202 }
01203 
01204 
01205 //______________________________________________________________________________
01206 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
01207 {
01208   unsigned int *pevt=(unsigned int*)((unsigned int)this+evtNumberOffset_);
01209   for (unsigned int i=0;i<nRawCells_;i++) {
01210     if ((*pevt++)==evtNumber) return i;
01211   }
01212   assert(false);
01213   return 0xffffffff;
01214 }
01215 
01216 
01217 //______________________________________________________________________________
01218 evt::State_t FUShmBuffer::evtState(unsigned int index)
01219 {
01220   assert(index<nRawCells_);
01221   evt::State_t *pstate=(evt::State_t*)((unsigned int)this+evtStateOffset_);
01222   pstate+=index;
01223   return *pstate;
01224 }
01225 
01226 
01227 //______________________________________________________________________________
01228 dqm::State_t FUShmBuffer::dqmState(unsigned int index)
01229 {
01230   assert(index<nDqmCells_);
01231   dqm::State_t *pstate=(dqm::State_t*)((unsigned int)this+dqmStateOffset_);
01232   pstate+=index;
01233   return *pstate;
01234 }
01235 
01236 
01237 //______________________________________________________________________________
01238 unsigned int FUShmBuffer::evtNumber(unsigned int index)
01239 {
01240   assert(index<nRawCells_);
01241   unsigned int *pevt=(unsigned int*)((unsigned int)this+evtNumberOffset_);
01242   pevt+=index;
01243   return *pevt;
01244 }
01245 
01246 
01247 //______________________________________________________________________________
01248 pid_t FUShmBuffer::evtPrcId(unsigned int index)
01249 {
01250   assert(index<nRawCells_);
01251   pid_t *prcid=(pid_t*)((unsigned int)this+evtPrcIdOffset_);
01252   prcid+=index;
01253   return *prcid;
01254 }
01255 
01256 
01257 //______________________________________________________________________________
01258 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
01259 {
01260   assert(index<nRawCells_);
01261   time_t *ptstmp=(time_t*)((unsigned int)this+evtTimeStampOffset_);
01262   ptstmp+=index;
01263   return *ptstmp;
01264 }
01265 
01266 
01267 //______________________________________________________________________________
01268 pid_t FUShmBuffer::clientPrcId(unsigned int index)
01269 {
01270   assert(index<nClientsMax_);
01271   pid_t *prcid=(pid_t*)((unsigned int)this+clientPrcIdOffset_);
01272   prcid+=index;
01273   return *prcid;
01274 }
01275 
01276 
01277 //______________________________________________________________________________
01278 bool FUShmBuffer::setEvtState(unsigned int index,evt::State_t state)
01279 {
01280   assert(index<nRawCells_);
01281   evt::State_t *pstate=(evt::State_t*)((unsigned int)this+evtStateOffset_);
01282   pstate+=index;
01283   lock();
01284   *pstate=state;
01285   unlock();
01286   return true;
01287 }
01288 
01289 
01290 //______________________________________________________________________________
01291 bool FUShmBuffer::setDqmState(unsigned int index,dqm::State_t state)
01292 {
01293   assert(index<nDqmCells_);
01294   dqm::State_t *pstate=(dqm::State_t*)((unsigned int)this+dqmStateOffset_);
01295   pstate+=index;
01296   lock();
01297   *pstate=state;
01298   unlock();
01299   return true;
01300 }
01301 
01302 
01303 //______________________________________________________________________________
01304 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
01305 {
01306   assert(index<nRawCells_);
01307   unsigned int *pcount=(unsigned int*)((unsigned int)this+evtDiscardOffset_);
01308   pcount+=index;
01309   lock();
01310   *pcount=discard;
01311   unlock();
01312   return true;
01313 }
01314 
01315 
01316 //______________________________________________________________________________
01317 int FUShmBuffer::incEvtDiscard(unsigned int index)
01318 {
01319   int result = 0;
01320   assert(index<nRawCells_);
01321   unsigned int *pcount=(unsigned int*)((unsigned int)this+evtDiscardOffset_);
01322   pcount+=index;
01323   lock();
01324   (*pcount)++;
01325   result = *pcount;
01326   unlock();
01327   return result;
01328 }
01329 
01330 
01331 //______________________________________________________________________________
01332 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
01333 {
01334   assert(index<nRawCells_);
01335   unsigned int *pevt=(unsigned int*)((unsigned int)this+evtNumberOffset_);
01336   pevt+=index;
01337   lock();
01338   *pevt=evtNumber;
01339   unlock();
01340   return true;
01341 }
01342 
01343 
01344 //______________________________________________________________________________
01345 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
01346 {
01347   assert(index<nRawCells_);
01348   pid_t* prcid=(pid_t*)((unsigned int)this+evtPrcIdOffset_);
01349   prcid+=index;
01350   lock();
01351   *prcid=prcId;
01352   unlock();
01353   return true;
01354 }
01355 
01356 
01357 //______________________________________________________________________________
01358 bool FUShmBuffer::setEvtTimeStamp(unsigned int index,time_t timeStamp)
01359 {
01360   assert(index<nRawCells_);
01361   time_t *ptstmp=(time_t*)((unsigned int)this+evtTimeStampOffset_);
01362   ptstmp+=index;
01363   lock();
01364   *ptstmp=timeStamp;
01365   unlock();
01366   return true;
01367 }
01368 
01369 
01370 //______________________________________________________________________________
01371 bool FUShmBuffer::setClientPrcId(pid_t prcId)
01372 {
01373   lock();
01374   assert(nClients_<nClientsMax_);
01375   pid_t *prcid=(pid_t*)((unsigned int)this+clientPrcIdOffset_);
01376   for (unsigned int i=0;i<nClients_;i++) {
01377     if ((*prcid)==prcId) { unlock();  return false; }
01378     prcid++;
01379   }
01380   nClients_++;
01381   *prcid=prcId;
01382   unlock();
01383   return true;
01384 }
01385 
01386 
01387 //______________________________________________________________________________
01388 bool FUShmBuffer::removeClientPrcId(pid_t prcId)
01389 {
01390   lock();
01391   pid_t *prcid=(pid_t*)((unsigned int)this+clientPrcIdOffset_);
01392   unsigned int iClient(0);
01393   while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
01394   assert(iClient!=nClients_);
01395   pid_t* next=prcid; next++;
01396   while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
01397   nClients_--;
01398   unlock();
01399   return true;
01400 }
01401 
01402 
01403 //______________________________________________________________________________
01404 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell)
01405 {
01406   FUShmRawCell* result(0);
01407   
01408   if (iCell>=nRawCells_) {
01409     cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
01410         <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
01411     return result;
01412   }
01413   
01414   if (segmentationMode_) {
01415     key_t         shmkey  =rawCellShmKey(iCell);
01416     int           shmid   =shm_get(shmkey,rawCellTotalSize_);
01417     void*         cellAddr=shm_attach(shmid);
01418     result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
01419   }
01420   else {
01421     result=
01422       (FUShmRawCell*)((unsigned int)this+rawCellOffset_+iCell*rawCellTotalSize_);
01423   }
01424   
01425   return result;
01426 }
01427 
01428 
01429 //______________________________________________________________________________
01430 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell)
01431 {
01432   FUShmRecoCell* result(0);
01433   
01434   if (iCell>=nRecoCells_) {
01435     cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
01436         <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
01437     return result;
01438   }
01439   
01440   if (segmentationMode_) {
01441     key_t         shmkey  =recoCellShmKey(iCell);
01442     int           shmid   =shm_get(shmkey,recoCellTotalSize_);
01443     void*         cellAddr=shm_attach(shmid);
01444     result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
01445   }
01446   else {
01447     result=
01448       (FUShmRecoCell*)((unsigned int)this+recoCellOffset_+iCell*recoCellTotalSize_);
01449   }
01450   
01451   return result;
01452 }
01453 
01454 
01455 //______________________________________________________________________________
01456 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell)
01457 {
01458   FUShmDqmCell* result(0);
01459   
01460   if (iCell>=nDqmCells_) {
01461     cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
01462         <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
01463     return result;
01464   }
01465   
01466   if (segmentationMode_) {
01467     key_t         shmkey  =dqmCellShmKey(iCell);
01468     int           shmid   =shm_get(shmkey,dqmCellTotalSize_);
01469     void*         cellAddr=shm_attach(shmid);
01470     result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
01471   }
01472   else {
01473     result=
01474       (FUShmDqmCell*)((unsigned int)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
01475   }
01476   
01477   return result;
01478 }
01479 
01480 
01481 //______________________________________________________________________________
01482 bool FUShmBuffer::rawCellReadyForDiscard(unsigned int index)
01483 {
01484   assert(index<nRawCells_);
01485   unsigned int *pcount=(unsigned int*)((unsigned int)this+evtDiscardOffset_);
01486   pcount+=index;
01487   lock();
01488   assert(*pcount>0);
01489   --(*pcount);
01490   bool result=(*pcount==0);
01491   unlock();
01492   return result;
01493 }
01494 
01495 
01496 //______________________________________________________________________________
01497 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
01498 {
01499   if (!segmentationMode_) {
01500     cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
01501     return -1;
01502   }
01503   key_t* addr=(key_t*)((unsigned int)this+offset);
01504   for (unsigned int i=0;i<iCell;i++) ++addr;
01505   return *addr;
01506 }
01507 
01508 
01509 //______________________________________________________________________________
01510 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
01511 {
01512   if (iCell>=nRawCells_) {
01513     cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
01514         <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
01515     return -1;
01516   }
01517   return shmKey(iCell,rawCellOffset_);
01518 }
01519 
01520 
01521 //______________________________________________________________________________
01522 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
01523 {
01524   if (iCell>=nRecoCells_) {
01525     cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
01526         <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
01527     return -1;
01528   }
01529   return shmKey(iCell,recoCellOffset_);
01530 }
01531 
01532 
01533 //______________________________________________________________________________
01534 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
01535 {
01536   if (iCell>=nDqmCells_) {
01537     cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
01538         <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
01539     return -1;
01540   }
01541   return shmKey(iCell,dqmCellOffset_);
01542 }
01543 
01544 
01545 //______________________________________________________________________________
01546 void FUShmBuffer::sem_init(int isem,int value)
01547 {
01548   if (semctl(semid(),isem,SETVAL,value)<0) {
01549     cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
01550   }
01551 }
01552 
01553 
01554 //______________________________________________________________________________
01555 void FUShmBuffer::sem_wait(int isem)
01556 {
01557   struct sembuf sops[1];
01558   sops[0].sem_num=isem;
01559   sops[0].sem_op =  -1;
01560   sops[0].sem_flg=   0;
01561   if (semop(semid(),sops,1)==-1) {
01562     cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
01563   }
01564 }
01565 
01566 
01567 //______________________________________________________________________________
01568 void FUShmBuffer::sem_post(int isem)
01569 {
01570   struct sembuf sops[1];
01571   sops[0].sem_num=isem;
01572   sops[0].sem_op =   1;
01573   sops[0].sem_flg=   0;
01574   if (semop(semid(),sops,1)==-1) {
01575     cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
01576   }
01577 }

Generated on Tue Jun 9 17:34:48 2009 for CMSSW by  doxygen 1.5.4